NIFI-10192 Caffeine cache schema request for reuse

This closes #6364

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Aerilym 2022-09-09 12:47:49 +10:00 committed by Mike Thomsen
parent 315e54a812
commit 4b691b133b
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
1 changed files with 41 additions and 2 deletions

View File

@ -17,6 +17,8 @@
package org.apache.nifi.processors.standard;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -45,6 +47,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
@ -194,6 +197,18 @@ public class LookupRecord extends AbstractProcessor {
.required(true)
.build();
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("record-path-lookup-miss-result-cache-size")
.displayName("Cache Size")
.description("Specifies how many lookup values/records should be cached."
+ "Setting this property to zero means no caching will be done and the table will be queried for each lookup value in each record. If the lookup "
+ "table changes often or the most recent data must be retrieved, do not use the cache.")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("0")
.required(true)
.build();
static final Relationship REL_MATCHED = new Relationship.Builder()
.name("matched")
.description("All records for which the lookup returns a value will be routed to this relationship")
@ -238,6 +253,7 @@ public class LookupRecord extends AbstractProcessor {
properties.add(ROUTING_STRATEGY);
properties.add(RESULT_CONTENTS);
properties.add(REPLACEMENT_STRATEGY);
properties.add(CACHE_SIZE);
return properties;
}
@ -451,7 +467,7 @@ public class LookupRecord extends AbstractProcessor {
if (isInPlaceReplacement) {
return new InPlaceReplacementStrategy();
} else {
return new RecordPathReplacementStrategy();
return new RecordPathReplacementStrategy(context);
}
}
@ -536,6 +552,19 @@ public class LookupRecord extends AbstractProcessor {
private class RecordPathReplacementStrategy implements ReplacementStrategy {
private int lookupCount = 0;
private volatile Cache<Map<String, Object>, Optional<?>> cache;
public RecordPathReplacementStrategy(ProcessContext context) {
final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
if (this.cache == null || cacheSize > 0) {
this.cache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.build();
}
}
@Override
public Set<Relationship> lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) {
lookupCount++;
@ -548,8 +577,15 @@ public class LookupRecord extends AbstractProcessor {
final FlowFile flowFile = lookupContext.getOriginalFlowFile();
final Optional<?> lookupValueOption;
final Optional<?> lookupValueCacheOption;
try {
lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
lookupValueCacheOption = (Optional<?>) cache.get(lookupCoordinates, k -> null);
if (lookupValueCacheOption == null) {
lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
} else {
lookupValueOption = lookupValueCacheOption;
}
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}
@ -634,6 +670,9 @@ public class LookupRecord extends AbstractProcessor {
}
final Optional<?> lookupResult = lookupService.lookup(lookupCoordinates, flowFileAttributes);
cache.put(lookupCoordinates, lookupResult);
if (!lookupResult.isPresent()) {
continue;
}