NIFI-3431: Support batch update in Notify processor

- Added Signal Counter Delta property
- Added Signal Buffer Count property
- Added processor property name and display name
- Changed IOException handling from routing it to failure to throw
  RuntimeException, so that NiFi framework can yield the processor for a while and try again

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1466.
This commit is contained in:
Koji Kawamura 2017-02-02 15:18:01 +09:00 committed by Pierre Villard
parent b7cdc6b382
commit a90fa9c285
4 changed files with 358 additions and 116 deletions

View File

@ -24,7 +24,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
@ -35,6 +34,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
@ -59,7 +59,8 @@ public class Notify extends AbstractProcessor {
// Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.name("distributed-cache-service")
.displayName("Distributed Cache Service")
.description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor")
.required(true)
.identifiesControllerService(AtomicDistributedMapCacheClient.class)
@ -67,7 +68,8 @@ public class Notify extends AbstractProcessor {
// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Release Signal Identifier")
.name("release-signal-id")
.displayName("Release Signal Identifier")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the release signal cache key")
.required(true)
@ -76,7 +78,8 @@ public class Notify extends AbstractProcessor {
.build();
public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
.name("Signal Counter Name")
.name("signal-counter-name")
.displayName("Signal Counter Name")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter name. " +
"Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " +
@ -87,9 +90,35 @@ public class Notify extends AbstractProcessor {
.defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME)
.build();
public static final PropertyDescriptor SIGNAL_COUNTER_DELTA = new PropertyDescriptor.Builder()
.name("signal-counter-delta")
.displayName("Signal Counter Delta")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter delta. " +
"Specify how much the counter should increase. " +
"For example, if multiple signal events are processed at upstream flow in batch oriented way, " +
"the number of events processed can be notified with this property at once.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.defaultValue("1")
.build();
public static final PropertyDescriptor SIGNAL_BUFFER_COUNT = new PropertyDescriptor.Builder()
.name("signal-buffer-count")
.displayName("Signal Buffer Count")
.description("Specify the maximum number of incoming flow files that can be buffered until signals are notified to cache service. " +
"The more buffer can provide the better performance, as it reduces the number of interactions with cache service " +
"by grouping signals by signal identifier when multiple incoming flow files share the same signal identifier.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();
// Specifies an optional regex used to identify which attributes to cache
public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder()
.name("Attribute Cache Regex")
.name("attribute-cache-regex")
.displayName("Attribute Cache Regex")
.description("Any attributes whose names match this regex will be stored in the distributed cache to be "
+ "copied to any FlowFiles released from a corresponding Wait processor. Note that the "
+ "uuid attribute will not be cached regardless of this value. If blank, no attributes "
@ -122,6 +151,8 @@ public class Notify extends AbstractProcessor {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(SIGNAL_COUNTER_DELTA);
descriptors.add(SIGNAL_BUFFER_COUNT);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_CACHE_REGEX);
return descriptors;
@ -132,58 +163,100 @@ public class Notify extends AbstractProcessor {
return relationships;
}
private class SignalBuffer {
final Map<String, Integer> deltas = new HashMap<>();
final Map<String, String> attributesToCache = new HashMap<>();
final List<FlowFile> flowFiles = new ArrayList<>();
int incrementDelta(final String counterName, final int delta) {
int current = deltas.containsKey(counterName) ? deltas.get(counterName) : 0;
int updated = current + delta;
deltas.put(counterName, updated);
return updated;
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final ComponentLog logger = getLogger();
final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
final PropertyValue counterNameProperty = context.getProperty(SIGNAL_COUNTER_NAME);
final PropertyValue deltaProperty = context.getProperty(SIGNAL_COUNTER_DELTA);
final String attributeCacheRegex = context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue();
final Integer bufferCount = context.getProperty(SIGNAL_BUFFER_COUNT).asInteger();
// the cache client used to interact with the distributed cache.
final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final Map<String, SignalBuffer> signalBuffers = new HashMap<>();
for (int i = 0; i < bufferCount; i++) {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
break;
}
final ComponentLog logger = getLogger();
// Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
final String counterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String signalId = signalIdProperty.evaluateAttributeExpressions(flowFile).getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
continue;
}
// the cache client used to interact with the distributed cache
final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
String counterName = counterNameProperty.evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(counterName)) {
counterName = WaitNotifyProtocol.DEFAULT_COUNT_NAME;
}
int delta = 1;
if (deltaProperty.isSet()) {
final String deltaStr = deltaProperty.evaluateAttributeExpressions(flowFile).getValue();
try {
final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet())
? context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue()
: null;
Map<String, String> attributesToCache = new HashMap<>();
if (StringUtils.isNotEmpty(attributeCacheRegex)) {
attributesToCache = flowFile.getAttributes().entrySet()
.stream().filter(e -> (!e.getKey().equals("uuid") && e.getKey().matches(attributeCacheRegex)))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
delta = Integer.parseInt(deltaStr);
} catch (final NumberFormatException e) {
logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[] {flowFile, e}, e);
session.transfer(flowFile, REL_FAILURE);
continue;
}
}
if (!signalBuffers.containsKey(signalId)) {
signalBuffers.put(signalId, new SignalBuffer());
}
final SignalBuffer signalBuffer = signalBuffers.get(signalId);
if (StringUtils.isNotEmpty(attributeCacheRegex)) {
flowFile.getAttributes().entrySet()
.stream().filter(e -> (!e.getKey().equals("uuid") && e.getKey().matches(attributeCacheRegex)))
.forEach(e -> signalBuffer.attributesToCache.put(e.getKey(), e.getValue()));
}
signalBuffer.incrementDelta(counterName, delta);
signalBuffer.flowFiles.add(flowFile);
if (logger.isDebugEnabled()) {
logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile});
}
// In case of ConcurrentModificationException, just throw the exception so that processor can
// retry after yielding for a while.
protocol.notify(signalId, counterName, 1, attributesToCache);
session.transfer(flowFile, REL_SUCCESS);
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
}
signalBuffers.forEach((signalId, signalBuffer) -> {
// In case of Exception, just throw the exception so that processor can
// retry after yielding for a while.
try {
protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache);
session.transfer(signalBuffer.flowFiles, REL_SUCCESS);
} catch (IOException e) {
throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", signalId, e), e);
}
});
}
}

View File

@ -81,7 +81,7 @@ public class WaitNotifyProtocol {
public long getCount(final String counterName) {
final Long count = counts.get(counterName);
return count != null ? count : -1;
return count != null ? count : 0;
}
}
@ -92,6 +92,58 @@ public class WaitNotifyProtocol {
this.cache = cache;
}
/**
* Notify a signal to increase a counter.
* @param signalId a key in the underlying cache engine
* @param deltas a map containing counterName and delta entries
* @param attributes attributes to save in the cache entry
* @return A Signal instance, merged with an existing signal if any
* @throws IOException thrown when it failed interacting with the cache engine
* @throws ConcurrentModificationException thrown if other process is also updating the same signal and failed to update after few retry attempts
*/
public Signal notify(final String signalId, final Map<String, Integer> deltas, final Map<String, String> attributes)
throws IOException, ConcurrentModificationException {
for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
final Signal existingSignal = getSignal(signalId);
final Signal signal = existingSignal != null ? existingSignal : new Signal();
if (attributes != null) {
signal.attributes.putAll(attributes);
}
deltas.forEach((counterName, delta) -> {
long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
count += delta;
signal.counts.put(counterName, count);
});
final String signalJson = objectMapper.writeValueAsString(signal);
final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
return signal;
}
long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1);
logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, deltas);
try {
Thread.sleep(waitMillis);
} catch (InterruptedException e) {
final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, deltas);
throw new ConcurrentModificationException(msg, e);
}
}
final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, deltas, MAX_REPLACE_RETRY_COUNT);
throw new ConcurrentModificationException(msg);
}
/**
* Notify a signal to increase a counter.
* @param signalId a key in the underlying cache engine
@ -105,43 +157,10 @@ public class WaitNotifyProtocol {
public Signal notify(final String signalId, final String counterName, final int delta, final Map<String, String> attributes)
throws IOException, ConcurrentModificationException {
for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
final Map<String, Integer> deltas = new HashMap<>();
deltas.put(counterName, delta);
return notify(signalId, deltas, attributes);
final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
Signal signal = getSignal(signalId);
if (signal == null) {
signal = new Signal();
}
if (attributes != null) {
signal.attributes.putAll(attributes);
}
long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
count += delta;
signal.counts.put(counterName, count);
final String signalJson = objectMapper.writeValueAsString(signal);
final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
return signal;
}
long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1);
logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, counterName);
try {
Thread.sleep(waitMillis);
} catch (InterruptedException e) {
final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, counterName);
throw new ConcurrentModificationException(msg, e);
}
}
final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, counterName, MAX_REPLACE_RETRY_COUNT);
throw new ConcurrentModificationException(msg);
}
/**

View File

@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestNotify {
@ -112,6 +113,145 @@ public class TestNotify {
assertEquals(1, signal.getCount("failure"));
}
@Test
public void testNotifyCountersBatch() throws InitializationException, IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "2");
final Map<String, String> props1 = new HashMap<>();
props1.put("releaseSignalAttribute", "someDataProcessing");
props1.put("key", "data1");
props1.put("status", "success");
runner.enqueue(new byte[]{}, props1);
final Map<String, String> props2 = new HashMap<>();
props2.put("releaseSignalAttribute", "someDataProcessing");
props2.put("key", "data2");
props2.put("status", "success");
runner.enqueue(new byte[]{}, props2);
final Map<String, String> props3 = new HashMap<>();
props3.put("releaseSignalAttribute", "someDataProcessing");
props3.put("key", "data3");
props3.put("status", "failure");
runner.enqueue(new byte[]{}, props3);
runner.run();
// Limited by the buffer count
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 2);
runner.clearTransferState();
Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
Map<String, String> cachedAttributes = signal.getAttributes();
assertEquals("Same attribute key will be overwritten by the latest signal", "data2", cachedAttributes.get("key"));
assertTrue(signal.isTotalCountReached(2));
assertEquals(2, signal.getCount("success"));
assertEquals(0, signal.getCount("failure"));
// Run it again, and it should process remaining one flow file.
runner.run();
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
runner.clearTransferState();
signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
cachedAttributes = signal.getAttributes();
assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key"));
assertTrue(signal.isTotalCountReached(3));
assertEquals(2, signal.getCount("success"));
assertEquals(1, signal.getCount("failure"));
}
@Test
public void testNotifyCountersUsingDelta() throws InitializationException, IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
runner.setProperty(Notify.SIGNAL_COUNTER_DELTA, "${record.count}");
runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "10");
final Map<String, String> props1 = new HashMap<>();
props1.put("releaseSignalAttribute", "someDataProcessing");
props1.put("key", "data1");
props1.put("status", "success");
props1.put("record.count", "1024");
runner.enqueue(new byte[]{}, props1);
final Map<String, String> props2 = new HashMap<>();
props2.put("releaseSignalAttribute", "someDataProcessing");
props2.put("key", "data2");
props2.put("status", "success");
props2.put("record.count", "2048");
runner.enqueue(new byte[]{}, props2);
final Map<String, String> props3 = new HashMap<>();
props3.put("releaseSignalAttribute", "someDataProcessing");
props3.put("key", "data3");
props3.put("status", "failure");
props3.put("record.count", "512");
runner.enqueue(new byte[]{}, props3);
runner.run();
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
runner.clearTransferState();
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
Map<String, String> cachedAttributes = signal.getAttributes();
assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key"));
assertTrue(signal.isTotalCountReached(3584));
assertEquals(3072, signal.getCount("success"));
assertEquals(512, signal.getCount("failure"));
}
@Test
public void testIllegalDelta() throws InitializationException, IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
runner.setProperty(Notify.SIGNAL_COUNTER_DELTA, "${record.count}");
runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "10");
final Map<String, String> props1 = new HashMap<>();
props1.put("releaseSignalAttribute", "someDataProcessing");
props1.put("key", "data1");
props1.put("status", "success");
props1.put("record.count", "1024");
runner.enqueue(new byte[]{}, props1);
final Map<String, String> props2 = new HashMap<>();
props2.put("releaseSignalAttribute", "someDataProcessing");
props2.put("key", "data2");
props2.put("status", "success");
props2.put("record.count", "2048 records");
runner.enqueue(new byte[]{}, props2);
final Map<String, String> props3 = new HashMap<>();
props3.put("releaseSignalAttribute", "someDataProcessing");
props3.put("key", "data3");
props3.put("status", "failure");
props3.put("record.count", "512");
runner.enqueue(new byte[]{}, props3);
runner.run();
// Only failed records should be transferred to failure.
runner.assertTransferCount(Notify.REL_SUCCESS, 2);
runner.assertTransferCount(Notify.REL_FAILURE, 1);
runner.clearTransferState();
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
Map<String, String> cachedAttributes = signal.getAttributes();
assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key"));
assertTrue(signal.isTotalCountReached(1536));
assertEquals(1024, signal.getCount("success"));
assertEquals(512, signal.getCount("failure"));
}
@Test
public void testRegex() throws InitializationException, IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
@ -156,13 +296,14 @@ public class TestNotify {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "2");
runner.enqueue(new byte[] {}, props);
try {
runner.run();
//Expect the processor to receive an IO exception from the cache service and route to failure
runner.assertAllFlowFilesTransferred(Notify.REL_FAILURE, 1);
runner.assertTransferCount(Notify.REL_FAILURE, 1);
fail("Processor should throw RuntimeException in case it receives an IO exception from the cache service and yield for a while.");
} catch (final AssertionError e) {
assertTrue(e.getCause() instanceof RuntimeException);
}
service.setFailOnCalls(false);
}
static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient {

View File

@ -134,6 +134,15 @@ public class TestWaitNotifyProtocol {
assertEquals(4, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
final Map<String, Integer> deltas = new HashMap<>();
deltas.put("a", 10);
deltas.put("b", 25);
protocol.notify("signal-id", deltas, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(5, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
}
@Test