mirror of https://github.com/apache/nifi.git
NIFI-1055: Fixed checkstyle violations
This commit is contained in:
parent
5d90c9be07
commit
0fc5d30461
|
@ -50,7 +50,7 @@ import org.apache.nifi.processor.io.StreamCallback;
|
||||||
|
|
||||||
@SideEffectFree
|
@SideEffectFree
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@Tags({ "json", "avro", "binary" })
|
@Tags({"json", "avro", "binary"})
|
||||||
@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
|
@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
|
||||||
+ "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
|
+ "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
|
||||||
+ "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
|
+ "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
|
||||||
|
@ -60,41 +60,41 @@ public class ConvertAvroToJSON extends AbstractProcessor {
|
||||||
protected static final String CONTAINER_ARRAY = "array";
|
protected static final String CONTAINER_ARRAY = "array";
|
||||||
protected static final String CONTAINER_NONE = "none";
|
protected static final String CONTAINER_NONE = "none";
|
||||||
|
|
||||||
static final PropertyDescriptor CONTAINER_OPTIONS
|
static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder()
|
||||||
= new PropertyDescriptor.Builder()
|
.name("JSON container options")
|
||||||
.name("JSON container options")
|
.description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE
|
||||||
.description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
|
+ ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
|
||||||
.allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
|
.allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
|
||||||
.required(true)
|
.required(true)
|
||||||
.defaultValue(CONTAINER_ARRAY)
|
.defaultValue(CONTAINER_ARRAY)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
.name("success")
|
.name("success")
|
||||||
.description("A FlowFile is routed to this relationship after it has been converted to JSON")
|
.description("A FlowFile is routed to this relationship after it has been converted to JSON")
|
||||||
.build();
|
.build();
|
||||||
static final Relationship REL_FAILURE = new Relationship.Builder()
|
static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
.name("failure")
|
.name("failure")
|
||||||
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
|
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private List<PropertyDescriptor> properties;
|
private List<PropertyDescriptor> properties;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void init(ProcessorInitializationContext context) {
|
protected void init(ProcessorInitializationContext context) {
|
||||||
super.init(context);
|
super.init(context);
|
||||||
|
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
properties.add(CONTAINER_OPTIONS);
|
properties.add(CONTAINER_OPTIONS);
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<Relationship> getRelationships() {
|
public Set<Relationship> getRelationships() {
|
||||||
final Set<Relationship> rels = new HashSet<>();
|
final Set<Relationship> rels = new HashSet<>();
|
||||||
|
@ -118,8 +118,8 @@ public class ConvertAvroToJSON extends AbstractProcessor {
|
||||||
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
|
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
|
||||||
try (final InputStream in = new BufferedInputStream(rawIn);
|
try (final InputStream in = new BufferedInputStream(rawIn);
|
||||||
|
|
||||||
final OutputStream out = new BufferedOutputStream(rawOut);
|
final OutputStream out = new BufferedOutputStream(rawOut);
|
||||||
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
|
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
|
||||||
|
|
||||||
final GenericData genericData = GenericData.get();
|
final GenericData genericData = GenericData.get();
|
||||||
GenericRecord record = reader.next();
|
GenericRecord record = reader.next();
|
||||||
|
|
|
@ -58,7 +58,6 @@ public class TestCSVToAvroProcessor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Basic test for tab separated files, similar to #test
|
* Basic test for tab separated files, similar to #test
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testTabSeparatedConversion() throws IOException {
|
public void testTabSeparatedConversion() throws IOException {
|
||||||
|
|
|
@ -31,10 +31,12 @@ import java.util.regex.Pattern;
|
||||||
import javax.servlet.Servlet;
|
import javax.servlet.Servlet;
|
||||||
import javax.ws.rs.Path;
|
import javax.ws.rs.Path;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
|
|
||||||
import org.apache.nifi.stream.io.StreamThrottler;
|
|
||||||
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
|
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
|
||||||
import org.apache.nifi.processor.DataUnit;
|
import org.apache.nifi.processor.DataUnit;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
@ -42,15 +44,12 @@ import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
import org.apache.nifi.processor.ProcessSessionFactory;
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
|
import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
|
||||||
import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
|
import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
|
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
|
||||||
|
import org.apache.nifi.stream.io.StreamThrottler;
|
||||||
import org.eclipse.jetty.server.Connector;
|
import org.eclipse.jetty.server.Connector;
|
||||||
import org.eclipse.jetty.server.HttpConfiguration;
|
import org.eclipse.jetty.server.HttpConfiguration;
|
||||||
import org.eclipse.jetty.server.HttpConnectionFactory;
|
import org.eclipse.jetty.server.HttpConnectionFactory;
|
||||||
|
@ -70,56 +69,56 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
||||||
private List<PropertyDescriptor> properties;
|
private List<PropertyDescriptor> properties;
|
||||||
|
|
||||||
public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
|
public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
|
||||||
.name("success")
|
.name("success")
|
||||||
.description("Relationship for successfully received FlowFiles")
|
.description("Relationship for successfully received FlowFiles")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
|
||||||
.name("Base Path")
|
.name("Base Path")
|
||||||
.description("Base path for incoming connections")
|
.description("Base path for incoming connections")
|
||||||
.required(true)
|
.required(true)
|
||||||
.defaultValue("contentListener")
|
.defaultValue("contentListener")
|
||||||
.addValidator(StandardValidators.URI_VALIDATOR)
|
.addValidator(StandardValidators.URI_VALIDATOR)
|
||||||
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
|
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
|
||||||
.name("Listening Port")
|
.name("Listening Port")
|
||||||
.description("The Port to listen on for incoming connections")
|
.description("The Port to listen on for incoming connections")
|
||||||
.required(true)
|
.required(true)
|
||||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
|
||||||
.name("Authorized DN Pattern")
|
.name("Authorized DN Pattern")
|
||||||
.description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
|
.description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.defaultValue(".*")
|
.defaultValue(".*")
|
||||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder()
|
||||||
.name("Max Unconfirmed Flowfile Time")
|
.name("Max Unconfirmed Flowfile Time")
|
||||||
.description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
|
.description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache")
|
||||||
.required(true)
|
.required(true)
|
||||||
.defaultValue("60 secs")
|
.defaultValue("60 secs")
|
||||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
|
||||||
.name("Max Data to Receive per Second")
|
.name("Max Data to Receive per Second")
|
||||||
.description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
|
.description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
|
||||||
.required(false)
|
.required(false)
|
||||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||||
.name("SSL Context Service")
|
.name("SSL Context Service")
|
||||||
.description("The Controller Service to use in order to obtain an SSL Context")
|
.description("The Controller Service to use in order to obtain an SSL Context")
|
||||||
.required(false)
|
.required(false)
|
||||||
.identifiesControllerService(SSLContextService.class)
|
.identifiesControllerService(SSLContextService.class)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
|
||||||
.name("HTTP Headers to receive as Attributes (Regex)")
|
.name("HTTP Headers to receive as Attributes (Regex)")
|
||||||
.description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
|
.description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
|
||||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
|
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
|
||||||
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
|
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
|
||||||
|
@ -173,7 +172,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
||||||
toShutdown.stop();
|
toShutdown.stop();
|
||||||
toShutdown.destroy();
|
toShutdown.destroy();
|
||||||
} catch (final Exception ex) {
|
} catch (final Exception ex) {
|
||||||
getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[]{ex});
|
getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[] {ex});
|
||||||
this.server = null;
|
this.server = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -235,18 +234,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
||||||
connector.setPort(port);
|
connector.setPort(port);
|
||||||
|
|
||||||
// add the connector to the server
|
// add the connector to the server
|
||||||
server.setConnectors(new Connector[]{connector});
|
server.setConnectors(new Connector[] {connector});
|
||||||
|
|
||||||
final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
|
final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
|
||||||
for (final Class<? extends Servlet> cls : getServerClasses()) {
|
for (final Class<? extends Servlet> cls : getServerClasses()) {
|
||||||
final Path path = cls.getAnnotation(Path.class);
|
final Path path = cls.getAnnotation(Path.class);
|
||||||
// Note: servlets must have a path annotation - this will NPE otherwise
|
// Note: servlets must have a path annotation - this will NPE otherwise
|
||||||
// also, servlets other than ListenHttpServlet must have a path starting with /
|
// also, servlets other than ListenHttpServlet must have a path starting with /
|
||||||
if(basePath.isEmpty() && !path.value().isEmpty()){
|
if (basePath.isEmpty() && !path.value().isEmpty()) {
|
||||||
// Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with //
|
// Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with //
|
||||||
contextHandler.addServlet(cls, path.value());
|
contextHandler.addServlet(cls, path.value());
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
contextHandler.addServlet(cls, "/" + basePath + path.value());
|
contextHandler.addServlet(cls, "/" + basePath + path.value());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -304,7 +302,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
||||||
for (final String id : findOldFlowFileIds(context)) {
|
for (final String id : findOldFlowFileIds(context)) {
|
||||||
final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
|
final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
|
||||||
if (wrapper != null) {
|
if (wrapper != null) {
|
||||||
getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[]{id});
|
getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[] {id});
|
||||||
wrapper.session.rollback();
|
wrapper.session.rollback();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
@ -33,24 +43,22 @@ import org.apache.nifi.distributed.cache.client.exception.SerializationException
|
||||||
import org.apache.nifi.expression.AttributeExpression.ResultType;
|
import org.apache.nifi.expression.AttributeExpression.ResultType;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ProcessorLog;
|
import org.apache.nifi.logging.ProcessorLog;
|
||||||
import org.apache.nifi.processor.*;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.DataUnit;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
@EventDriven
|
@EventDriven
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@Tags({"map", "cache", "put", "distributed"})
|
@Tags({"map", "cache", "put", "distributed"})
|
||||||
@CapabilityDescription("Gets the content of a FlowFile and puts it to a distributed map cache, using a cache key " +
|
@CapabilityDescription("Gets the content of a FlowFile and puts it to a distributed map cache, using a cache key " +
|
||||||
"computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
|
"computed from FlowFile attributes. If the cache already contains the entry and the cache update strategy is " +
|
||||||
"'keep original' the entry is not replaced.'")
|
"'keep original' the entry is not replaced.'")
|
||||||
@WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " +
|
@WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " +
|
||||||
"attribute is true, is the FlowFile is cached, otherwise false.")
|
"attribute is true, is the FlowFile is cached, otherwise false.")
|
||||||
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
|
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
|
||||||
public class PutDistributedMapCache extends AbstractProcessor {
|
public class PutDistributedMapCache extends AbstractProcessor {
|
||||||
|
|
||||||
|
@ -58,55 +66,55 @@ public class PutDistributedMapCache extends AbstractProcessor {
|
||||||
|
|
||||||
// Identifies the distributed map cache client
|
// Identifies the distributed map cache client
|
||||||
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
||||||
.name("Distributed Cache Service")
|
.name("Distributed Cache Service")
|
||||||
.description("The Controller Service that is used to cache flow files")
|
.description("The Controller Service that is used to cache flow files")
|
||||||
.required(true)
|
.required(true)
|
||||||
.identifiesControllerService(DistributedMapCacheClient.class)
|
.identifiesControllerService(DistributedMapCacheClient.class)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Selects the FlowFile attribute, whose value is used as cache key
|
// Selects the FlowFile attribute, whose value is used as cache key
|
||||||
public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
|
||||||
.name("Cache Entry Identifier")
|
.name("Cache Entry Identifier")
|
||||||
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
|
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " +
|
||||||
"be evaluated against a FlowFile in order to determine the cache key")
|
"be evaluated against a FlowFile in order to determine the cache key")
|
||||||
.required(true)
|
.required(true)
|
||||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
|
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
|
||||||
.expressionLanguageSupported(true)
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final AllowableValue CACHE_UPDATE_REPLACE = new AllowableValue("replace", "Replace if present",
|
public static final AllowableValue CACHE_UPDATE_REPLACE = new AllowableValue("replace", "Replace if present",
|
||||||
"Adds the specified entry to the cache, replacing any value that is currently set.");
|
"Adds the specified entry to the cache, replacing any value that is currently set.");
|
||||||
|
|
||||||
public static final AllowableValue CACHE_UPDATE_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original",
|
public static final AllowableValue CACHE_UPDATE_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original",
|
||||||
"Adds the specified entry to the cache, if the key does not exist.");
|
"Adds the specified entry to the cache, if the key does not exist.");
|
||||||
|
|
||||||
public static final PropertyDescriptor CACHE_UPDATE_STRATEGY = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor CACHE_UPDATE_STRATEGY = new PropertyDescriptor.Builder()
|
||||||
.name("Cache update strategy")
|
.name("Cache update strategy")
|
||||||
.description("Determines how the cache is updated if the cache already contains the entry")
|
.description("Determines how the cache is updated if the cache already contains the entry")
|
||||||
.required(true)
|
.required(true)
|
||||||
.allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
|
.allowableValues(CACHE_UPDATE_REPLACE, CACHE_UPDATE_KEEP_ORIGINAL)
|
||||||
.defaultValue(CACHE_UPDATE_REPLACE.getValue())
|
.defaultValue(CACHE_UPDATE_REPLACE.getValue())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor CACHE_ENTRY_MAX_BYTES = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor CACHE_ENTRY_MAX_BYTES = new PropertyDescriptor.Builder()
|
||||||
.name("Max cache entry size")
|
.name("Max cache entry size")
|
||||||
.description("The maximum amount of data to put into cache")
|
.description("The maximum amount of data to put into cache")
|
||||||
.required(false)
|
.required(false)
|
||||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||||
.defaultValue("1 MB")
|
.defaultValue("1 MB")
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(false)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
.name("success")
|
.name("success")
|
||||||
.description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
|
.description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
.name("failure")
|
.name("failure")
|
||||||
.description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
|
.description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
|
||||||
.build();
|
.build();
|
||||||
private final Set<Relationship> relationships;
|
private final Set<Relationship> relationships;
|
||||||
|
|
||||||
private final Serializer<String> keySerializer = new StringSerializer();
|
private final Serializer<String> keySerializer = new StringSerializer();
|
||||||
|
@ -207,7 +215,7 @@ public class PutDistributedMapCache extends AbstractProcessor {
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
|
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.distributed.cache.client.Deserializer;
|
import org.apache.nifi.distributed.cache.client.Deserializer;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
||||||
|
@ -26,22 +34,11 @@ import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
|
|
||||||
public class TestPutDistributedMapCache {
|
public class TestPutDistributedMapCache {
|
||||||
|
|
||||||
private TestRunner runner;
|
private TestRunner runner;
|
||||||
private MockCacheClient service;
|
private MockCacheClient service;
|
||||||
private PutDistributedMapCache processor;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws InitializationException {
|
public void setup() throws InitializationException {
|
||||||
|
@ -57,7 +54,7 @@ public class TestPutDistributedMapCache {
|
||||||
public void testNoCacheKey() throws InitializationException {
|
public void testNoCacheKey() throws InitializationException {
|
||||||
|
|
||||||
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
|
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${caheKeyAttribute}");
|
||||||
runner.enqueue(new byte[]{});
|
runner.enqueue(new byte[] {});
|
||||||
|
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
|
@ -99,7 +96,7 @@ public class TestPutDistributedMapCache {
|
||||||
props.put("caheKeyAttribute", "2");
|
props.put("caheKeyAttribute", "2");
|
||||||
|
|
||||||
// flow file without content
|
// flow file without content
|
||||||
runner.enqueue(new byte[]{}, props);
|
runner.enqueue(new byte[] {}, props);
|
||||||
|
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
|
@ -171,7 +168,7 @@ public class TestPutDistributedMapCache {
|
||||||
|
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
//we expect that the cache entry is replaced
|
// we expect that the cache entry is replaced
|
||||||
value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
|
value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
|
||||||
assertEquals(replaced, new String(value, "UTF-8"));
|
assertEquals(replaced, new String(value, "UTF-8"));
|
||||||
}
|
}
|
||||||
|
@ -215,7 +212,7 @@ public class TestPutDistributedMapCache {
|
||||||
|
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
//we expect that the cache entry is NOT replaced
|
// we expect that the cache entry is NOT replaced
|
||||||
value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
|
value = service.get("replaceme", new PutDistributedMapCache.StringSerializer(), new PutDistributedMapCache.CacheValueDeserializer());
|
||||||
assertEquals(original, new String(value, "UTF-8"));
|
assertEquals(original, new String(value, "UTF-8"));
|
||||||
}
|
}
|
||||||
|
@ -225,7 +222,7 @@ public class TestPutDistributedMapCache {
|
||||||
private boolean failOnCalls = false;
|
private boolean failOnCalls = false;
|
||||||
|
|
||||||
private void verifyNotFail() throws IOException {
|
private void verifyNotFail() throws IOException {
|
||||||
if ( failOnCalls ) {
|
if (failOnCalls) {
|
||||||
throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
|
throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -240,7 +237,7 @@ public class TestPutDistributedMapCache {
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
|
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
|
||||||
final Deserializer<V> valueDeserializer) throws IOException {
|
final Deserializer<V> valueDeserializer) throws IOException {
|
||||||
verifyNotFail();
|
verifyNotFail();
|
||||||
return (V) values.putIfAbsent(key, value);
|
return (V) values.putIfAbsent(key, value);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue