diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java index 2d262f3469..c48d52009f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java @@ -33,6 +33,8 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -40,6 +42,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -54,6 +57,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processors.standard.util.JmsFactory; +import org.apache.nifi.processors.standard.util.JmsProcessingSummary; import org.apache.nifi.processors.standard.util.WrappedMessageConsumer; import org.apache.nifi.util.BooleanHolder; import org.apache.nifi.util.IntegerHolder; @@ -63,6 +67,8 @@ import org.apache.nifi.util.StopWatch; public abstract class JmsConsumer extends AbstractProcessor { + public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage."; + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("All FlowFiles are routed to success").build(); @@ -108,22 +114,17 @@ public abstract class JmsConsumer extends AbstractProcessor { final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean(); final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); - final ObjectHolder lastMessageReceived = new ObjectHolder<>(null); - final ObjectHolder> attributesFromJmsProps = new ObjectHolder<>(null); - final Set allFlowFilesCreated = new HashSet<>(); - final IntegerHolder messagesReceived = new IntegerHolder(0); - final LongHolder bytesReceived = new LongHolder(0L); - + final JmsProcessingSummary processingSummary = new JmsProcessingSummary(); + final StopWatch stopWatch = new StopWatch(true); for (int i = 0; i < batchSize; i++) { - final BooleanHolder failure = new BooleanHolder(false); final Message message; try { // If we haven't received a message, wait until one is available. If we have already received at least one // message, then we are not willing to wait for more to become available, but we are willing to keep receiving // all messages that are immediately available. - if (messagesReceived.get() == 0) { + if (processingSummary.getMessagesReceived() == 0) { message = consumer.receive(timeout); } else { message = consumer.receiveNoWait(); @@ -131,7 +132,6 @@ public abstract class JmsConsumer extends AbstractProcessor { } catch (final JMSException e) { logger.error("Failed to receive JMS Message due to {}", e); wrappedConsumer.close(logger); - failure.set(true); break; } @@ -139,48 +139,16 @@ public abstract class JmsConsumer extends AbstractProcessor { break; } - final IntegerHolder msgsThisFlowFile = new IntegerHolder(0); - FlowFile flowFile = session.create(); try { - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) { - messagesReceived.getAndIncrement(); - final Map attributes = (addAttributes ? JmsFactory.createAttributeMap(message) : null); - attributesFromJmsProps.set(attributes); - - final byte[] messageBody = JmsFactory.createByteArray(message); - out.write(messageBody); - bytesReceived.addAndGet(messageBody.length); - msgsThisFlowFile.incrementAndGet(); - lastMessageReceived.set(message); - } catch (final JMSException e) { - logger.error("Failed to receive JMS Message due to {}", e); - failure.set(true); - } - } - }); - } finally { - if (failure.get()) { // no flowfile created - session.remove(flowFile); - wrappedConsumer.close(logger); - } else { - allFlowFilesCreated.add(flowFile); - - final Map attributes = attributesFromJmsProps.get(); - if (attributes != null) { - flowFile = session.putAllAttributes(flowFile, attributes); - } - - session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue()); - session.transfer(flowFile, REL_SUCCESS); - logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()}); - } - } + processingSummary.add( map2FlowFile(context, session, message, addAttributes, logger) ); + } catch (Exception e) { + logger.error("Failed to receive JMS Message due to {}", e); + wrappedConsumer.close(logger); + break; + } } - - if (allFlowFilesCreated.isEmpty()) { + + if (processingSummary.getFlowFilesCreated()==0) { context.yield(); return; } @@ -188,21 +156,81 @@ public abstract class JmsConsumer extends AbstractProcessor { session.commit(); stopWatch.stop(); - if (!allFlowFilesCreated.isEmpty()) { + if (processingSummary.getFlowFilesCreated()>0) { final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F); - float messagesPerSec = ((float) messagesReceived.get()) / secs; - final String dataRate = stopWatch.calculateDataRate(bytesReceived.get()); - logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{messagesReceived.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate}); + float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs; + final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived()); + logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate}); } // if we need to acknowledge the messages, do so now. - final Message lastMessage = lastMessageReceived.get(); + final Message lastMessage = processingSummary.getLastMessageReceived(); if (clientAcknowledge && lastMessage != null) { try { lastMessage.acknowledge(); // acknowledge all received messages by acknowledging only the last. } catch (final JMSException e) { - logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{messagesReceived.get(), e}); + logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{processingSummary.getMessagesReceived(), e}); } } } + + public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger) throws Exception { + + // Currently not very useful, because always one Message == one FlowFile + final IntegerHolder msgsThisFlowFile = new IntegerHolder(1); + + FlowFile flowFile = session.create(); + try { + // MapMessage is exception, add only name-value pairs to FlowFile attributes + if (message instanceof MapMessage) { + MapMessage mapMessage = (MapMessage) message; + flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage)); + } + // all other message types, write Message body to FlowFile content + else { + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) { + final byte[] messageBody = JmsFactory.createByteArray(message); + out.write(messageBody); + } catch (final JMSException e) { + throw new ProcessException("Failed to receive JMS Message due to {}", e); + } + } + }); + } + + if (addAttributes) + flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message)); + + session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue()); + session.transfer(flowFile, REL_SUCCESS); + logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()}); + + return new JmsProcessingSummary(flowFile.getSize(), message, flowFile); + + } catch (Exception e) { + session.remove(flowFile); + throw e; + } + } + + public static Map createMapMessageValues(final MapMessage mapMessage) throws JMSException { + final Map valueMap = new HashMap<>(); + + final Enumeration enumeration = mapMessage.getMapNames(); + while (enumeration.hasMoreElements()) { + final String name = (String) enumeration.nextElement(); + + final Object value = mapMessage.getObject(name); + if (value==null) + valueMap.put(MAP_MESSAGE_PREFIX+name, ""); + else + valueMap.put(MAP_MESSAGE_PREFIX+name, value.toString()); + } + + return valueMap; + } + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java new file mode 100644 index 0000000000..02a40965eb --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import javax.jms.Message; + +import org.apache.nifi.flowfile.FlowFile; + + +/** + * Data structure which allows to collect processing summary data. + * + */ +public class JmsProcessingSummary { + + private int messagesReceived; + private long bytesReceived; + private Message lastMessageReceived; + private int flowFilesCreated; + private FlowFile lastFlowFile; // helps testing + + public JmsProcessingSummary() { + super(); + this.messagesReceived = 0; + this.bytesReceived = 0; + this.lastMessageReceived = null; + this.flowFilesCreated = 0; + this.lastFlowFile = null; + } + + public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) { + super(); + this.messagesReceived = 1; + this.bytesReceived = bytesReceived; + this.lastMessageReceived = lastMessageReceived; + this.flowFilesCreated = 1; + this.lastFlowFile = lastFlowFile; + } + + public void add(JmsProcessingSummary jmsProcessingSummary) { + this.messagesReceived += jmsProcessingSummary.messagesReceived; + this.bytesReceived += jmsProcessingSummary.bytesReceived; + this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived; + this.flowFilesCreated += jmsProcessingSummary.flowFilesCreated; + this.lastFlowFile = jmsProcessingSummary.lastFlowFile; + } + + public int getMessagesReceived() { + return messagesReceived; + } + + public long getBytesReceived() { + return bytesReceived; + } + + public Message getLastMessageReceived() { + return lastMessageReceived; + } + + public int getFlowFilesCreated() { + return flowFilesCreated; + } + + public FlowFile getLastFlowFile() { + return lastFlowFile; + } + +} + diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java new file mode 100644 index 0000000000..1777a896d8 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.standard.util.JmsProcessingSummary; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.MockProcessorInitializationContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +/** + * + */ +public class TestJmsConsumer { + + static protected MapMessage createMapMessage() throws JMSException { + MapMessage mapMessage = new ActiveMQMapMessage(); + mapMessage.setString("name", "Arnold"); + mapMessage.setInt ("age", 97); + mapMessage.setDouble("xyz", 89686.564); + mapMessage.setBoolean("good", true); + return mapMessage; + } + + /** + * Test method for {@link org.apache.nifi.processors.standard.JmsConsumer#createMapMessageAttrs(javax.jms.MapMessage)}. + * @throws JMSException + */ + @Test + public void testCreateMapMessageValues() throws JMSException { + + MapMessage mapMessage = createMapMessage(); + + Map mapMessageValues = JmsConsumer.createMapMessageValues(mapMessage); + assertEquals("", 4, mapMessageValues.size()); + assertEquals("", "Arnold", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name")); + assertEquals("", "97", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age")); + assertEquals("", "89686.564", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz")); + assertEquals("", "true", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good")); + } + + /** + * Test MapMessage to FlowFile conversion + */ + @Test + public void testMap2FlowFileMapMessage() throws Exception { + + TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + MapMessage mapMessage = createMapMessage(); + + ProcessContext context = runner.getProcessContext(); + ProcessSession session = runner.getProcessSessionFactory().createSession(); + ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(), + (MockProcessContext) runner.getProcessContext()); + + JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, mapMessage, true, pic.getLogger()); + + assertEquals("MapMessage should not create FlowFile content", 0, summary.getBytesReceived()); + + Map attributes = summary.getLastFlowFile().getAttributes(); + assertEquals("", "Arnold", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name")); + assertEquals("", "97", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age")); + assertEquals("", "89686.564", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz")); + assertEquals("", "true", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good")); + } + + /** + * Test TextMessage to FlowFile conversion + */ + @Test + public void testMap2FlowFileTextMessage() throws Exception { + + TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + TextMessage textMessage = new ActiveMQTextMessage(); + + String payload = "Hello world!"; + textMessage.setText(payload); + + ProcessContext context = runner.getProcessContext(); + ProcessSession session = runner.getProcessSessionFactory().createSession(); + ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(), + (MockProcessContext) runner.getProcessContext()); + + JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, textMessage, true, pic.getLogger()); + + assertEquals("TextMessage content length should equal to FlowFile content size", payload.length(), summary.getLastFlowFile().getSize()); + + final byte[] buffer = new byte[payload.length()]; + runner.clearTransferState(); + + session.read(summary.getLastFlowFile(), new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer, false); + } + }); + + String contentString = new String(buffer,"UTF-8"); + assertEquals("", payload, contentString); + } + + /** + * Test BytesMessage to FlowFile conversion + */ + @Test + public void testMap2FlowFileBytesMessage() throws Exception { + + TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + BytesMessage bytesMessage = new ActiveMQBytesMessage(); + + String sourceString = "Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.!"; + byte[] payload = sourceString.getBytes("UTF-8"); + bytesMessage.writeBytes(payload); + bytesMessage.reset(); + + ProcessContext context = runner.getProcessContext(); + ProcessSession session = runner.getProcessSessionFactory().createSession(); + ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(), + (MockProcessContext) runner.getProcessContext()); + + JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, bytesMessage, true, pic.getLogger()); + + assertEquals("BytesMessage content length should equal to FlowFile content size", payload.length, summary.getLastFlowFile().getSize()); + + final byte[] buffer = new byte[payload.length]; + runner.clearTransferState(); + + session.read(summary.getLastFlowFile(), new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer, false); + } + }); + + String contentString = new String(buffer,"UTF-8"); + assertEquals("", sourceString, contentString); + } + +}