Merge remote-tracking branch 'apache/develop' into develop

This commit is contained in:
Brian Ghigiarelli 2015-06-26 11:21:49 -04:00
commit 97f33d5c51
34 changed files with 795 additions and 180 deletions

View File

@ -22,6 +22,7 @@ public interface LineageNode {
* @return the identifier of the Clustered NiFi Node that generated the * @return the identifier of the Clustered NiFi Node that generated the
* event * event
*/ */
@Deprecated
String getClusterNodeIdentifier(); String getClusterNodeIdentifier();
/** /**

View File

@ -276,6 +276,7 @@ language governing permissions and limitations under the License. -->
<nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size> <nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size>
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync> <nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
<nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count> <nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count>
<nifi.provenance.repository.max.attribute.length>65536</nifi.provenance.repository.max.attribute.length>
<!-- volatile provenance repository properties --> <!-- volatile provenance repository properties -->
<nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size> <nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size>

View File

@ -36,11 +36,13 @@ public class EventNode implements ProvenanceEventLineageNode {
return String.valueOf(getEventIdentifier()); return String.valueOf(getEventIdentifier());
} }
@Deprecated
@Override @Override
public String getClusterNodeIdentifier() { public String getClusterNodeIdentifier() {
return clusterNodeIdentifier; return clusterNodeIdentifier;
} }
@Deprecated
public void setClusterNodeIdentifier(final String nodeIdentifier) { public void setClusterNodeIdentifier(final String nodeIdentifier) {
this.clusterNodeIdentifier = nodeIdentifier; this.clusterNodeIdentifier = nodeIdentifier;
} }

View File

@ -39,6 +39,7 @@ public class FlowFileNode implements LineageNode {
return creationTime; return creationTime;
} }
@Deprecated
@Override @Override
public String getClusterNodeIdentifier() { public String getClusterNodeIdentifier() {
return clusterNodeIdentifier; return clusterNodeIdentifier;

View File

@ -233,8 +233,7 @@ public class NiFiProperties extends Properties {
* obtained. * obtained.
* *
* @return the NiFiProperties object to use * @return the NiFiProperties object to use
* @throws RuntimeException * @throws RuntimeException if unable to load properties file
* if unable to load properties file
*/ */
public static synchronized NiFiProperties getInstance() { public static synchronized NiFiProperties getInstance() {
if (null == instance) { if (null == instance) {
@ -794,7 +793,7 @@ public class NiFiProperties extends Properties {
final String scheme = (rawScheme == null) ? "http" : rawScheme; final String scheme = (rawScheme == null) ? "http" : rawScheme;
final String host; final String host;
final int port; final Integer port;
if ("http".equalsIgnoreCase(scheme)) { if ("http".equalsIgnoreCase(scheme)) {
// get host // get host
if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) { if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) {
@ -804,6 +803,10 @@ public class NiFiProperties extends Properties {
} }
// get port // get port
port = getPort(); port = getPort();
if (port == null) {
throw new RuntimeException(String.format("The %s must be specified if running in a cluster with %s set to false.", WEB_HTTP_PORT, CLUSTER_PROTOCOL_IS_SECURE));
}
} else { } else {
// get host // get host
if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) { if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) {
@ -813,6 +816,10 @@ public class NiFiProperties extends Properties {
} }
// get port // get port
port = getSslPort(); port = getSslPort();
if (port == null) {
throw new RuntimeException(String.format("The %s must be specified if running in a cluster with %s set to true.", WEB_HTTPS_PORT, CLUSTER_PROTOCOL_IS_SECURE));
}
} }
return InetSocketAddress.createUnresolved(host, port); return InetSocketAddress.createUnresolved(host, port);
@ -824,8 +831,7 @@ public class NiFiProperties extends Properties {
* configured. No directories will be created as a result of this operation. * configured. No directories will be created as a result of this operation.
* *
* @return database repository path * @return database repository path
* @throws InvalidPathException * @throws InvalidPathException If the configured path is invalid
* If the configured path is invalid
*/ */
public Path getDatabaseRepositoryPath() { public Path getDatabaseRepositoryPath() {
return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY)); return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY));
@ -836,8 +842,7 @@ public class NiFiProperties extends Properties {
* configured. No directories will be created as a result of this operation. * configured. No directories will be created as a result of this operation.
* *
* @return database repository path * @return database repository path
* @throws InvalidPathException * @throws InvalidPathException If the configured path is invalid
* If the configured path is invalid
*/ */
public Path getFlowFileRepositoryPath() { public Path getFlowFileRepositoryPath() {
return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY)); return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY));
@ -850,8 +855,7 @@ public class NiFiProperties extends Properties {
* operation. * operation.
* *
* @return file repositories paths * @return file repositories paths
* @throws InvalidPathException * @throws InvalidPathException If any of the configured paths are invalid
* If any of the configured paths are invalid
*/ */
public Map<String, Path> getContentRepositoryPaths() { public Map<String, Path> getContentRepositoryPaths() {
final Map<String, Path> contentRepositoryPaths = new HashMap<>(); final Map<String, Path> contentRepositoryPaths = new HashMap<>();

View File

@ -210,7 +210,7 @@ Here is an example entry using the name John Smith:
---- ----
<users> <users>
<user dn="[cn=John Smith,ou=people,dc=example,dc=com]"> <user dn="cn=John Smith,ou=people,dc=example,dc=com">
<role name="ROLE_ADMIN"/> <role name="ROLE_ADMIN"/>
</user> </user>
</users> </users>

View File

@ -22,14 +22,21 @@ import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
public class MockConfigurationContext implements ConfigurationContext { public class MockConfigurationContext implements ConfigurationContext {
private final Map<PropertyDescriptor, String> properties; private final Map<PropertyDescriptor, String> properties;
private final ControllerServiceLookup serviceLookup; private final ControllerServiceLookup serviceLookup;
private final ControllerService service;
public MockConfigurationContext(final Map<PropertyDescriptor, String> properties, final ControllerServiceLookup serviceLookup) { public MockConfigurationContext(final Map<PropertyDescriptor, String> properties, final ControllerServiceLookup serviceLookup) {
this(null, properties, serviceLookup);
}
public MockConfigurationContext(final ControllerService service, final Map<PropertyDescriptor, String> properties, final ControllerServiceLookup serviceLookup) {
this.service = service;
this.properties = properties; this.properties = properties;
this.serviceLookup = serviceLookup; this.serviceLookup = serviceLookup;
} }
@ -38,7 +45,7 @@ public class MockConfigurationContext implements ConfigurationContext {
public PropertyValue getProperty(final PropertyDescriptor property) { public PropertyValue getProperty(final PropertyDescriptor property) {
String value = properties.get(property); String value = properties.get(property);
if (value == null) { if (value == null) {
value = property.getDefaultValue(); value = getActualDescriptor(property).getDefaultValue();
} }
return new MockPropertyValue(value, serviceLookup); return new MockPropertyValue(value, serviceLookup);
} }
@ -47,4 +54,13 @@ public class MockConfigurationContext implements ConfigurationContext {
public Map<PropertyDescriptor, String> getProperties() { public Map<PropertyDescriptor, String> getProperties() {
return new HashMap<>(this.properties); return new HashMap<>(this.properties);
} }
private PropertyDescriptor getActualDescriptor(final PropertyDescriptor property) {
if (service == null) {
return property;
}
final PropertyDescriptor resolved = service.getPropertyDescriptor(property.getName());
return resolved == null ? property : resolved;
}
} }

View File

@ -39,6 +39,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -182,6 +183,11 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override @Override
public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) {
run(iterations, stopOnFinish, initialize, 5000);
}
@Override
public void run(final int iterations, final boolean stopOnFinish, final boolean initialize, final long runWait) {
if (iterations < 1) { if (iterations < 1) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
@ -207,6 +213,10 @@ public class StandardProcessorTestRunner implements TestRunner {
} }
executorService.shutdown(); executorService.shutdown();
try {
executorService.awaitTermination(runWait, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e1) {
}
int finishedCount = 0; int finishedCount = 0;
boolean unscheduledRun = false; boolean unscheduledRun = false;
@ -599,7 +609,7 @@ public class StandardProcessorTestRunner implements TestRunner {
} }
try { try {
final ConfigurationContext configContext = new MockConfigurationContext(configuration.getProperties(), context); final ConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), context);
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext); ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext);
} catch (final InvocationTargetException ite) { } catch (final InvocationTargetException ite) {
ite.getCause().printStackTrace(); ite.getCause().printStackTrace();

View File

@ -125,6 +125,51 @@ public interface TestRunner {
*/ */
void run(int iterations, boolean stopOnFinish, final boolean initialize); void run(int iterations, boolean stopOnFinish, final boolean initialize);
/**
* This method runs the {@link Processor} <code>iterations</code> times,
* using the sequence of steps below:
* <ul>
* <li>
* If {@code initialize} is true, run all methods on the Processor that are
* annotated with the
* {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. If
* any of these methods throws an Exception, the Unit Test will fail.
* </li>
* <li>
* Schedule the
* {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}
* method to be invoked <code>iterations</code> times. The number of threads
* used to run these iterations is determined by the ThreadCount of this
* <code>TestRunner</code>. By default, the value is set to 1, but it can be
* modified by calling the {@link #setThreadCount(int)} method.
* </li>
* <li>
* As soon as the first thread finishes its execution of
* {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger},
* all methods on the Processor that are annotated with the
* {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
* are invoked. If any of these methods throws an Exception, the Unit Test
* will fail.
* </li>
* <li>
* Waits for all threads to finish execution.
* </li>
* <li>
* If and only if the value of <code>shutdown</code> is true: Call all
* methods on the Processor that is annotated with the
* {@link nifi.processor.annotation.OnStopped @OnStopped} annotation.
* </li>
* </ul>
*
* @param iterations number of iterations
* @param stopOnFinish whether or not to run the Processor methods that are
* annotated with {@link nifi.processor.annotation.OnStopped @OnStopped}
* @param initialize true if must initialize
* @param runWait indicates the amount of time in milliseconds that the framework should wait for
* processors to stop running before calling the {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
*/
void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait);
/** /**
* Invokes all methods on the Processor that are annotated with the * Invokes all methods on the Processor that are annotated with the
* {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If * {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.junit.Assert;
import org.junit.Test;
public class CurrentTestStandardProcessorTestRunner {
/**
* This test will verify that all iterations of the run are finished before unscheduled is called
*/
@Test
public void testOnScheduledCalledAfterRunFinished() {
SlowRunProcessor processor = new SlowRunProcessor();
StandardProcessorTestRunner runner = new StandardProcessorTestRunner(processor);
final int iterations = 5;
runner.run(iterations);
// if the counter is not equal to iterations, the the processor must have been unscheduled
// before all the run calls were made, that would be bad.
Assert.assertEquals(iterations, processor.getCounter());
}
/**
* This processor simulates a "slow" processor that checks whether it is scheduled before doing something
*
*
*/
private static class SlowRunProcessor extends AbstractProcessor {
private int counter = 0;
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
try {
// be slow
Thread.sleep(50);
// make sure we are still scheduled
if (isScheduled()) {
// increment counter
++counter;
}
} catch (InterruptedException e) {
}
}
public int getCounter() {
return counter;
}
}
}

View File

@ -188,7 +188,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
return; return;
} }
if ((primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null)) { if (primaryEntry == null && restoreEntry != null || primaryEntry != null && restoreEntry == null) {
throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
} }
@ -352,7 +352,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
final File[] files = dir.listFiles(new FilenameFilter() { final File[] files = dir.listFiles(new FilenameFilter() {
@Override @Override
public boolean accept(File dir, String name) { public boolean accept(File dir, String name) {
return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT)); return name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT);
} }
}); });
@ -515,19 +515,10 @@ public class DataFlowDaoImpl implements DataFlowDao {
final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
dataFlow.setAutoStartProcessors(autoStart); dataFlow.setAutoStartProcessors(autoStart);
return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes); return new ClusterDataFlow(dataFlow, clusterMetadata == null ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes);
} }
private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException { private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {
// get the data flow
DataFlow dataFlow = clusterDataFlow.getDataFlow();
// if no dataflow, then write a new dataflow
if (dataFlow == null) {
dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]);
}
// setup the cluster metadata // setup the cluster metadata
final ClusterMetadata clusterMetadata = new ClusterMetadata(); final ClusterMetadata clusterMetadata = new ClusterMetadata();
clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId()); clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId());

View File

@ -260,7 +260,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final FlowFileRecord clone = builder.build(); final FlowFileRecord clone = builder.build();
final StandardRepositoryRecord newRecord = new StandardRepositoryRecord(destination.getFlowFileQueue()); final StandardRepositoryRecord newRecord = new StandardRepositoryRecord(destination.getFlowFileQueue());
getProvenanceReporter().clone(currRec, clone); provenanceReporter.clone(currRec, clone, false);
final ContentClaim claim = clone.getContentClaim(); final ContentClaim claim = clone.getContentClaim();
if (claim != null) { if (claim != null) {

View File

@ -327,7 +327,13 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
@Override @Override
public void clone(final FlowFile parent, final FlowFile child) { public void clone(final FlowFile parent, final FlowFile child) {
verifyFlowFileKnown(child); clone(parent, child, true);
}
void clone(final FlowFile parent, final FlowFile child, final boolean verifyFlowFile) {
if (verifyFlowFile) {
verifyFlowFileKnown(child);
}
try { try {
final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE); final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE);

View File

@ -140,6 +140,10 @@ install() {
cp $0 $SVC_FILE cp $0 $SVC_FILE
sed -i s:NIFI_HOME=.*:NIFI_HOME="$NIFI_HOME": $SVC_FILE sed -i s:NIFI_HOME=.*:NIFI_HOME="$NIFI_HOME": $SVC_FILE
sed -i s:PROGNAME=.*:PROGNAME=$(basename "$0"): $SVC_FILE sed -i s:PROGNAME=.*:PROGNAME=$(basename "$0"): $SVC_FILE
rm -f /etc/rc2.d/S65${SVC_NAME}
ln -s /etc/init.d/$SVC_NAME /etc/rc2.d/S65${SVC_NAME}
rm -f /etc/rc2.d/K65${SVC_NAME}
ln -s /etc/init.d/$SVC_NAME /etc/rc2.d/K65${SVC_NAME}
echo Service $SVC_NAME installed echo Service $SVC_NAME installed
} }

View File

@ -82,6 +82,9 @@ nifi.provenance.repository.indexed.attributes=${nifi.provenance.repository.index
# Large values for the shard size will result in more Java heap usage when searching the Provenance Repository # Large values for the shard size will result in more Java heap usage when searching the Provenance Repository
# but should provide better performance # but should provide better performance
nifi.provenance.repository.index.shard.size=${nifi.provenance.repository.index.shard.size} nifi.provenance.repository.index.shard.size=${nifi.provenance.repository.index.shard.size}
# Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from
# the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved.
nifi.provenance.repository.max.attribute.length=${nifi.provenance.repository.max.attribute.length}
# Volatile Provenance Respository Properties # Volatile Provenance Respository Properties
nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size} nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}

View File

@ -115,6 +115,7 @@ div.nifi-tooltip {
border: 1px solid #454545; border: 1px solid #454545;
background-color: #FFFFA3; background-color: #FFFFA3;
color: #454545; color: #454545;
max-width: 500px;
} }
.ellipsis { .ellipsis {

View File

@ -811,7 +811,7 @@ nf.Canvas = (function () {
// bulletins for this processor are now gone // bulletins for this processor are now gone
if (bulletins.length === 0) { if (bulletins.length === 0) {
if (bulletinIcon.data('qtip')) { if (bulletinIcon.data('qtip')) {
bulletinIcon.removeClass('has-bulletins').qtip('destroy'); bulletinIcon.removeClass('has-bulletins').qtip('api').destroy(true);
} }
// hide the icon // hide the icon

View File

@ -264,7 +264,7 @@ nf.ControllerService = (function () {
}, nf.CanvasUtils.config.systemTooltipConfig)); }, nf.CanvasUtils.config.systemTooltipConfig));
} }
} else if (icon.data('qtip')) { } else if (icon.data('qtip')) {
icon.qtip('destroy'); icon.qtip('api').destroy(true);
} }
return state; return state;
}); });
@ -294,7 +294,7 @@ nf.ControllerService = (function () {
}, nf.CanvasUtils.config.systemTooltipConfig)); }, nf.CanvasUtils.config.systemTooltipConfig));
} }
} else if (icon.data('qtip')) { } else if (icon.data('qtip')) {
icon.qtip('destroy'); icon.qtip('api').destroy(true);
} }
return state; return state;
}); });

View File

@ -184,6 +184,23 @@ nf.Common = {
* @argument {string} error The error * @argument {string} error The error
*/ */
handleAjaxError: function (xhr, status, error) { handleAjaxError: function (xhr, status, error) {
// if an error occurs while the splash screen is visible close the canvas show the error message
if ($('#splash').is(':visible')) {
$('#message-title').text('An unexpected error has occurred');
if ($.trim(xhr.responseText) === '') {
$('#message-content').text('Please check the logs.');
} else {
$('#message-content').text(xhr.responseText);
}
// show the error pane
$('#message-pane').show();
// close the canvas
nf.Common.closeCanvas();
return;
}
// status code 400, 404, and 409 are expected response codes for common errors. // status code 400, 404, and 409 are expected response codes for common errors.
if (xhr.status === 400 || xhr.status === 404 || xhr.status === 409) { if (xhr.status === 400 || xhr.status === 404 || xhr.status === 409) {
nf.Dialog.showOkDialog({ nf.Dialog.showOkDialog({

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
@ -39,6 +40,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
@ -58,7 +61,7 @@ import org.apache.nifi.util.LongHolder;
import scala.actors.threadpool.Arrays; import scala.actors.threadpool.Arrays;
@SupportsBatching @SupportsBatching
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
public class PutKafka extends AbstractProcessor { public class PutKafka extends AbstractProcessor {
@ -66,94 +69,190 @@ public class PutKafka extends AbstractProcessor {
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to" public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to"
+ " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
+ " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than"
+ " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes"); + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after" public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after"
+ " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result"
+ " in data loss."); + " in data loss.");
/**
* AllowableValue for a Producer Type that synchronously sends messages to Kafka
*/
public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately.");
/**
* AllowableValue for a Producer Type that asynchronously sends messages to Kafka
*/
public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka."
+ " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data.");
/**
* AllowableValue for sending messages to Kafka without compression
*/
public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", "Compression will not be used for any topic.");
/**
* AllowableValue for sending messages to Kafka with GZIP compression
*/
public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", "Compress messages using GZIP");
/**
* AllowableValue for sending messages to Kafka with Snappy compression
*/
public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
.name("Known Brokers") .name("Known Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true) .required(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.build(); .build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("Topic Name") .name("Topic Name")
.description("The Kafka Topic of interest") .description("The Kafka Topic of interest")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("Kafka Key") .name("Kafka Key")
.description("The Key to use for the Message") .description("The Key to use for the Message")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
.name("Delivery Guarantee") .name("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka")
.required(true) .required(true)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue()) .defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build(); .build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter") .name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ "If not specified, the entire content of the FlowFile will be used as a single message. " + "If not specified, the entire content of the FlowFile will be used as a single message. "
+ "If specified, the contents of the FlowFile will be split on this delimiter and each section " + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ "sent as a separate Kafka message.") + "sent as a separate Kafka message.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Buffer Size") .name("Max Buffer Size")
.description("The maximum amount of data to buffer in memory before sending to Kafka") .description("The maximum amount of data to buffer in memory before sending to Kafka")
.required(true) .required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue("1 MB") .defaultValue("1 MB")
.build(); .build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout") .name("Communications Timeout")
.description("The amount of time to wait for a response from Kafka before determining that there is a communications error") .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
.required(true) .required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue("30 secs") .defaultValue("30 secs")
.build(); .build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name") .name("Client Name")
.description("Client Name to use when communicating with Kafka") .description("Client Name to use when communicating with Kafka")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.build(); .build();
public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
.name("Producer Type")
.description("This parameter specifies whether the messages are sent asynchronously in a background thread.")
.required(true)
.allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
.build();
public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
.name("Async Batch Size")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
+ " The producer will wait until either this number of messages are ready"
+ " to send or \"Queue Buffering Max Time\" is reached.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("200")
.build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder()
.name("Queue Buffering Max Time")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms"
+ " will try to batch together 100ms of messages to send at once. This will improve"
+ " throughput but adds message delivery latency due to the buffering.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("5 secs")
.build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
.name("Queue Buffer Max Count")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The maximum number of unsent messages that can be queued up in the producer when"
+ " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000")
.build();
public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder()
.name("Queue Enqueue Timeout")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The amount of time to block before dropping messages when running in "
+ PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
+ " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will"
+ " be enqueued immediately or dropped if the queue is full (the producer send call will"
+ " never block). If not set, the producer will block indefinitely and never willingly"
+ " drop a send.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("Compression Codec")
.description("This parameter allows you to specify the compression codec for all"
+ " data generated by this producer.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY)
.defaultValue(COMPRESSION_CODEC_NONE.getValue())
.build();
public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder()
.name("Compressed Topics")
.description("This parameter allows you to set whether compression should be turned on"
+ " for particular topics. If the compression codec is anything other than"
+ " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any."
+ " If the list of compressed topics is empty, then enable the specified"
+ " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + ","
+ " compression is disabled for all topics")
.required(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
.build(); .build();
public static final Relationship REL_FAILURE = new Relationship.Builder() public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
.build(); .build();
private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>(); private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientName = new PropertyDescriptor.Builder() final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CLIENT_NAME) .fromPropertyDescriptor(CLIENT_NAME)
.defaultValue("NiFi-" + getIdentifier()) .defaultValue("NiFi-" + getIdentifier())
.build(); .build();
final List<PropertyDescriptor> props = new ArrayList<>(); final List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEED_BROKERS); props.add(SEED_BROKERS);
@ -163,10 +262,32 @@ public class PutKafka extends AbstractProcessor {
props.add(MESSAGE_DELIMITER); props.add(MESSAGE_DELIMITER);
props.add(MAX_BUFFER_SIZE); props.add(MAX_BUFFER_SIZE);
props.add(TIMEOUT); props.add(TIMEOUT);
props.add(PRODUCER_TYPE);
props.add(BATCH_NUM_MESSAGES);
props.add(QUEUE_BUFFERING_MAX_MESSAGES);
props.add(QUEUE_BUFFERING_MAX);
props.add(QUEUE_ENQUEUE_TIMEOUT);
props.add(COMPRESSION_CODEC);
props.add(COMPRESSED_TOPICS);
props.add(clientName); props.add(clientName);
return props; return props;
} }
@Override
public Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger();
final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger();
if (batchMessages > bufferMaxMessages) {
errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false)
.explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build());
}
return errors;
}
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>(1); final Set<Relationship> relationships = new HashSet<>(1);
@ -194,7 +315,27 @@ public class PutKafka extends AbstractProcessor {
properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue())); properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
properties.setProperty("message.send.max.retries", "1"); properties.setProperty("message.send.max.retries", "1");
properties.setProperty("producer.type", "sync"); properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue());
properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue());
final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
if (queueBufferingMillis != null) {
properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis));
}
properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
final Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
if (queueEnqueueTimeoutMillis != null) {
properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis));
}
final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
properties.setProperty("compression.codec", compressionCodec);
final String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue();
if (compressedTopics != null) {
properties.setProperty("compressed.topics", compressedTopics);
}
return new ProducerConfig(properties); return new ProducerConfig(properties);
} }
@ -204,7 +345,7 @@ public class PutKafka extends AbstractProcessor {
} }
private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) { private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
Producer<byte[], byte[]> producer = producers.poll(); final Producer<byte[], byte[]> producer = producers.poll();
return producer == null ? createProducer(context) : producer; return producer == null ? createProducer(context) : producer;
} }
@ -214,7 +355,7 @@ public class PutKafka extends AbstractProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get(); final FlowFile flowFile = session.get();
if (flowFile == null) { if (flowFile == null) {
return; return;
} }
@ -222,7 +363,7 @@ public class PutKafka extends AbstractProcessor {
final long start = System.nanoTime(); final long start = System.nanoTime();
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8);
String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
if (delimiter != null) { if (delimiter != null) {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
@ -255,9 +396,9 @@ public class PutKafka extends AbstractProcessor {
session.getProvenanceReporter().send(flowFile, "kafka://" + topic); session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[]{flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[]{flowFile, e}); getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e });
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
error = true; error = true;
} finally { } finally {
@ -292,7 +433,7 @@ public class PutKafka extends AbstractProcessor {
int nextByte; int nextByte;
try (final InputStream bufferedIn = new BufferedInputStream(rawIn); try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
// read until we're out of data. // read until we're out of data.
while (!streamFinished) { while (!streamFinished) {
@ -380,7 +521,7 @@ public class PutKafka extends AbstractProcessor {
final long nanos = System.nanoTime() - start; final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages");
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[]{messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] { messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
} catch (final ProcessException pe) { } catch (final ProcessException pe) {
error = true; error = true;
@ -390,7 +531,7 @@ public class PutKafka extends AbstractProcessor {
final long offset = lastMessageOffset.get(); final long offset = lastMessageOffset.get();
if (offset == 0L) { if (offset == 0L) {
// all of the messages failed to send. Route FlowFile to failure // all of the messages failed to send. Route FlowFile to failure
getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[]{flowFile, pe.getCause()}); getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() });
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} else { } else {
// Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
@ -398,8 +539,8 @@ public class PutKafka extends AbstractProcessor {
final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into" getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into"
+ " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[]{ + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause()}); messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
session.transfer(successfulMessages, REL_SUCCESS); session.transfer(successfulMessages, REL_SUCCESS);
session.transfer(failedMessages, REL_FAILURE); session.transfer(failedMessages, REL_FAILURE);

View File

@ -28,6 +28,7 @@ import java.util.Map;
import kafka.common.FailedToSendMessageException; import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer; import kafka.javaapi.producer.Producer;
import kafka.message.CompressionCodec;
import kafka.producer.KeyedMessage; import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; import kafka.producer.ProducerConfig;
@ -41,6 +42,8 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import scala.collection.Seq;
public class TestPutKafka { public class TestPutKafka {
@Test @Test
@ -191,6 +194,24 @@ public class TestPutKafka {
runner.setProperty(PutKafka.TIMEOUT, "3 secs"); runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
keyValuePutExecute(runner);
}
@Test
@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
public void testKeyValuePutAsync() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
runner.setProperty(PutKafka.KEY, "${kafka.key}");
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
keyValuePutExecute(runner);
}
private void keyValuePutExecute(final TestRunner runner) {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", "test"); attributes.put("kafka.topic", "test");
attributes.put("kafka.key", "key3"); attributes.put("kafka.key", "key3");
@ -210,6 +231,140 @@ public class TestPutKafka {
assertTrue(Arrays.equals(data, mff.toByteArray())); assertTrue(Arrays.equals(data, mff.toByteArray()));
} }
@Test
public void testProducerConfigDefault() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
// Check the codec
final CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.NoCompressionCodec$);
// Check compressed topics
final Seq<String> compressedTopics = config.compressedTopics();
assertEquals(0, compressedTopics.size());
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType);
}
@Test
public void testProducerConfigAsyncWithCompression() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue());
runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
// Check that the codec is snappy
final CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$);
// Check compressed topics
final Seq<String> compressedTopics = config.compressedTopics();
assertEquals(3, compressedTopics.size());
assertTrue(compressedTopics.contains("topic01"));
assertTrue(compressedTopics.contains("topic02"));
assertTrue(compressedTopics.contains("topic03"));
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
}
@Test
public void testProducerConfigAsyncQueueThresholds() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs");
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535");
runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
// Check that the queue thresholds were properly translated
assertEquals(7000, config.queueBufferingMaxMs());
assertEquals(535, config.queueBufferingMaxMessages());
assertEquals(200, config.queueEnqueueTimeoutMs());
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
}
@Test
public void testProducerConfigInvalidBatchSize() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200");
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100");
runner.assertNotValid();
}
@Test
public void testProducerConfigAsyncDefaultEnqueueTimeout() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
// Do not set QUEUE_ENQUEUE_TIMEOUT
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
// Check that the enqueue timeout defaults to -1
assertEquals(-1, config.queueEnqueueTimeoutMs());
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
}
private static class TestableProcessor extends PutKafka { private static class TestableProcessor extends PutKafka {
private MockProducer producer; private MockProducer producer;
@ -236,6 +391,14 @@ public class TestPutKafka {
public MockProducer getProducer() { public MockProducer getProducer() {
return producer; return producer;
} }
/**
* Exposed for test verification
*/
@Override
public ProducerConfig createConfig(final ProcessContext context) {
return super.createConfig(context);
}
} }
private static class MockProducer extends Producer<byte[], byte[]> { private static class MockProducer extends Producer<byte[], byte[]> {

View File

@ -85,7 +85,7 @@ public class IndexConfiguration {
} }
private Long getFirstEntryTime(final File provenanceLogFile) { private Long getFirstEntryTime(final File provenanceLogFile) {
try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null)) { try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) {
final StandardProvenanceEventRecord firstRecord = reader.nextRecord(); final StandardProvenanceEventRecord firstRecord = reader.nextRecord();
if (firstRecord == null) { if (firstRecord == null) {
return provenanceLogFile.lastModified(); return provenanceLogFile.lastModified();

View File

@ -134,6 +134,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private final IndexManager indexManager; private final IndexManager indexManager;
private final boolean alwaysSync; private final boolean alwaysSync;
private final int rolloverCheckMillis; private final int rolloverCheckMillis;
private final int maxAttributeChars;
private final ScheduledExecutorService scheduledExecService; private final ScheduledExecutorService scheduledExecService;
private final ScheduledExecutorService rolloverExecutor; private final ScheduledExecutorService rolloverExecutor;
@ -167,6 +168,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
this.configuration = configuration; this.configuration = configuration;
this.maxAttributeChars = configuration.getMaxAttributeChars();
for (final File file : configuration.getStorageDirectories()) { for (final File file : configuration.getStorageDirectories()) {
final Path storageDirectory = file.toPath(); final Path storageDirectory = file.toPath();
@ -289,6 +291,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false")); final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false"));
final int defaultMaxAttrChars = 65536;
final String maxAttrLength = properties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars));
int maxAttrChars;
try {
maxAttrChars = Integer.parseInt(maxAttrLength);
// must be at least 36 characters because that's the length of the uuid attribute,
// which must be kept intact
if (maxAttrChars < 36) {
maxAttrChars = 36;
logger.warn("Found max attribute length property set to " + maxAttrLength + " but minimum length is 36; using 36 instead");
}
} catch (final Exception e) {
maxAttrChars = defaultMaxAttrChars;
}
final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true); final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
final List<SearchableField> searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false); final List<SearchableField> searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false);
@ -310,6 +327,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
config.setMaxStorageCapacity(maxStorageBytes); config.setMaxStorageCapacity(maxStorageBytes);
config.setQueryThreadPoolSize(queryThreads); config.setQueryThreadPoolSize(queryThreads);
config.setJournalCount(journalCount); config.setJournalCount(journalCount);
config.setMaxAttributeChars(maxAttrChars);
if (shardSize != null) { if (shardSize != null) {
config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue()); config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
@ -337,6 +355,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return writers; return writers;
} }
/**
* @return the maximum number of characters that any Event attribute should contain. If the event contains
* more characters than this, the attribute may be truncated on retrieval
*/
public int getMaxAttributeCharacters() {
return maxAttributeChars;
}
@Override @Override
public StandardProvenanceEventRecord.Builder eventBuilder() { public StandardProvenanceEventRecord.Builder eventBuilder() {
return new StandardProvenanceEventRecord.Builder(); return new StandardProvenanceEventRecord.Builder();
@ -362,7 +388,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
for (final Path path : paths) { for (final Path path : paths) {
try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) { try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles(), maxAttributeChars)) {
// if this is the first record, try to find out the block index and jump directly to // if this is the first record, try to find out the block index and jump directly to
// the block index. This avoids having to read through a lot of data that we don't care about // the block index. This avoids having to read through a lot of data that we don't care about
// just to get to the first record that we want. // just to get to the first record that we want.
@ -377,7 +403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
StandardProvenanceEventRecord record; StandardProvenanceEventRecord record;
while (records.size() < maxRecords && ((record = reader.nextRecord()) != null)) { while (records.size() < maxRecords && (record = reader.nextRecord()) != null) {
if (record.getEventId() >= firstRecordId) { if (record.getEventId() >= firstRecordId) {
records.add(record); records.add(record);
} }
@ -507,7 +533,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
if (maxIdFile != null) { if (maxIdFile != null) {
// Determine the max ID in the last file. // Determine the max ID in the last file.
try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) { try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles(), maxAttributeChars)) {
final long eventId = reader.getMaxEventId(); final long eventId = reader.getMaxEventId();
if (eventId > maxId) { if (eventId > maxId) {
maxId = eventId; maxId = eventId;
@ -571,7 +597,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// Read the records in the last file to find its max id // Read the records in the last file to find its max id
if (greatestMinIdFile != null) { if (greatestMinIdFile != null) {
try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) { try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path> emptyList(), maxAttributeChars)) {
maxId = recordReader.getMaxEventId(); maxId = recordReader.getMaxEventId();
} }
} }
@ -1224,7 +1250,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try { try {
for (final File journalFile : journalFiles) { for (final File journalFile : journalFiles) {
try { try {
readers.add(RecordReaders.newRecordReader(journalFile, null)); // Use MAX_VALUE for number of chars because we don't want to truncate the value as we write it
// out. This allows us to later decide that we want more characters and still be able to retrieve
// the entire event.
readers.add(RecordReaders.newRecordReader(journalFile, null, Integer.MAX_VALUE));
} catch (final EOFException eof) { } catch (final EOFException eof) {
// there's nothing here. Skip over it. // there's nothing here. Skip over it.
} catch (final IOException ioe) { } catch (final IOException ioe) {
@ -1314,7 +1343,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
indexingAction.index(record, indexWriter, blockIndex); indexingAction.index(record, indexWriter, blockIndex);
maxId = record.getEventId(); maxId = record.getEventId();
latestRecords.add(record); latestRecords.add(truncateAttributes(record));
records++; records++;
// Remove this entry from the map // Remove this entry from the map
@ -1383,6 +1412,39 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return writerFile; return writerFile;
} }
private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) {
boolean requireTruncation = false;
for (final Map.Entry<String, String> entry : original.getAttributes().entrySet()) {
if (entry.getValue().length() > maxAttributeChars) {
requireTruncation = true;
break;
}
}
if (!requireTruncation) {
return original;
}
final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder().fromEvent(original);
builder.setAttributes(truncateAttributes(original.getPreviousAttributes()), truncateAttributes(original.getUpdatedAttributes()));
final StandardProvenanceEventRecord truncated = builder.build();
truncated.setEventId(original.getEventId());
return truncated;
}
private Map<String, String> truncateAttributes(final Map<String, String> original) {
final Map<String, String> truncatedAttrs = new HashMap<>();
for (final Map.Entry<String, String> entry : original.entrySet()) {
if (entry.getValue().length() > maxAttributeChars) {
truncatedAttrs.put(entry.getKey(), entry.getValue().substring(0, maxAttributeChars));
} else {
truncatedAttrs.put(entry.getKey(), entry.getValue());
}
}
return truncatedAttrs;
}
@Override @Override
public List<SearchableField> getSearchableFields() { public List<SearchableField> getSearchableFields() {
final List<SearchableField> searchableFields = new ArrayList<>(configuration.getSearchableFields()); final List<SearchableField> searchableFields = new ArrayList<>(configuration.getSearchableFields());
@ -1612,7 +1674,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
for (final File file : potentialFiles) { for (final File file : potentialFiles) {
try { try {
reader = RecordReaders.newRecordReader(file, allLogFiles); reader = RecordReaders.newRecordReader(file, allLogFiles, maxAttributeChars);
} catch (final IOException ioe) { } catch (final IOException ioe) {
continue; continue;
} }
@ -1788,7 +1850,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return true; return true;
} }
if (repoDirty.get() || (writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis)) { if (repoDirty.get() || writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis) {
return true; return true;
} }
@ -1797,7 +1859,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
public Collection<Path> getAllLogFiles() { public Collection<Path> getAllLogFiles() {
final SortedMap<Long, Path> map = idToPathMap.get(); final SortedMap<Long, Path> map = idToPathMap.get();
return (map == null) ? new ArrayList<Path>() : map.values(); return map == null ? new ArrayList<Path>() : map.values();
} }
private static class PathMapComparator implements Comparator<Long> { private static class PathMapComparator implements Comparator<Long> {
@ -1885,7 +1947,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
@Override @Override
public void run() { public void run() {
try { try {
final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager); final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
final StandardQueryResult queryResult = search.search(query, retrievalCount); final StandardQueryResult queryResult = search.search(query, retrievalCount);
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount()); submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
if (queryResult.isFinished()) { if (queryResult.isFinished()) {
@ -1926,7 +1988,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} }
try { try {
final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexManager, indexDir, null, flowFileUuids); final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this,
indexManager, indexDir, null, flowFileUuids, maxAttributeChars);
final StandardLineageResult result = submission.getResult(); final StandardLineageResult result = submission.getResult();
result.update(matchingRecords); result.update(matchingRecords);
@ -1959,7 +2023,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final Map.Entry<String, AsyncQuerySubmission> entry = queryIterator.next(); final Map.Entry<String, AsyncQuerySubmission> entry = queryIterator.next();
final StandardQueryResult result = entry.getValue().getResult(); final StandardQueryResult result = entry.getValue().getResult();
if (entry.getValue().isCanceled() || (result.isFinished() && result.getExpiration().before(now))) { if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) {
queryIterator.remove(); queryIterator.remove();
} }
} }
@ -1969,7 +2033,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final Map.Entry<String, AsyncLineageSubmission> entry = lineageIterator.next(); final Map.Entry<String, AsyncLineageSubmission> entry = lineageIterator.next();
final StandardLineageResult result = entry.getValue().getResult(); final StandardLineageResult result = entry.getValue().getResult();
if (entry.getValue().isCanceled() || (result.isFinished() && result.getExpiration().before(now))) { if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) {
lineageIterator.remove(); lineageIterator.remove();
} }
} }

View File

@ -34,6 +34,7 @@ public class RepositoryConfiguration {
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
private int journalCount = 16; private int journalCount = 16;
private int compressionBlockBytes = 1024 * 1024; private int compressionBlockBytes = 1024 * 1024;
private int maxAttributeChars = 65536;
private List<SearchableField> searchableFields = new ArrayList<>(); private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>(); private List<SearchableField> searchableAttributes = new ArrayList<>();
@ -278,4 +279,21 @@ public class RepositoryConfiguration {
public void setAlwaysSync(boolean alwaysSync) { public void setAlwaysSync(boolean alwaysSync) {
this.alwaysSync = alwaysSync; this.alwaysSync = alwaysSync;
} }
/**
* @return the maximum number of characters to include in any attribute. If an attribute in a Provenance
* Event has more than this number of characters, it will be truncated when the event is retrieved.
*/
public int getMaxAttributeChars() {
return maxAttributeChars;
}
/**
* Sets the maximum number of characters to include in any attribute. If an attribute in a Provenance
* Event has more than this number of characters, it will be truncated when it is retrieved.
*/
public void setMaxAttributeChars(int maxAttributeChars) {
this.maxAttributeChars = maxAttributeChars;
}
} }

View File

@ -47,18 +47,20 @@ public class StandardRecordReader implements RecordReader {
private final boolean compressed; private final boolean compressed;
private final TocReader tocReader; private final TocReader tocReader;
private final int headerLength; private final int headerLength;
private final int maxAttributeChars;
private DataInputStream dis; private DataInputStream dis;
private ByteCountingInputStream byteCountingIn; private ByteCountingInputStream byteCountingIn;
public StandardRecordReader(final InputStream in, final String filename) throws IOException { public StandardRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException {
this(in, filename, null); this(in, filename, null, maxAttributeChars);
} }
public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException { public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException {
logger.trace("Creating RecordReader for {}", filename); logger.trace("Creating RecordReader for {}", filename);
rawInputStream = new ByteCountingInputStream(in); rawInputStream = new ByteCountingInputStream(in);
this.maxAttributeChars = maxAttributeChars;
final InputStream limitedStream; final InputStream limitedStream;
if ( tocReader == null ) { if ( tocReader == null ) {
@ -367,7 +369,8 @@ public class StandardRecordReader implements RecordReader {
for (int i = 0; i < numAttributes; i++) { for (int i = 0; i < numAttributes; i++) {
final String key = readLongString(dis); final String key = readLongString(dis);
final String value = valueNullable ? readLongNullableString(dis) : readLongString(dis); final String value = valueNullable ? readLongNullableString(dis) : readLongString(dis);
attrs.put(key, value); final String truncatedValue = value.length() > maxAttributeChars ? value.substring(0, maxAttributeChars) : value;
attrs.put(key, truncatedValue);
} }
return attrs; return attrs;
@ -429,7 +432,7 @@ public class StandardRecordReader implements RecordReader {
byteCountingIn.reset(); byteCountingIn.reset();
} }
return (nextByte >= 0); return nextByte >= 0;
} }
@Override @Override
@ -451,7 +454,7 @@ public class StandardRecordReader implements RecordReader {
// committed, so we can just process the FlowFile again. // committed, so we can just process the FlowFile again.
} }
return (lastRecord == null) ? -1L : lastRecord.getEventId(); return lastRecord == null ? -1L : lastRecord.getEventId();
} }
@Override @Override

View File

@ -46,9 +46,9 @@ public class DeleteIndexAction implements ExpirationAction {
@Override @Override
public File execute(final File expiredFile) throws IOException { public File execute(final File expiredFile) throws IOException {
// count the number of records and determine the max event id that we are deleting. // count the number of records and determine the max event id that we are deleting.
long numDeleted = 0; final long numDeleted = 0;
long maxEventId = -1L; long maxEventId = -1L;
try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) { try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles(), Integer.MAX_VALUE)) {
maxEventId = reader.getMaxEventId(); maxEventId = reader.getMaxEventId();
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath()); logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
@ -65,7 +65,7 @@ public class DeleteIndexAction implements ExpirationAction {
writer.deleteDocuments(term); writer.deleteDocuments(term);
writer.commit(); writer.commit();
final int docsLeft = writer.numDocs(); final int docsLeft = writer.numDocs();
deleteDir = (docsLeft <= 0); deleteDir = docsLeft <= 0;
logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
} finally { } finally {
indexManager.returnIndexWriter(indexingDirectory, writer); indexManager.returnIndexWriter(indexingDirectory, writer);

View File

@ -51,7 +51,7 @@ public class DocsReader {
} }
public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults) throws IOException { final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
if (retrievalCount.get() >= maxResults) { if (retrievalCount.get() >= maxResults) {
return Collections.emptySet(); return Collections.emptySet();
} }
@ -68,7 +68,7 @@ public class DocsReader {
final long readDocuments = System.nanoTime() - start; final long readDocuments = System.nanoTime() - start;
logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments)); logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults); return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars);
} }
@ -108,7 +108,8 @@ public class DocsReader {
} }
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
if (retrievalCount.get() >= maxResults) { if (retrievalCount.get() >= maxResults) {
return Collections.emptySet(); return Collections.emptySet();
} }
@ -161,7 +162,7 @@ public class DocsReader {
for (final File file : potentialFiles) { for (final File file : potentialFiles) {
try { try {
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
matchingRecords.add(getRecord(d, reader)); matchingRecords.add(getRecord(d, reader));
if ( retrievalCount.incrementAndGet() >= maxResults ) { if ( retrievalCount.incrementAndGet() >= maxResults ) {

View File

@ -39,11 +39,13 @@ public class IndexSearch {
private final PersistentProvenanceRepository repository; private final PersistentProvenanceRepository repository;
private final File indexDirectory; private final File indexDirectory;
private final IndexManager indexManager; private final IndexManager indexManager;
private final int maxAttributeChars;
public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) { public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager, final int maxAttributeChars) {
this.repository = repo; this.repository = repo;
this.indexDirectory = indexDirectory; this.indexDirectory = indexDirectory;
this.indexManager = indexManager; this.indexManager = indexManager;
this.maxAttributeChars = maxAttributeChars;
} }
public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException { public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException {
@ -82,7 +84,8 @@ public class IndexSearch {
} }
final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults()); matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
provenanceQuery.getMaxResults(), maxAttributeChars);
final long readRecordsNanos = System.nanoTime() - finishSearch; final long readRecordsNanos = System.nanoTime() - finishSearch;
logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this); logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);

View File

@ -46,7 +46,7 @@ public class LineageQuery {
private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class); private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory, public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory,
final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException { final String lineageIdentifier, final Collection<String> flowFileUuids, final int maxAttributeChars) throws IOException {
if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) { if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size())); throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
} }
@ -94,7 +94,9 @@ public class LineageQuery {
final long searchEnd = System.nanoTime(); final long searchEnd = System.nanoTime();
final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE); final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
final long readDocsEnd = System.nanoTime(); final long readDocsEnd = System.nanoTime();
logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis", logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis",
indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));

View File

@ -32,7 +32,18 @@ import org.apache.nifi.provenance.toc.TocUtil;
public class RecordReaders { public class RecordReaders {
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException { /**
* Creates a new Record Reader that is capable of reading Provenance Event Journals
*
* @param file the Provenance Event Journal to read data from
* @param provenanceLogFiles collection of all provenance journal files
* @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid
* issues where a FlowFile has an extremely large attribute and reading events
* for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap
* @return a Record Reader capable of reading Provenance Event Journals
* @throws IOException if unable to create a Record Reader for the given file
*/
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles, final int maxAttributeChars) throws IOException {
final File originalFile = file; final File originalFile = file;
InputStream fis = null; InputStream fis = null;
@ -92,9 +103,9 @@ public class RecordReaders {
final File tocFile = TocUtil.getTocFile(file); final File tocFile = TocUtil.getTocFile(file);
if ( tocFile.exists() ) { if ( tocFile.exists() ) {
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
return new StandardRecordReader(fis, filename, tocReader); return new StandardRecordReader(fis, filename, tocReader, maxAttributeChars);
} else { } else {
return new StandardRecordReader(fis, filename); return new StandardRecordReader(fis, filename, maxAttributeChars);
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
if ( fis != null ) { if ( fis != null ) {

View File

@ -252,7 +252,7 @@ public class TestPersistentProvenanceRepository {
assertEquals(10, recoveredRecords.size()); assertEquals(10, recoveredRecords.size());
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
final ProvenanceEventRecord recovered = recoveredRecords.get(i); final ProvenanceEventRecord recovered = recoveredRecords.get(i);
assertEquals((long) i, recovered.getEventId()); assertEquals(i, recovered.getEventId());
assertEquals("nifi://unit-test", recovered.getTransitUri()); assertEquals("nifi://unit-test", recovered.getTransitUri());
assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType()); assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType());
assertEquals(attributes, recovered.getAttributes()); assertEquals(attributes, recovered.getAttributes());
@ -283,7 +283,7 @@ public class TestPersistentProvenanceRepository {
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234"); builder.setComponentId("1234");
builder.setComponentType("dummy processor"); builder.setComponentType("dummy processor");
ProvenanceEventRecord record = builder.build(); final ProvenanceEventRecord record = builder.build();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
repo.registerEvent(record); repo.registerEvent(record);
@ -1106,7 +1106,7 @@ public class TestPersistentProvenanceRepository {
final Query q = new Query(""); final Query q = new Query("");
q.setMaxResults(1000); q.setMaxResults(1000);
TopDocs topDocs = searcher.search(luceneQuery, 1000); final TopDocs topDocs = searcher.search(luceneQuery, 1000);
final List<Document> docs = new ArrayList<>(); final List<Document> docs = new ArrayList<>();
for (int i = 0; i < topDocs.scoreDocs.length; i++) { for (int i = 0; i < topDocs.scoreDocs.length; i++) {
@ -1157,7 +1157,7 @@ public class TestPersistentProvenanceRepository {
for (final File file : storageDir.listFiles()) { for (final File file : storageDir.listFiles()) {
if (file.isFile()) { if (file.isFile()) {
try (RecordReader reader = RecordReaders.newRecordReader(file, null)) { try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
ProvenanceEventRecord r = null; ProvenanceEventRecord r = null;
while ((r = reader.nextRecord()) != null) { while ((r = reader.nextRecord()) != null) {
@ -1169,4 +1169,35 @@ public class TestPersistentProvenanceRepository {
assertEquals(10000, counter); assertEquals(10000, counter);
} }
@Test
public void testTruncateAttributes() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();
config.setMaxAttributeChars(50);
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
repo.initialize(getEventReporter());
final Map<String, String> attributes = new HashMap<>();
attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
final ProvenanceEventRecord record = builder.build();
repo.registerEvent(record);
repo.waitForRollover();
final ProvenanceEventRecord retrieved = repo.getEvent(0L);
assertNotNull(retrieved);
assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid"));
assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars"));
}
} }

View File

@ -74,7 +74,7 @@ public class TestStandardRecordReaderWriter {
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile); try (final FileInputStream fis = new FileInputStream(journalFile);
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) {
assertEquals(0, reader.getBlockIndex()); assertEquals(0, reader.getBlockIndex());
reader.skipToBlock(0); reader.skipToBlock(0);
final StandardProvenanceEventRecord recovered = reader.nextRecord(); final StandardProvenanceEventRecord recovered = reader.nextRecord();
@ -102,7 +102,7 @@ public class TestStandardRecordReaderWriter {
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile); try (final FileInputStream fis = new FileInputStream(journalFile);
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) {
assertEquals(0, reader.getBlockIndex()); assertEquals(0, reader.getBlockIndex());
reader.skipToBlock(0); reader.skipToBlock(0);
final StandardProvenanceEventRecord recovered = reader.nextRecord(); final StandardProvenanceEventRecord recovered = reader.nextRecord();
@ -133,7 +133,7 @@ public class TestStandardRecordReaderWriter {
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile); try (final FileInputStream fis = new FileInputStream(journalFile);
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i=0; i < 10; i++) { for (int i=0; i < 10; i++) {
assertEquals(0, reader.getBlockIndex()); assertEquals(0, reader.getBlockIndex());
@ -172,12 +172,12 @@ public class TestStandardRecordReaderWriter {
final TocReader tocReader = new StandardTocReader(tocFile); final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile); try (final FileInputStream fis = new FileInputStream(journalFile);
final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i=0; i < 10; i++) { for (int i=0; i < 10; i++) {
final StandardProvenanceEventRecord recovered = reader.nextRecord(); final StandardProvenanceEventRecord recovered = reader.nextRecord();
System.out.println(recovered); System.out.println(recovered);
assertNotNull(recovered); assertNotNull(recovered);
assertEquals((long) i, recovered.getEventId()); assertEquals(i, recovered.getEventId());
assertEquals("nifi://unit-test", recovered.getTransitUri()); assertEquals("nifi://unit-test", recovered.getTransitUri());
} }

View File

@ -60,7 +60,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ "Regular Expressions are entered by adding user-defined properties; " + "Regular Expressions are entered by adding user-defined properties; "
+ "the name of the property maps to the Attribute Name into which the result will be placed. " + "the name of the property maps to the Attribute Name into which the result will be placed. "
+ "The first capture group, if any found, will be placed into that attribute name." + "The first capture group, if any found, will be placed into that attribute name."
+ "But all catpure groups, including the matching string sequence itself will also be " + "But all capture groups, including the matching string sequence itself will also be "
+ "provided at that attribute name with an index value provided." + "provided at that attribute name with an index value provided."
+ "The value of the property must be a valid Regular Expressions with one or more capturing groups. " + "The value of the property must be a valid Regular Expressions with one or more capturing groups. "
+ "If the Regular Expression matches more than once, only the first match will be used. " + "If the Regular Expression matches more than once, only the first match will be used. "
@ -69,7 +69,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ "and no attributes will be applied to the FlowFile.") + "and no attributes will be applied to the FlowFile.")
@DynamicProperty(name = "A FlowFile attribute", value = "A Regular Expression with one or more capturing group", @DynamicProperty(name = "A FlowFile attribute", value = "A Regular Expression with one or more capturing group",
description = "The first capture group, if any found, will be placed into that attribute name." description = "The first capture group, if any found, will be placed into that attribute name."
+ "But all catpure groups, including the matching string sequence itself will also be " + "But all capture groups, including the matching string sequence itself will also be "
+ "provided at that attribute name with an index value provided.") + "provided at that attribute name with an index value provided.")
public class ExtractText extends AbstractProcessor { public class ExtractText extends AbstractProcessor {

View File

@ -82,31 +82,34 @@ import org.apache.nifi.util.ObjectHolder;
+ "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.") + "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.")
@ReadsAttributes({ @ReadsAttributes({
@ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. " @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "All FlowFiles with the same value for this attribute will be bundled together"), + "All FlowFiles with the same value for this attribute will be bundled together."),
@ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This " @ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute and must be a unique integer " + "This attribute indicates the order in which the fragments should be assembled. This "
+ "between 0 and the value of the fragment.count attribute. This attribute indicates the order in which the fragments should be assembled"), + "attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all "
+ "FlowFiles that have the same value for the \"fragment.identifier\" attribute) integer "
+ "between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "
+ "\"fragment.identifier\" attribute and the same value for the \"fragment.index\" attribute, the behavior of this Processor is undefined."),
@ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This " @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+ "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected " + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+ "in the given bundle"), + "in the given bundle."),
@ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the <Merge Strategy> property is set to Defragment. " @ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged " + "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged "
+ "FlowFile"), + "FlowFile."),
@ReadsAttribute(attribute = "tar.permissions", description = "Applicable only if the <Merge Format> property is set to TAR. The value of this " @ReadsAttribute(attribute = "tar.permissions", description = "Applicable only if the <Merge Format> property is set to TAR. The value of this "
+ "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should " + "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should "
+ "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used")}) + "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used") })
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "filename", description = "When more than 1 file is merged, the filename comes from the segment.original.filename " @WritesAttribute(attribute = "filename", description = "When more than 1 file is merged, the filename comes from the segment.original.filename "
+ "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching " + "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching "
+ "system time. Then a filename extension may be applied:" + "system time. Then a filename extension may be applied:"
+ "if Merge Format is TAR, then the filename will be appended with .tar, " + "if Merge Format is TAR, then the filename will be appended with .tar, "
+ "if Merge Format is ZIP, then the filename will be appended with .zip, " + "if Merge Format is ZIP, then the filename will be appended with .zip, "
+ "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"), + "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"),
@WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"), @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
@WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+ "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output")}) + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") })
@SeeAlso(SegmentContent.class) @SeeAlso(SegmentContent.class)
public class MergeContent extends BinFiles { public class MergeContent extends BinFiles {
@ -131,7 +134,9 @@ public class MergeContent extends BinFiles {
"Defragment", "Defragment",
"Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+ "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index> or alternatively (for backward compatibility " + "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index> or alternatively (for backward compatibility "
+ "purposes) <segment.identifier>, <segment.count>, and <segment.index>"); + "purposes) <segment.identifier>, <segment.count>, and <segment.index>. All FlowFiles with the same value for \"fragment.identifier\" "
+ "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles "
+ "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute.");
public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue( public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue(
"Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file"); "Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file");
@ -307,7 +312,7 @@ public class MergeContent extends BinFiles {
@Override @Override
protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) { protected Collection<ValidationResult> additionalCustomValidation(ValidationContext context) {
Collection<ValidationResult> results = new ArrayList<>(); final Collection<ValidationResult> results = new ArrayList<>();
final String delimiterStrategy = context.getProperty(DELIMITER_STRATEGY).getValue(); final String delimiterStrategy = context.getProperty(DELIMITER_STRATEGY).getValue();
if(DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategy)) { if(DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategy)) {
@ -353,7 +358,7 @@ public class MergeContent extends BinFiles {
@Override @Override
protected String getGroupId(final ProcessContext context, final FlowFile flowFile) { protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue(); final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName); String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
// when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier
if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
@ -442,7 +447,7 @@ public class MergeContent extends BinFiles {
bundle = session.putAllAttributes(bundle, bundleAttributes); bundle = session.putAllAttributes(bundle, bundleAttributes);
final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles"; final String inputDescription = binCopy.size() < 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle}); getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
session.transfer(bundle, REL_MERGED); session.transfer(bundle, REL_MERGED);
@ -640,18 +645,18 @@ public class MergeContent extends BinFiles {
} }
if (".".equals(path.getName(0).toString())) { if (".".equals(path.getName(0).toString())) {
path = (path.getNameCount() == 1) ? null : path.subpath(1, path.getNameCount()); path = path.getNameCount() == 1 ? null : path.subpath(1, path.getNameCount());
} }
return (path == null) ? "" : path.toString() + "/"; return path == null ? "" : path.toString() + "/";
} }
private String createFilename(final List<FlowFileSessionWrapper> wrappers) { private String createFilename(final List<FlowFileSessionWrapper> wrappers) {
if (wrappers.size() == 1) { if (wrappers.size() == 1) {
return wrappers.get(0).getFlowFile().getAttribute(CoreAttributes.FILENAME.key()); return wrappers.get(0).getFlowFile().getAttribute(CoreAttributes.FILENAME.key());
} else { } else {
FlowFile ff = wrappers.get(0).getFlowFile(); final FlowFile ff = wrappers.get(0).getFlowFile();
String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME); final String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME);
if (origFilename != null) { if (origFilename != null) {
return origFilename; return origFilename;
} else { } else {