NIFI-2789, NIFI-2790 - polishing

This closes #1027
This commit is contained in:
Oleg Zhurakousky 2016-09-20 09:29:00 -04:00
parent c238676058
commit b693a4a561
5 changed files with 19 additions and 17 deletions

View File

@ -66,7 +66,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.build(); .build();
static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("Destination Name") .name("Destination Name")
.description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic').") .description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic' or 'myTopic').")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)

View File

@ -50,7 +50,7 @@ import org.springframework.jms.core.JmsTemplate;
@Tags({ "jms", "get", "message", "receive", "consume" }) @Tags({ "jms", "get", "message", "receive", "consume" })
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Consumes JMS Message of type BytesMessage or TextMessage transforming its content to " @CapabilityDescription("Consumes JMS Message of type BytesMessage or TextMessage transforming its content to "
+ "a FlowFile and transitioning it to 'success' relationship.") + "a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes.")
@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
@ -90,8 +90,8 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
}); });
Map<String, Object> jmsHeaders = response.getMessageHeaders(); Map<String, Object> jmsHeaders = response.getMessageHeaders();
Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties()); Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, flowFile, processSession); flowFile = this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, flowFile, processSession); flowFile = this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
processSession.transfer(flowFile, REL_SUCCESS); processSession.transfer(flowFile, REL_SUCCESS);
} else { } else {
@ -116,11 +116,15 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
} }
/** /**
* * Copies JMS attributes (i.e., headers and properties) as FF attributes.
* Given that FF attributes mandate that values are of type String, the
* copied values of JMS attributes will be stringified via
* String.valueOf(attribute).
*/ */
private FlowFile updateFlowFileAttributesWithMap(Map<String, Object> map, FlowFile flowFile, ProcessSession processSession) { private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, Object> jmsAttributes, FlowFile flowFile,
ProcessSession processSession) {
Map<String, String> attributes = new HashMap<String, String>(); Map<String, String> attributes = new HashMap<String, String>();
for (Entry<String, Object> entry : map.entrySet()) { for (Entry<String, Object> entry : jmsAttributes.entrySet()) {
attributes.put(entry.getKey(), String.valueOf(entry.getValue())); attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
} }
attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName); attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName);

View File

@ -61,7 +61,7 @@ final class JMSConsumer extends JMSWorker {
/** /**
* *
*/ */
public JMSResponse consume(final String destinationName) { public JMSResponse consume(String destinationName) {
Message message = this.jmsTemplate.receive(destinationName); Message message = this.jmsTemplate.receive(destinationName);
if (message != null) { if (message != null) {
byte[] messageBody = null; byte[] messageBody = null;

View File

@ -63,7 +63,7 @@ final class JMSPublisher extends JMSWorker {
* *
* @param messageBytes byte array representing contents of the message * @param messageBytes byte array representing contents of the message
*/ */
void publish(final String destinationName, byte[] messageBytes) { void publish(String destinationName, byte[] messageBytes) {
this.publish(destinationName, messageBytes, null); this.publish(destinationName, messageBytes, null);
} }
@ -74,7 +74,7 @@ final class JMSPublisher extends JMSWorker {
* @param flowFileAttributes * @param flowFileAttributes
* Map representing {@link FlowFile} attributes. * Map representing {@link FlowFile} attributes.
*/ */
void publish(final String destinationName, final byte[] messageBytes, final Map<String, String> flowFileAttributes) { void publish(final String destinationName, final byte[] messageBytes, Map<String, String> flowFileAttributes) {
this.jmsTemplate.send(destinationName, new MessageCreator() { this.jmsTemplate.send(destinationName, new MessageCreator() {
@Override @Override
public Message createMessage(Session session) throws JMSException { public Message createMessage(Session session) throws JMSException {
@ -125,9 +125,8 @@ final class JMSPublisher extends JMSWorker {
* *
*/ */
private void logUnbuildableDestination(String destinationName, String headerName) { private void logUnbuildableDestination(String destinationName, String headerName) {
logger.warn("Failed to determine destination type from destination name '" + destinationName + "'. The '" this.processLog.warn("Failed to determine destination type from destination name '" + destinationName
+ headerName + "' will not be set."); + "'. The '"
processLog.warn("Failed to determine destination type from destination name '" + destinationName + "'. The '"
+ headerName + "' will not be set."); + headerName + "' will not be set.");
} }

View File

@ -55,7 +55,7 @@ import org.springframework.jms.support.JmsHeaders;
@Tags({ "jms", "put", "message", "send", "publish" }) @Tags({ "jms", "put", "message", "send", "publish" })
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Creates a JMS Message from the contents of a FlowFile and sends it to a " @CapabilityDescription("Creates a JMS Message from the contents of a FlowFile and sends it to a "
+ "JMS Destination (queue or topic) as JMS BytesMessage.") + "JMS Destination (queue or topic) as JMS BytesMessage. FlowFile attributes will be added as JMS headers and/or properties to the outgoing JMS message.")
@SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class }) @SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> { public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
@ -98,9 +98,8 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
FlowFile flowFile = processSession.get(); FlowFile flowFile = processSession.get();
if (flowFile != null) { if (flowFile != null) {
try { try {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue(); String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession), this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
flowFile.getAttributes());
processSession.transfer(flowFile, REL_SUCCESS); processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
} catch (Exception e) { } catch (Exception e) {