diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java index b8902a91ac..7405ca5e2a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java @@ -251,8 +251,9 @@ public class PutJMS extends AbstractProcessor { final String flowFileDescription = successfulFlowFiles.size() > 10 ? successfulFlowFiles.size() + " FlowFiles" : successfulFlowFiles.toString(); logger.info("Sent {} to JMS Server and transferred to 'success'", new Object[]{flowFileDescription}); } catch (JMSException e) { - logger.error("Failed to commit JMS Session due to {}; rolling back session", new Object[]{e}); - session.rollback(); + logger.error("Failed to commit JMS Session due to {} and transferred to 'failure'", new Object[]{e}); + session.transfer(flowFiles, REL_FAILURE); + context.yield(); wrappedProducer.close(logger); } } finally { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java new file mode 100644 index 0000000000..acba96cec6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java @@ -0,0 +1,565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.standard.util.JmsFactory; +import org.apache.nifi.processors.standard.util.JmsProperties; +import org.apache.nifi.processors.standard.util.WrappedMessageProducer; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +public class TestPutJMS { + + private final String TEST_PROVIDER = JmsProperties.ACTIVEMQ_PROVIDER; + private final String TEST_URL = "vm://localhost?broker.persistent=false"; + private final String TEST_DEST_TYPE = JmsProperties.DESTINATION_TYPE_QUEUE; + private final String TEST_DEST_NAME = "queue.testing"; + private final String TEST_ACK_MODE = JmsProperties.ACK_MODE_AUTO; + + private String testQueueSuffix() { + final StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + return "." + trace[2].getMethodName(); + } + + private void injectFieldValue(Class klass, Object instance, String fieldName, Object fieldValue) throws NoSuchFieldException, IllegalAccessException { + Field field = klass.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(instance, fieldValue); + } + + @Test + public void testGetRelationships() { + final PutJMS putJMS = new PutJMS(); + final Set relationships = putJMS.getRelationships(); + assertEquals(2, relationships.size()); + assertTrue(relationships.contains(PutJMS.REL_FAILURE)); + assertTrue(relationships.contains(PutJMS.REL_SUCCESS)); + } + + @Test + public void testCleanupResources() throws JMSException, NoSuchFieldException, IllegalAccessException { + final PutJMS putJMS = new PutJMS(); + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + + final Queue wrappedMessageProducerQueue = (Queue) spy(new LinkedBlockingQueue<>()); + injectFieldValue(PutJMS.class, putJMS, "producerQueue", wrappedMessageProducerQueue); + + final WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runnerPut.getProcessContext(), true); + wrappedMessageProducerQueue.offer(wrappedProducer); + + assertNotNull(wrappedMessageProducerQueue.peek()); + putJMS.cleanupResources(); + assertNull(wrappedMessageProducerQueue.peek()); + } + + + @Test + public void testCreateMessageDirectly() throws JMSException { + final PutJMS putJMS = new PutJMS(); + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPut.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, TEST_ACK_MODE); + + final WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runnerPut.getProcessContext(), true); + final Session jmsSession = wrappedProducer.getSession(); + final MessageProducer producer = wrappedProducer.getProducer(); + final Message message = jmsSession.createTextMessage("createMessageDirectly"); + + producer.send(message); + jmsSession.commit(); + + final GetJMSQueue getJmsQueue = new GetJMSQueue(); + final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue); + + runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerGet.setProperty(JmsProperties.URL, TEST_URL); + runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, TEST_ACK_MODE); + + runnerGet.run(); + + final List flowFiles = runnerGet.getFlowFilesForRelationship( + new Relationship.Builder().name("success").build()); + + assertEquals(1, flowFiles.size()); + final MockFlowFile successFlowFile = flowFiles.get(0); + successFlowFile.assertContentEquals("createMessageDirectly"); + successFlowFile.assertAttributeEquals("jms.JMSDestination", TEST_DEST_NAME + testQueueSuffix()); + producer.close(); + jmsSession.close(); + } + + @Test + public void testPutGetAttributesAndProps() throws JMSException { + final PutJMS putJMS = spy(new PutJMS()); + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPut.setProperty(JmsProperties.REPLY_TO_QUEUE, TEST_DEST_NAME + testQueueSuffix() + ".reply"); + runnerPut.setProperty(JmsProperties.ATTRIBUTES_TO_JMS_PROPS, "true"); + + runnerPut.run(); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "file1.txt"); + attributes.put("jms.string", "banana"); + attributes.put("jms.integer", "50"); + attributes.put("jms.integer.type", "integer"); + attributes.put("jms.float", "3.14159"); + attributes.put("jms.float.type", "float"); + attributes.put("jms.boolean", "true"); + attributes.put("jms.boolean.type", "boolean"); + attributes.put("jms.long", "123456789"); + attributes.put("jms.long.type", "long"); + attributes.put("jms.short", "16384"); + attributes.put("jms.short.type", "short"); + attributes.put("jms.byte", "127"); + attributes.put("jms.byte.type", "byte"); + attributes.put("jms.double", "3.1415626547"); + attributes.put("jms.double.type", "double"); + attributes.put("jms.object", "{\"id\":215, \"name\": \"john doe\"}"); + attributes.put("jms.object.type", "object"); + attributes.put("jms.eyes", "blue"); + attributes.put("jms.eyes.type", "color"); + attributes.put("jms.badinteger", "3.14"); + attributes.put("jms.badinteger.type", "integer"); + runnerPut.enqueue("putGetMessage".getBytes(), attributes); + + runnerPut.run(); + + assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + final GetJMSQueue getJmsQueue = new GetJMSQueue(); + final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue); + runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerGet.setProperty(JmsProperties.URL, TEST_URL); + runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, TEST_ACK_MODE); + runnerGet.setProperty(JmsProperties.JMS_PROPS_TO_ATTRIBUTES, "true"); + + runnerGet.run(); + + assertEquals(1, runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS).size()); + + final List flowFilesGet = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + + assertEquals(1, flowFilesGet.size()); + final MockFlowFile successFlowFile = flowFilesGet.get(0); + + successFlowFile.assertContentEquals("putGetMessage"); + successFlowFile.assertAttributeEquals("jms.JMSDestination", TEST_DEST_NAME + testQueueSuffix()); + successFlowFile.assertAttributeEquals("jms.JMSReplyTo", "queue://" + TEST_DEST_NAME + testQueueSuffix() + ".reply"); + successFlowFile.assertAttributeEquals("jms.string", "banana"); + successFlowFile.assertAttributeEquals("jms.integer", "50"); + successFlowFile.assertAttributeEquals("jms.float", "3.14159"); + successFlowFile.assertAttributeEquals("jms.boolean", "true"); + successFlowFile.assertAttributeEquals("jms.long", "123456789"); + successFlowFile.assertAttributeEquals("jms.short", "16384"); + successFlowFile.assertAttributeEquals("jms.byte", "127"); + successFlowFile.assertAttributeEquals("jms.double", "3.1415626547"); + successFlowFile.assertAttributeEquals("jms.object", "{\"id\":215, \"name\": \"john doe\"}"); + successFlowFile.assertAttributeEquals("jms.eyes", null); + successFlowFile.assertAttributeEquals("jms.badinteger", null); + } + + @Test + public void testPutGetMessageTypes() throws JMSException { + final GetJMSQueue getJmsQueue = new GetJMSQueue(); + final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue); + runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerGet.setProperty(JmsProperties.URL, TEST_URL); + runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, TEST_ACK_MODE); + runnerGet.setProperty(JmsProperties.JMS_PROPS_TO_ATTRIBUTES, "true"); + + //------------------------------------------------------------ + final PutJMS putJMStext = spy(new PutJMS()); + final TestRunner runnerPutText = TestRunners.newTestRunner(putJMStext); + runnerPutText.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPutText.setProperty(JmsProperties.URL, TEST_URL); + runnerPutText.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPutText.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPutText.setProperty(JmsProperties.MESSAGE_TYPE, JmsProperties.MSG_TYPE_TEXT); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "file1.txt"); + runnerPutText.enqueue("putGetTextMessage", attributes); + + runnerPutText.run(); + + assertEquals(0, runnerPutText.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPutText.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + runnerGet.run(); + + final List ffText = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + assertEquals(1, ffText.size()); + final MockFlowFile successText = ffText.get(0); + successText.assertContentEquals("putGetTextMessage"); + + //------------------------------------------------------------ + final PutJMS putJMSempty = spy(new PutJMS()); + final TestRunner runnerPutEmpty = TestRunners.newTestRunner(putJMSempty); + runnerPutEmpty.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPutEmpty.setProperty(JmsProperties.URL, TEST_URL); + runnerPutEmpty.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPutEmpty.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPutEmpty.setProperty(JmsProperties.MESSAGE_TYPE, JmsProperties.MSG_TYPE_EMPTY); + + final Map attributesEmpty = new HashMap<>(); + attributesEmpty.put("filename", "file1.txt"); + runnerPutEmpty.enqueue("putGetEmptyMessage", attributesEmpty); + + runnerPutEmpty.run(); + + assertEquals(0, runnerPutEmpty.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPutEmpty.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + runnerGet.clearTransferState(); + runnerGet.run(); + + final List ffEmpty = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + assertEquals(1, ffEmpty.size()); + final MockFlowFile successEmpty = ffEmpty.get(0); + successEmpty.assertContentEquals(""); + + //------------------------------------------------------------ + final PutJMS putJMSstream = spy(new PutJMS()); + final TestRunner runnerPutStream = TestRunners.newTestRunner(putJMSstream); + runnerPutStream.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPutStream.setProperty(JmsProperties.URL, TEST_URL); + runnerPutStream.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPutStream.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPutStream.setProperty(JmsProperties.MESSAGE_TYPE, JmsProperties.MSG_TYPE_STREAM); + + final Map attributesStream = new HashMap<>(); + attributesStream.put("filename", "file1.txt"); + runnerPutStream.enqueue("putGetStreamMessage", attributesStream); + + runnerGet.clearTransferState(); + runnerPutStream.run(); + + assertEquals(0, runnerPutStream.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPutStream.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + runnerGet.run(); + + final List ffStream = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + assertEquals(1, ffStream.size()); + final MockFlowFile successStream = ffStream.get(0); + successStream.assertContentEquals("putGetStreamMessage"); + + //------------------------------------------------------------ + final PutJMS putJMSmap = spy(new PutJMS()); + final TestRunner runnerPutMap = TestRunners.newTestRunner(putJMSmap); + runnerPutMap.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPutMap.setProperty(JmsProperties.URL, TEST_URL); + runnerPutMap.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPutMap.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPutMap.setProperty(JmsProperties.MESSAGE_TYPE, JmsProperties.MSG_TYPE_MAP); + + final Map attributesMap = new HashMap<>(); + attributesMap.put("filename", "file1.txt"); + runnerPutMap.enqueue("putGetMapMessage", attributesMap); + + runnerGet.clearTransferState(); + runnerPutMap.run(); + + assertEquals(0, runnerPutMap.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPutMap.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + runnerGet.run(); + + final List ffMap = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + assertEquals(1, ffMap.size()); + final MockFlowFile successMap = ffMap.get(0); + successMap.assertContentEquals(""); + + //------------------------------------------------------------ + final PutJMS putJMSByte = spy(new PutJMS()); + final TestRunner runnerPutByte = TestRunners.newTestRunner(putJMSByte); + runnerPutByte.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPutByte.setProperty(JmsProperties.URL, TEST_URL); + runnerPutByte.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPutByte.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPutByte.setProperty(JmsProperties.MESSAGE_TYPE, JmsProperties.MSG_TYPE_BYTE); + + final Map attributesByte = new HashMap<>(); + attributesByte.put("filename", "file1.txt"); + runnerPutByte.enqueue("putGetTextMessage", attributesByte); + + runnerPutByte.run(); + + assertEquals(0, runnerPutByte.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPutByte.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + runnerGet.clearTransferState(); + runnerGet.run(); + + final List ffByte = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + assertEquals(1, ffByte.size()); + final MockFlowFile successByte = ffByte.get(0); + successByte.assertContentEquals("putGetTextMessage"); + } + + @Test + public void testTTL() throws JMSException, InterruptedException { + final PutJMS putJMS = spy(new PutJMS()); + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPut.setProperty(JmsProperties.MESSAGE_TTL, "3 s"); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "file1.txt"); + runnerPut.enqueue("ttl10secNotExpired".getBytes(), attributes); + + runnerPut.run(); + + assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + final GetJMSQueue getJmsQueue = new GetJMSQueue(); + final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue); + runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerGet.setProperty(JmsProperties.URL, TEST_URL); + runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, TEST_ACK_MODE); + runnerGet.setProperty(JmsProperties.TIMEOUT, "1 s"); + + runnerGet.run(); + + final List flowFiles1 = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + assertEquals(1, flowFiles1.size()); + final MockFlowFile successFlowFile1 = flowFiles1.get(0); + successFlowFile1.assertContentEquals("ttl10secNotExpired"); + + runnerPut.clearTransferState(); + runnerGet.clearTransferState(); + + runnerPut.setProperty(JmsProperties.MESSAGE_TTL, "1 s"); + runnerPut.enqueue("ttl1secExpired".getBytes(), attributes); + + runnerPut.run(); + + assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + Thread.sleep(2000L); + runnerGet.run(); + + final List flowFiles2 = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + assertEquals(0, flowFiles2.size()); + } + + @Test + public void testFailureOnFileExceedsBufferSize() throws JMSException { + final PutJMS putJMS = spy(new PutJMS()); + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPut.setProperty(JmsProperties.MAX_BUFFER_SIZE, "10 B"); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "file1.txt"); + runnerPut.enqueue("failureOnFileExceedsBufferSize".getBytes(), attributes); + + runnerPut.run(); + + assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + } + + @Test(expected = NumberFormatException.class) + public void testBadMessagePriorityValueFails() throws JMSException { + final PutJMS putJMS = spy(new PutJMS()); + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPut.setProperty(JmsProperties.MESSAGE_PRIORITY, "negative one"); + assertEquals(PutJMS.DEFAULT_MESSAGE_PRIORITY, runnerPut.getProcessContext().getProperty(JmsProperties.MESSAGE_PRIORITY).asInteger().intValue()); + } + + @Test + public void testBadMessagePriorityRunSucceeds() throws JMSException { + final PutJMS putJMS = spy(new PutJMS()); + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPut.setProperty(JmsProperties.MESSAGE_PRIORITY, "negative one"); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "file1.txt"); + runnerPut.enqueue("badMessagePriorityRunSucceeds".getBytes(), attributes); + + runnerPut.run(); + + assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + } + + @Test + public void testPutSendRoutesToFailure() throws JMSException, NoSuchFieldException, IllegalAccessException { + + final PutJMS putJMS = spy(new PutJMS()); + + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + + final ProcessContext context = runnerPut.getProcessContext(); + + final Queue wrappedMessageProducerQueue = (Queue) spy(new LinkedBlockingQueue<>()); + injectFieldValue(PutJMS.class, putJMS, "producerQueue", wrappedMessageProducerQueue); + + final WrappedMessageProducer wrappedMessageProducer = spy(JmsFactory.createMessageProducer(context, true)); + final MessageProducer messageProducer = spy(wrappedMessageProducer.getProducer()); + + doAnswer(new Answer() { + @Override + public WrappedMessageProducer answer(InvocationOnMock invocationOnMock) { + return wrappedMessageProducer; + } + }).when(wrappedMessageProducerQueue).poll(); + assertEquals(wrappedMessageProducer, wrappedMessageProducerQueue.poll()); + + doAnswer(new Answer() { + @Override + public MessageProducer answer(InvocationOnMock invocationOnMock) { + return messageProducer; + } + }).when(wrappedMessageProducer).getProducer(); + + doThrow(new JMSException("force send to fail")).when(messageProducer).send(any(Message.class)); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "file1.txt"); + runnerPut.enqueue("putSendRoutesToFailure".getBytes(), attributes); + + runnerPut.run(); + + assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + + final List flowFilesFail = runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE); + assertEquals(1, flowFilesFail.size()); + } + + @Test + public void testPutCommitRoutesToFailure() throws JMSException, NoSuchFieldException, IllegalAccessException { + + final PutJMS putJMS = spy(new PutJMS()); + + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + + final ProcessContext context = runnerPut.getProcessContext(); + final Queue wrappedMessageProducerQueue = (Queue) spy(new LinkedBlockingQueue<>()); + injectFieldValue(PutJMS.class, putJMS, "producerQueue", wrappedMessageProducerQueue); + + final WrappedMessageProducer wrappedMessageProducer = spy(JmsFactory.createMessageProducer(context, true)); + final MessageProducer messageProducer = spy(wrappedMessageProducer.getProducer()); + final Connection connection = JmsFactory.createConnection(context); + final Session jmsSession = spy(JmsFactory.createSession(context, connection, true)); + + doAnswer(new Answer() { + @Override + public WrappedMessageProducer answer(InvocationOnMock invocationOnMock) { + return wrappedMessageProducer; + } + }).when(wrappedMessageProducerQueue).poll(); + + doAnswer(new Answer() { + @Override + public MessageProducer answer(InvocationOnMock invocationOnMock) { + return messageProducer; + } + }).when(wrappedMessageProducer).getProducer(); + + doAnswer(new Answer() { + @Override + public Session answer(InvocationOnMock invocationOnMock) { + return jmsSession; + } + }).when(wrappedMessageProducer).getSession(); + + doThrow(new JMSException("force commit to fail")).when(jmsSession).commit(); + + final Map attributes = new HashMap<>(); + attributes.put("filename", "file1.txt"); + runnerPut.enqueue("putCommitRoutesToFailure".getBytes(), attributes); + + runnerPut.run(); + + assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + + final List flowFilesFail = runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE); + assertEquals(1, flowFilesFail.size()); + } +} \ No newline at end of file