Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions extra/bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<artifactId>live-intent-omni-channel-identity</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.prebid.server.hooks.modules</groupId>
<artifactId>pb-rule-engine</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
15 changes: 15 additions & 0 deletions extra/modules/pb-rule-engine/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.prebid.server.hooks.modules</groupId>
<artifactId>all-modules</artifactId>
<version>3.33.0-SNAPSHOT</version>
</parent>

<artifactId>pb-rule-engine</artifactId>

<name>pb-rule-engine</name>
<description>Rule engine module</description>
</project>
1 change: 1 addition & 0 deletions extra/modules/pb-rule-engine/src/lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lombok.anyConstructor.addConstructorProperties = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.prebid.server.hooks.modules.rule.engine.config;

import com.iab.openrtb.request.BidRequest;
import io.vertx.core.Vertx;
import org.prebid.server.bidder.BidderCatalog;
import org.prebid.server.execution.retry.ExponentialBackoffRetryPolicy;
import org.prebid.server.hooks.execution.model.Stage;
import org.prebid.server.hooks.modules.rule.engine.core.config.AccountConfigParser;
import org.prebid.server.hooks.modules.rule.engine.core.config.RuleParser;
import org.prebid.server.hooks.modules.rule.engine.core.config.StageConfigParser;
import org.prebid.server.hooks.modules.rule.engine.core.request.RequestConditionalRuleFactory;
import org.prebid.server.hooks.modules.rule.engine.core.request.RequestRuleContext;
import org.prebid.server.hooks.modules.rule.engine.core.request.RequestStageSpecification;
import org.prebid.server.hooks.modules.rule.engine.v1.PbRuleEngineModule;
import org.prebid.server.json.ObjectMapperProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Clock;
import java.util.concurrent.ThreadLocalRandom;
import java.util.random.RandomGenerator;

@Configuration
@ConditionalOnProperty(prefix = "hooks." + PbRuleEngineModule.CODE, name = "enabled", havingValue = "true")
public class PbRuleEngineModuleConfiguration {

@Bean
PbRuleEngineModule ruleEngineModule(RuleParser ruleParser,
@Value("${datacenter-region:#{null}}") String datacenter) {

return new PbRuleEngineModule(ruleParser, datacenter);
}

@Bean
StageConfigParser<BidRequest, RequestRuleContext> processedAuctionRequestStageParser(
BidderCatalog bidderCatalog) {

final RandomGenerator randomGenerator = () -> ThreadLocalRandom.current().nextLong();

return new StageConfigParser<>(
randomGenerator,
Stage.processed_auction_request,
new RequestStageSpecification(ObjectMapperProvider.mapper(), bidderCatalog, randomGenerator),
new RequestConditionalRuleFactory());
}

@Bean
AccountConfigParser accountConfigParser(
StageConfigParser<BidRequest, RequestRuleContext> processedAuctionRequestStageParser) {

return new AccountConfigParser(ObjectMapperProvider.mapper(), processedAuctionRequestStageParser);
}

@Bean
RuleParser ruleParser(
@Value("${hooks.pb-rule-engine.rule-cache.expire-after-minutes}") long cacheExpireAfterMinutes,
@Value("${hooks.pb-rule-engine.rule-cache.max-size}") long cacheMaxSize,
@Value("${hooks.pb-rule-engine.rule-parsing.retry-initial-delay-millis}") long delay,
@Value("${hooks.pb-rule-engine.rule-parsing.retry-max-delay-millis}") long maxDelay,
@Value("${hooks.pb-rule-engine.rule-parsing.retry-exponential-factor}") double factor,
@Value("${hooks.pb-rule-engine.rule-parsing.retry-exponential-jitter}") double jitter,
AccountConfigParser accountConfigParser,
Vertx vertx,
Clock clock) {

return new RuleParser(
cacheExpireAfterMinutes,
cacheMaxSize,
ExponentialBackoffRetryPolicy.of(delay, maxDelay, factor, jitter),
accountConfigParser,
vertx,
clock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.prebid.server.hooks.modules.rule.engine.core.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.iab.openrtb.request.BidRequest;
import org.prebid.server.exception.PreBidException;
import org.prebid.server.hooks.modules.rule.engine.core.config.model.AccountConfig;
import org.prebid.server.hooks.modules.rule.engine.core.request.RequestRuleContext;
import org.prebid.server.hooks.modules.rule.engine.core.rules.NoOpRule;
import org.prebid.server.hooks.modules.rule.engine.core.rules.PerStageRule;

import java.util.Objects;

public class AccountConfigParser {

private final ObjectMapper mapper;
private final StageConfigParser<BidRequest, RequestRuleContext> processedAuctionRequestStageParser;

public AccountConfigParser(
ObjectMapper mapper,
StageConfigParser<BidRequest, RequestRuleContext> processedAuctionRequestStageParser) {

this.mapper = Objects.requireNonNull(mapper);
this.processedAuctionRequestStageParser = Objects.requireNonNull(processedAuctionRequestStageParser);
}

public PerStageRule parse(ObjectNode accountConfig) {
final AccountConfig parsedConfig;
try {
parsedConfig = mapper.treeToValue(accountConfig, AccountConfig.class);
} catch (JsonProcessingException e) {
throw new PreBidException(e.getMessage());
}

if (!parsedConfig.isEnabled()) {
return PerStageRule.builder()
.timestamp(parsedConfig.getTimestamp())
.processedAuctionRequestRule(NoOpRule.create())
.build();
}

return PerStageRule.builder()
.timestamp(parsedConfig.getTimestamp())
.processedAuctionRequestRule(processedAuctionRequestStageParser.parse(parsedConfig))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package org.prebid.server.hooks.modules.rule.engine.core.config;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import org.apache.commons.lang3.ObjectUtils;
import org.prebid.server.execution.retry.RetryPolicy;
import org.prebid.server.execution.retry.Retryable;
import org.prebid.server.hooks.modules.rule.engine.core.rules.PerStageRule;
import org.prebid.server.log.Logger;
import org.prebid.server.log.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public class RuleParser {

private static final Logger logger = LoggerFactory.getLogger(RuleParser.class);

private final AccountConfigParser parser;
private final Vertx vertx;
private final Clock clock;

private final RetryPolicy retryPolicy;

private final Map<String, ParsingAttempt> accountIdToParsingAttempt;
private final Map<String, PerStageRule> accountIdToRules;

public RuleParser(long cacheExpireAfterMinutes,
long cacheMaxSize,
RetryPolicy retryPolicy,
AccountConfigParser parser,
Vertx vertx,
Clock clock) {

this.parser = Objects.requireNonNull(parser);
this.vertx = Objects.requireNonNull(vertx);
this.clock = Objects.requireNonNull(clock);
this.retryPolicy = Objects.requireNonNull(retryPolicy);

this.accountIdToParsingAttempt = Caffeine.newBuilder()
.expireAfterAccess(cacheExpireAfterMinutes, TimeUnit.MINUTES)
.maximumSize(cacheMaxSize)
.<String, ParsingAttempt>build()
.asMap();

this.accountIdToRules = Caffeine.newBuilder()
.expireAfterAccess(cacheExpireAfterMinutes, TimeUnit.MINUTES)
.maximumSize(cacheMaxSize)
.<String, PerStageRule>build()
.asMap();
}

public Future<PerStageRule> parseForAccount(String accountId, ObjectNode config) {
final PerStageRule cachedRule = accountIdToRules.get(accountId);

if (cachedRule != null && cachedRule.timestamp().compareTo(getConfigTimestamp(config)) >= 0) {
return Future.succeededFuture(cachedRule);
}

parseConfig(accountId, config);
return Future.succeededFuture(ObjectUtils.defaultIfNull(cachedRule, PerStageRule.noOp()));
}

private Instant getConfigTimestamp(ObjectNode config) {
try {
return Optional.of(config)
.map(node -> node.get("timestamp"))
.filter(JsonNode::isTextual)
.map(JsonNode::asText)
.map(Instant::parse)
.orElse(Instant.EPOCH);
} catch (DateTimeParseException exception) {
return Instant.EPOCH;
}
}

private void parseConfig(String accountId, ObjectNode config) {
final Instant now = clock.instant();
final ParsingAttempt attempt = accountIdToParsingAttempt.compute(
accountId, (ignored, previousAttempt) -> tryRegisteringNewAttempt(previousAttempt, now));

// reference equality used on purpose - if references are equal - then we should parse
if (attempt.timestamp() == now) {
logger.info("Parsing rule for account {}", accountId);
vertx.executeBlocking(() -> parser.parse(config))
.onSuccess(result -> succeedParsingAttempt(accountId, result))
.onFailure(error -> failParsingAttempt(accountId, attempt, error));
}
}

private ParsingAttempt tryRegisteringNewAttempt(ParsingAttempt previousAttempt, Instant currentAttemptStart) {
if (previousAttempt == null) {
return new ParsingAttempt.InProgress(currentAttemptStart, retryPolicy);
}

if (previousAttempt instanceof ParsingAttempt.InProgress) {
return previousAttempt;
}

if (previousAttempt.retryPolicy() instanceof Retryable previousAttemptRetryPolicy) {
final Instant previouslyDecidedToRetryAfter = previousAttempt.timestamp().plus(
Duration.ofMillis(previousAttemptRetryPolicy.delay()));

return previouslyDecidedToRetryAfter.isBefore(currentAttemptStart)
? new ParsingAttempt.InProgress(currentAttemptStart, previousAttemptRetryPolicy.next())
: previousAttempt;
}

return previousAttempt;
}

private void succeedParsingAttempt(String accountId, PerStageRule result) {
accountIdToRules.put(accountId, result);
accountIdToParsingAttempt.remove(accountId);

logger.debug("Successfully parsed rule-engine config for account {}", accountId);
}

private void failParsingAttempt(String accountId, ParsingAttempt attempt, Throwable cause) {
accountIdToParsingAttempt.put(accountId, ((ParsingAttempt.InProgress) attempt).failed());

logger.error(
"Failed to parse rule-engine config for account %s: %s".formatted(accountId, cause.getMessage()),
cause);
}

private sealed interface ParsingAttempt {

Instant timestamp();

RetryPolicy retryPolicy();

record Failed(Instant timestamp, RetryPolicy retryPolicy) implements ParsingAttempt {
}

record InProgress(Instant timestamp, RetryPolicy retryPolicy) implements ParsingAttempt {

public Failed failed() {
return new Failed(timestamp, retryPolicy);
}
}
}
}
Loading
Loading