diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
index b05909b360..ebfe0f6dbc 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
@@ -58,6 +58,14 @@
commons-io
commons-io
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+
+
+ org.apache.nifi
+ nifi-record
+
org.apache.activemq
activemq-client
@@ -83,6 +91,20 @@
2.0.0-SNAPSHOT
test
+
+
+
+ org.apache.nifi
+ nifi-record-serialization-services
+ 2.0.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-schema-registry-service-api
+ 2.0.0-SNAPSHOT
+ test
+
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index aa8fa4dd64..470260cfc9 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -37,6 +37,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
@@ -151,6 +153,20 @@ public abstract class AbstractJMSProcessor extends Abstract
.collect(Collectors.toList())
);
+ static final PropertyDescriptor BASE_RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor BASE_RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .dependsOn(BASE_RECORD_READER)
+ .required(true)
+ .build();
private volatile IJMSConnectionFactoryProvider connectionFactoryProvider;
private volatile BlockingQueue workerPool;
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 6e99da3807..72d1cae292 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -32,13 +32,18 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
-import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
+import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
+import org.apache.nifi.jms.processors.ioconcept.writer.record.RecordWriter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
@@ -48,6 +53,7 @@ import javax.jms.Session;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -55,6 +61,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
/**
* Consuming JMS processor which upon each invocation of
@@ -88,19 +95,24 @@ import java.util.concurrent.TimeUnit;
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
public class ConsumeJMS extends AbstractJMSProcessor {
+
public static final String JMS_MESSAGETYPE = "jms.messagetype";
+ private final static String COUNTER_PARSE_FAILURES = "Parse Failures";
+ private final static String COUNTER_RECORDS_RECEIVED = "Records Received";
+ private final static String COUNTER_RECORDS_PROCESSED = "Records Processed";
+
static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE),
- "AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + ")",
+ "AUTO_ACKNOWLEDGE (" + Session.AUTO_ACKNOWLEDGE + ")",
"Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. "
+ "Can result in data loss in the event where NiFi abruptly stopped before session was commited.");
static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(Session.CLIENT_ACKNOWLEDGE),
- "CLIENT_ACKNOWLEDGE (" + String.valueOf(Session.CLIENT_ACKNOWLEDGE) + ")",
+ "CLIENT_ACKNOWLEDGE (" + Session.CLIENT_ACKNOWLEDGE + ")",
"(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss");
static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
- "DUPS_OK_ACKNOWLEDGE (" + String.valueOf(Session.DUPS_OK_ACKNOWLEDGE) + ")",
+ "DUPS_OK_ACKNOWLEDGE (" + Session.DUPS_OK_ACKNOWLEDGE + ")",
"This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data "
+ "duplication and data loss while achieving the best throughput.");
@@ -170,11 +182,38 @@ public class ConsumeJMS extends AbstractJMSProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+ public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_RECORD_READER)
+ .description("The Record Reader to use for parsing received JMS Messages into Records.")
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_RECORD_WRITER)
+ .description("The Record Writer to use for serializing Records before writing them to a FlowFile.")
+ .build();
+
+ static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder()
+ .name("output-strategy")
+ .displayName("Output Strategy")
+ .description("The format used to output the JMS message into a FlowFile record.")
+ .dependsOn(RECORD_READER)
+ .required(true)
+ .defaultValue(OutputStrategy.USE_VALUE.getValue())
+ .allowableValues(OutputStrategy.class)
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are received from the JMS Destination are routed to this relationship")
.build();
+ public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
+ .name("parse.failure")
+ .description("If a message cannot be parsed using the configured Record Reader, the contents of the "
+ + "message will be routed to this Relationship as its own individual FlowFile.")
+ .autoTerminateDefault(true) // to make sure flow are still valid after upgrades
+ .build();
+
private final static Set relationships;
private final static List propertyDescriptors;
@@ -205,6 +244,10 @@ public class ConsumeJMS extends AbstractJMSProcessor {
_propertyDescriptors.add(TIMEOUT);
_propertyDescriptors.add(ERROR_QUEUE);
+ _propertyDescriptors.add(RECORD_READER);
+ _propertyDescriptors.add(RECORD_WRITER);
+ _propertyDescriptors.add(OUTPUT_STRATEGY);
+
_propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
_propertyDescriptors.addAll(JMS_CF_PROPERTIES);
@@ -212,6 +255,7 @@ public class ConsumeJMS extends AbstractJMSProcessor {
Set _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
+ _relationships.add(REL_PARSE_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
@@ -268,35 +312,11 @@ public class ConsumeJMS extends AbstractJMSProcessor {
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
try {
- consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, new ConsumerCallback() {
- @Override
- public void accept(final JMSResponse response) {
- if (response == null) {
- return;
- }
-
- try {
- FlowFile flowFile = processSession.create();
- flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
-
- final Map jmsHeaders = response.getMessageHeaders();
- final Map jmsProperties = response.getMessageProperties();
-
- flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
- flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
- flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
-
- processSession.getProvenanceReporter().receive(flowFile, destinationName);
- processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
- processSession.transfer(flowFile, REL_SUCCESS);
-
- processSession.commitAsync(() -> acknowledge(response), throwable -> response.reject());
- } catch (final Throwable t) {
- response.reject();
- throw t;
- }
- }
- });
+ if (context.getProperty(RECORD_READER).isSet()) {
+ processMessageSet(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
+ } else {
+ processSingleMessage(processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
+ }
} catch(Exception e) {
getLogger().error("Error while trying to process JMS message", e);
consumer.setValid(false);
@@ -305,6 +325,92 @@ public class ConsumeJMS extends AbstractJMSProcessor {
}
}
+ private void processSingleMessage(ProcessSession processSession, JMSConsumer consumer, String destinationName, String errorQueueName,
+ boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
+
+ consumer.consumeSingleMessage(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, response -> {
+ if (response == null) {
+ return;
+ }
+
+ try {
+ final FlowFile flowFile = createFlowFileFromMessage(processSession, destinationName, response);
+
+ processSession.getProvenanceReporter().receive(flowFile, destinationName);
+ processSession.transfer(flowFile, REL_SUCCESS);
+ processSession.commitAsync(
+ () -> withLog(() -> acknowledge(response)),
+ __ -> withLog(() -> response.reject()));
+ } catch (final Throwable t) {
+ response.reject();
+ throw t;
+ }
+ });
+ }
+
+ private FlowFile createFlowFileFromMessage(ProcessSession processSession, String destinationName, JMSResponse response) {
+ FlowFile flowFile = processSession.create();
+ flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
+
+ final Map jmsHeaders = response.getMessageHeaders();
+ final Map jmsProperties = response.getMessageProperties();
+
+ flowFile = updateFlowFileAttributesWithJMSAttributes(mergeJmsAttributes(jmsHeaders, jmsProperties), flowFile, processSession);
+ flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
+ flowFile = processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
+
+ return flowFile;
+ }
+
+ private void processMessageSet(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName,String errorQueueName,
+ boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
+
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
+
+ final FlowFileWriter flowFileWriter = new RecordWriter<>(
+ readerFactory,
+ writerFactory,
+ message -> message.getMessageBody() == null ? new byte[0] : message.getMessageBody(),
+ message -> mergeJmsAttributes(message.getMessageHeaders(), message.getMessageProperties()),
+ outputStrategy,
+ getLogger()
+ );
+
+ consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, jmsResponses -> {
+ flowFileWriter.write(session, jmsResponses, new FlowFileWriterCallback<>() {
+ @Override
+ public void onSuccess(FlowFile flowFile, List processedMessages, List failedMessages) {
+ session.getProvenanceReporter().receive(flowFile, destinationName);
+ session.adjustCounter(COUNTER_RECORDS_RECEIVED, processedMessages.size() + failedMessages.size(), false);
+ session.adjustCounter(COUNTER_RECORDS_PROCESSED, processedMessages.size(), false);
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.commitAsync(
+ () -> withLog(() -> acknowledge(processedMessages, failedMessages)),
+ __ -> withLog(() -> reject(processedMessages, failedMessages))
+ );
+ }
+
+ @Override
+ public void onParseFailure(FlowFile flowFile, JMSResponse message, Exception e) {
+ session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+
+ final FlowFile failedMessage = createFlowFileFromMessage(session, destinationName, message);
+ session.transfer(failedMessage, REL_PARSE_FAILURE);
+ }
+
+ @Override
+ public void onFailure(FlowFile flowFile, List processedMessages, List failedMessages, Exception e) {
+ reject(processedMessages, failedMessages);
+ // It would be nicer to call rollback and yield here, but we are rethrowing the exception to have the same error handling with processSingleMessage.
+ throw new ProcessException(e);
+ }
+ });
+ });
+ }
+
private void acknowledge(final JMSResponse response) {
try {
response.acknowledge();
@@ -314,6 +420,26 @@ public class ConsumeJMS extends AbstractJMSProcessor {
}
}
+ private void acknowledge(final List processedMessages, final List failedMessages) {
+ acknowledge(findLastBatchedJmsResponse(processedMessages, failedMessages));
+ }
+
+ private void reject(final List processedMessages, final List failedMessages) {
+ findLastBatchedJmsResponse(processedMessages, failedMessages).reject();
+ }
+
+ private void withLog(Runnable runnable) {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ getLogger().error("An error happened during commitAsync callback", e);
+ throw e;
+ }
+ }
+
+ private JMSResponse findLastBatchedJmsResponse(List processedMessages, List failedMessages) {
+ return Stream.of(processedMessages, failedMessages).flatMap(Collection::stream).max(Comparator.comparing(JMSResponse::getBatchOrder)).get();
+ }
/**
* Will create an instance of {@link JMSConsumer}
@@ -375,4 +501,16 @@ public class ConsumeJMS extends AbstractJMSProcessor {
flowFile = processSession.putAllAttributes(flowFile, attributes);
return flowFile;
}
+
+ private Map mergeJmsAttributes(Map headers, Map properties) {
+ final Map jmsAttributes = new HashMap<>(headers);
+ properties.forEach((key, value) -> {
+ if (jmsAttributes.containsKey(key)) {
+ getLogger().warn("JMS Header and Property name collides as an attribute. JMS Property will override the JMS Header attribute. attributeName=[{}]", key);
+ }
+ jmsAttributes.put(key, value);
+ });
+
+ return jmsAttributes;
+ }
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index 4d856ca157..141b01d9c7 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -38,19 +38,24 @@ import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
/**
* Generic consumer of messages from JMS compliant messaging system.
*/
class JMSConsumer extends JMSWorker {
+ private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000;
+
JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) {
super(connectionFactory, jmsTemplate, logger);
- logger.debug("Created Message Consumer for '{}'", new Object[] {jmsTemplate});
+ logger.debug("Created Message Consumer for '{}'", jmsTemplate);
}
@@ -82,79 +87,62 @@ class JMSConsumer extends JMSWorker {
}
}
-
/**
* Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
*/
- public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
- final String charset, final ConsumerCallback consumerCallback) {
- this.jmsTemplate.execute(new SessionCallback() {
+ public void consumeSingleMessage(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
+ final String charset, final Consumer singleMessageConsumer) {
+ doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> {
+ final JMSResponse response = receiveMessage(session, messageConsumer, charset, errorQueueName);
+ if (response != null) {
+ // Provide the JMSResponse to the processor to handle. It is the responsibility of the
+ // processor to handle acknowledgment of the message (if Client Acknowledge), and it is
+ // the responsibility of the processor to handle closing the Message Consumer.
+ // Both of these actions can be handled by calling the acknowledge() or reject() methods of
+ // the JMSResponse.
+ singleMessageConsumer.accept(response);
+ }
+ });
+ }
+
+ /**
+ * Receives a list of messages from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
+ */
+ public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
+ final String charset, final Consumer> messageSetConsumer) {
+ doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() {
@Override
- public Void doInJms(final Session session) throws JMSException {
+ public void consume(Session session, MessageConsumer messageConsumer) throws JMSException {
+ final List jmsResponses = new ArrayList<>();
+ int batchCounter = 0;
- final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
- try {
- final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
- // If there is no message, there's nothing for us to do. We can simply close the consumer and return.
- if (message == null) {
- JmsUtils.closeMessageConsumer(msgConsumer);
- return null;
- }
-
- String messageType;
- byte[] messageBody;
-
- try {
- if (message instanceof TextMessage) {
- messageType = TextMessage.class.getSimpleName();
- messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset));
- } else if (message instanceof BytesMessage) {
- messageType = BytesMessage.class.getSimpleName();
- messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
- } else if (message instanceof ObjectMessage) {
- messageType = ObjectMessage.class.getSimpleName();
- messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
- } else if (message instanceof StreamMessage) {
- messageType = StreamMessage.class.getSimpleName();
- messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message);
- } else if (message instanceof MapMessage) {
- messageType = MapMessage.class.getSimpleName();
- messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message);
- } else {
- acknowledge(message, session);
-
- if (errorQueueName != null) {
- processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName});
- jmsTemplate.send(errorQueueName, __ -> message);
- } else {
- processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message});
- }
-
- return null;
- }
- } catch (final MessageConversionException mce) {
- processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.",
- new Object[] {message}, mce);
- acknowledge(message, session);
-
- if (errorQueueName != null) {
- jmsTemplate.send(errorQueueName, __ -> message);
- }
-
- return null;
- }
-
- final Map messageHeaders = extractMessageHeaders(message);
- final Map messageProperties = extractMessageProperties(message);
- final JMSResponse response = new JMSResponse(message, session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, messageProperties, msgConsumer);
+ JMSResponse response;
+ while ((response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) {
+ response.setBatchOrder(batchCounter);
+ jmsResponses.add(response);
+ batchCounter++;
+ }
+ if (!jmsResponses.isEmpty()) {
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
// the responsibility of the processor to handle closing the Message Consumer.
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
// the JMSResponse.
- consumerCallback.accept(response);
+ messageSetConsumer.accept(jmsResponses);
+ }
+ }
+ });
+ }
+
+ private void doWithJmsTemplate(String destinationName, boolean durable, boolean shared, String subscriptionName, String messageSelector, MessageReceiver messageReceiver) {
+ this.jmsTemplate.execute(new SessionCallback() {
+ @Override
+ public Void doInJms(final Session session) throws JMSException {
+
+ final MessageConsumer messageConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
+ try {
+ messageReceiver.consume(session, messageConsumer);
} catch (Exception e) {
// We need to call recover to ensure that in the event of
// abrupt end or exception the current session will stop message
@@ -166,7 +154,7 @@ class JMSConsumer extends JMSWorker {
processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e1);
}
- JmsUtils.closeMessageConsumer(msgConsumer);
+ JmsUtils.closeMessageConsumer(messageConsumer);
throw e;
}
@@ -175,6 +163,64 @@ class JMSConsumer extends JMSWorker {
}, true);
}
+ private JMSResponse receiveMessage(Session session, MessageConsumer msgConsumer, String charset, String errorQueueName) throws JMSException {
+ final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
+
+ // If there is no message, there's nothing for us to do. We can simply close the consumer and return.
+ if (message == null) {
+ JmsUtils.closeMessageConsumer(msgConsumer);
+ return null;
+ }
+
+ String messageType;
+ byte[] messageBody;
+
+ try {
+ if (message instanceof TextMessage) {
+ messageType = TextMessage.class.getSimpleName();
+ messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset));
+ } else if (message instanceof BytesMessage) {
+ messageType = BytesMessage.class.getSimpleName();
+ messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
+ } else if (message instanceof ObjectMessage) {
+ messageType = ObjectMessage.class.getSimpleName();
+ messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
+ } else if (message instanceof StreamMessage) {
+ messageType = StreamMessage.class.getSimpleName();
+ messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message);
+ } else if (message instanceof MapMessage) {
+ messageType = MapMessage.class.getSimpleName();
+ messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message);
+ } else {
+ acknowledge(message, session);
+
+ if (errorQueueName != null) {
+ processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", message, errorQueueName);
+ jmsTemplate.send(errorQueueName, __ -> message);
+ } else {
+ processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", message);
+ }
+
+ return null;
+ }
+ } catch (final MessageConversionException mce) {
+ processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.",
+ new Object[] {message}, mce);
+ acknowledge(message, session);
+
+ if (errorQueueName != null) {
+ jmsTemplate.send(errorQueueName, __ -> message);
+ }
+
+ return null;
+ }
+
+ final Map messageHeaders = extractMessageHeaders(message);
+ final Map messageProperties = extractMessageProperties(message);
+
+ return new JMSResponse(message, session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, messageProperties, msgConsumer);
+ }
+
private void acknowledge(final Message message, final Session session) throws JMSException {
if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
message.acknowledge();
@@ -245,6 +291,7 @@ class JMSConsumer extends JMSWorker {
private final Map messageHeaders;
private final Map messageProperties;
private final MessageConsumer messageConsumer;
+ private Integer batchOrder;
JMSResponse(final Message message, final int acknowledgementMode, final String messageType, final byte[] messageBody, final Map messageHeaders,
final Map messageProperties, final MessageConsumer msgConsumer) {
@@ -286,13 +333,18 @@ class JMSConsumer extends JMSWorker {
public void reject() {
JmsUtils.closeMessageConsumer(messageConsumer);
}
+
+ public Integer getBatchOrder() {
+ return batchOrder;
+ }
+
+ public void setBatchOrder(Integer batchOrder) {
+ this.batchOrder = batchOrder;
+ }
}
- /**
- * Callback to be invoked while executing inJMS call (the call within the
- * live JMS session)
- */
- static interface ConsumerCallback {
- void accept(JMSResponse response);
+ interface MessageReceiver {
+ void consume(Session session, MessageConsumer messageConsumer) throws JMSException;
}
+
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 3a65bababd..67a49a9dd6 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -36,7 +36,7 @@ import java.util.Map.Entry;
/**
* Generic publisher of messages to JMS compliant messaging system.
*/
-final class JMSPublisher extends JMSWorker {
+class JMSPublisher extends JMSWorker {
JMSPublisher(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog processLog) {
super(connectionFactory, jmsTemplate, processLog);
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 2a236adfb1..03158f4cbc 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -31,12 +31,18 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
+import org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReader;
+import org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReaderCallback;
+import org.apache.nifi.jms.processors.ioconcept.reader.StateTrackingFlowFileReader;
+import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.stream.io.StreamUtils;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
@@ -55,6 +61,10 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+
/**
* An implementation of JMS Message publishing {@link Processor} which upon each
* invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will
@@ -122,6 +132,16 @@ public class PublishJMS extends AbstractJMSProcessor {
.required(true)
.build();
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_RECORD_READER)
+ .description("The Record Reader to use for parsing the incoming FlowFile into Records.")
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_RECORD_WRITER)
+ .description("The Record Writer to use for serializing Records before publishing them as an JMS Message.")
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are sent to the JMS destination are routed to this relationship")
@@ -154,12 +174,14 @@ public class PublishJMS extends AbstractJMSProcessor {
_propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
_propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
+ _propertyDescriptors.add(RECORD_READER);
+ _propertyDescriptors.add(RECORD_WRITER);
+
_propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
_propertyDescriptors.addAll(JMS_CF_PROPERTIES);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
-
Set _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
@@ -173,7 +195,7 @@ public class PublishJMS extends AbstractJMSProcessor {
* as JMS headers on the newly constructed message. For the list of
* available message headers please see {@link JmsHeaders}.
*
- * Upon success the incoming {@link FlowFile} is transferred to the'success'
+ * Upon success the incoming {@link FlowFile} is transferred to the 'success'
* {@link Relationship} and upon failure FlowFile is penalized and
* transferred to the 'failure' {@link Relationship}
*/
@@ -182,12 +204,12 @@ public class PublishJMS extends AbstractJMSProcessor {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
try {
- String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
- String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
- Boolean allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
- String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
+ final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
+ final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+ final Boolean allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
+ final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
- Map attributesToSend = new HashMap<>();
+ final Map attributesToSend = new HashMap<>();
// REGEX Attributes
final Pattern pattern = Pattern.compile(attributeHeaderRegex);
for (final Map.Entry entry : flowFile.getAttributes().entrySet()) {
@@ -199,36 +221,63 @@ public class PublishJMS extends AbstractJMSProcessor {
}
}
- switch (context.getProperty(MESSAGE_BODY).getValue()) {
- case TEXT_MESSAGE:
- try {
- publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), attributesToSend);
- } catch(Exception e) {
- publisher.setValid(false);
- throw e;
- }
- break;
- case BYTES_MESSAGE:
- default:
- try {
- publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), attributesToSend);
- } catch(Exception e) {
- publisher.setValid(false);
- throw e;
- }
- break;
+ if (context.getProperty(RECORD_READER).isSet()) {
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ final FlowFileReader flowFileReader = new StateTrackingFlowFileReader(
+ getIdentifier(),
+ new RecordSupplier(readerFactory, writerFactory),
+ getLogger()
+ );
+
+ flowFileReader.read(
+ processSession,
+ flowFile,
+ content -> publisher.publish(destinationName, content, attributesToSend),
+ new FlowFileReaderCallback() {
+ @Override
+ public void onSuccess(FlowFile flowFile, int processedRecords, boolean isRecover, long transmissionMillis) {
+ final String eventTemplate = isRecover ? PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER : PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+ processSession.getProvenanceReporter().send(
+ flowFile,
+ destinationName,
+ String.format(eventTemplate, processedRecords),
+ transmissionMillis);
+
+ processSession.transfer(flowFile, REL_SUCCESS);
+ }
+
+ @Override
+ public void onFailure(FlowFile flowFile, int processedRecords, long transmissionMillis, Exception e) {
+ processSession.getProvenanceReporter().send(
+ flowFile,
+ destinationName,
+ String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords),
+ transmissionMillis);
+
+ handleException(context, processSession, publisher, flowFile, e);
+ }
+ }
+ );
+ } else {
+ processStandardFlowFile(context, processSession, publisher, flowFile, destinationName, charset, attributesToSend);
+ processSession.transfer(flowFile, REL_SUCCESS);
+ processSession.getProvenanceReporter().send(flowFile, destinationName);
}
- processSession.transfer(flowFile, REL_SUCCESS);
- processSession.getProvenanceReporter().send(flowFile, destinationName);
} catch (Exception e) {
- processSession.transfer(flowFile, REL_FAILURE);
- getLogger().error("Failed while sending message to JMS via " + publisher, e);
- context.yield();
- publisher.setValid(false);
+ handleException(context, processSession, publisher, flowFile, e);
}
}
}
+ private void handleException(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile, Exception e) {
+ processSession.transfer(flowFile, REL_FAILURE);
+ this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
+ context.yield();
+ publisher.setValid(false);
+ }
+
@Override
protected List getSupportedPropertyDescriptors() {
return propertyDescriptors;
@@ -250,6 +299,34 @@ public class PublishJMS extends AbstractJMSProcessor {
return new JMSPublisher(connectionFactory, jmsTemplate, this.getLogger());
}
+ private void processStandardFlowFile(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile,
+ String destinationName, String charset, Map attributesToSend) {
+ publishMessage(context, processSession, publisher, flowFile, destinationName, charset, attributesToSend);
+ }
+
+ private void publishMessage(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile,
+ String destinationName, String charset, Map attributesToSend) {
+ switch (context.getProperty(MESSAGE_BODY).getValue()) {
+ case TEXT_MESSAGE:
+ try {
+ publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), attributesToSend);
+ } catch(Exception e) {
+ publisher.setValid(false);
+ throw e;
+ }
+ break;
+ case BYTES_MESSAGE:
+ default:
+ try {
+ publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), attributesToSend);
+ } catch(Exception e) {
+ publisher.setValid(false);
+ throw e;
+ }
+ break;
+ }
+ }
+
/**
* Extracts contents of the {@link FlowFile} as byte array.
*/
@@ -264,4 +341,5 @@ public class PublishJMS extends AbstractJMSProcessor {
session.read(flowFile, in -> IOUtils.copy(in, writer, Charset.forName(charset)));
return writer.toString();
}
+
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReader.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReader.java
new file mode 100644
index 0000000000..14529f3460
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReader.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface FlowFileReader {
+ void read(ProcessSession session, FlowFile flowFile, MessageHandler messageHandler, FlowFileReaderCallback flowFileReaderCallback);
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReaderCallback.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReaderCallback.java
new file mode 100644
index 0000000000..eb9d612806
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/FlowFileReaderCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface FlowFileReaderCallback {
+ void onSuccess(FlowFile flowFile, int processedRecords, boolean isRecover, long transmissionMillis);
+ void onFailure(FlowFile flowFile, int processedRecords, long transmissionMillis, Exception e);
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/MessageHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/MessageHandler.java
new file mode 100644
index 0000000000..6c5e2a8d7b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/MessageHandler.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+public interface MessageHandler {
+ void handle(byte[] content);
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.java
new file mode 100644
index 0000000000..e2d0f77486
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/StateTrackingFlowFileReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Optional.ofNullable;
+
+public class StateTrackingFlowFileReader implements FlowFileReader {
+
+ public static final String ATTR_READ_FAILED_INDEX_SUFFIX = ".read.failed.index";
+
+ private final String identifier;
+ private final RecordSupplier recordSupplier;
+ private final ComponentLog logger;
+
+ public StateTrackingFlowFileReader(String identifier, RecordSupplier recordSupplier, ComponentLog logger) {
+ this.identifier = identifier;
+ this.recordSupplier = recordSupplier;
+ this.logger = logger;
+ }
+
+ @Override
+ public void read(ProcessSession session, FlowFile flowFile, MessageHandler messageHandler, FlowFileReaderCallback flowFileReaderCallback) {
+ final StopWatch stopWatch = new StopWatch(true);
+ final AtomicInteger processedRecords = new AtomicInteger();
+
+ final String publishFailedIndexAttributeName = identifier + ATTR_READ_FAILED_INDEX_SUFFIX;
+
+ try {
+ final Long previousProcessFailedAt = ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
+
+ session.read(flowFile, in -> recordSupplier.process(flowFile, in, processedRecords, previousProcessFailedAt, logger, messageHandler));
+
+ FlowFile successFlowFile = flowFile;
+
+ final boolean isRecover = previousProcessFailedAt != null;
+ if (isRecover) {
+ successFlowFile = session.removeAttribute(flowFile, publishFailedIndexAttributeName);
+ }
+
+ flowFileReaderCallback.onSuccess(successFlowFile, processedRecords.get(), isRecover, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ logger.error("An error happened while processing records. Routing to failure.", e);
+
+ final FlowFile failedFlowFile = session.putAttribute(flowFile, publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
+
+ flowFileReaderCallback.onFailure(failedFlowFile, processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), e);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/ProvenanceEventTemplates.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/ProvenanceEventTemplates.java
new file mode 100644
index 0000000000..d02dcb9198
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/ProvenanceEventTemplates.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader.record;
+
+public class ProvenanceEventTemplates {
+
+ public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
+ public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
+ public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
+
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/RecordSupplier.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/RecordSupplier.java
new file mode 100644
index 0000000000..95172c5d98
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/reader/record/RecordSupplier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.reader.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.processors.ioconcept.reader.MessageHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RecordSupplier {
+
+ private final RecordReaderFactory readerFactory;
+ private final RecordSetWriterFactory writerFactory;
+
+ public RecordSupplier(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory) {
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
+ }
+
+ public void process(FlowFile flowfile, InputStream in, AtomicInteger processedRecords, Long processFromIndex, ComponentLog logger, MessageHandler messageHandler) throws IOException {
+
+ try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
+ final RecordSet recordSet = reader.createRecordSet();
+
+ final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+ Record record;
+ while ((record = recordSet.next()) != null) {
+ if (processFromIndex != null && processedRecords.get() < processFromIndex) {
+ processedRecords.getAndIncrement();
+ continue;
+ }
+
+ baos.reset();
+
+ try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
+ writer.write(record);
+ writer.flush();
+ }
+
+ final byte[] messageContent = baos.toByteArray();
+
+ messageHandler.handle(messageContent);
+
+ processedRecords.getAndIncrement();
+ }
+ } catch (SchemaNotFoundException | MalformedRecordException e) {
+ throw new ProcessException("An error happened during creating components for serialization.", e);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/AttributeSource.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/AttributeSource.java
new file mode 100644
index 0000000000..390a8aa25b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/AttributeSource.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import java.util.Map;
+
+public interface AttributeSource {
+ Map getAttributes(T message);
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriter.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriter.java
new file mode 100644
index 0000000000..85efcc1739
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+public interface FlowFileWriter {
+ void write(ProcessSession session, List messages, FlowFileWriterCallback flowFileWriterCallback);
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriterCallback.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriterCallback.java
new file mode 100644
index 0000000000..7a8c464351
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/FlowFileWriterCallback.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.List;
+
+public interface FlowFileWriterCallback {
+ void onSuccess(FlowFile flowFile, List processedMessages, List failedMessages);
+ void onParseFailure(FlowFile flowFile, T message, Exception e);
+ void onFailure(FlowFile flowFile, List processedMessages, List failedMessages, Exception e);
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/Marshaller.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/Marshaller.java
new file mode 100644
index 0000000000..40c3342b06
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/Marshaller.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer;
+
+public interface Marshaller {
+ byte[] marshall(T message);
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/OutputStrategy.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/OutputStrategy.java
new file mode 100644
index 0000000000..6c8f6c8cc2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/OutputStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer.record;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration of supported Output Strategies
+ */
+public enum OutputStrategy implements DescribedValue {
+ USE_VALUE("USE_VALUE", "Use Content as Value", "Write only the message to the FlowFile record."),
+
+ USE_WRAPPER("USE_WRAPPER", "Use Wrapper", "Write the additional attributes into the FlowFile record on a separate leaf. (See processor usage for more information.)"),
+
+ USE_APPENDER("USE_APPENDER", "Use Appender", "Write the additional attributes into the FlowFile record prefixed with \"_\". (See processor usage for more information.)");
+
+ private final String value;
+
+ private final String displayName;
+
+ private final String description;
+
+ OutputStrategy(final String value, final String displayName, final String description) {
+ this.value = value;
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordUtils.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordUtils.java
new file mode 100644
index 0000000000..114d8b9c03
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer.record;
+
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordUtils {
+
+ public static Record append(final Record originalRecord, final Map decoratorValues, final String decoratorPrefix) {
+ final List originalFields = originalRecord.getSchema().getFields();
+
+ final List mergedFields = new ArrayList<>(originalFields);
+ decoratorValues.forEach((key, value) -> mergedFields.add(new RecordField(decoratorPrefix + key, RecordFieldType.STRING.getDataType())));
+
+ final RecordSchema mergedSchema = new SimpleRecordSchema(mergedFields);
+
+ final Map recordValues = new HashMap<>();
+ originalFields.stream().map(RecordField::getFieldName).forEach(fieldName -> recordValues.put(fieldName, originalRecord.getValue(fieldName)));
+ decoratorValues.forEach((key, value) -> recordValues.put(decoratorPrefix + key, value));
+
+ return new MapRecord(mergedSchema, recordValues);
+ }
+
+ public static MapRecord wrap(final Record originalRecord, final String originalRecordKey, final Map decoratorValues, final String decoratorKey)
+ throws IOException, MalformedRecordException {
+
+ // create schema
+ final Tuple originalRecordLeaf = wrapStandardRecord(originalRecord, originalRecordKey);
+ final Tuple decoratorLeaf = wrapDecoratorValues(decoratorValues, decoratorKey);
+ final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(originalRecordLeaf.getKey(), decoratorLeaf.getKey()));
+
+ // assign values
+ final Map recordValues = new HashMap<>();
+ recordValues.put(originalRecordLeaf.getKey().getFieldName(), originalRecordLeaf.getValue());
+ recordValues.put(decoratorLeaf.getKey().getFieldName(), decoratorLeaf.getValue());
+ return new MapRecord(rootRecordSchema, recordValues);
+ }
+
+ private static Tuple wrapStandardRecord(final Record record, final String recordKey) {
+ final RecordSchema recordSchema = (record == null) ? null : record.getSchema();
+ final RecordField recordField = new RecordField(recordKey, RecordFieldType.RECORD.getRecordDataType(recordSchema));
+ return new Tuple<>(recordField, record);
+ }
+
+ private static Tuple wrapDecoratorValues(final Map decoratorValues, final String decoratorKey) {
+ final RecordField recordField = new RecordField(decoratorKey, RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+ return new Tuple<>(recordField, decoratorValues);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordWriter.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordWriter.java
new file mode 100644
index 0000000000..0bea02da8d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ioconcept/writer/record/RecordWriter.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.ioconcept.writer.record;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.jms.processors.ioconcept.writer.AttributeSource;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter;
+import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
+import org.apache.nifi.jms.processors.ioconcept.writer.Marshaller;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_APPENDER;
+import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_VALUE;
+import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_WRAPPER;
+
+public class RecordWriter implements FlowFileWriter {
+
+ private final static String RECORD_COUNT_KEY = "record.count";
+
+ private final RecordReaderFactory readerFactory;
+ private final RecordSetWriterFactory writerFactory;
+ private final Marshaller marshaller;
+ private final AttributeSource attributeSource;
+ private final OutputStrategy outputStrategy;
+ private final ComponentLog logger;
+
+ public RecordWriter(RecordReaderFactory readerFactory,
+ RecordSetWriterFactory writerFactory,
+ Marshaller marshaller,
+ AttributeSource attributeSource,
+ OutputStrategy outputStrategy,
+ ComponentLog logger) {
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
+ this.marshaller = marshaller;
+ this.attributeSource = attributeSource;
+ this.outputStrategy = outputStrategy;
+ this.logger = logger;
+ }
+
+ @Override
+ public void write(ProcessSession session, List messages, FlowFileWriterCallback flowFileWriterCallback) {
+ FlowFile flowFile = session.create();
+
+ final Map attributes = new HashMap<>();
+ final AtomicInteger recordCount = new AtomicInteger();
+
+ final List processedMessages = new ArrayList<>();
+ final List failedMessages = new ArrayList<>();
+
+ RecordSetWriter writer = null;
+ boolean isWriterInitialized = false;
+
+ try {
+ for (T message : messages) {
+ if (message == null) {
+ break;
+ }
+
+ final byte[] recordBytes = marshaller.marshall(message);
+ try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+ final RecordReader reader;
+
+ // parse incoming message which may contain multiple messages
+ try {
+ reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
+ } catch (final IOException ioe) {
+ logger.error("Failed to parse message due to comms failure. Will roll back session and try again momentarily.");
+ flowFileWriterCallback.onFailure(flowFile, processedMessages, failedMessages, ioe);
+ closeWriter(writer);
+ return;
+ } catch (final Exception e) {
+ logger.error("Failed to parse message, sending to the parse failure relationship", e);
+ failedMessages.add(message);
+ flowFileWriterCallback.onParseFailure(flowFile, message, e);
+ continue;
+ }
+
+ // write messages as records into FlowFile
+ try {
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+
+ if (attributeSource != null && !outputStrategy.equals(USE_VALUE)) {
+ final Map additionalAttributes = attributeSource.getAttributes(message);
+ if (outputStrategy.equals(USE_APPENDER)) {
+ record = RecordUtils.append(record, additionalAttributes, "_");
+ } else if (outputStrategy.equals(USE_WRAPPER)){
+ record = RecordUtils.wrap(record, "value", additionalAttributes, "_");
+ }
+ }
+
+ if (!isWriterInitialized) {
+ final RecordSchema recordSchema = record.getSchema();
+ final OutputStream rawOut = session.write(flowFile);
+
+ RecordSchema writeSchema;
+ try {
+ writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
+ failedMessages.add(message);
+ flowFileWriterCallback.onParseFailure(flowFile, message, e);
+ continue;
+ }
+
+ writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+ writer.beginRecordSet();
+ }
+
+ try {
+ writer.write(record);
+ isWriterInitialized = true;
+ processedMessages.add(message);
+ } catch (final RuntimeException re) {
+ logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
+ failedMessages.add(message);
+ flowFileWriterCallback.onParseFailure(flowFile, message, re);
+ }
+ }
+ } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+ logger.error("Failed to write message, sending to the parse failure relationship", e);
+ failedMessages.add(message);
+ flowFileWriterCallback.onParseFailure(flowFile, message, e);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to write message, sending to the parse failure relationship", e);
+ failedMessages.add(message);
+ flowFileWriterCallback.onParseFailure(flowFile, message, e);
+ }
+ }
+
+ if (writer != null) {
+ final WriteResult writeResult = writer.finishRecordSet();
+ attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ recordCount.set(writeResult.getRecordCount());
+ }
+
+ } catch (final Exception e) {
+ flowFileWriterCallback.onFailure(flowFile, processedMessages, failedMessages, e);
+ } finally {
+ closeWriter(writer);
+ }
+
+ if (recordCount.get() == 0) {
+ session.remove(flowFile);
+ return;
+ }
+
+ session.putAllAttributes(flowFile, attributes);
+
+ final int count = recordCount.get();
+ logger.info("Successfully processed {} records for {}", count, flowFile);
+
+ flowFileWriterCallback.onSuccess(flowFile, processedMessages, failedMessages);
+ }
+
+ private void closeWriter(final RecordSetWriter writer) {
+ try {
+ if (writer != null) {
+ writer.close();
+ }
+ } catch (final Exception ioe) {
+ logger.warn("Failed to close Record Writer", ioe);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index e61d1a38cc..de3e97761a 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.jms.processors;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@@ -27,6 +31,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -46,6 +51,7 @@ import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -64,7 +70,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static java.util.Arrays.asList;
import static org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy;
+import static org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetWriterService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -77,6 +86,8 @@ import static org.mockito.Mockito.when;
public class ConsumeJMSIT {
+ private static final String JMS_DESTINATION_ATTRIBUTE_NAME = "jms_destination";
+
@Test
public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
final String destinationName = "cooQueue";
@@ -478,6 +489,194 @@ public class ConsumeJMSIT {
}
}
+ @Test
+ public void testConsumeRecords() throws InitializationException {
+ String destination = "testConsumeRecords";
+ ArrayNode expectedRecordSet = createTestJsonInput();
+
+ JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
+ try {
+ jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(0).toString()));
+ jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(1).toString()));
+ jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(2).toString()));
+
+ TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
+ testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+
+ testRunner.run(1, false);
+
+ List successFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+ assertEquals(1, successFlowFiles.size());
+ assertEquals(expectedRecordSet.toString(), new String(successFlowFiles.get(0).toByteArray()));
+
+ List parseFailedFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
+ assertEquals(0, parseFailedFlowFiles.size());
+ } finally {
+ ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
+ }
+ }
+
+ @Test
+ public void testConsumeMalformedRecords() throws InitializationException {
+ String destination = "testConsumeRecords";
+ ArrayNode expectedRecordSet = createTestJsonInput();
+ String expectedParseFailedContent1 = "this is not a json";
+ String expectedParseFailedContent2 = "this is still not a json";
+
+ JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
+ try {
+ jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(0).toString()));
+ jmsTemplate.send(destination, session -> session.createTextMessage(expectedParseFailedContent1));
+ jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(1).toString()));
+ jmsTemplate.send(destination, session -> session.createTextMessage(expectedParseFailedContent2));
+ jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(2).toString()));
+
+ TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
+ testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setRelationshipAvailable(ConsumeJMS.REL_PARSE_FAILURE);
+
+ testRunner.run(1, false);
+
+ // checking whether the processor was able to construct a valid recordSet from the properly formatted messages
+ List successFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+ assertEquals(1, successFlowFiles.size());
+ assertEquals(expectedRecordSet.toString(), new String(successFlowFiles.get(0).toByteArray()));
+
+ // and checking whether it creates separate FlowFiles for the malformed messages
+ List parseFailedFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
+ assertEquals(2, parseFailedFlowFiles.size());
+ assertEquals(expectedParseFailedContent1, new String(parseFailedFlowFiles.get(0).toByteArray()));
+ assertEquals(expectedParseFailedContent2, new String(parseFailedFlowFiles.get(1).toByteArray()));
+ } finally {
+ ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
+ }
+ }
+
+ @Test
+ public void testConsumeRecordsWithAppenderOutputStrategy() throws InitializationException, JsonProcessingException {
+ String destination = "testConsumeRecordsWithAppenderOutputStrategy";
+ ArrayNode inputRecordSet = createTestJsonInput();
+
+ JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
+ try {
+ jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(0).toString()));
+ jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(1).toString()));
+ jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(2).toString()));
+
+ TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
+ testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, OutputStrategy.USE_APPENDER.getValue());
+
+ testRunner.run(1, false);
+
+ List successFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+ assertEquals(1, successFlowFiles.size());
+ JsonNode flowFileContentAsJson = deserializeToJsonNode(new String(successFlowFiles.get(0).toByteArray()));
+ // checking that the output contains at least a part of the original input
+ assertEquals(inputRecordSet.get(0).get("firstAttribute").asText(), flowFileContentAsJson.get(0).get("firstAttribute").asText());
+ assertEquals(inputRecordSet.get(1).get("firstAttribute").asText(), flowFileContentAsJson.get(1).get("firstAttribute").asText());
+ assertEquals(inputRecordSet.get(2).get("firstAttribute").asText(), flowFileContentAsJson.get(2).get("firstAttribute").asText());
+ // checking jms_destination attribute exists with the given value
+ // this attribute has been chosen because it is deterministic; others vary based on host, time, etc.
+ // not nice, but stubbing all attributes would be uglier with the current code structure
+ assertEquals(destination, flowFileContentAsJson.get(0).get("_" + JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+ assertEquals(destination, flowFileContentAsJson.get(1).get("_" + JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+ assertEquals(destination, flowFileContentAsJson.get(2).get("_" + JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+
+ List parseFailedFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
+ assertEquals(0, parseFailedFlowFiles.size());
+ } finally {
+ ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
+ }
+ }
+
+ @Test
+ public void testConsumeRecordsWithWrapperOutputStrategy() throws InitializationException, JsonProcessingException {
+ String destination = "testConsumeRecordsWithWrapperOutputStrategy";
+ String valueKey = "value";
+ String attributeKey = "_";
+ ArrayNode inputRecordSet = createTestJsonInput();
+
+ JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
+ try {
+ jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(0).toString()));
+ jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(1).toString()));
+ jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(2).toString()));
+
+ TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
+ testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, OutputStrategy.USE_WRAPPER.getValue());
+
+ testRunner.run(1, false);
+
+ List successFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+ assertEquals(1, successFlowFiles.size());
+ JsonNode flowFileContentAsJson = deserializeToJsonNode(new String(successFlowFiles.get(0).toByteArray()));
+ // checking that the original json is equal to the leaf
+ assertEquals(inputRecordSet.get(0), flowFileContentAsJson.get(0).get(valueKey));
+ assertEquals(inputRecordSet.get(1), flowFileContentAsJson.get(1).get(valueKey));
+ assertEquals(inputRecordSet.get(2), flowFileContentAsJson.get(2).get(valueKey));
+ // checking that the attribute leaf contains at least the jms_destination attribute
+ // this attribute has been chosen because it is deterministic; others vary based on host, time, etc.
+ // not nice, but stubbing all attributes would be uglier with the current code structure
+ assertEquals(destination, flowFileContentAsJson.get(0).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+ assertEquals(destination, flowFileContentAsJson.get(1).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+ assertEquals(destination, flowFileContentAsJson.get(2).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
+
+ List parseFailedFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
+ assertEquals(0, parseFailedFlowFiles.size());
+ } finally {
+ ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
+ }
+ }
+
+ private static ArrayNode createTestJsonInput() {
+ final ObjectMapper mapper = new ObjectMapper();
+
+ return mapper.createArrayNode().addAll(asList(
+ mapper.createObjectNode()
+ .put("recordId", 1)
+ .put("firstAttribute", "foo")
+ .put("secondAttribute", false),
+ mapper.createObjectNode()
+ .put("recordId", 2)
+ .put("firstAttribute", "bar")
+ .put("secondAttribute", true),
+ mapper.createObjectNode()
+ .put("recordId", 3)
+ .put("firstAttribute", "foobar")
+ .put("secondAttribute", false)
+ ));
+ }
+
+ private JsonNode deserializeToJsonNode(String rawJson) throws JsonProcessingException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readTree(rawJson);
+ }
+
+ private TestRunner initializeTestRunner(ConnectionFactory connectionFactory, String destinationName) throws InitializationException {
+ return initializeTestRunner(new ConsumeJMS(), connectionFactory, destinationName);
+ }
+
+ private TestRunner initializeTestRunner(ConsumeJMS processor, ConnectionFactory connectionFactory, String destinationName) throws InitializationException {
+ TestRunner runner = TestRunners.newTestRunner(processor);
+ JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(connectionFactory);
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
+ runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
+
+ return runner;
+ }
+
private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException {
// Publish a message.
try (Connection conn = cf.createConnection();
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index b681bc09e8..e78734ac15 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -16,21 +16,17 @@
*/
package org.apache.nifi.jms.processors;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@@ -41,19 +37,22 @@ import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
-import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
-import org.apache.nifi.logging.ComponentLog;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.support.JmsHeaders;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
public class JMSPublisherConsumerIT {
@@ -69,7 +68,7 @@ public class JMSPublisherConsumerIT {
return message;
};
- ConsumerCallback responseChecker = response -> {
+ Consumer responseChecker = response -> {
assertEquals(
"stringAsObject",
SerializationUtils.deserialize(response.getMessageBody())
@@ -123,7 +122,7 @@ public class JMSPublisherConsumerIT {
expected = byteArrayOutputStream.toByteArray();
}
- ConsumerCallback responseChecker = response -> {
+ Consumer responseChecker = response -> {
byte[] actual = response.getMessageBody();
assertArrayEquals(
@@ -175,7 +174,7 @@ public class JMSPublisherConsumerIT {
}
private void testMapMessage(String destinationName, MessageCreator messageCreator, String expectedJson) {
- ConsumerCallback responseChecker = response -> {
+ Consumer responseChecker = response -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
@@ -191,7 +190,7 @@ public class JMSPublisherConsumerIT {
testMessage(destinationName, messageCreator, responseChecker);
}
- private void testMessage(String destinationName, MessageCreator messageCreator, ConsumerCallback responseChecker) {
+ private void testMessage(String destinationName, MessageCreator messageCreator, Consumer responseChecker) {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
AtomicBoolean callbackInvoked = new AtomicBoolean();
@@ -200,7 +199,7 @@ public class JMSPublisherConsumerIT {
jmsTemplate.send(destinationName, messageCreator);
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
- consumer.consume(destinationName, null, false, false, null, null, "UTF-8", response -> {
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
callbackInvoked.set(true);
responseChecker.accept(response);
});
@@ -282,11 +281,8 @@ public class JMSPublisherConsumerIT {
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
- consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
- @Override
- public void accept(JMSResponse response) {
- // noop
- }
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
+ // noop
});
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
@@ -312,15 +308,12 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
- consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
- @Override
- public void accept(JMSResponse response) {
- callbackInvoked.set(true);
- assertEquals("hello from the other side", new String(response.getMessageBody()));
- assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
- assertEquals("foo", response.getMessageProperties().get("foo"));
- assertEquals("false", response.getMessageProperties().get("bar"));
- }
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
+ callbackInvoked.set(true);
+ assertEquals("hello from the other side", new String(response.getMessageBody()));
+ assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
+ assertEquals("foo", response.getMessageProperties().get("foo"));
+ assertEquals("false", response.getMessageProperties().get("bar"));
});
assertTrue(callbackInvoked.get());
@@ -348,13 +341,6 @@ public class JMSPublisherConsumerIT {
final AtomicInteger msgCount = new AtomicInteger(0);
- final ConsumerCallback callback = new ConsumerCallback() {
- @Override
- public void accept(JMSResponse response) {
- msgCount.incrementAndGet();
- }
- };
-
final Thread[] threads = new Thread[4];
for (int i = 0; i < 4; i++) {
final Thread t = new Thread(() -> {
@@ -364,7 +350,8 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < messagesPerThreadCount && msgCount.get() < totalMessageCount; j++) {
- consumer.consume(destinationName, null, false, false, null, null, "UTF-8", callback);
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8",
+ response -> msgCount.incrementAndGet());
}
} finally {
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@@ -404,13 +391,10 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
- consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
- @Override
- public void accept(JMSResponse response) {
- callbackInvoked.set(true);
- assertEquals("1", new String(response.getMessageBody()));
- throw new RuntimeException("intentional to avoid explicit ack");
- }
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
+ callbackInvoked.set(true);
+ assertEquals("1", new String(response.getMessageBody()));
+ throw new RuntimeException("intentional to avoid explicit ack");
});
} catch (Exception e) {
// expected
@@ -421,17 +405,14 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
while (!callbackInvoked.get()) {
- consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
- @Override
- public void accept(JMSResponse response) {
- if (response == null) {
- return;
- }
-
- callbackInvoked.set(true);
- assertEquals("1", new String(response.getMessageBody()));
- acknowledge(response);
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
+ if (response == null) {
+ return;
}
+
+ callbackInvoked.set(true);
+ assertEquals("1", new String(response.getMessageBody()));
+ acknowledge(response);
});
}
@@ -441,17 +422,14 @@ public class JMSPublisherConsumerIT {
// receiving next message and fail again
try {
while (!callbackInvoked.get()) {
- consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
- @Override
- public void accept(JMSResponse response) {
- if (response == null) {
- return;
- }
-
- callbackInvoked.set(true);
- assertEquals("2", new String(response.getMessageBody()));
- throw new RuntimeException("intentional to avoid explicit ack");
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
+ if (response == null) {
+ return;
}
+
+ callbackInvoked.set(true);
+ assertEquals("2", new String(response.getMessageBody()));
+ throw new RuntimeException("intentional to avoid explicit ack");
});
}
} catch (Exception e) {
@@ -463,17 +441,14 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
try {
while (!callbackInvoked.get()) {
- consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() {
- @Override
- public void accept(JMSResponse response) {
- if (response == null) {
- return;
- }
-
- callbackInvoked.set(true);
- assertEquals("2", new String(response.getMessageBody()));
- acknowledge(response);
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
+ if (response == null) {
+ return;
}
+
+ callbackInvoked.set(true);
+ assertEquals("2", new String(response.getMessageBody()));
+ acknowledge(response);
});
}
} catch (Exception e) {
@@ -514,12 +489,9 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
AtomicBoolean callbackInvoked = new AtomicBoolean();
- consumer.consume(destinationName, null, false, false, null, messageSelector, "UTF-8", new ConsumerCallback() {
- @Override
- public void accept(JMSResponse response) {
- callbackInvoked.set(true);
- assertEquals("msg1", new String(response.getMessageBody()));
- }
+ consumer.consumeSingleMessage(destinationName, null, false, false, null, messageSelector, "UTF-8", response -> {
+ callbackInvoked.set(true);
+ assertEquals("msg1", new String(response.getMessageBody()));
});
assertTrue(callbackInvoked.get());
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index 802f4956b2..43c51d6916 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.jms.processors;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@@ -28,12 +30,17 @@ import org.apache.nifi.jms.processors.helpers.ConnectionFactoryInvocationHandler
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.mockito.Mockito;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
@@ -48,21 +55,42 @@ import java.lang.reflect.Proxy;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static java.util.Arrays.asList;
+import static org.apache.nifi.jms.processors.PublishJMS.REL_FAILURE;
+import static org.apache.nifi.jms.processors.PublishJMS.REL_SUCCESS;
import static org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy;
+import static org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetWriterService;
+import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.jms.processors.ioconcept.reader.StateTrackingFlowFileReader.ATTR_READ_FAILED_INDEX_SUFFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PublishJMSIT {
+ TestRunner testRunner;
+
+ @AfterEach
+ public void cleanup() {
+ if (testRunner != null) {
+ testRunner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
+ testRunner = null;
+ }
+ }
+
@Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
@@ -91,7 +119,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it.
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
@@ -134,7 +162,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it.
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
@@ -169,8 +197,8 @@ public class PublishJMSIT {
runner.run();
Thread.sleep(200);
- assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty());
- assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0));
+ assertTrue(runner.getFlowFilesForRelationship(REL_SUCCESS).isEmpty());
+ assertNotNull(runner.getFlowFilesForRelationship(REL_FAILURE).get(0));
}
@Test
@@ -198,7 +226,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
@@ -256,7 +284,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
@@ -316,7 +344,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);
- final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
@@ -542,4 +570,210 @@ public class PublishJMSIT {
assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled(), "In case of an exception, the processor should be yielded.");
}
+
+ @Test
+ public void testPublishRecords() throws InitializationException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ String destination = "testPublishRecords";
+ testRunner = initializeTestRunner(cf, destination);
+ testRunner.setProperty(PublishJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.assertValid();
+
+ final ArrayNode testInput = createTestJsonInput();
+
+ testRunner.enqueue(testInput.toString().getBytes());
+
+ testRunner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(cf, destination, testInput.get(0).toString());
+ verifyPublishedMessage(cf, destination, testInput.get(1).toString());
+ verifyPublishedMessage(cf, destination, testInput.get(2).toString());
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile successfulFlowFile = flowFiles.get(0);
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
+ assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
+ }
+
+ @Test
+ public void testPublishRecordsFailed() throws InitializationException {
+ PublishJMS processor = new PublishJMS() {
+ @Override
+ protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
+ JMSPublisher spiedPublisher = Mockito.spy(publisher);
+ Mockito.doCallRealMethod()
+ .doThrow(new RuntimeException("Second publish failed."))
+ .when(spiedPublisher).publish(any(), any(byte[].class), any());
+ super.rendezvousWithJms(context, processSession, spiedPublisher);
+ }
+ };
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ String destination = "testPublishRecords";
+ testRunner = initializeTestRunner(processor, cf, destination);
+ testRunner.setProperty(PublishJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.assertValid();
+
+ final ArrayNode testInput = createTestJsonInput();
+
+ testRunner.enqueue(testInput.toString().getBytes());
+
+ testRunner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
+
+ testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 1));
+
+ verifyPublishedMessage(cf, destination, testInput.get(0).toString());
+
+ List flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile failedFlowFile = flowFiles.get(0);
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
+ assertEquals("1", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+ }
+
+ @Test
+ public void testContinuePublishRecordsAndFailAgainWhenPreviousPublishFailed() throws InitializationException {
+ PublishJMS processor = new PublishJMS() {
+ @Override
+ protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
+ JMSPublisher spiedPublisher = Mockito.spy(publisher);
+ Mockito.doCallRealMethod()
+ .doThrow(new RuntimeException("Second publish failed."))
+ .when(spiedPublisher).publish(any(), any(byte[].class), any());
+ super.rendezvousWithJms(context, processSession, spiedPublisher);
+ }
+ };
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ String destination = "testPublishRecords";
+ testRunner = initializeTestRunner(processor, cf, destination);
+ testRunner.setProperty(PublishJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.assertValid();
+
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
+ final ArrayNode testInput = createTestJsonInput();
+
+ final Map attributes = new HashMap<>();
+ attributes.put(publishFailedIndexAttributeName, "1");
+ testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+ testRunner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
+
+ testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 2));
+
+ verifyPublishedMessage(cf, destination, testInput.get(1).toString());
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile failedFlowFile = flowFiles.get(0);
+ assertEquals("2", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+ }
+
+ @Test
+ public void testContinuePublishRecordsSuccessfullyWhenPreviousPublishFailed() throws InitializationException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ String destination = "testPublishRecords";
+ testRunner = initializeTestRunner(cf, destination);
+ testRunner.setProperty(PublishJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(PublishJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.assertValid();
+
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
+ final ArrayNode testInput = createTestJsonInput();
+
+ final Map attributes = new HashMap<>();
+ attributes.put(publishFailedIndexAttributeName, "1");
+ testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+ testRunner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, 3));
+
+ verifyPublishedMessage(cf, destination, testInput.get(1).toString());
+ verifyPublishedMessage(cf, destination, testInput.get(2).toString());
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile successfulFlowFile = flowFiles.get(0);
+ assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
+ publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
+ }
+
+ private TestRunner initializeTestRunner(ConnectionFactory connectionFactory, String destinationName) throws InitializationException {
+ PublishJMS processor = new PublishJMS();
+ return initializeTestRunner(processor, connectionFactory, destinationName);
+ }
+
+ private TestRunner initializeTestRunner(PublishJMS processor, ConnectionFactory connectionFactory, String destinationName) throws InitializationException {
+ TestRunner runner = TestRunners.newTestRunner(processor);
+ JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(connectionFactory);
+
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(PublishJMS.DESTINATION, destinationName);
+
+ return runner;
+ }
+
+ private void verifyPublishedMessage(ConnectionFactory connectionFactory, String destinationName, String content) {
+ JmsTemplate jmst = new JmsTemplate(connectionFactory);
+ BytesMessage message = (BytesMessage) jmst.receive(destinationName);
+
+ byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
+ assertEquals(content, new String(messageBytes));
+ }
+
+ private ProvenanceEventRecord assertProvenanceEvent() {
+ final List provenanceEvents = testRunner.getProvenanceEvents();
+ assertNotNull(provenanceEvents);
+ assertEquals(1, provenanceEvents.size());
+
+ final ProvenanceEventRecord event = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.SEND, event.getEventType());
+
+ return event;
+ }
+
+ private void assertProvenanceEvent(String expectedDetails) {
+ final ProvenanceEventRecord event = assertProvenanceEvent();
+ assertEquals(expectedDetails, event.getDetails());
+ }
+
+ private static ArrayNode createTestJsonInput() {
+ final ObjectMapper mapper = new ObjectMapper();
+
+ return mapper.createArrayNode().addAll(asList(
+ mapper.createObjectNode()
+ .put("recordId", 1)
+ .put("firstAttribute", "foo")
+ .put("secondAttribute", false),
+ mapper.createObjectNode()
+ .put("recordId", 2)
+ .put("firstAttribute", "bar")
+ .put("secondAttribute", true),
+ mapper.createObjectNode()
+ .put("recordId", 3)
+ .put("firstAttribute", "foobar")
+ .put("secondAttribute", false)
+ ));
+ }
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/JMSTestUtil.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/JMSTestUtil.java
new file mode 100644
index 0000000000..d6c95cbae4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/JMSTestUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.TestRunner;
+
+public class JMSTestUtil {
+
+ public static String createJsonRecordSetReaderService(TestRunner testRunner) throws InitializationException {
+ final String id = "record-reader";
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ testRunner.addControllerService(id, jsonReader);
+ testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+ testRunner.enableControllerService(jsonReader);
+ return id;
+ }
+
+ public static String createJsonRecordSetWriterService(TestRunner testRunner) throws InitializationException {
+ final String id = "record-writer";
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ testRunner.addControllerService(id, jsonWriter);
+ testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+ testRunner.enableControllerService(jsonWriter);
+ return id;
+ }
+}