diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index d7c40f7643..cae3dc2631 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -66,7 +66,7 @@ abstract class AbstractJMSProcessor extends AbstractProcess .build(); static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() .name("Destination Name") - .description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic').") + .description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic' or 'myTopic').") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index cdd5fcd4b7..aea6c9cb36 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -50,7 +50,7 @@ import org.springframework.jms.core.JmsTemplate; @Tags({ "jms", "get", "message", "receive", "consume" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Consumes JMS Message of type BytesMessage or TextMessage transforming its content to " - + "a FlowFile and transitioning it to 'success' relationship.") + + "a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes.") @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) public class ConsumeJMS extends AbstractJMSProcessor { @@ -90,8 +90,8 @@ public class ConsumeJMS extends AbstractJMSProcessor { }); Map jmsHeaders = response.getMessageHeaders(); Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties()); - flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, flowFile, processSession); - flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, flowFile, processSession); + flowFile = this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); + flowFile = this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); processSession.transfer(flowFile, REL_SUCCESS); } else { @@ -116,11 +116,15 @@ public class ConsumeJMS extends AbstractJMSProcessor { } /** - * + * Copies JMS attributes (i.e., headers and properties) as FF attributes. + * Given that FF attributes mandate that values are of type String, the + * copied values of JMS attributes will be stringified via + * String.valueOf(attribute). */ - private FlowFile updateFlowFileAttributesWithMap(Map map, FlowFile flowFile, ProcessSession processSession) { + private FlowFile updateFlowFileAttributesWithJMSAttributes(Map jmsAttributes, FlowFile flowFile, + ProcessSession processSession) { Map attributes = new HashMap(); - for (Entry entry : map.entrySet()) { + for (Entry entry : jmsAttributes.entrySet()) { attributes.put(entry.getKey(), String.valueOf(entry.getValue())); } attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java index 1525738ca1..0ddbe486b8 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java @@ -61,7 +61,7 @@ final class JMSConsumer extends JMSWorker { /** * */ - public JMSResponse consume(final String destinationName) { + public JMSResponse consume(String destinationName) { Message message = this.jmsTemplate.receive(destinationName); if (message != null) { byte[] messageBody = null; diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index 49355f4573..526f0bd1bb 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -63,7 +63,7 @@ final class JMSPublisher extends JMSWorker { * * @param messageBytes byte array representing contents of the message */ - void publish(final String destinationName, byte[] messageBytes) { + void publish(String destinationName, byte[] messageBytes) { this.publish(destinationName, messageBytes, null); } @@ -74,7 +74,7 @@ final class JMSPublisher extends JMSWorker { * @param flowFileAttributes * Map representing {@link FlowFile} attributes. */ - void publish(final String destinationName, final byte[] messageBytes, final Map flowFileAttributes) { + void publish(final String destinationName, final byte[] messageBytes, Map flowFileAttributes) { this.jmsTemplate.send(destinationName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { @@ -125,9 +125,8 @@ final class JMSPublisher extends JMSWorker { * */ private void logUnbuildableDestination(String destinationName, String headerName) { - logger.warn("Failed to determine destination type from destination name '" + destinationName + "'. The '" - + headerName + "' will not be set."); - processLog.warn("Failed to determine destination type from destination name '" + destinationName + "'. The '" + this.processLog.warn("Failed to determine destination type from destination name '" + destinationName + + "'. The '" + headerName + "' will not be set."); } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index 36f7e868bb..1f7a44e4b3 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -55,7 +55,7 @@ import org.springframework.jms.support.JmsHeaders; @Tags({ "jms", "put", "message", "send", "publish" }) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Creates a JMS Message from the contents of a FlowFile and sends it to a " - + "JMS Destination (queue or topic) as JMS BytesMessage.") + + "JMS Destination (queue or topic) as JMS BytesMessage. FlowFile attributes will be added as JMS headers and/or properties to the outgoing JMS message.") @SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class }) public class PublishJMS extends AbstractJMSProcessor { @@ -98,9 +98,8 @@ public class PublishJMS extends AbstractJMSProcessor { FlowFile flowFile = processSession.get(); if (flowFile != null) { try { - final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue(); - this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession), - flowFile.getAttributes()); + String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue(); + this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes()); processSession.transfer(flowFile, REL_SUCCESS); processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); } catch (Exception e) {