diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java index 0e2c19d6ea..5edb7ddfa0 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java +++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java @@ -64,7 +64,14 @@ public interface FlowFile extends Comparable { * @return a set of identifiers that are unique to this FlowFile's lineage. * If FlowFile X is derived from FlowFile Y, both FlowFiles will have the * same value for the Lineage Claim ID. + * + * @deprecated this collection was erroneously unbounded and caused a lot of OutOfMemoryError problems + * when dealing with FlowFiles with many ancestors. This Collection is + * now capped at 100 lineage identifiers. This method was introduced with the idea of providing + * future performance improvements but due to the high cost of heap consumption will not be used + * in such a manner. As a result, this method will be removed in a future release. */ + @Deprecated Set getLineageIdentifiers(); /** diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java index dc251b3f17..fc26d93467 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java @@ -51,7 +51,14 @@ public interface ProvenanceEventRecord { /** * @return the set of all lineage identifiers that are associated with the * FlowFile for which this Event was created + * + * @deprecated this collection was erroneously unbounded and caused a lot of OutOfMemoryError problems + * when querying Provenance Events about FlowFiles with many ancestors. This Collection is + * now capped at 100 lineage identifiers. This method was introduced with the idea of providing + * future performance improvements but due to the high cost of heap consumption will not be used + * in such a manner. As a result, this method will be removed in a future release. */ + @Deprecated Set getLineageIdentifiers(); /** diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java index 9a9a27d79a..03ab3eabab 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java @@ -18,8 +18,12 @@ package org.apache.nifi.provenance; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.Date; +import java.util.Iterator; import java.util.List; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -40,7 +44,7 @@ public class StandardQueryResult implements QueryResult { private final Lock writeLock = rwLock.writeLock(); // guarded by writeLock - private final List matchingRecords = new ArrayList<>(); + private final Set matchingRecords = new TreeSet<>(new EventIdComparator()); private long totalHitCount; private int numCompletedSteps = 0; private Date expirationDate; @@ -66,8 +70,14 @@ public class StandardQueryResult implements QueryResult { } final List copy = new ArrayList<>(query.getMaxResults()); - for (int i = 0; i < query.getMaxResults(); i++) { - copy.add(matchingRecords.get(i)); + + int i = 0; + final Iterator itr = matchingRecords.iterator(); + while (itr.hasNext()) { + copy.add(itr.next()); + if (++i >= query.getMaxResults()) { + break; + } } return copy; @@ -165,4 +175,11 @@ public class StandardQueryResult implements QueryResult { private void updateExpiration() { expirationDate = new Date(System.currentTimeMillis() + TTL); } + + private static class EventIdComparator implements Comparator { + @Override + public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) { + return Long.compare(o2.getEventId(), o1.getEventId()); + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java index cc8c734c99..5474c7a165 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java @@ -25,24 +25,25 @@ import java.util.Map; import java.util.Set; import java.util.regex.Pattern; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; - import org.apache.commons.lang3.builder.CompareToBuilder; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; /** *

- * A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content.

+ * A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content. + *

* * Immutable - Thread Safe * */ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { + private static final int MAX_LINEAGE_IDENTIFIERS = 100; private final long id; private final long entryDate; @@ -182,7 +183,18 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { public Builder lineageIdentifiers(final Collection lineageIdentifiers) { if (null != lineageIdentifiers) { bLineageIdentifiers.clear(); - bLineageIdentifiers.addAll(lineageIdentifiers); + + if (lineageIdentifiers.size() > MAX_LINEAGE_IDENTIFIERS) { + int i = 0; + for (final String id : lineageIdentifiers) { + bLineageIdentifiers.add(id); + if (i++ >= MAX_LINEAGE_IDENTIFIERS) { + break; + } + } + } else { + bLineageIdentifiers.addAll(lineageIdentifiers); + } } return this; } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 687574351e..5e4aed05eb 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -117,16 +117,16 @@ public class TestPersistentProvenanceRepository { // Delete all of the storage files. We do this in order to clean up the tons of files that // we create but also to ensure that we have closed all of the file handles. If we leave any // streams open, for instance, this will throw an IOException, causing our unit test to fail. - for ( final File storageDir : config.getStorageDirectories() ) { + for (final File storageDir : config.getStorageDirectories()) { int i; - for (i=0; i < 3; i++) { + for (i = 0; i < 3; i++) { try { FileUtils.deleteFile(storageDir, true); break; } catch (final IOException ioe) { // if there is a virus scanner, etc. running in the background we may not be able to // delete the file. Wait a sec and try again. - if ( i == 2 ) { + if (i == 2) { throw ioe; } else { try { @@ -441,7 +441,7 @@ public class TestPersistentProvenanceRepository { repo.waitForRollover(); final Query query = new Query(UUID.randomUUID().toString()); - // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*")); + // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4")); query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); @@ -464,68 +464,6 @@ public class TestPersistentProvenanceRepository { assertTrue(newRecordSet.getMatchingEvents().isEmpty()); } - @Test - public void testIndexAndCompressOnRolloverAndSubsequentSearchAsync() throws IOException, InterruptedException, ParseException { - final RepositoryConfiguration config = createConfiguration(); - config.setMaxRecordLife(3, TimeUnit.SECONDS); - config.setMaxStorageCapacity(1024L * 1024L); - config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); - config.setMaxEventFileCapacity(1024L * 1024L); - config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); - - repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); - - final String uuid = "00000000-0000-0000-0000-000000000000"; - final Map attributes = new HashMap<>(); - attributes.put("abc", "xyz"); - attributes.put("xyz", "abc"); - attributes.put("filename", "file-" + uuid); - - final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); - builder.setEventTime(System.currentTimeMillis()); - builder.setEventType(ProvenanceEventType.RECEIVE); - builder.setTransitUri("nifi://unit-test"); - builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); - builder.setComponentId("1234"); - builder.setComponentType("dummy processor"); - - for (int i = 0; i < 10; i++) { - attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); - builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); - repo.registerEvent(builder.build()); - } - - repo.waitForRollover(); - - final Query query = new Query(UUID.randomUUID().toString()); - query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000*")); - query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*")); - query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4")); - query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*")); - query.setMaxResults(100); - - final QuerySubmission submission = repo.submitQuery(query); - while (!submission.getResult().isFinished()) { - Thread.sleep(100L); - } - - assertEquals(10, submission.getResult().getMatchingEvents().size()); - for (final ProvenanceEventRecord match : submission.getResult().getMatchingEvents()) { - System.out.println(match); - } - - Thread.sleep(2000L); - - config.setMaxStorageCapacity(100L); - config.setMaxRecordLife(500, TimeUnit.MILLISECONDS); - repo.purgeOldEvents(); - Thread.sleep(2000L); - - final QueryResult newRecordSet = repo.queryEvents(query); - assertTrue(newRecordSet.getMatchingEvents().isEmpty()); - } - @Test public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException { final RepositoryConfiguration config = createConfiguration(); @@ -603,7 +541,7 @@ public class TestPersistentProvenanceRepository { repo.purgeOldEvents(); - Thread.sleep(2000L); // purge is async. Give it time to do its job. + Thread.sleep(2000L); // purge is async. Give it time to do its job. query.setMaxResults(100); final QuerySubmission noResultSubmission = repo.submitQuery(query); @@ -939,7 +877,7 @@ public class TestPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setMaxEventFileCapacity(1024L * 1024L); config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); - config.setDesiredIndexSize(10); // force new index to be created for each rollover + config.setDesiredIndexSize(10); // force new index to be created for each rollover repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); repo.initialize(getEventReporter()); @@ -961,7 +899,7 @@ public class TestPersistentProvenanceRepository { for (int i = 0; i < 10; i++) { attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i); builder.fromFlowFile(createFlowFile(i, 3000L, attributes)); - builder.setEventTime(10L); // make sure the events are destroyed when we call purge + builder.setEventTime(10L); // make sure the events are destroyed when we call purge repo.registerEvent(builder.build()); } @@ -1019,7 +957,7 @@ public class TestPersistentProvenanceRepository { @Test public void testBackPressure() throws IOException, InterruptedException { final RepositoryConfiguration config = createConfiguration(); - config.setMaxEventFileCapacity(1L); // force rollover on each record. + config.setMaxEventFileCapacity(1L); // force rollover on each record. config.setJournalCount(1); final AtomicInteger journalCountRef = new AtomicInteger(0); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 2d939446dd..76f9daf323 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -154,6 +154,11 @@ language governing permissions and limitations under the License. --> org.apache.activemq activemq-client + + org.apache.activemq + activemq-broker + test + com.jayway.jsonpath json-path diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java index 461d3816b9..d4e1969712 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java @@ -21,6 +21,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CL import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE; import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX; import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME; +import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE; import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES; import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER; import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR; @@ -89,6 +90,7 @@ public abstract class JmsConsumer extends AbstractProcessor { descriptors.add(USERNAME); descriptors.add(PASSWORD); descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(DESTINATION_TYPE); descriptors.add(ACKNOWLEDGEMENT_MODE); descriptors.add(MESSAGE_SELECTOR); descriptors.add(JMS_PROPS_TO_ATTRIBUTES); @@ -158,8 +160,8 @@ public abstract class JmsConsumer extends AbstractProcessor { stopWatch.stop(); if (processingSummary.getFlowFilesCreated() > 0) { - final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F); - float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs; + final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F); + float messagesPerSec = (processingSummary.getMessagesReceived()) / secs; final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived()); logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java index ca5df9f4eb..5f6bea514c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java @@ -481,7 +481,13 @@ public class JmsFactory { attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID()); } if (message.getJMSDestination() != null) { - attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, message.getJMSDestination().toString()); + String destinationName; + if (message.getJMSDestination() instanceof Queue) { + destinationName = ((Queue) message.getJMSDestination()).getQueueName(); + } else { + destinationName = ((Topic) message.getJMSDestination()).getTopicName(); + } + attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName); } if (message.getJMSMessageID() != null) { attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java index 9c833f54c9..bfc56a5ed1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertTrue; + +import java.util.List; + import javax.jms.BytesMessage; import javax.jms.MapMessage; import javax.jms.Message; @@ -24,61 +28,86 @@ import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.standard.util.JmsFactory; import org.apache.nifi.processors.standard.util.JmsProperties; import org.apache.nifi.processors.standard.util.WrappedMessageProducer; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.Revision; +import org.junit.Test; public class TestGetJMSQueue { - @org.junit.Ignore + @Test public void testSendTextToQueue() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + GetJMSQueue getJmsQueue = new GetJMSQueue(); + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); - runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); + runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false"); runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); - final Message message = jmsSession.createTextMessage("Hello World"); producer.send(message); jmsSession.commit(); + + runner.run(); + + List flowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("success").build()); + + assertTrue(flowFiles.size() == 1); + MockFlowFile successFlowFile = flowFiles.get(0); + successFlowFile.assertContentEquals("Hello World"); + successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing"); producer.close(); jmsSession.close(); } - @org.junit.Ignore + @Test public void testSendBytesToQueue() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + GetJMSQueue getJmsQueue = new GetJMSQueue(); + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); - runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); + runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false"); runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); - final BytesMessage message = jmsSession.createBytesMessage(); message.writeBytes("Hello Bytes".getBytes()); producer.send(message); jmsSession.commit(); + + runner.run(); + + List flowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("success").build()); + + assertTrue(flowFiles.size() == 1); + MockFlowFile successFlowFile = flowFiles.get(0); + successFlowFile.assertContentEquals("Hello Bytes"); + successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing"); producer.close(); jmsSession.close(); } - @org.junit.Ignore + @Test public void testSendStreamToQueue() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + GetJMSQueue getJmsQueue = new GetJMSQueue(); + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); - runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); + runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false"); runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); @@ -91,6 +120,17 @@ public class TestGetJMSQueue { producer.send(message); jmsSession.commit(); + + runner.run(); + + List flowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("success").build()); + + assertTrue(flowFiles.size() == 1); + MockFlowFile successFlowFile = flowFiles.get(0); + successFlowFile.assertContentEquals("Hello Stream"); + successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing"); + producer.close(); jmsSession.close(); } diff --git a/pom.xml b/pom.xml index e0744eb2f2..cf6f018cda 100644 --- a/pom.xml +++ b/pom.xml @@ -513,7 +513,13 @@ org.apache.activemq activemq-client - 5.12.0 + 5.12.1 + + + org.apache.activemq + activemq-broker + 5.12.1 + tests org.apache.lucene