NIFI-1381 Removing the hardcoded jms:// prefix and instead deferring to the URI specified by the processor properties of PutJMS

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Aldrin Piri 2016-01-10 22:28:09 -05:00 committed by joewitt
parent d42f1d4add
commit 8d37af07b9
2 changed files with 46 additions and 2 deletions

View File

@ -241,7 +241,7 @@ public class PutJMS extends AbstractProcessor {
}
successfulFlowFiles.add(flowFile);
session.getProvenanceReporter().send(flowFile, "jms://" + context.getProperty(URL).getValue());
session.getProvenanceReporter().send(flowFile, context.getProperty(URL).getValue());
}
try {

View File

@ -21,6 +21,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -561,4 +562,47 @@ public class TestPutJMS {
final List<MockFlowFile> flowFilesFail = runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE);
assertEquals(1, flowFilesFail.size());
}
@Test
public void testPutProvenanceSendEventTransitUri() throws JMSException {
final PutJMS putJMS = spy(new PutJMS());
final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
runnerPut.setProperty(JmsProperties.URL, TEST_URL);
runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix());
runnerPut.setProperty(JmsProperties.ATTRIBUTES_TO_JMS_PROPS, "true");
runnerPut.enqueue("putGetMessage".getBytes());
runnerPut.run();
assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
final List<ProvenanceEventRecord> putProvenanceEvents = runnerPut.getProvenanceEvents();
assertEquals(1, putProvenanceEvents.size());
// Verify the transitUri is the same as that configured in the properties
assertEquals(TEST_URL, putProvenanceEvents.get(0).getTransitUri());
final GetJMSQueue getJmsQueue = new GetJMSQueue();
final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue);
runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
runnerGet.setProperty(JmsProperties.URL, TEST_URL);
runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix());
runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, TEST_ACK_MODE);
runnerGet.run();
assertEquals(1, runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS).size());
final List<MockFlowFile> flowFilesGet = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
assertEquals(1, flowFilesGet.size());
final MockFlowFile successFlowFile = flowFilesGet.get(0);
successFlowFile.assertContentEquals("putGetMessage");
}
}