mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-1000'
This commit is contained in:
commit
72008035b1
|
@ -64,7 +64,14 @@ public interface FlowFile extends Comparable<FlowFile> {
|
||||||
* @return a set of identifiers that are unique to this FlowFile's lineage.
|
* @return a set of identifiers that are unique to this FlowFile's lineage.
|
||||||
* If FlowFile X is derived from FlowFile Y, both FlowFiles will have the
|
* If FlowFile X is derived from FlowFile Y, both FlowFiles will have the
|
||||||
* same value for the Lineage Claim ID.
|
* same value for the Lineage Claim ID.
|
||||||
|
*
|
||||||
|
* @deprecated this collection was erroneously unbounded and caused a lot of OutOfMemoryError problems
|
||||||
|
* when dealing with FlowFiles with many ancestors. This Collection is
|
||||||
|
* now capped at 100 lineage identifiers. This method was introduced with the idea of providing
|
||||||
|
* future performance improvements but due to the high cost of heap consumption will not be used
|
||||||
|
* in such a manner. As a result, this method will be removed in a future release.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
Set<String> getLineageIdentifiers();
|
Set<String> getLineageIdentifiers();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -51,7 +51,14 @@ public interface ProvenanceEventRecord {
|
||||||
/**
|
/**
|
||||||
* @return the set of all lineage identifiers that are associated with the
|
* @return the set of all lineage identifiers that are associated with the
|
||||||
* FlowFile for which this Event was created
|
* FlowFile for which this Event was created
|
||||||
|
*
|
||||||
|
* @deprecated this collection was erroneously unbounded and caused a lot of OutOfMemoryError problems
|
||||||
|
* when querying Provenance Events about FlowFiles with many ancestors. This Collection is
|
||||||
|
* now capped at 100 lineage identifiers. This method was introduced with the idea of providing
|
||||||
|
* future performance improvements but due to the high cost of heap consumption will not be used
|
||||||
|
* in such a manner. As a result, this method will be removed in a future release.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
Set<String> getLineageIdentifiers();
|
Set<String> getLineageIdentifiers();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,8 +18,12 @@ package org.apache.nifi.provenance;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
@ -40,7 +44,7 @@ public class StandardQueryResult implements QueryResult {
|
||||||
|
|
||||||
private final Lock writeLock = rwLock.writeLock();
|
private final Lock writeLock = rwLock.writeLock();
|
||||||
// guarded by writeLock
|
// guarded by writeLock
|
||||||
private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>();
|
private final Set<ProvenanceEventRecord> matchingRecords = new TreeSet<>(new EventIdComparator());
|
||||||
private long totalHitCount;
|
private long totalHitCount;
|
||||||
private int numCompletedSteps = 0;
|
private int numCompletedSteps = 0;
|
||||||
private Date expirationDate;
|
private Date expirationDate;
|
||||||
|
@ -66,8 +70,14 @@ public class StandardQueryResult implements QueryResult {
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
|
final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
|
||||||
for (int i = 0; i < query.getMaxResults(); i++) {
|
|
||||||
copy.add(matchingRecords.get(i));
|
int i = 0;
|
||||||
|
final Iterator<ProvenanceEventRecord> itr = matchingRecords.iterator();
|
||||||
|
while (itr.hasNext()) {
|
||||||
|
copy.add(itr.next());
|
||||||
|
if (++i >= query.getMaxResults()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return copy;
|
return copy;
|
||||||
|
@ -165,4 +175,11 @@ public class StandardQueryResult implements QueryResult {
|
||||||
private void updateExpiration() {
|
private void updateExpiration() {
|
||||||
expirationDate = new Date(System.currentTimeMillis() + TTL);
|
expirationDate = new Date(System.currentTimeMillis() + TTL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class EventIdComparator implements Comparator<ProvenanceEventRecord> {
|
||||||
|
@Override
|
||||||
|
public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) {
|
||||||
|
return Long.compare(o2.getEventId(), o1.getEventId());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,24 +25,25 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.builder.CompareToBuilder;
|
import org.apache.commons.lang3.builder.CompareToBuilder;
|
||||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
import org.apache.commons.lang3.builder.ToStringStyle;
|
import org.apache.commons.lang3.builder.ToStringStyle;
|
||||||
|
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content.</p>
|
* A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content.
|
||||||
|
* </p>
|
||||||
*
|
*
|
||||||
* <b>Immutable - Thread Safe</b>
|
* <b>Immutable - Thread Safe</b>
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
|
public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
|
||||||
|
private static final int MAX_LINEAGE_IDENTIFIERS = 100;
|
||||||
|
|
||||||
private final long id;
|
private final long id;
|
||||||
private final long entryDate;
|
private final long entryDate;
|
||||||
|
@ -182,8 +183,19 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
|
||||||
public Builder lineageIdentifiers(final Collection<String> lineageIdentifiers) {
|
public Builder lineageIdentifiers(final Collection<String> lineageIdentifiers) {
|
||||||
if (null != lineageIdentifiers) {
|
if (null != lineageIdentifiers) {
|
||||||
bLineageIdentifiers.clear();
|
bLineageIdentifiers.clear();
|
||||||
|
|
||||||
|
if (lineageIdentifiers.size() > MAX_LINEAGE_IDENTIFIERS) {
|
||||||
|
int i = 0;
|
||||||
|
for (final String id : lineageIdentifiers) {
|
||||||
|
bLineageIdentifiers.add(id);
|
||||||
|
if (i++ >= MAX_LINEAGE_IDENTIFIERS) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
bLineageIdentifiers.addAll(lineageIdentifiers);
|
bLineageIdentifiers.addAll(lineageIdentifiers);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -464,68 +464,6 @@ public class TestPersistentProvenanceRepository {
|
||||||
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
|
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testIndexAndCompressOnRolloverAndSubsequentSearchAsync() throws IOException, InterruptedException, ParseException {
|
|
||||||
final RepositoryConfiguration config = createConfiguration();
|
|
||||||
config.setMaxRecordLife(3, TimeUnit.SECONDS);
|
|
||||||
config.setMaxStorageCapacity(1024L * 1024L);
|
|
||||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
|
||||||
config.setMaxEventFileCapacity(1024L * 1024L);
|
|
||||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
|
||||||
|
|
||||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
|
||||||
repo.initialize(getEventReporter());
|
|
||||||
|
|
||||||
final String uuid = "00000000-0000-0000-0000-000000000000";
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
|
||||||
attributes.put("abc", "xyz");
|
|
||||||
attributes.put("xyz", "abc");
|
|
||||||
attributes.put("filename", "file-" + uuid);
|
|
||||||
|
|
||||||
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
|
||||||
builder.setEventTime(System.currentTimeMillis());
|
|
||||||
builder.setEventType(ProvenanceEventType.RECEIVE);
|
|
||||||
builder.setTransitUri("nifi://unit-test");
|
|
||||||
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
|
||||||
builder.setComponentId("1234");
|
|
||||||
builder.setComponentType("dummy processor");
|
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
|
|
||||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
|
||||||
repo.registerEvent(builder.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
repo.waitForRollover();
|
|
||||||
|
|
||||||
final Query query = new Query(UUID.randomUUID().toString());
|
|
||||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000*"));
|
|
||||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
|
|
||||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
|
|
||||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
|
|
||||||
query.setMaxResults(100);
|
|
||||||
|
|
||||||
final QuerySubmission submission = repo.submitQuery(query);
|
|
||||||
while (!submission.getResult().isFinished()) {
|
|
||||||
Thread.sleep(100L);
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(10, submission.getResult().getMatchingEvents().size());
|
|
||||||
for (final ProvenanceEventRecord match : submission.getResult().getMatchingEvents()) {
|
|
||||||
System.out.println(match);
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread.sleep(2000L);
|
|
||||||
|
|
||||||
config.setMaxStorageCapacity(100L);
|
|
||||||
config.setMaxRecordLife(500, TimeUnit.MILLISECONDS);
|
|
||||||
repo.purgeOldEvents();
|
|
||||||
Thread.sleep(2000L);
|
|
||||||
|
|
||||||
final QueryResult newRecordSet = repo.queryEvents(query);
|
|
||||||
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
|
public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
|
||||||
final RepositoryConfiguration config = createConfiguration();
|
final RepositoryConfiguration config = createConfiguration();
|
||||||
|
|
|
@ -154,6 +154,11 @@ language governing permissions and limitations under the License. -->
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>activemq-client</artifactId>
|
<artifactId>activemq-client</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.activemq</groupId>
|
||||||
|
<artifactId>activemq-broker</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.jayway.jsonpath</groupId>
|
<groupId>com.jayway.jsonpath</groupId>
|
||||||
<artifactId>json-path</artifactId>
|
<artifactId>json-path</artifactId>
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CL
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
|
||||||
|
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
|
||||||
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
|
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
|
||||||
|
@ -89,6 +90,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
|
||||||
descriptors.add(USERNAME);
|
descriptors.add(USERNAME);
|
||||||
descriptors.add(PASSWORD);
|
descriptors.add(PASSWORD);
|
||||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||||
|
descriptors.add(DESTINATION_TYPE);
|
||||||
descriptors.add(ACKNOWLEDGEMENT_MODE);
|
descriptors.add(ACKNOWLEDGEMENT_MODE);
|
||||||
descriptors.add(MESSAGE_SELECTOR);
|
descriptors.add(MESSAGE_SELECTOR);
|
||||||
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
|
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
|
||||||
|
@ -158,8 +160,8 @@ public abstract class JmsConsumer extends AbstractProcessor {
|
||||||
|
|
||||||
stopWatch.stop();
|
stopWatch.stop();
|
||||||
if (processingSummary.getFlowFilesCreated() > 0) {
|
if (processingSummary.getFlowFilesCreated() > 0) {
|
||||||
final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
|
final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
|
||||||
float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
|
float messagesPerSec = (processingSummary.getMessagesReceived()) / secs;
|
||||||
final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
|
final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
|
||||||
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
|
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
|
||||||
new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
|
new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
|
||||||
|
|
|
@ -481,7 +481,13 @@ public class JmsFactory {
|
||||||
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
|
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
|
||||||
}
|
}
|
||||||
if (message.getJMSDestination() != null) {
|
if (message.getJMSDestination() != null) {
|
||||||
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, message.getJMSDestination().toString());
|
String destinationName;
|
||||||
|
if (message.getJMSDestination() instanceof Queue) {
|
||||||
|
destinationName = ((Queue) message.getJMSDestination()).getQueueName();
|
||||||
|
} else {
|
||||||
|
destinationName = ((Topic) message.getJMSDestination()).getTopicName();
|
||||||
|
}
|
||||||
|
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
|
||||||
}
|
}
|
||||||
if (message.getJMSMessageID() != null) {
|
if (message.getJMSMessageID() != null) {
|
||||||
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
|
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.MapMessage;
|
import javax.jms.MapMessage;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
@ -24,61 +28,86 @@ import javax.jms.ObjectMessage;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.StreamMessage;
|
import javax.jms.StreamMessage;
|
||||||
|
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processors.standard.util.JmsFactory;
|
import org.apache.nifi.processors.standard.util.JmsFactory;
|
||||||
import org.apache.nifi.processors.standard.util.JmsProperties;
|
import org.apache.nifi.processors.standard.util.JmsProperties;
|
||||||
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
|
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.apache.nifi.web.Revision;
|
import org.apache.nifi.web.Revision;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestGetJMSQueue {
|
public class TestGetJMSQueue {
|
||||||
|
|
||||||
@org.junit.Ignore
|
@Test
|
||||||
public void testSendTextToQueue() throws Exception {
|
public void testSendTextToQueue() throws Exception {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
|
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||||
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||||
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
|
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
|
||||||
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
||||||
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
||||||
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
||||||
|
|
||||||
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
|
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
|
||||||
final Session jmsSession = wrappedProducer.getSession();
|
final Session jmsSession = wrappedProducer.getSession();
|
||||||
final MessageProducer producer = wrappedProducer.getProducer();
|
final MessageProducer producer = wrappedProducer.getProducer();
|
||||||
|
|
||||||
final Message message = jmsSession.createTextMessage("Hello World");
|
final Message message = jmsSession.createTextMessage("Hello World");
|
||||||
|
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
jmsSession.commit();
|
jmsSession.commit();
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFiles = runner
|
||||||
|
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
|
||||||
|
|
||||||
|
assertTrue(flowFiles.size() == 1);
|
||||||
|
MockFlowFile successFlowFile = flowFiles.get(0);
|
||||||
|
successFlowFile.assertContentEquals("Hello World");
|
||||||
|
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
|
||||||
producer.close();
|
producer.close();
|
||||||
jmsSession.close();
|
jmsSession.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@org.junit.Ignore
|
@Test
|
||||||
public void testSendBytesToQueue() throws Exception {
|
public void testSendBytesToQueue() throws Exception {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
|
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||||
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||||
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
|
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
|
||||||
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
||||||
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
||||||
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
||||||
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
|
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
|
||||||
final Session jmsSession = wrappedProducer.getSession();
|
final Session jmsSession = wrappedProducer.getSession();
|
||||||
final MessageProducer producer = wrappedProducer.getProducer();
|
final MessageProducer producer = wrappedProducer.getProducer();
|
||||||
|
|
||||||
final BytesMessage message = jmsSession.createBytesMessage();
|
final BytesMessage message = jmsSession.createBytesMessage();
|
||||||
message.writeBytes("Hello Bytes".getBytes());
|
message.writeBytes("Hello Bytes".getBytes());
|
||||||
|
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
jmsSession.commit();
|
jmsSession.commit();
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFiles = runner
|
||||||
|
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
|
||||||
|
|
||||||
|
assertTrue(flowFiles.size() == 1);
|
||||||
|
MockFlowFile successFlowFile = flowFiles.get(0);
|
||||||
|
successFlowFile.assertContentEquals("Hello Bytes");
|
||||||
|
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
|
||||||
producer.close();
|
producer.close();
|
||||||
jmsSession.close();
|
jmsSession.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@org.junit.Ignore
|
@Test
|
||||||
public void testSendStreamToQueue() throws Exception {
|
public void testSendStreamToQueue() throws Exception {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
|
GetJMSQueue getJmsQueue = new GetJMSQueue();
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
|
||||||
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
|
||||||
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
|
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
|
||||||
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
|
||||||
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
|
||||||
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
|
||||||
|
@ -91,6 +120,17 @@ public class TestGetJMSQueue {
|
||||||
|
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
jmsSession.commit();
|
jmsSession.commit();
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
List<MockFlowFile> flowFiles = runner
|
||||||
|
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
|
||||||
|
|
||||||
|
assertTrue(flowFiles.size() == 1);
|
||||||
|
MockFlowFile successFlowFile = flowFiles.get(0);
|
||||||
|
successFlowFile.assertContentEquals("Hello Stream");
|
||||||
|
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
|
||||||
|
|
||||||
producer.close();
|
producer.close();
|
||||||
jmsSession.close();
|
jmsSession.close();
|
||||||
}
|
}
|
||||||
|
|
8
pom.xml
8
pom.xml
|
@ -513,7 +513,13 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>activemq-client</artifactId>
|
<artifactId>activemq-client</artifactId>
|
||||||
<version>5.12.0</version>
|
<version>5.12.1</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.activemq</groupId>
|
||||||
|
<artifactId>activemq-broker</artifactId>
|
||||||
|
<version>5.12.1</version>
|
||||||
|
<scope>tests</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.lucene</groupId>
|
<groupId>org.apache.lucene</groupId>
|
||||||
|
|
Loading…
Reference in New Issue