From 34627f78b7816f5c0cf9d919787408aefb204545 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Wed, 14 Dec 2016 15:32:16 +0000 Subject: [PATCH] NIFI-190: Initial commit of Wait and Notify processors This closes #1329. Signed-off-by: Bryan Bende --- .../nifi/processors/standard/Notify.java | 191 ++++++++++++ .../apache/nifi/processors/standard/Wait.java | 266 ++++++++++++++++ .../util/FlowFileAttributesSerializer.java | 71 +++++ .../org.apache.nifi.processor.Processor | 2 + .../nifi/processors/standard/TestNotify.java | 190 ++++++++++++ .../nifi/processors/standard/TestWait.java | 290 ++++++++++++++++++ .../TestFlowFileAttributesSerializer.java | 69 +++++ 7 files changed, 1079 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FlowFileAttributesSerializer.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestFlowFileAttributesSerializer.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java new file mode 100644 index 0000000000..23052fca7b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.expression.AttributeExpression.ResultType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; + +@EventDriven +@SupportsBatching +@Tags({"map", "cache", "notify", "distributed", "signal", "release"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Caches a release signal identifier in the distributed cache, optionally along with " + + "the FlowFile's attributes. Any flow files held at a corresponding Wait processor will be " + + "released once this signal in the cache is discovered.") +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", + "org.apache.nifi.processors.standard.Wait"}) +public class Notify extends AbstractProcessor { + + // Identifies the distributed map cache client + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("Distributed Cache Service") + .description("The Controller Service that is used to check for release signals from a corresponding Notify processor") + .required(true) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + // Selects the FlowFile attribute or expression, whose value is used as cache key + public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder() + .name("Release Signal Identifier") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the release signal cache key") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); + + // Specifies an optional regex used to identify which attributes to cache + public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder() + .name("Attribute Cache Regex") + .description("Any attributes whose names match this regex will be stored in the distributed cache to be " + + "copied to any FlowFiles released from a corresponding Wait processor. Note that the " + + "uuid attribute will not be cached regardless of this value. If blank, no attributes " + + "will be cached.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles where the release signal has been successfully entered in the cache will be routed to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship") + .build(); + + private final Set relationships; + + private final Serializer keySerializer = new StringSerializer(); + private final Serializer> valueSerializer = new FlowFileAttributesSerializer(); + + public Notify() { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(RELEASE_SIGNAL_IDENTIFIER); + descriptors.add(DISTRIBUTED_CACHE_SERVICE); + descriptors.add(ATTRIBUTE_CACHE_REGEX); + return descriptors; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + + // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support + final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); + + // if the computed value is null, or empty, we transfer the flow file to failure relationship + if (StringUtils.isBlank(cacheKey)) { + logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + // the cache client used to interact with the distributed cache + final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + + try { + final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet()) + ? context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue() + : null; + + Map attributesToCache = new HashMap<>(); + if (StringUtils.isNotEmpty(attributeCacheRegex)) { + attributesToCache = flowFile.getAttributes().entrySet() + .stream().filter(e -> (!e.getKey().equals("uuid") && e.getKey().matches(attributeCacheRegex))) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + } + + if (logger.isDebugEnabled()) { + logger.debug("Cached release signal identifier {} from FlowFile {}", new Object[] {cacheKey, flowFile}); + } + + cache.put(cacheKey, attributesToCache, keySerializer, valueSerializer); + + session.transfer(flowFile, REL_SUCCESS); + } catch (final IOException e) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e}); + } + } + + /** + * Simple string serializer, used for serializing the cache key + */ + public static class StringSerializer implements Serializer { + + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java new file mode 100644 index 0000000000..4a4f8033bd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.expression.AttributeExpression.ResultType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; + +@EventDriven +@SupportsBatching +@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal " + + "is stored in the distributed cache from a corresponding Notify processor. At this point, a waiting FlowFile is routed to " + + "the 'success' relationship, with attributes copied from the FlowFile that produced " + + "the release signal from the Notify processor. The release signal entry is then removed from " + + "the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration.") +@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the " + + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile.") +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", + "org.apache.nifi.processors.standard.Notify"}) +public class Wait extends AbstractProcessor { + + public static final String WAIT_START_TIMESTAMP = "wait.start.timestamp"; + + // Identifies the distributed map cache client + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("Distributed Cache Service") + .description("The Controller Service that is used to check for release signals from a corresponding Notify processor") + .required(true) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + // Selects the FlowFile attribute or expression, whose value is used as cache key + public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder() + .name("Release Signal Identifier") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the release signal cache key") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); + + // Selects the FlowFile attribute or expression, whose value is used as cache key + public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder() + .name("Expiration Duration") + .description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship") + .required(true) + .defaultValue("10 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new AllowableValue("replace", "Replace if present", + "When cached attributes are copied onto released FlowFiles, they replace any matching attributes."); + + public static final AllowableValue ATTRIBUTE_COPY_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original", + "Attributes on released FlowFiles are not overwritten by copied cached attributes."); + + public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder() + .name("Attribute Copy Mode") + .description("Specifies how to handle attributes copied from flow files entering the Notify processor") + .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()) + .required(true) + .allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL) + .expressionLanguageSupported(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile with a matching release signal in the cache will be routed to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship") + .build(); + + public static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("A FlowFile with no matching release signal in the cache will be routed to this relationship") + .build(); + + public static final Relationship REL_EXPIRED = new Relationship.Builder() + .name("expired") + .description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship") + .build(); + private final Set relationships; + + private final Serializer keySerializer = new StringSerializer(); + private final Deserializer> valueDeserializer = new FlowFileAttributesSerializer(); + + public Wait() { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_WAIT); + rels.add(REL_EXPIRED); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(RELEASE_SIGNAL_IDENTIFIER); + descriptors.add(EXPIRATION_DURATION); + descriptors.add(DISTRIBUTED_CACHE_SERVICE); + descriptors.add(ATTRIBUTE_COPY_MODE); + return descriptors; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + + // cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support + final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); + + // if the computed value is null, or empty, we transfer the flow file to failure relationship + if (StringUtils.isBlank(cacheKey)) { + logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + // the cache client used to interact with the distributed cache + final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + + try { + // check for expiration + String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP); + if (waitStartTimestamp == null) { + waitStartTimestamp = String.valueOf(System.currentTimeMillis()); + flowFile = session.putAttribute(flowFile, WAIT_START_TIMESTAMP, waitStartTimestamp); + } + + long lWaitStartTimestamp = 0L; + try { + lWaitStartTimestamp = Long.parseLong(waitStartTimestamp); + } catch (NumberFormatException nfe) { + logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + long expirationDuration = context.getProperty(EXPIRATION_DURATION) + .asTimePeriod(TimeUnit.MILLISECONDS); + long now = System.currentTimeMillis(); + if (now > (lWaitStartTimestamp + expirationDuration)) { + logger.warn("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)}); + session.transfer(flowFile, REL_EXPIRED); + return; + } + + // get notifying flow file attributes + Map cachedAttributes = cache.get(cacheKey, keySerializer, valueDeserializer); + + if (cachedAttributes == null) { + if (logger.isDebugEnabled()) { + logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {cacheKey, flowFile}); + } + session.transfer(flowFile, REL_WAIT); + return; + } + + // copy over attributes from release signal flow file, if provided + if (!cachedAttributes.isEmpty()) { + cachedAttributes.remove("uuid"); + String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue(); + if (ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode)) { + flowFile = session.putAllAttributes(flowFile, cachedAttributes); + } else { + Map attributesToCopy = new HashMap<>(); + for(Entry entry : cachedAttributes.entrySet()) { + // if the current flow file does *not* have the cached attribute, copy it + if (flowFile.getAttribute(entry.getKey()) == null) { + attributesToCopy.put(entry.getKey(), entry.getValue()); + } + } + flowFile = session.putAllAttributes(flowFile, attributesToCopy); + } + } + + session.transfer(flowFile, REL_SUCCESS); + + cache.remove(cacheKey, keySerializer); + } catch (final IOException e) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e}); + } + } + + /** + * Simple string serializer, used for serializing the cache key + */ + public static class StringSerializer implements Serializer { + + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FlowFileAttributesSerializer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FlowFileAttributesSerializer.java new file mode 100644 index 0000000000..2e0995de51 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FlowFileAttributesSerializer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Pattern; + +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +/** + * Offers serialization and deserialization for flow file attribute maps. + * + */ +public class FlowFileAttributesSerializer implements Deserializer>, Serializer> { + + private static final String ATTRIBUTE_SEPARATOR = "<|--|>"; + + @Override + public Map deserialize(final byte[] input) throws DeserializationException, IOException { + if (input == null || input.length == 0) { + return null; + } + Map attributes = new HashMap<>(); + + String attributesText = new String(input, StandardCharsets.UTF_8); + String[] entries = attributesText.split(Pattern.quote(ATTRIBUTE_SEPARATOR)); + for(String entry : entries) { + int equalsIndex = entry.indexOf('='); + String key = entry.substring(0, equalsIndex); + String value = entry.substring(equalsIndex + 1); + attributes.put(key, value); + } + + return attributes; + } + + @Override + public void serialize(Map value, OutputStream output) throws SerializationException, IOException { + int i = 0; + for(Entry entry : value.entrySet()) { + output.write((entry.getKey() + '=' + entry.getValue()).getBytes(StandardCharsets.UTF_8)); + if (i < value.size() - 1) { + output.write(ATTRIBUTE_SEPARATOR.getBytes(StandardCharsets.UTF_8)); + } + i++; + } + output.flush(); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b9aae35b36..2b4ad7682b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -58,6 +58,7 @@ org.apache.nifi.processors.standard.LogAttribute org.apache.nifi.processors.standard.MergeContent org.apache.nifi.processors.standard.ModifyBytes org.apache.nifi.processors.standard.MonitorActivity +org.apache.nifi.processors.standard.Notify org.apache.nifi.processors.standard.ParseCEF org.apache.nifi.processors.standard.ParseSyslog org.apache.nifi.processors.standard.PostHTTP @@ -89,6 +90,7 @@ org.apache.nifi.processors.standard.TransformXml org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.ValidateXml org.apache.nifi.processors.standard.ValidateCsv +org.apache.nifi.processors.standard.Wait org.apache.nifi.processors.standard.ExecuteSQL org.apache.nifi.processors.standard.FetchDistributedMapCache org.apache.nifi.processors.standard.ListFTP diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java new file mode 100644 index 0000000000..37c4c436ae --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestNotify { + + private TestRunner runner; + private MockCacheClient service; + + @Before + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(Notify.class); + + service = new MockCacheClient(); + runner.addControllerService("service", service); + runner.enableControllerService(service); + runner.setProperty(Notify.DISTRIBUTED_CACHE_SERVICE, "service"); + } + + @Test + public void testNotify() throws InitializationException, IOException { + runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "1"); + props.put("key", "value"); + runner.enqueue(new byte[] {},props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1); + runner.clearTransferState(); + + Map cachedAttributes = service.get("1", new Notify.StringSerializer(), new FlowFileAttributesSerializer()); + assertEquals("value", cachedAttributes.get("key")); + } + + @Test + public void testRegex() throws InitializationException, IOException { + runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, "key[0-9]*"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "1"); + props.put("key1", "value"); + props.put("other.key1", "value"); + runner.enqueue(new byte[] {},props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1); + runner.clearTransferState(); + + Map cachedAttributes = service.get("1", new Notify.StringSerializer(), new FlowFileAttributesSerializer()); + assertEquals("value", cachedAttributes.get("key1")); + assertNull(cachedAttributes.get("other.key1")); + } + + @Test + public void testEmptyReleaseSignal() throws InitializationException, InterruptedException { + runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + + final Map props = new HashMap<>(); + runner.enqueue(new byte[] {},props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Notify.REL_FAILURE, 1); + runner.clearTransferState(); + } + + @Test + public void testFailingCacheService() throws InitializationException, IOException { + service.setFailOnCalls(true); + runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "2"); + runner.enqueue(new byte[] {}, props); + runner.run(); + + //Expect the processor to receive an IO exception from the cache service and route to failure + runner.assertAllFlowFilesTransferred(Notify.REL_FAILURE, 1); + runner.assertTransferCount(Notify.REL_FAILURE, 1); + + service.setFailOnCalls(false); + } + + private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { + private final ConcurrentMap values = new ConcurrentHashMap<>(); + private boolean failOnCalls = false; + + public void setFailOnCalls(boolean failOnCalls){ + this.failOnCalls = failOnCalls; + } + + + private void verifyNotFail() throws IOException { + if (failOnCalls) { + throw new IOException("Could not call to remote service because Unit Test marked service unavailable"); + } + } + + @Override + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + verifyNotFail(); + final Object retValue = values.putIfAbsent(key, value); + return (retValue == null); + } + + @Override + @SuppressWarnings("unchecked") + public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, + final Deserializer valueDeserializer) throws IOException { + verifyNotFail(); + return (V) values.putIfAbsent(key, value); + } + + @Override + public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { + verifyNotFail(); + return values.containsKey(key); + } + + @Override + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + verifyNotFail(); + values.put(key, value); + } + + @Override + @SuppressWarnings("unchecked") + public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + verifyNotFail(); + if(values.containsKey(key)) { + return (V) values.get(key); + } else { + return null; + } + } + + @Override + public void close() throws IOException { + } + + @Override + public boolean remove(final K key, final Serializer serializer) throws IOException { + verifyNotFail(); + values.remove(key); + return true; + } + } + + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java new file mode 100644 index 0000000000..f42740b81c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestWait { + + private TestRunner runner; + private MockCacheClient service; + + @Before + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(Wait.class); + + service = new MockCacheClient(); + runner.addControllerService("service", service); + runner.enableControllerService(service); + runner.setProperty(Wait.DISTRIBUTED_CACHE_SERVICE, "service"); + } + + @Test + public void testWait() throws InitializationException { + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "1"); + runner.enqueue(new byte[] {},props); + + runner.run(); + + // no cache key attribute + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + runner.clearTransferState(); + } + + @Test + public void testExpired() throws InitializationException, InterruptedException { + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "1"); + runner.enqueue(new byte[] {},props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + + runner.clearTransferState(); + runner.enqueue(ff); + + Thread.sleep(101L); + runner.run(); + + runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1); + runner.clearTransferState(); + } + + @Test + public void testBadWaitStartTimestamp() throws InitializationException, InterruptedException { + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "1"); + props.put("wait.start.timestamp", "blue bunny"); + runner.enqueue(new byte[] {},props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); + runner.clearTransferState(); + } + + @Test + public void testEmptyReleaseSignal() throws InitializationException, InterruptedException { + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + + final Map props = new HashMap<>(); + runner.enqueue(new byte[] {},props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); + runner.clearTransferState(); + } + + @Test + public void testFailingCacheService() throws InitializationException, IOException { + service.setFailOnCalls(true); + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + + final Map props = new HashMap<>(); + props.put("releaseSignalAttribute", "2"); + runner.enqueue(new byte[] {}, props); + runner.run(); + + //Expect the processor to receive an IO exception from the cache service and route to failure + runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); + runner.assertTransferCount(Wait.REL_FAILURE, 1); + + service.setFailOnCalls(false); + } + + @Test + public void testReplaceAttributes() throws InitializationException, IOException { + Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("both", "notifyValue"); + cachedAttributes.put("uuid", "notifyUuid"); + cachedAttributes.put("notify.only", "notifyValue"); + + service.put("key", cachedAttributes, new Wait.StringSerializer(), new FlowFileAttributesSerializer()); + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_REPLACE.getValue()); + + final Map waitAttributes = new HashMap<>(); + waitAttributes.put("releaseSignalAttribute", "key"); + waitAttributes.put("wait.only", "waitValue"); + waitAttributes.put("both", "waitValue"); + waitAttributes.put("uuid", UUID.randomUUID().toString()); + String flowFileContent = "content"; + runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); + + // make sure the key is in the cache before Wait runs + assertNotNull(service.get("key", new Wait.StringSerializer(), new FlowFileAttributesSerializer())); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); + runner.assertTransferCount(Wait.REL_SUCCESS, 1); + + final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + + // show a new attribute was copied from the cache + assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); + // show that uuid was not overwritten + assertEquals(waitAttributes.get("uuid"), outputFlowFile.getAttribute("uuid")); + // show that the original attributes are still there + assertEquals("waitValue", outputFlowFile.getAttribute("wait.only")); + + // here's the important part: show that the cached attribute replaces the original + assertEquals("notifyValue", outputFlowFile.getAttribute("both")); + runner.clearTransferState(); + + // make sure Wait removed this key from the cache + assertNull(service.get("key", new Wait.StringSerializer(), new FlowFileAttributesSerializer())); + } + + @Test + public void testKeepOriginalAttributes() throws InitializationException, IOException { + Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("both", "notifyValue"); + cachedAttributes.put("uuid", "notifyUuid"); + cachedAttributes.put("notify.only", "notifyValue"); + + service.put("key", cachedAttributes, new Wait.StringSerializer(), new FlowFileAttributesSerializer()); + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()); + + final Map waitAttributes = new HashMap<>(); + waitAttributes.put("releaseSignalAttribute", "key"); + waitAttributes.put("wait.only", "waitValue"); + waitAttributes.put("both", "waitValue"); + waitAttributes.put("uuid", UUID.randomUUID().toString()); + String flowFileContent = "content"; + runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); + runner.assertTransferCount(Wait.REL_SUCCESS, 1); + + final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + + // show a new attribute was copied from the cache + assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); + // show that uuid was not overwritten + assertEquals(waitAttributes.get("uuid"), outputFlowFile.getAttribute("uuid")); + // show that the original attributes are still there + assertEquals("waitValue", outputFlowFile.getAttribute("wait.only")); + + // here's the important part: show that the original attribute is kept + assertEquals("waitValue", outputFlowFile.getAttribute("both")); + runner.clearTransferState(); + } + + private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { + private final ConcurrentMap values = new ConcurrentHashMap<>(); + private boolean failOnCalls = false; + + public void setFailOnCalls(boolean failOnCalls){ + this.failOnCalls = failOnCalls; + } + + + private void verifyNotFail() throws IOException { + if (failOnCalls) { + throw new IOException("Could not call to remote service because Unit Test marked service unavailable"); + } + } + + @Override + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + verifyNotFail(); + final Object retValue = values.putIfAbsent(key, value); + return (retValue == null); + } + + @Override + @SuppressWarnings("unchecked") + public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, + final Deserializer valueDeserializer) throws IOException { + verifyNotFail(); + return (V) values.putIfAbsent(key, value); + } + + @Override + public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { + verifyNotFail(); + return values.containsKey(key); + } + + @Override + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + verifyNotFail(); + values.put(key, value); + } + + @Override + @SuppressWarnings("unchecked") + public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + verifyNotFail(); + if(values.containsKey(key)) { + return (V) values.get(key); + } else { + return null; + } + } + + @Override + public void close() throws IOException { + } + + @Override + public boolean remove(final K key, final Serializer serializer) throws IOException { + verifyNotFail(); + values.remove(key); + return true; + } + } + + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestFlowFileAttributesSerializer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestFlowFileAttributesSerializer.java new file mode 100644 index 0000000000..8574762ca7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestFlowFileAttributesSerializer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.junit.Assert; +import org.junit.Test; + +public class TestFlowFileAttributesSerializer { + + private FlowFileAttributesSerializer serializer = new FlowFileAttributesSerializer(); + + @Test + public void testBothWays() throws SerializationException, IOException { + Map attributes = new HashMap<>(); + attributes.put("a", "1"); + attributes.put("b", "2"); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + serializer.serialize(attributes, output); + output.flush(); + + Map result = serializer.deserialize(output.toByteArray()); + Assert.assertEquals(attributes, result); + } + + @Test + public void testEmptyIsNull() throws SerializationException, IOException { + Map attributes = new HashMap<>(); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + serializer.serialize(attributes, output); + output.flush(); + + Map result = serializer.deserialize(output.toByteArray()); + Assert.assertNull(result); + } + + @Test + public void testEmptyIsNull2() throws SerializationException, IOException { + Map result = serializer.deserialize("".getBytes()); + Assert.assertNull(result); + } + + @Test + public void testNullIsNull() throws SerializationException, IOException { + Map result = serializer.deserialize(null); + Assert.assertNull(result); + } +} \ No newline at end of file