mirror of https://github.com/apache/nifi.git
NIFI-1000 addressed PR comments
This commit is contained in:
parent
ef0be5a5d6
commit
8699e35108
|
@ -153,7 +153,6 @@ language governing permissions and limitations under the License. -->
|
|||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jayway.jsonpath</groupId>
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CL
|
|||
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
|
||||
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
|
||||
|
@ -89,6 +90,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
|
|||
descriptors.add(USERNAME);
|
||||
descriptors.add(PASSWORD);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(DESTINATION_TYPE);
|
||||
descriptors.add(ACKNOWLEDGEMENT_MODE);
|
||||
descriptors.add(MESSAGE_SELECTOR);
|
||||
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
|
||||
|
@ -158,8 +160,8 @@ public abstract class JmsConsumer extends AbstractProcessor {
|
|||
|
||||
stopWatch.stop();
|
||||
if (processingSummary.getFlowFilesCreated() > 0) {
|
||||
final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
|
||||
float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
|
||||
final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
|
||||
float messagesPerSec = (processingSummary.getMessagesReceived()) / secs;
|
||||
final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
|
||||
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
|
||||
new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
|
||||
|
|
|
@ -46,14 +46,13 @@ public class TestGetJMSQueue {
|
|||
@Test
|
||||
public void testSendTextToQueue() throws Exception {
|
||||
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||
StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
|
||||
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
|
||||
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
||||
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
||||
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
||||
|
||||
MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
|
||||
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
|
||||
final Session jmsSession = wrappedProducer.getSession();
|
||||
final MessageProducer producer = wrappedProducer.getProducer();
|
||||
|
@ -62,16 +61,15 @@ public class TestGetJMSQueue {
|
|||
producer.send(message);
|
||||
jmsSession.commit();
|
||||
|
||||
getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
|
||||
runner.run();
|
||||
|
||||
List<MockFlowFile> flowFiles = pSession
|
||||
List<MockFlowFile> flowFiles = runner
|
||||
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
|
||||
|
||||
assertTrue(flowFiles.size() == 1);
|
||||
MockFlowFile successFlowFile = flowFiles.get(0);
|
||||
String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
|
||||
assertEquals("Hello World", receivedMessage);
|
||||
assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
|
||||
successFlowFile.assertContentEquals("Hello World");
|
||||
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
|
||||
producer.close();
|
||||
jmsSession.close();
|
||||
}
|
||||
|
@ -79,7 +77,7 @@ public class TestGetJMSQueue {
|
|||
@Test
|
||||
public void testSendBytesToQueue() throws Exception {
|
||||
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||
StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
|
||||
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
|
||||
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
||||
|
@ -88,23 +86,21 @@ public class TestGetJMSQueue {
|
|||
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
|
||||
final Session jmsSession = wrappedProducer.getSession();
|
||||
final MessageProducer producer = wrappedProducer.getProducer();
|
||||
MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
|
||||
final BytesMessage message = jmsSession.createBytesMessage();
|
||||
message.writeBytes("Hello Bytes".getBytes());
|
||||
|
||||
producer.send(message);
|
||||
jmsSession.commit();
|
||||
|
||||
getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
|
||||
runner.run();
|
||||
|
||||
List<MockFlowFile> flowFiles = pSession
|
||||
List<MockFlowFile> flowFiles = runner
|
||||
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
|
||||
|
||||
assertTrue(flowFiles.size() == 1);
|
||||
MockFlowFile successFlowFile = flowFiles.get(0);
|
||||
String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
|
||||
assertEquals("Hello Bytes", receivedMessage);
|
||||
assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
|
||||
successFlowFile.assertContentEquals("Hello Bytes");
|
||||
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
|
||||
producer.close();
|
||||
jmsSession.close();
|
||||
}
|
||||
|
@ -112,7 +108,7 @@ public class TestGetJMSQueue {
|
|||
@Test
|
||||
public void testSendStreamToQueue() throws Exception {
|
||||
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||
StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
|
||||
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
|
||||
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
||||
|
@ -121,23 +117,22 @@ public class TestGetJMSQueue {
|
|||
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
|
||||
final Session jmsSession = wrappedProducer.getSession();
|
||||
final MessageProducer producer = wrappedProducer.getProducer();
|
||||
MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
|
||||
|
||||
final StreamMessage message = jmsSession.createStreamMessage();
|
||||
message.writeBytes("Hello Stream".getBytes());
|
||||
|
||||
producer.send(message);
|
||||
jmsSession.commit();
|
||||
|
||||
getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
|
||||
runner.run();
|
||||
|
||||
List<MockFlowFile> flowFiles = pSession
|
||||
List<MockFlowFile> flowFiles = runner
|
||||
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
|
||||
|
||||
assertTrue(flowFiles.size() == 1);
|
||||
MockFlowFile successFlowFile = flowFiles.get(0);
|
||||
String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
|
||||
assertEquals("Hello Stream", receivedMessage);
|
||||
assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
|
||||
successFlowFile.assertContentEquals("Hello Stream");
|
||||
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
|
||||
|
||||
producer.close();
|
||||
jmsSession.close();
|
||||
|
|
Loading…
Reference in New Issue