NIFI-11137 Add record support to Consume/PublishJMS (#6987)

This commit is contained in:
Nandor Soma Abonyi 2023-04-03 21:42:27 +02:00 committed by GitHub
parent 3fa25b6f11
commit 32df0fa484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1679 additions and 239 deletions

View File

@ -58,6 +58,14 @@
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId> <artifactId>activemq-client</artifactId>
@ -83,6 +91,20 @@
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -37,6 +37,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
@ -151,6 +153,20 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
static final PropertyDescriptor BASE_RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.identifiesControllerService(RecordReaderFactory.class)
.required(false)
.build();
static final PropertyDescriptor BASE_RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.identifiesControllerService(RecordSetWriterFactory.class)
.dependsOn(BASE_RECORD_READER)
.required(true)
.build();
private volatile IJMSConnectionFactoryProvider connectionFactoryProvider; private volatile IJMSConnectionFactoryProvider connectionFactoryProvider;
private volatile BlockingQueue<T> workerPool; private volatile BlockingQueue<T> workerPool;

View File

@ -32,13 +32,18 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse; import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter;
import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
import org.apache.nifi.jms.processors.ioconcept.writer.record.RecordWriter;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
@ -48,6 +53,7 @@ import javax.jms.Session;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -55,6 +61,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/** /**
* Consuming JMS processor which upon each invocation of * Consuming JMS processor which upon each invocation of
@ -88,19 +95,24 @@ import java.util.concurrent.TimeUnit;
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
public static final String JMS_MESSAGETYPE = "jms.messagetype"; public static final String JMS_MESSAGETYPE = "jms.messagetype";
private final static String COUNTER_PARSE_FAILURES = "Parse Failures";
private final static String COUNTER_RECORDS_RECEIVED = "Records Received";
private final static String COUNTER_RECORDS_PROCESSED = "Records Processed";
static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE), static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE),
"AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + ")", "AUTO_ACKNOWLEDGE (" + Session.AUTO_ACKNOWLEDGE + ")",
"Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. " "Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. "
+ "Can result in data loss in the event where NiFi abruptly stopped before session was commited."); + "Can result in data loss in the event where NiFi abruptly stopped before session was commited.");
static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(Session.CLIENT_ACKNOWLEDGE), static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(Session.CLIENT_ACKNOWLEDGE),
"CLIENT_ACKNOWLEDGE (" + String.valueOf(Session.CLIENT_ACKNOWLEDGE) + ")", "CLIENT_ACKNOWLEDGE (" + Session.CLIENT_ACKNOWLEDGE + ")",
"(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss"); "(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss");
static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(Session.DUPS_OK_ACKNOWLEDGE), static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
"DUPS_OK_ACKNOWLEDGE (" + String.valueOf(Session.DUPS_OK_ACKNOWLEDGE) + ")", "DUPS_OK_ACKNOWLEDGE (" + Session.DUPS_OK_ACKNOWLEDGE + ")",
"This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data " "This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data "
+ "duplication and data loss while achieving the best throughput."); + "duplication and data loss while achieving the best throughput.");
@ -170,11 +182,38 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build(); .build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_RECORD_READER)
.description("The Record Reader to use for parsing received JMS Messages into Records.")
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_RECORD_WRITER)
.description("The Record Writer to use for serializing Records before writing them to a FlowFile.")
.build();
static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder()
.name("output-strategy")
.displayName("Output Strategy")
.description("The format used to output the JMS message into a FlowFile record.")
.dependsOn(RECORD_READER)
.required(true)
.defaultValue(OutputStrategy.USE_VALUE.getValue())
.allowableValues(OutputStrategy.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("All FlowFiles that are received from the JMS Destination are routed to this relationship") .description("All FlowFiles that are received from the JMS Destination are routed to this relationship")
.build(); .build();
public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
.name("parse.failure")
.description("If a message cannot be parsed using the configured Record Reader, the contents of the "
+ "message will be routed to this Relationship as its own individual FlowFile.")
.autoTerminateDefault(true) // to make sure flow are still valid after upgrades
.build();
private final static Set<Relationship> relationships; private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors; private final static List<PropertyDescriptor> propertyDescriptors;
@ -205,6 +244,10 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
_propertyDescriptors.add(TIMEOUT); _propertyDescriptors.add(TIMEOUT);
_propertyDescriptors.add(ERROR_QUEUE); _propertyDescriptors.add(ERROR_QUEUE);
_propertyDescriptors.add(RECORD_READER);
_propertyDescriptors.add(RECORD_WRITER);
_propertyDescriptors.add(OUTPUT_STRATEGY);
_propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES); _propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
_propertyDescriptors.addAll(JMS_CF_PROPERTIES); _propertyDescriptors.addAll(JMS_CF_PROPERTIES);
@ -212,6 +255,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
Set<Relationship> _relationships = new HashSet<>(); Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS); _relationships.add(REL_SUCCESS);
_relationships.add(REL_PARSE_FAILURE);
relationships = Collections.unmodifiableSet(_relationships); relationships = Collections.unmodifiableSet(_relationships);
} }
@ -268,35 +312,11 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
try { try {
consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, new ConsumerCallback() { if (context.getProperty(RECORD_READER).isSet()) {
@Override processMessageSet(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
public void accept(final JMSResponse response) { } else {
if (response == null) { processSingleMessage(processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
return;
} }
try {
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
final Map<String, String> jmsHeaders = response.getMessageHeaders();
final Map<String, String> jmsProperties = response.getMessageProperties();
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commitAsync(() -> acknowledge(response), throwable -> response.reject());
} catch (final Throwable t) {
response.reject();
throw t;
}
}
});
} catch(Exception e) { } catch(Exception e) {
getLogger().error("Error while trying to process JMS message", e); getLogger().error("Error while trying to process JMS message", e);
consumer.setValid(false); consumer.setValid(false);
@ -305,6 +325,92 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
} }
} }
private void processSingleMessage(ProcessSession processSession, JMSConsumer consumer, String destinationName, String errorQueueName,
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
consumer.consumeSingleMessage(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, response -> {
if (response == null) {
return;
}
try {
final FlowFile flowFile = createFlowFileFromMessage(processSession, destinationName, response);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commitAsync(
() -> withLog(() -> acknowledge(response)),
__ -> withLog(() -> response.reject()));
} catch (final Throwable t) {
response.reject();
throw t;
}
});
}
private FlowFile createFlowFileFromMessage(ProcessSession processSession, String destinationName, JMSResponse response) {
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
final Map<String, String> jmsHeaders = response.getMessageHeaders();
final Map<String, String> jmsProperties = response.getMessageProperties();
flowFile = updateFlowFileAttributesWithJMSAttributes(mergeJmsAttributes(jmsHeaders, jmsProperties), flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
flowFile = processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
return flowFile;
}
private void processMessageSet(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName,String errorQueueName,
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
final FlowFileWriter<JMSResponse> flowFileWriter = new RecordWriter<>(
readerFactory,
writerFactory,
message -> message.getMessageBody() == null ? new byte[0] : message.getMessageBody(),
message -> mergeJmsAttributes(message.getMessageHeaders(), message.getMessageProperties()),
outputStrategy,
getLogger()
);
consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, jmsResponses -> {
flowFileWriter.write(session, jmsResponses, new FlowFileWriterCallback<>() {
@Override
public void onSuccess(FlowFile flowFile, List<JMSResponse> processedMessages, List<JMSResponse> failedMessages) {
session.getProvenanceReporter().receive(flowFile, destinationName);
session.adjustCounter(COUNTER_RECORDS_RECEIVED, processedMessages.size() + failedMessages.size(), false);
session.adjustCounter(COUNTER_RECORDS_PROCESSED, processedMessages.size(), false);
session.transfer(flowFile, REL_SUCCESS);
session.commitAsync(
() -> withLog(() -> acknowledge(processedMessages, failedMessages)),
__ -> withLog(() -> reject(processedMessages, failedMessages))
);
}
@Override
public void onParseFailure(FlowFile flowFile, JMSResponse message, Exception e) {
session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
final FlowFile failedMessage = createFlowFileFromMessage(session, destinationName, message);
session.transfer(failedMessage, REL_PARSE_FAILURE);
}
@Override
public void onFailure(FlowFile flowFile, List<JMSResponse> processedMessages, List<JMSResponse> failedMessages, Exception e) {
reject(processedMessages, failedMessages);
// It would be nicer to call rollback and yield here, but we are rethrowing the exception to have the same error handling with processSingleMessage.
throw new ProcessException(e);
}
});
});
}
private void acknowledge(final JMSResponse response) { private void acknowledge(final JMSResponse response) {
try { try {
response.acknowledge(); response.acknowledge();
@ -314,6 +420,26 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
} }
} }
private void acknowledge(final List<JMSResponse> processedMessages, final List<JMSResponse> failedMessages) {
acknowledge(findLastBatchedJmsResponse(processedMessages, failedMessages));
}
private void reject(final List<JMSResponse> processedMessages, final List<JMSResponse> failedMessages) {
findLastBatchedJmsResponse(processedMessages, failedMessages).reject();
}
private void withLog(Runnable runnable) {
try {
runnable.run();
} catch (Exception e) {
getLogger().error("An error happened during commitAsync callback", e);
throw e;
}
}
private JMSResponse findLastBatchedJmsResponse(List<JMSResponse> processedMessages, List<JMSResponse> failedMessages) {
return Stream.of(processedMessages, failedMessages).flatMap(Collection::stream).max(Comparator.comparing(JMSResponse::getBatchOrder)).get();
}
/** /**
* Will create an instance of {@link JMSConsumer} * Will create an instance of {@link JMSConsumer}
@ -375,4 +501,16 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
flowFile = processSession.putAllAttributes(flowFile, attributes); flowFile = processSession.putAllAttributes(flowFile, attributes);
return flowFile; return flowFile;
} }
private Map<String, String> mergeJmsAttributes(Map<String, String> headers, Map<String, String> properties) {
final Map<String, String> jmsAttributes = new HashMap<>(headers);
properties.forEach((key, value) -> {
if (jmsAttributes.containsKey(key)) {
getLogger().warn("JMS Header and Property name collides as an attribute. JMS Property will override the JMS Header attribute. attributeName=[{}]", key);
}
jmsAttributes.put(key, value);
});
return jmsAttributes;
}
} }

View File

@ -38,19 +38,24 @@ import javax.jms.StreamMessage;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer;
/** /**
* Generic consumer of messages from JMS compliant messaging system. * Generic consumer of messages from JMS compliant messaging system.
*/ */
class JMSConsumer extends JMSWorker { class JMSConsumer extends JMSWorker {
private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000;
JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) { JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) {
super(connectionFactory, jmsTemplate, logger); super(connectionFactory, jmsTemplate, logger);
logger.debug("Created Message Consumer for '{}'", new Object[] {jmsTemplate}); logger.debug("Created Message Consumer for '{}'", jmsTemplate);
} }
@ -82,18 +87,83 @@ class JMSConsumer extends JMSWorker {
} }
} }
/** /**
* Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message. * Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
*/ */
public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector, public void consumeSingleMessage(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
final String charset, final ConsumerCallback consumerCallback) { final String charset, final Consumer<JMSResponse> singleMessageConsumer) {
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> {
final JMSResponse response = receiveMessage(session, messageConsumer, charset, errorQueueName);
if (response != null) {
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
// the responsibility of the processor to handle closing the Message Consumer.
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
// the JMSResponse.
singleMessageConsumer.accept(response);
}
});
}
/**
* Receives a list of messages from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
*/
public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
final String charset, final Consumer<List<JMSResponse>> messageSetConsumer) {
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() {
@Override
public void consume(Session session, MessageConsumer messageConsumer) throws JMSException {
final List<JMSResponse> jmsResponses = new ArrayList<>();
int batchCounter = 0;
JMSResponse response;
while ((response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) {
response.setBatchOrder(batchCounter);
jmsResponses.add(response);
batchCounter++;
}
if (!jmsResponses.isEmpty()) {
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
// the responsibility of the processor to handle closing the Message Consumer.
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
// the JMSResponse.
messageSetConsumer.accept(jmsResponses);
}
}
});
}
private void doWithJmsTemplate(String destinationName, boolean durable, boolean shared, String subscriptionName, String messageSelector, MessageReceiver messageReceiver) {
this.jmsTemplate.execute(new SessionCallback<Void>() { this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override @Override
public Void doInJms(final Session session) throws JMSException { public Void doInJms(final Session session) throws JMSException {
final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector); final MessageConsumer messageConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
try { try {
messageReceiver.consume(session, messageConsumer);
} catch (Exception e) {
// We need to call recover to ensure that in the event of
// abrupt end or exception the current session will stop message
// delivery and restart with the oldest unacknowledged message
try {
session.recover();
} catch (Exception e1) {
// likely the session is closed...need to catch this so that the root cause of failure is propagated
processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e1);
}
JmsUtils.closeMessageConsumer(messageConsumer);
throw e;
}
return null;
}
}, true);
}
private JMSResponse receiveMessage(Session session, MessageConsumer msgConsumer, String charset, String errorQueueName) throws JMSException {
final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
// If there is no message, there's nothing for us to do. We can simply close the consumer and return. // If there is no message, there's nothing for us to do. We can simply close the consumer and return.
@ -125,10 +195,10 @@ class JMSConsumer extends JMSWorker {
acknowledge(message, session); acknowledge(message, session);
if (errorQueueName != null) { if (errorQueueName != null) {
processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName}); processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", message, errorQueueName);
jmsTemplate.send(errorQueueName, __ -> message); jmsTemplate.send(errorQueueName, __ -> message);
} else { } else {
processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message}); processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", message);
} }
return null; return null;
@ -147,32 +217,8 @@ class JMSConsumer extends JMSWorker {
final Map<String, String> messageHeaders = extractMessageHeaders(message); final Map<String, String> messageHeaders = extractMessageHeaders(message);
final Map<String, String> messageProperties = extractMessageProperties(message); final Map<String, String> messageProperties = extractMessageProperties(message);
final JMSResponse response = new JMSResponse(message, session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, messageProperties, msgConsumer);
// Provide the JMSResponse to the processor to handle. It is the responsibility of the return new JMSResponse(message, session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, messageProperties, msgConsumer);
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
// the responsibility of the processor to handle closing the Message Consumer.
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
// the JMSResponse.
consumerCallback.accept(response);
} catch (Exception e) {
// We need to call recover to ensure that in the event of
// abrupt end or exception the current session will stop message
// delivery and restart with the oldest unacknowledged message
try {
session.recover();
} catch (Exception e1) {
// likely the session is closed...need to catch this so that the root cause of failure is propagated
processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e1);
}
JmsUtils.closeMessageConsumer(msgConsumer);
throw e;
}
return null;
}
}, true);
} }
private void acknowledge(final Message message, final Session session) throws JMSException { private void acknowledge(final Message message, final Session session) throws JMSException {
@ -245,6 +291,7 @@ class JMSConsumer extends JMSWorker {
private final Map<String, String> messageHeaders; private final Map<String, String> messageHeaders;
private final Map<String, String> messageProperties; private final Map<String, String> messageProperties;
private final MessageConsumer messageConsumer; private final MessageConsumer messageConsumer;
private Integer batchOrder;
JMSResponse(final Message message, final int acknowledgementMode, final String messageType, final byte[] messageBody, final Map<String, String> messageHeaders, JMSResponse(final Message message, final int acknowledgementMode, final String messageType, final byte[] messageBody, final Map<String, String> messageHeaders,
final Map<String, String> messageProperties, final MessageConsumer msgConsumer) { final Map<String, String> messageProperties, final MessageConsumer msgConsumer) {
@ -286,13 +333,18 @@ class JMSConsumer extends JMSWorker {
public void reject() { public void reject() {
JmsUtils.closeMessageConsumer(messageConsumer); JmsUtils.closeMessageConsumer(messageConsumer);
} }
public Integer getBatchOrder() {
return batchOrder;
} }
/** public void setBatchOrder(Integer batchOrder) {
* Callback to be invoked while executing inJMS call (the call within the this.batchOrder = batchOrder;
* live JMS session)
*/
static interface ConsumerCallback {
void accept(JMSResponse response);
} }
} }
interface MessageReceiver {
void consume(Session session, MessageConsumer messageConsumer) throws JMSException;
}
}

View File

@ -36,7 +36,7 @@ import java.util.Map.Entry;
/** /**
* Generic publisher of messages to JMS compliant messaging system. * Generic publisher of messages to JMS compliant messaging system.
*/ */
final class JMSPublisher extends JMSWorker { class JMSPublisher extends JMSWorker {
JMSPublisher(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog processLog) { JMSPublisher(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog processLog) {
super(connectionFactory, jmsTemplate, processLog); super(connectionFactory, jmsTemplate, processLog);

View File

@ -31,12 +31,18 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReader;
import org.apache.nifi.jms.processors.ioconcept.reader.FlowFileReaderCallback;
import org.apache.nifi.jms.processors.ioconcept.reader.StateTrackingFlowFileReader;
import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
@ -55,6 +61,10 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
/** /**
* An implementation of JMS Message publishing {@link Processor} which upon each * An implementation of JMS Message publishing {@link Processor} which upon each
* invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will * invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will
@ -122,6 +132,16 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_RECORD_READER)
.description("The Record Reader to use for parsing the incoming FlowFile into Records.")
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_RECORD_WRITER)
.description("The Record Writer to use for serializing Records before publishing them as an JMS Message.")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("All FlowFiles that are sent to the JMS destination are routed to this relationship") .description("All FlowFiles that are sent to the JMS destination are routed to this relationship")
@ -154,12 +174,14 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
_propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS); _propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
_propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX); _propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
_propertyDescriptors.add(RECORD_READER);
_propertyDescriptors.add(RECORD_WRITER);
_propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES); _propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
_propertyDescriptors.addAll(JMS_CF_PROPERTIES); _propertyDescriptors.addAll(JMS_CF_PROPERTIES);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>(); Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS); _relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE); _relationships.add(REL_FAILURE);
@ -182,12 +204,12 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
FlowFile flowFile = processSession.get(); FlowFile flowFile = processSession.get();
if (flowFile != null) { if (flowFile != null) {
try { try {
String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue(); final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue(); final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
Boolean allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean(); final Boolean allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue(); final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
Map<String,String> attributesToSend = new HashMap<>(); final Map<String,String> attributesToSend = new HashMap<>();
// REGEX Attributes // REGEX Attributes
final Pattern pattern = Pattern.compile(attributeHeaderRegex); final Pattern pattern = Pattern.compile(attributeHeaderRegex);
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) { for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
@ -199,35 +221,62 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
} }
} }
switch (context.getProperty(MESSAGE_BODY).getValue()) { if (context.getProperty(RECORD_READER).isSet()) {
case TEXT_MESSAGE: final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
try { final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), attributesToSend);
} catch(Exception e) { final FlowFileReader flowFileReader = new StateTrackingFlowFileReader(
publisher.setValid(false); getIdentifier(),
throw e; new RecordSupplier(readerFactory, writerFactory),
getLogger()
);
flowFileReader.read(
processSession,
flowFile,
content -> publisher.publish(destinationName, content, attributesToSend),
new FlowFileReaderCallback() {
@Override
public void onSuccess(FlowFile flowFile, int processedRecords, boolean isRecover, long transmissionMillis) {
final String eventTemplate = isRecover ? PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER : PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
processSession.getProvenanceReporter().send(
flowFile,
destinationName,
String.format(eventTemplate, processedRecords),
transmissionMillis);
processSession.transfer(flowFile, REL_SUCCESS);
} }
break;
case BYTES_MESSAGE: @Override
default: public void onFailure(FlowFile flowFile, int processedRecords, long transmissionMillis, Exception e) {
try { processSession.getProvenanceReporter().send(
publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), attributesToSend); flowFile,
} catch(Exception e) { destinationName,
publisher.setValid(false); String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords),
throw e; transmissionMillis);
handleException(context, processSession, publisher, flowFile, e);
} }
break;
} }
);
} else {
processStandardFlowFile(context, processSession, publisher, flowFile, destinationName, charset, attributesToSend);
processSession.transfer(flowFile, REL_SUCCESS); processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, destinationName); processSession.getProvenanceReporter().send(flowFile, destinationName);
}
} catch (Exception e) { } catch (Exception e) {
handleException(context, processSession, publisher, flowFile, e);
}
}
}
private void handleException(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile, Exception e) {
processSession.transfer(flowFile, REL_FAILURE); processSession.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed while sending message to JMS via " + publisher, e); this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
context.yield(); context.yield();
publisher.setValid(false); publisher.setValid(false);
} }
}
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -250,6 +299,34 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
return new JMSPublisher(connectionFactory, jmsTemplate, this.getLogger()); return new JMSPublisher(connectionFactory, jmsTemplate, this.getLogger());
} }
private void processStandardFlowFile(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile,
String destinationName, String charset, Map<String,String> attributesToSend) {
publishMessage(context, processSession, publisher, flowFile, destinationName, charset, attributesToSend);
}
private void publishMessage(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile,
String destinationName, String charset, Map<String,String> attributesToSend) {
switch (context.getProperty(MESSAGE_BODY).getValue()) {
case TEXT_MESSAGE:
try {
publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), attributesToSend);
} catch(Exception e) {
publisher.setValid(false);
throw e;
}
break;
case BYTES_MESSAGE:
default:
try {
publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), attributesToSend);
} catch(Exception e) {
publisher.setValid(false);
throw e;
}
break;
}
}
/** /**
* Extracts contents of the {@link FlowFile} as byte array. * Extracts contents of the {@link FlowFile} as byte array.
*/ */
@ -264,4 +341,5 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
session.read(flowFile, in -> IOUtils.copy(in, writer, Charset.forName(charset))); session.read(flowFile, in -> IOUtils.copy(in, writer, Charset.forName(charset)));
return writer.toString(); return writer.toString();
} }
} }

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.reader;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
public interface FlowFileReader {
void read(ProcessSession session, FlowFile flowFile, MessageHandler messageHandler, FlowFileReaderCallback flowFileReaderCallback);
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.reader;
import org.apache.nifi.flowfile.FlowFile;
public interface FlowFileReaderCallback {
void onSuccess(FlowFile flowFile, int processedRecords, boolean isRecover, long transmissionMillis);
void onFailure(FlowFile flowFile, int processedRecords, long transmissionMillis, Exception e);
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.reader;
public interface MessageHandler {
void handle(byte[] content);
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.reader;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.processors.ioconcept.reader.record.RecordSupplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.StopWatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Optional.ofNullable;
public class StateTrackingFlowFileReader implements FlowFileReader {
public static final String ATTR_READ_FAILED_INDEX_SUFFIX = ".read.failed.index";
private final String identifier;
private final RecordSupplier recordSupplier;
private final ComponentLog logger;
public StateTrackingFlowFileReader(String identifier, RecordSupplier recordSupplier, ComponentLog logger) {
this.identifier = identifier;
this.recordSupplier = recordSupplier;
this.logger = logger;
}
@Override
public void read(ProcessSession session, FlowFile flowFile, MessageHandler messageHandler, FlowFileReaderCallback flowFileReaderCallback) {
final StopWatch stopWatch = new StopWatch(true);
final AtomicInteger processedRecords = new AtomicInteger();
final String publishFailedIndexAttributeName = identifier + ATTR_READ_FAILED_INDEX_SUFFIX;
try {
final Long previousProcessFailedAt = ofNullable(flowFile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
session.read(flowFile, in -> recordSupplier.process(flowFile, in, processedRecords, previousProcessFailedAt, logger, messageHandler));
FlowFile successFlowFile = flowFile;
final boolean isRecover = previousProcessFailedAt != null;
if (isRecover) {
successFlowFile = session.removeAttribute(flowFile, publishFailedIndexAttributeName);
}
flowFileReaderCallback.onSuccess(successFlowFile, processedRecords.get(), isRecover, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
logger.error("An error happened while processing records. Routing to failure.", e);
final FlowFile failedFlowFile = session.putAttribute(flowFile, publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
flowFileReaderCallback.onFailure(failedFlowFile, processedRecords.get(), stopWatch.getElapsed(TimeUnit.MILLISECONDS), e);
}
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.reader.record;
public class ProvenanceEventTemplates {
public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
public static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.reader.record;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.processors.ioconcept.reader.MessageHandler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
public class RecordSupplier {
private final RecordReaderFactory readerFactory;
private final RecordSetWriterFactory writerFactory;
public RecordSupplier(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
}
public void process(FlowFile flowfile, InputStream in, AtomicInteger processedRecords, Long processFromIndex, ComponentLog logger, MessageHandler messageHandler) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
Record record;
while ((record = recordSet.next()) != null) {
if (processFromIndex != null && processedRecords.get() < processFromIndex) {
processedRecords.getAndIncrement();
continue;
}
baos.reset();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
writer.write(record);
writer.flush();
}
final byte[] messageContent = baos.toByteArray();
messageHandler.handle(messageContent);
processedRecords.getAndIncrement();
}
} catch (SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("An error happened during creating components for serialization.", e);
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.writer;
import java.util.Map;
public interface AttributeSource<T> {
Map<String, String> getAttributes(T message);
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.writer;
import org.apache.nifi.processor.ProcessSession;
import java.util.List;
public interface FlowFileWriter<T> {
void write(ProcessSession session, List<T> messages, FlowFileWriterCallback<T> flowFileWriterCallback);
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.writer;
import org.apache.nifi.flowfile.FlowFile;
import java.util.List;
public interface FlowFileWriterCallback<T> {
void onSuccess(FlowFile flowFile, List<T> processedMessages, List<T> failedMessages);
void onParseFailure(FlowFile flowFile, T message, Exception e);
void onFailure(FlowFile flowFile, List<T> processedMessages, List<T> failedMessages, Exception e);
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.writer;
public interface Marshaller<T> {
byte[] marshall(T message);
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.writer.record;
import org.apache.nifi.components.DescribedValue;
/**
* Enumeration of supported Output Strategies
*/
public enum OutputStrategy implements DescribedValue {
USE_VALUE("USE_VALUE", "Use Content as Value", "Write only the message to the FlowFile record."),
USE_WRAPPER("USE_WRAPPER", "Use Wrapper", "Write the additional attributes into the FlowFile record on a separate leaf. (See processor usage for more information.)"),
USE_APPENDER("USE_APPENDER", "Use Appender", "Write the additional attributes into the FlowFile record prefixed with \"_\". (See processor usage for more information.)");
private final String value;
private final String displayName;
private final String description;
OutputStrategy(final String value, final String displayName, final String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.writer.record;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RecordUtils {
public static Record append(final Record originalRecord, final Map<String, String> decoratorValues, final String decoratorPrefix) {
final List<RecordField> originalFields = originalRecord.getSchema().getFields();
final List<RecordField> mergedFields = new ArrayList<>(originalFields);
decoratorValues.forEach((key, value) -> mergedFields.add(new RecordField(decoratorPrefix + key, RecordFieldType.STRING.getDataType())));
final RecordSchema mergedSchema = new SimpleRecordSchema(mergedFields);
final Map<String, Object> recordValues = new HashMap<>();
originalFields.stream().map(RecordField::getFieldName).forEach(fieldName -> recordValues.put(fieldName, originalRecord.getValue(fieldName)));
decoratorValues.forEach((key, value) -> recordValues.put(decoratorPrefix + key, value));
return new MapRecord(mergedSchema, recordValues);
}
public static MapRecord wrap(final Record originalRecord, final String originalRecordKey, final Map<String, String> decoratorValues, final String decoratorKey)
throws IOException, MalformedRecordException {
// create schema
final Tuple<RecordField, Object> originalRecordLeaf = wrapStandardRecord(originalRecord, originalRecordKey);
final Tuple<RecordField, Object> decoratorLeaf = wrapDecoratorValues(decoratorValues, decoratorKey);
final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(originalRecordLeaf.getKey(), decoratorLeaf.getKey()));
// assign values
final Map<String, Object> recordValues = new HashMap<>();
recordValues.put(originalRecordLeaf.getKey().getFieldName(), originalRecordLeaf.getValue());
recordValues.put(decoratorLeaf.getKey().getFieldName(), decoratorLeaf.getValue());
return new MapRecord(rootRecordSchema, recordValues);
}
private static Tuple<RecordField, Object> wrapStandardRecord(final Record record, final String recordKey) {
final RecordSchema recordSchema = (record == null) ? null : record.getSchema();
final RecordField recordField = new RecordField(recordKey, RecordFieldType.RECORD.getRecordDataType(recordSchema));
return new Tuple<>(recordField, record);
}
private static Tuple<RecordField, Object> wrapDecoratorValues(final Map<String, String> decoratorValues, final String decoratorKey) {
final RecordField recordField = new RecordField(decoratorKey, RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
return new Tuple<>(recordField, decoratorValues);
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.ioconcept.writer.record;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.jms.processors.ioconcept.writer.AttributeSource;
import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter;
import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
import org.apache.nifi.jms.processors.ioconcept.writer.Marshaller;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_APPENDER;
import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_VALUE;
import static org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy.USE_WRAPPER;
public class RecordWriter<T> implements FlowFileWriter<T> {
private final static String RECORD_COUNT_KEY = "record.count";
private final RecordReaderFactory readerFactory;
private final RecordSetWriterFactory writerFactory;
private final Marshaller<T> marshaller;
private final AttributeSource<T> attributeSource;
private final OutputStrategy outputStrategy;
private final ComponentLog logger;
public RecordWriter(RecordReaderFactory readerFactory,
RecordSetWriterFactory writerFactory,
Marshaller<T> marshaller,
AttributeSource<T> attributeSource,
OutputStrategy outputStrategy,
ComponentLog logger) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.marshaller = marshaller;
this.attributeSource = attributeSource;
this.outputStrategy = outputStrategy;
this.logger = logger;
}
@Override
public void write(ProcessSession session, List<T> messages, FlowFileWriterCallback<T> flowFileWriterCallback) {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger();
final List<T> processedMessages = new ArrayList<>();
final List<T> failedMessages = new ArrayList<>();
RecordSetWriter writer = null;
boolean isWriterInitialized = false;
try {
for (T message : messages) {
if (message == null) {
break;
}
final byte[] recordBytes = marshaller.marshall(message);
try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
final RecordReader reader;
// parse incoming message which may contain multiple messages
try {
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final IOException ioe) {
logger.error("Failed to parse message due to comms failure. Will roll back session and try again momentarily.");
flowFileWriterCallback.onFailure(flowFile, processedMessages, failedMessages, ioe);
closeWriter(writer);
return;
} catch (final Exception e) {
logger.error("Failed to parse message, sending to the parse failure relationship", e);
failedMessages.add(message);
flowFileWriterCallback.onParseFailure(flowFile, message, e);
continue;
}
// write messages as records into FlowFile
try {
Record record;
while ((record = reader.nextRecord()) != null) {
if (attributeSource != null && !outputStrategy.equals(USE_VALUE)) {
final Map<String, String> additionalAttributes = attributeSource.getAttributes(message);
if (outputStrategy.equals(USE_APPENDER)) {
record = RecordUtils.append(record, additionalAttributes, "_");
} else if (outputStrategy.equals(USE_WRAPPER)){
record = RecordUtils.wrap(record, "value", additionalAttributes, "_");
}
}
if (!isWriterInitialized) {
final RecordSchema recordSchema = record.getSchema();
final OutputStream rawOut = session.write(flowFile);
RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
failedMessages.add(message);
flowFileWriterCallback.onParseFailure(flowFile, message, e);
continue;
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
}
try {
writer.write(record);
isWriterInitialized = true;
processedMessages.add(message);
} catch (final RuntimeException re) {
logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
failedMessages.add(message);
flowFileWriterCallback.onParseFailure(flowFile, message, re);
}
}
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
logger.error("Failed to write message, sending to the parse failure relationship", e);
failedMessages.add(message);
flowFileWriterCallback.onParseFailure(flowFile, message, e);
}
} catch (Exception e) {
logger.error("Failed to write message, sending to the parse failure relationship", e);
failedMessages.add(message);
flowFileWriterCallback.onParseFailure(flowFile, message, e);
}
}
if (writer != null) {
final WriteResult writeResult = writer.finishRecordSet();
attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
recordCount.set(writeResult.getRecordCount());
}
} catch (final Exception e) {
flowFileWriterCallback.onFailure(flowFile, processedMessages, failedMessages, e);
} finally {
closeWriter(writer);
}
if (recordCount.get() == 0) {
session.remove(flowFile);
return;
}
session.putAllAttributes(flowFile, attributes);
final int count = recordCount.get();
logger.info("Successfully processed {} records for {}", count, flowFile);
flowFileWriterCallback.onSuccess(flowFile, processedMessages, failedMessages);
}
private void closeWriter(final RecordSetWriter writer) {
try {
if (writer != null) {
writer.close();
}
} catch (final Exception ioe) {
logger.warn("Failed to close Record Writer", ioe);
}
}
}

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.nifi.jms.processors; package org.apache.nifi.jms.processors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
@ -27,6 +31,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties; import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -46,6 +51,7 @@ import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MapMessage; import javax.jms.MapMessage;
import javax.jms.Message; import javax.jms.Message;
@ -64,7 +70,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy; import static org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy;
import static org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetReaderService;
import static org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetWriterService;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -77,6 +86,8 @@ import static org.mockito.Mockito.when;
public class ConsumeJMSIT { public class ConsumeJMSIT {
private static final String JMS_DESTINATION_ATTRIBUTE_NAME = "jms_destination";
@Test @Test
public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
final String destinationName = "cooQueue"; final String destinationName = "cooQueue";
@ -478,6 +489,194 @@ public class ConsumeJMSIT {
} }
} }
@Test
public void testConsumeRecords() throws InitializationException {
String destination = "testConsumeRecords";
ArrayNode expectedRecordSet = createTestJsonInput();
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(0).toString()));
jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(1).toString()));
jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(2).toString()));
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.run(1, false);
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
assertEquals(1, successFlowFiles.size());
assertEquals(expectedRecordSet.toString(), new String(successFlowFiles.get(0).toByteArray()));
List<MockFlowFile> parseFailedFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
assertEquals(0, parseFailedFlowFiles.size());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void testConsumeMalformedRecords() throws InitializationException {
String destination = "testConsumeRecords";
ArrayNode expectedRecordSet = createTestJsonInput();
String expectedParseFailedContent1 = "this is not a json";
String expectedParseFailedContent2 = "this is still not a json";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(0).toString()));
jmsTemplate.send(destination, session -> session.createTextMessage(expectedParseFailedContent1));
jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(1).toString()));
jmsTemplate.send(destination, session -> session.createTextMessage(expectedParseFailedContent2));
jmsTemplate.send(destination, session -> session.createTextMessage(expectedRecordSet.get(2).toString()));
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setRelationshipAvailable(ConsumeJMS.REL_PARSE_FAILURE);
testRunner.run(1, false);
// checking whether the processor was able to construct a valid recordSet from the properly formatted messages
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
assertEquals(1, successFlowFiles.size());
assertEquals(expectedRecordSet.toString(), new String(successFlowFiles.get(0).toByteArray()));
// and checking whether it creates separate FlowFiles for the malformed messages
List<MockFlowFile> parseFailedFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
assertEquals(2, parseFailedFlowFiles.size());
assertEquals(expectedParseFailedContent1, new String(parseFailedFlowFiles.get(0).toByteArray()));
assertEquals(expectedParseFailedContent2, new String(parseFailedFlowFiles.get(1).toByteArray()));
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void testConsumeRecordsWithAppenderOutputStrategy() throws InitializationException, JsonProcessingException {
String destination = "testConsumeRecordsWithAppenderOutputStrategy";
ArrayNode inputRecordSet = createTestJsonInput();
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(0).toString()));
jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(1).toString()));
jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(2).toString()));
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, OutputStrategy.USE_APPENDER.getValue());
testRunner.run(1, false);
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
assertEquals(1, successFlowFiles.size());
JsonNode flowFileContentAsJson = deserializeToJsonNode(new String(successFlowFiles.get(0).toByteArray()));
// checking that the output contains at least a part of the original input
assertEquals(inputRecordSet.get(0).get("firstAttribute").asText(), flowFileContentAsJson.get(0).get("firstAttribute").asText());
assertEquals(inputRecordSet.get(1).get("firstAttribute").asText(), flowFileContentAsJson.get(1).get("firstAttribute").asText());
assertEquals(inputRecordSet.get(2).get("firstAttribute").asText(), flowFileContentAsJson.get(2).get("firstAttribute").asText());
// checking jms_destination attribute exists with the given value
// this attribute has been chosen because it is deterministic; others vary based on host, time, etc.
// not nice, but stubbing all attributes would be uglier with the current code structure
assertEquals(destination, flowFileContentAsJson.get(0).get("_" + JMS_DESTINATION_ATTRIBUTE_NAME).asText());
assertEquals(destination, flowFileContentAsJson.get(1).get("_" + JMS_DESTINATION_ATTRIBUTE_NAME).asText());
assertEquals(destination, flowFileContentAsJson.get(2).get("_" + JMS_DESTINATION_ATTRIBUTE_NAME).asText());
List<MockFlowFile> parseFailedFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
assertEquals(0, parseFailedFlowFiles.size());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void testConsumeRecordsWithWrapperOutputStrategy() throws InitializationException, JsonProcessingException {
String destination = "testConsumeRecordsWithWrapperOutputStrategy";
String valueKey = "value";
String attributeKey = "_";
ArrayNode inputRecordSet = createTestJsonInput();
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(0).toString()));
jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(1).toString()));
jmsTemplate.send(destination, session -> session.createTextMessage(inputRecordSet.get(2).toString()));
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, OutputStrategy.USE_WRAPPER.getValue());
testRunner.run(1, false);
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
assertEquals(1, successFlowFiles.size());
JsonNode flowFileContentAsJson = deserializeToJsonNode(new String(successFlowFiles.get(0).toByteArray()));
// checking that the original json is equal to the leaf
assertEquals(inputRecordSet.get(0), flowFileContentAsJson.get(0).get(valueKey));
assertEquals(inputRecordSet.get(1), flowFileContentAsJson.get(1).get(valueKey));
assertEquals(inputRecordSet.get(2), flowFileContentAsJson.get(2).get(valueKey));
// checking that the attribute leaf contains at least the jms_destination attribute
// this attribute has been chosen because it is deterministic; others vary based on host, time, etc.
// not nice, but stubbing all attributes would be uglier with the current code structure
assertEquals(destination, flowFileContentAsJson.get(0).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
assertEquals(destination, flowFileContentAsJson.get(1).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
assertEquals(destination, flowFileContentAsJson.get(2).get(attributeKey).get(JMS_DESTINATION_ATTRIBUTE_NAME).asText());
List<MockFlowFile> parseFailedFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeJMS.REL_PARSE_FAILURE);
assertEquals(0, parseFailedFlowFiles.size());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
private static ArrayNode createTestJsonInput() {
final ObjectMapper mapper = new ObjectMapper();
return mapper.createArrayNode().addAll(asList(
mapper.createObjectNode()
.put("recordId", 1)
.put("firstAttribute", "foo")
.put("secondAttribute", false),
mapper.createObjectNode()
.put("recordId", 2)
.put("firstAttribute", "bar")
.put("secondAttribute", true),
mapper.createObjectNode()
.put("recordId", 3)
.put("firstAttribute", "foobar")
.put("secondAttribute", false)
));
}
private JsonNode deserializeToJsonNode(String rawJson) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readTree(rawJson);
}
private TestRunner initializeTestRunner(ConnectionFactory connectionFactory, String destinationName) throws InitializationException {
return initializeTestRunner(new ConsumeJMS(), connectionFactory, destinationName);
}
private TestRunner initializeTestRunner(ConsumeJMS processor, ConnectionFactory connectionFactory, String destinationName) throws InitializationException {
TestRunner runner = TestRunners.newTestRunner(processor);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(connectionFactory);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
return runner;
}
private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException { private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException {
// Publish a message. // Publish a message.
try (Connection conn = cf.createConnection(); try (Connection conn = cf.createConnection();

View File

@ -16,21 +16,17 @@
*/ */
package org.apache.nifi.jms.processors; package org.apache.nifi.jms.processors;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import com.fasterxml.jackson.core.type.TypeReference;
import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.ObjectMapper;
import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.commons.lang3.SerializationUtils;
import static org.mockito.Mockito.mock; import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
import org.apache.nifi.logging.ComponentLog;
import java.io.ByteArrayOutputStream; import org.junit.jupiter.api.Test;
import java.io.DataOutputStream; import org.junit.jupiter.api.Timeout;
import java.io.IOException; import org.springframework.jms.connection.CachingConnectionFactory;
import java.nio.charset.StandardCharsets; import org.springframework.jms.core.JmsTemplate;
import java.util.HashMap; import org.springframework.jms.core.MessageCreator;
import java.util.Map; import org.springframework.jms.support.JmsHeaders;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -41,19 +37,22 @@ import javax.jms.Session;
import javax.jms.StreamMessage; import javax.jms.StreamMessage;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.type.TypeReference; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import com.fasterxml.jackson.databind.ObjectMapper; import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.commons.lang3.SerializationUtils; import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback; import static org.mockito.Mockito.mock;
import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
import org.apache.nifi.logging.ComponentLog;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.JmsHeaders;
public class JMSPublisherConsumerIT { public class JMSPublisherConsumerIT {
@ -69,7 +68,7 @@ public class JMSPublisherConsumerIT {
return message; return message;
}; };
ConsumerCallback responseChecker = response -> { Consumer<JMSResponse> responseChecker = response -> {
assertEquals( assertEquals(
"stringAsObject", "stringAsObject",
SerializationUtils.deserialize(response.getMessageBody()) SerializationUtils.deserialize(response.getMessageBody())
@ -123,7 +122,7 @@ public class JMSPublisherConsumerIT {
expected = byteArrayOutputStream.toByteArray(); expected = byteArrayOutputStream.toByteArray();
} }
ConsumerCallback responseChecker = response -> { Consumer<JMSResponse> responseChecker = response -> {
byte[] actual = response.getMessageBody(); byte[] actual = response.getMessageBody();
assertArrayEquals( assertArrayEquals(
@ -175,7 +174,7 @@ public class JMSPublisherConsumerIT {
} }
private void testMapMessage(String destinationName, MessageCreator messageCreator, String expectedJson) { private void testMapMessage(String destinationName, MessageCreator messageCreator, String expectedJson) {
ConsumerCallback responseChecker = response -> { Consumer<JMSResponse> responseChecker = response -> {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
try { try {
@ -191,7 +190,7 @@ public class JMSPublisherConsumerIT {
testMessage(destinationName, messageCreator, responseChecker); testMessage(destinationName, messageCreator, responseChecker);
} }
private void testMessage(String destinationName, MessageCreator messageCreator, ConsumerCallback responseChecker) { private void testMessage(String destinationName, MessageCreator messageCreator, Consumer<JMSResponse> responseChecker) {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
AtomicBoolean callbackInvoked = new AtomicBoolean(); AtomicBoolean callbackInvoked = new AtomicBoolean();
@ -200,7 +199,7 @@ public class JMSPublisherConsumerIT {
jmsTemplate.send(destinationName, messageCreator); jmsTemplate.send(destinationName, messageCreator);
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", response -> { consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
callbackInvoked.set(true); callbackInvoked.set(true);
responseChecker.accept(response); responseChecker.accept(response);
}); });
@ -282,11 +281,8 @@ public class JMSPublisherConsumerIT {
}); });
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() { consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
@Override
public void accept(JMSResponse response) {
// noop // noop
}
}); });
} finally { } finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
@ -312,15 +308,12 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean(); final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() { consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("hello from the other side", new String(response.getMessageBody())); assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO)); assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo")); assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar")); assertEquals("false", response.getMessageProperties().get("bar"));
}
}); });
assertTrue(callbackInvoked.get()); assertTrue(callbackInvoked.get());
@ -348,13 +341,6 @@ public class JMSPublisherConsumerIT {
final AtomicInteger msgCount = new AtomicInteger(0); final AtomicInteger msgCount = new AtomicInteger(0);
final ConsumerCallback callback = new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
msgCount.incrementAndGet();
}
};
final Thread[] threads = new Thread[4]; final Thread[] threads = new Thread[4];
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
final Thread t = new Thread(() -> { final Thread t = new Thread(() -> {
@ -364,7 +350,8 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < messagesPerThreadCount && msgCount.get() < totalMessageCount; j++) { for (int j = 0; j < messagesPerThreadCount && msgCount.get() < totalMessageCount; j++) {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", callback); consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8",
response -> msgCount.incrementAndGet());
} }
} finally { } finally {
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy(); ((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@ -404,13 +391,10 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean(); final AtomicBoolean callbackInvoked = new AtomicBoolean();
try { try {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() { consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody())); assertEquals("1", new String(response.getMessageBody()));
throw new RuntimeException("intentional to avoid explicit ack"); throw new RuntimeException("intentional to avoid explicit ack");
}
}); });
} catch (Exception e) { } catch (Exception e) {
// expected // expected
@ -421,9 +405,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully // should receive the same message, but will process it successfully
while (!callbackInvoked.get()) { while (!callbackInvoked.get()) {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() { consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
@Override
public void accept(JMSResponse response) {
if (response == null) { if (response == null) {
return; return;
} }
@ -431,7 +413,6 @@ public class JMSPublisherConsumerIT {
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody())); assertEquals("1", new String(response.getMessageBody()));
acknowledge(response); acknowledge(response);
}
}); });
} }
@ -441,9 +422,7 @@ public class JMSPublisherConsumerIT {
// receiving next message and fail again // receiving next message and fail again
try { try {
while (!callbackInvoked.get()) { while (!callbackInvoked.get()) {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() { consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
@Override
public void accept(JMSResponse response) {
if (response == null) { if (response == null) {
return; return;
} }
@ -451,7 +430,6 @@ public class JMSPublisherConsumerIT {
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody())); assertEquals("2", new String(response.getMessageBody()));
throw new RuntimeException("intentional to avoid explicit ack"); throw new RuntimeException("intentional to avoid explicit ack");
}
}); });
} }
} catch (Exception e) { } catch (Exception e) {
@ -463,9 +441,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully // should receive the same message, but will process it successfully
try { try {
while (!callbackInvoked.get()) { while (!callbackInvoked.get()) {
consumer.consume(destinationName, null, false, false, null, null, "UTF-8", new ConsumerCallback() { consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
@Override
public void accept(JMSResponse response) {
if (response == null) { if (response == null) {
return; return;
} }
@ -473,7 +449,6 @@ public class JMSPublisherConsumerIT {
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody())); assertEquals("2", new String(response.getMessageBody()));
acknowledge(response); acknowledge(response);
}
}); });
} }
} catch (Exception e) { } catch (Exception e) {
@ -514,12 +489,9 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
AtomicBoolean callbackInvoked = new AtomicBoolean(); AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, null, false, false, null, messageSelector, "UTF-8", new ConsumerCallback() { consumer.consumeSingleMessage(destinationName, null, false, false, null, messageSelector, "UTF-8", response -> {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("msg1", new String(response.getMessageBody())); assertEquals("msg1", new String(response.getMessageBody()));
}
}); });
assertTrue(callbackInvoked.get()); assertTrue(callbackInvoked.get());

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.jms.processors; package org.apache.nifi.jms.processors;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
@ -28,12 +30,17 @@ import org.apache.nifi.jms.processors.helpers.ConnectionFactoryInvocationHandler
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders; import org.springframework.jms.support.JmsHeaders;
@ -48,21 +55,42 @@ import java.lang.reflect.Proxy;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static org.apache.nifi.jms.processors.PublishJMS.REL_FAILURE;
import static org.apache.nifi.jms.processors.PublishJMS.REL_SUCCESS;
import static org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy; import static org.apache.nifi.jms.processors.helpers.AssertionUtils.assertCausedBy;
import static org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetReaderService;
import static org.apache.nifi.jms.processors.helpers.JMSTestUtil.createJsonRecordSetWriterService;
import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceEventTemplates.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
import static org.apache.nifi.jms.processors.ioconcept.reader.StateTrackingFlowFileReader.ATTR_READ_FAILED_INDEX_SUFFIX;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class PublishJMSIT { public class PublishJMSIT {
TestRunner testRunner;
@AfterEach
public void cleanup() {
if (testRunner != null) {
testRunner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
testRunner = null;
}
}
@Test @Test
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
@ -91,7 +119,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes); runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it.
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF); assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf); JmsTemplate jmst = new JmsTemplate(cf);
@ -134,7 +162,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes); runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it.
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF); assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf); JmsTemplate jmst = new JmsTemplate(cf);
@ -169,8 +197,8 @@ public class PublishJMSIT {
runner.run(); runner.run();
Thread.sleep(200); Thread.sleep(200);
assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty()); assertTrue(runner.getFlowFilesForRelationship(REL_SUCCESS).isEmpty());
assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0)); assertNotNull(runner.getFlowFilesForRelationship(REL_FAILURE).get(0));
} }
@Test @Test
@ -198,7 +226,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes); runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); runner.run(1, false);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF); assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf); JmsTemplate jmst = new JmsTemplate(cf);
@ -256,7 +284,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes); runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it. runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF); assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf); JmsTemplate jmst = new JmsTemplate(cf);
@ -316,7 +344,7 @@ public class PublishJMSIT {
runner.enqueue("Hey dude!".getBytes(), attributes); runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); runner.run(1, false);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); final MockFlowFile successFF = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertNotNull(successFF); assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf); JmsTemplate jmst = new JmsTemplate(cf);
@ -542,4 +570,210 @@ public class PublishJMSIT {
assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled(), "In case of an exception, the processor should be yielded."); assertTrue(((MockProcessContext) runner.getProcessContext()).isYieldCalled(), "In case of an exception, the processor should be yielded.");
} }
@Test
public void testPublishRecords() throws InitializationException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
String destination = "testPublishRecords";
testRunner = initializeTestRunner(cf, destination);
testRunner.setProperty(PublishJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.assertValid();
final ArrayNode testInput = createTestJsonInput();
testRunner.enqueue(testInput.toString().getBytes());
testRunner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(cf, destination, testInput.get(0).toString());
verifyPublishedMessage(cf, destination, testInput.get(1).toString());
verifyPublishedMessage(cf, destination, testInput.get(2).toString());
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
assertEquals(1, flowFiles.size());
final MockFlowFile successfulFlowFile = flowFiles.get(0);
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
}
@Test
public void testPublishRecordsFailed() throws InitializationException {
PublishJMS processor = new PublishJMS() {
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
JMSPublisher spiedPublisher = Mockito.spy(publisher);
Mockito.doCallRealMethod()
.doThrow(new RuntimeException("Second publish failed."))
.when(spiedPublisher).publish(any(), any(byte[].class), any());
super.rendezvousWithJms(context, processSession, spiedPublisher);
}
};
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
String destination = "testPublishRecords";
testRunner = initializeTestRunner(processor, cf, destination);
testRunner.setProperty(PublishJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.assertValid();
final ArrayNode testInput = createTestJsonInput();
testRunner.enqueue(testInput.toString().getBytes());
testRunner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 1));
verifyPublishedMessage(cf, destination, testInput.get(0).toString());
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile failedFlowFile = flowFiles.get(0);
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
assertEquals("1", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
}
@Test
public void testContinuePublishRecordsAndFailAgainWhenPreviousPublishFailed() throws InitializationException {
PublishJMS processor = new PublishJMS() {
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
JMSPublisher spiedPublisher = Mockito.spy(publisher);
Mockito.doCallRealMethod()
.doThrow(new RuntimeException("Second publish failed."))
.when(spiedPublisher).publish(any(), any(byte[].class), any());
super.rendezvousWithJms(context, processSession, spiedPublisher);
}
};
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
String destination = "testPublishRecords";
testRunner = initializeTestRunner(processor, cf, destination);
testRunner.setProperty(PublishJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.assertValid();
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
final ArrayNode testInput = createTestJsonInput();
final Map<String, String> attributes = new HashMap<>();
attributes.put(publishFailedIndexAttributeName, "1");
testRunner.enqueue(testInput.toString().getBytes(), attributes);
testRunner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 2));
verifyPublishedMessage(cf, destination, testInput.get(1).toString());
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile failedFlowFile = flowFiles.get(0);
assertEquals("2", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
}
@Test
public void testContinuePublishRecordsSuccessfullyWhenPreviousPublishFailed() throws InitializationException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
String destination = "testPublishRecords";
testRunner = initializeTestRunner(cf, destination);
testRunner.setProperty(PublishJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.assertValid();
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_READ_FAILED_INDEX_SUFFIX;
final ArrayNode testInput = createTestJsonInput();
final Map<String, String> attributes = new HashMap<>();
attributes.put(publishFailedIndexAttributeName, "1");
testRunner.enqueue(testInput.toString().getBytes(), attributes);
testRunner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, 3));
verifyPublishedMessage(cf, destination, testInput.get(1).toString());
verifyPublishedMessage(cf, destination, testInput.get(2).toString());
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
assertEquals(1, flowFiles.size());
final MockFlowFile successfulFlowFile = flowFiles.get(0);
assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
}
private TestRunner initializeTestRunner(ConnectionFactory connectionFactory, String destinationName) throws InitializationException {
PublishJMS processor = new PublishJMS();
return initializeTestRunner(processor, connectionFactory, destinationName);
}
private TestRunner initializeTestRunner(PublishJMS processor, ConnectionFactory connectionFactory, String destinationName) throws InitializationException {
TestRunner runner = TestRunners.newTestRunner(processor);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(connectionFactory);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationName);
return runner;
}
private void verifyPublishedMessage(ConnectionFactory connectionFactory, String destinationName, String content) {
JmsTemplate jmst = new JmsTemplate(connectionFactory);
BytesMessage message = (BytesMessage) jmst.receive(destinationName);
byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals(content, new String(messageBytes));
}
private ProvenanceEventRecord assertProvenanceEvent() {
final List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
assertNotNull(provenanceEvents);
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord event = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, event.getEventType());
return event;
}
private void assertProvenanceEvent(String expectedDetails) {
final ProvenanceEventRecord event = assertProvenanceEvent();
assertEquals(expectedDetails, event.getDetails());
}
private static ArrayNode createTestJsonInput() {
final ObjectMapper mapper = new ObjectMapper();
return mapper.createArrayNode().addAll(asList(
mapper.createObjectNode()
.put("recordId", 1)
.put("firstAttribute", "foo")
.put("secondAttribute", false),
mapper.createObjectNode()
.put("recordId", 2)
.put("firstAttribute", "bar")
.put("secondAttribute", true),
mapper.createObjectNode()
.put("recordId", 3)
.put("firstAttribute", "foobar")
.put("secondAttribute", false)
));
}
} }

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.helpers;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.TestRunner;
public class JMSTestUtil {
public static String createJsonRecordSetReaderService(TestRunner testRunner) throws InitializationException {
final String id = "record-reader";
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService(id, jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
return id;
}
public static String createJsonRecordSetWriterService(TestRunner testRunner) throws InitializationException {
final String id = "record-writer";
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService(id, jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
return id;
}
}