NIFI-84 add MapMessage value pairs as FlowFile attributes

Signed-off-by: Toivo Adams <toivo.adams@gmail.com>
Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Toivo Adams 2015-02-10 10:03:51 +02:00 committed by Mark Payne
parent afe446774d
commit 602fa7a860
3 changed files with 339 additions and 55 deletions

View File

@ -33,6 +33,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -40,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -54,6 +57,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
import org.apache.nifi.util.BooleanHolder;
import org.apache.nifi.util.IntegerHolder;
@ -63,6 +67,8 @@ import org.apache.nifi.util.StopWatch;
public abstract class JmsConsumer extends AbstractProcessor {
public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles are routed to success").build();
@ -108,22 +114,17 @@ public abstract class JmsConsumer extends AbstractProcessor {
final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final ObjectHolder<Message> lastMessageReceived = new ObjectHolder<>(null);
final ObjectHolder<Map<String, String>> attributesFromJmsProps = new ObjectHolder<>(null);
final Set<FlowFile> allFlowFilesCreated = new HashSet<>();
final IntegerHolder messagesReceived = new IntegerHolder(0);
final LongHolder bytesReceived = new LongHolder(0L);
final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
final StopWatch stopWatch = new StopWatch(true);
for (int i = 0; i < batchSize; i++) {
final BooleanHolder failure = new BooleanHolder(false);
final Message message;
try {
// If we haven't received a message, wait until one is available. If we have already received at least one
// message, then we are not willing to wait for more to become available, but we are willing to keep receiving
// all messages that are immediately available.
if (messagesReceived.get() == 0) {
if (processingSummary.getMessagesReceived() == 0) {
message = consumer.receive(timeout);
} else {
message = consumer.receiveNoWait();
@ -131,7 +132,6 @@ public abstract class JmsConsumer extends AbstractProcessor {
} catch (final JMSException e) {
logger.error("Failed to receive JMS Message due to {}", e);
wrappedConsumer.close(logger);
failure.set(true);
break;
}
@ -139,48 +139,16 @@ public abstract class JmsConsumer extends AbstractProcessor {
break;
}
final IntegerHolder msgsThisFlowFile = new IntegerHolder(0);
FlowFile flowFile = session.create();
try {
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
messagesReceived.getAndIncrement();
final Map<String, String> attributes = (addAttributes ? JmsFactory.createAttributeMap(message) : null);
attributesFromJmsProps.set(attributes);
final byte[] messageBody = JmsFactory.createByteArray(message);
out.write(messageBody);
bytesReceived.addAndGet(messageBody.length);
msgsThisFlowFile.incrementAndGet();
lastMessageReceived.set(message);
} catch (final JMSException e) {
logger.error("Failed to receive JMS Message due to {}", e);
failure.set(true);
}
}
});
} finally {
if (failure.get()) { // no flowfile created
session.remove(flowFile);
wrappedConsumer.close(logger);
} else {
allFlowFilesCreated.add(flowFile);
final Map<String, String> attributes = attributesFromJmsProps.get();
if (attributes != null) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
}
}
processingSummary.add( map2FlowFile(context, session, message, addAttributes, logger) );
} catch (Exception e) {
logger.error("Failed to receive JMS Message due to {}", e);
wrappedConsumer.close(logger);
break;
}
}
if (allFlowFilesCreated.isEmpty()) {
if (processingSummary.getFlowFilesCreated()==0) {
context.yield();
return;
}
@ -188,21 +156,81 @@ public abstract class JmsConsumer extends AbstractProcessor {
session.commit();
stopWatch.stop();
if (!allFlowFilesCreated.isEmpty()) {
if (processingSummary.getFlowFilesCreated()>0) {
final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
float messagesPerSec = ((float) messagesReceived.get()) / secs;
final String dataRate = stopWatch.calculateDataRate(bytesReceived.get());
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{messagesReceived.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
}
// if we need to acknowledge the messages, do so now.
final Message lastMessage = lastMessageReceived.get();
final Message lastMessage = processingSummary.getLastMessageReceived();
if (clientAcknowledge && lastMessage != null) {
try {
lastMessage.acknowledge(); // acknowledge all received messages by acknowledging only the last.
} catch (final JMSException e) {
logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{messagesReceived.get(), e});
logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{processingSummary.getMessagesReceived(), e});
}
}
}
public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger) throws Exception {
// Currently not very useful, because always one Message == one FlowFile
final IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
FlowFile flowFile = session.create();
try {
// MapMessage is exception, add only name-value pairs to FlowFile attributes
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
}
// all other message types, write Message body to FlowFile content
else {
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
final byte[] messageBody = JmsFactory.createByteArray(message);
out.write(messageBody);
} catch (final JMSException e) {
throw new ProcessException("Failed to receive JMS Message due to {}", e);
}
}
});
}
if (addAttributes)
flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
} catch (Exception e) {
session.remove(flowFile);
throw e;
}
}
public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
final Map<String, String> valueMap = new HashMap<>();
final Enumeration<?> enumeration = mapMessage.getMapNames();
while (enumeration.hasMoreElements()) {
final String name = (String) enumeration.nextElement();
final Object value = mapMessage.getObject(name);
if (value==null)
valueMap.put(MAP_MESSAGE_PREFIX+name, "");
else
valueMap.put(MAP_MESSAGE_PREFIX+name, value.toString());
}
return valueMap;
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import javax.jms.Message;
import org.apache.nifi.flowfile.FlowFile;
/**
* Data structure which allows to collect processing summary data.
*
*/
public class JmsProcessingSummary {
private int messagesReceived;
private long bytesReceived;
private Message lastMessageReceived;
private int flowFilesCreated;
private FlowFile lastFlowFile; // helps testing
public JmsProcessingSummary() {
super();
this.messagesReceived = 0;
this.bytesReceived = 0;
this.lastMessageReceived = null;
this.flowFilesCreated = 0;
this.lastFlowFile = null;
}
public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) {
super();
this.messagesReceived = 1;
this.bytesReceived = bytesReceived;
this.lastMessageReceived = lastMessageReceived;
this.flowFilesCreated = 1;
this.lastFlowFile = lastFlowFile;
}
public void add(JmsProcessingSummary jmsProcessingSummary) {
this.messagesReceived += jmsProcessingSummary.messagesReceived;
this.bytesReceived += jmsProcessingSummary.bytesReceived;
this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived;
this.flowFilesCreated += jmsProcessingSummary.flowFilesCreated;
this.lastFlowFile = jmsProcessingSummary.lastFlowFile;
}
public int getMessagesReceived() {
return messagesReceived;
}
public long getBytesReceived() {
return bytesReceived;
}
public Message getLastMessageReceived() {
return lastMessageReceived;
}
public int getFlowFilesCreated() {
return flowFilesCreated;
}
public FlowFile getLastFlowFile() {
return lastFlowFile;
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.*;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
/**
*
*/
public class TestJmsConsumer {
static protected MapMessage createMapMessage() throws JMSException {
MapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("name", "Arnold");
mapMessage.setInt ("age", 97);
mapMessage.setDouble("xyz", 89686.564);
mapMessage.setBoolean("good", true);
return mapMessage;
}
/**
* Test method for {@link org.apache.nifi.processors.standard.JmsConsumer#createMapMessageAttrs(javax.jms.MapMessage)}.
* @throws JMSException
*/
@Test
public void testCreateMapMessageValues() throws JMSException {
MapMessage mapMessage = createMapMessage();
Map<String, String> mapMessageValues = JmsConsumer.createMapMessageValues(mapMessage);
assertEquals("", 4, mapMessageValues.size());
assertEquals("", "Arnold", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
assertEquals("", "97", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
assertEquals("", "89686.564", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
assertEquals("", "true", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));
}
/**
* Test MapMessage to FlowFile conversion
*/
@Test
public void testMap2FlowFileMapMessage() throws Exception {
TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
MapMessage mapMessage = createMapMessage();
ProcessContext context = runner.getProcessContext();
ProcessSession session = runner.getProcessSessionFactory().createSession();
ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
(MockProcessContext) runner.getProcessContext());
JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, mapMessage, true, pic.getLogger());
assertEquals("MapMessage should not create FlowFile content", 0, summary.getBytesReceived());
Map<String, String> attributes = summary.getLastFlowFile().getAttributes();
assertEquals("", "Arnold", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
assertEquals("", "97", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
assertEquals("", "89686.564", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
assertEquals("", "true", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));
}
/**
* Test TextMessage to FlowFile conversion
*/
@Test
public void testMap2FlowFileTextMessage() throws Exception {
TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
TextMessage textMessage = new ActiveMQTextMessage();
String payload = "Hello world!";
textMessage.setText(payload);
ProcessContext context = runner.getProcessContext();
ProcessSession session = runner.getProcessSessionFactory().createSession();
ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
(MockProcessContext) runner.getProcessContext());
JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, textMessage, true, pic.getLogger());
assertEquals("TextMessage content length should equal to FlowFile content size", payload.length(), summary.getLastFlowFile().getSize());
final byte[] buffer = new byte[payload.length()];
runner.clearTransferState();
session.read(summary.getLastFlowFile(), new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
String contentString = new String(buffer,"UTF-8");
assertEquals("", payload, contentString);
}
/**
* Test BytesMessage to FlowFile conversion
*/
@Test
public void testMap2FlowFileBytesMessage() throws Exception {
TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
BytesMessage bytesMessage = new ActiveMQBytesMessage();
String sourceString = "Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.!";
byte[] payload = sourceString.getBytes("UTF-8");
bytesMessage.writeBytes(payload);
bytesMessage.reset();
ProcessContext context = runner.getProcessContext();
ProcessSession session = runner.getProcessSessionFactory().createSession();
ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
(MockProcessContext) runner.getProcessContext());
JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, bytesMessage, true, pic.getLogger());
assertEquals("BytesMessage content length should equal to FlowFile content size", payload.length, summary.getLastFlowFile().getSize());
final byte[] buffer = new byte[payload.length];
runner.clearTransferState();
session.read(summary.getLastFlowFile(), new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
}
});
String contentString = new String(buffer,"UTF-8");
assertEquals("", sourceString, contentString);
}
}