Merge branch 'NIFI-1000' of https://github.com/olegz/nifi into NIFI-1000

This commit is contained in:
Mark Payne 2015-11-10 13:34:56 -05:00
commit 53725b5c72
5 changed files with 68 additions and 17 deletions

View File

@ -152,7 +152,7 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<artifactId>activemq-all</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>

View File

@ -21,6 +21,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CL
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
@ -89,6 +90,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(DESTINATION_TYPE);
descriptors.add(ACKNOWLEDGEMENT_MODE);
descriptors.add(MESSAGE_SELECTOR);
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
@ -158,8 +160,8 @@ public abstract class JmsConsumer extends AbstractProcessor {
stopWatch.stop();
if (processingSummary.getFlowFilesCreated() > 0) {
final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
float messagesPerSec = (processingSummary.getMessagesReceived()) / secs;
final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});

View File

@ -481,7 +481,13 @@ public class JmsFactory {
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
}
if (message.getJMSDestination() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, message.getJMSDestination().toString());
String destinationName;
if (message.getJMSDestination() instanceof Queue) {
destinationName = ((Queue) message.getJMSDestination()).getQueueName();
} else {
destinationName = ((Topic) message.getJMSDestination()).getTopicName();
}
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
}
if (message.getJMSMessageID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());

View File

@ -16,6 +16,11 @@
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
@ -24,61 +29,88 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.StandardProcessorTestRunner;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.Revision;
import org.junit.Test;
public class TestGetJMSQueue {
@org.junit.Ignore
@Test
public void testSendTextToQueue() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
final Message message = jmsSession.createTextMessage("Hello World");
producer.send(message);
jmsSession.commit();
runner.run();
List<MockFlowFile> flowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
assertTrue(flowFiles.size() == 1);
MockFlowFile successFlowFile = flowFiles.get(0);
successFlowFile.assertContentEquals("Hello World");
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
producer.close();
jmsSession.close();
}
@org.junit.Ignore
@Test
public void testSendBytesToQueue() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
final BytesMessage message = jmsSession.createBytesMessage();
message.writeBytes("Hello Bytes".getBytes());
producer.send(message);
jmsSession.commit();
runner.run();
List<MockFlowFile> flowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
assertTrue(flowFiles.size() == 1);
MockFlowFile successFlowFile = flowFiles.get(0);
successFlowFile.assertContentEquals("Hello Bytes");
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
producer.close();
jmsSession.close();
}
@org.junit.Ignore
@Test
public void testSendStreamToQueue() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
@ -91,6 +123,17 @@ public class TestGetJMSQueue {
producer.send(message);
jmsSession.commit();
runner.run();
List<MockFlowFile> flowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
assertTrue(flowFiles.size() == 1);
MockFlowFile successFlowFile = flowFiles.get(0);
successFlowFile.assertContentEquals("Hello Stream");
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
producer.close();
jmsSession.close();
}

View File

@ -511,8 +511,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.12.0</version>
<artifactId>activemq-all</artifactId>
<version>5.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>