mirror of https://github.com/apache/nifi.git
NIFI-7085 add flowFile batching to ConsumeJMS and PublishJMS
This closes #8584 load some configuration in onScheduled instead of a loop, reinstated and fixed a failing integration test
This commit is contained in:
parent
1bcc61ebb4
commit
80e889305f
|
@ -128,6 +128,14 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
|
|||
.identifiesControllerService(JMSConnectionFactoryProviderDefinition.class)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Maximum Batch Size")
|
||||
.description("The maximum number of messages to publish or consume in each invocation of the processor.")
|
||||
.required(true)
|
||||
.defaultValue("1")
|
||||
.addValidator(StandardValidators.createLongValidator(1, 10_000, true))
|
||||
.build();
|
||||
|
||||
static final List<PropertyDescriptor> JNDI_JMS_CF_PROPERTIES = Collections.unmodifiableList(
|
||||
JndiJmsConnectionFactoryProperties.getPropertyDescriptors().stream()
|
||||
.map(pd -> new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriter;
|
|||
import org.apache.nifi.jms.processors.ioconcept.writer.FlowFileWriterCallback;
|
||||
import org.apache.nifi.jms.processors.ioconcept.writer.record.OutputStrategy;
|
||||
import org.apache.nifi.jms.processors.ioconcept.writer.record.RecordWriter;
|
||||
import org.apache.nifi.migration.PropertyConfiguration;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
|
@ -251,6 +252,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
|||
_propertyDescriptors.add(SHARED_SUBSCRIBER);
|
||||
_propertyDescriptors.add(SUBSCRIPTION_NAME);
|
||||
_propertyDescriptors.add(TIMEOUT);
|
||||
_propertyDescriptors.add(MAX_BATCH_SIZE);
|
||||
_propertyDescriptors.add(ERROR_QUEUE);
|
||||
|
||||
_propertyDescriptors.add(RECORD_READER);
|
||||
|
@ -268,6 +270,17 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
|||
relationships = Collections.unmodifiableSet(_relationships);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void migrateProperties(PropertyConfiguration config) {
|
||||
super.migrateProperties(config);
|
||||
|
||||
if (!config.hasProperty(MAX_BATCH_SIZE)) {
|
||||
if (config.isPropertySet(BASE_RECORD_READER)) {
|
||||
config.setProperty(MAX_BATCH_SIZE, "10000");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isDurableSubscriber(final ProcessContext context) {
|
||||
final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
|
||||
return durableBoolean == null ? false : durableBoolean;
|
||||
|
@ -322,9 +335,9 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
|||
|
||||
try {
|
||||
if (context.getProperty(RECORD_READER).isSet()) {
|
||||
processMessageSet(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
|
||||
processMessagesAsRecords(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
|
||||
} else {
|
||||
processSingleMessage(processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
|
||||
processMessages(context, processSession, consumer, destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Error while trying to process JMS message", e);
|
||||
|
@ -334,26 +347,25 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
|||
}
|
||||
}
|
||||
|
||||
private void processSingleMessage(ProcessSession processSession, JMSConsumer consumer, String destinationName, String errorQueueName,
|
||||
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
|
||||
private void processMessages(ProcessContext context, ProcessSession processSession, JMSConsumer consumer, String destinationName, String errorQueueName,
|
||||
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
|
||||
|
||||
consumer.consumeSingleMessage(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, response -> {
|
||||
if (response == null) {
|
||||
return;
|
||||
}
|
||||
int batchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
|
||||
consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, batchSize, jmsResponses -> {
|
||||
jmsResponses.forEach(response -> {
|
||||
try {
|
||||
final FlowFile flowFile = createFlowFileFromMessage(processSession, destinationName, response);
|
||||
|
||||
try {
|
||||
final FlowFile flowFile = createFlowFileFromMessage(processSession, destinationName, response);
|
||||
|
||||
processSession.getProvenanceReporter().receive(flowFile, destinationName);
|
||||
processSession.transfer(flowFile, REL_SUCCESS);
|
||||
processSession.commitAsync(
|
||||
() -> withLog(() -> acknowledge(response)),
|
||||
__ -> withLog(() -> response.reject()));
|
||||
} catch (final Throwable t) {
|
||||
response.reject();
|
||||
throw t;
|
||||
}
|
||||
processSession.getProvenanceReporter().receive(flowFile, destinationName);
|
||||
processSession.transfer(flowFile, REL_SUCCESS);
|
||||
processSession.commitAsync(
|
||||
() -> withLog(() -> acknowledge(response)),
|
||||
__ -> withLog(() -> response.reject()));
|
||||
} catch (final Throwable t) {
|
||||
response.reject();
|
||||
throw t;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -369,9 +381,10 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
|||
return processSession.putAllAttributes(flowFile, attributes);
|
||||
}
|
||||
|
||||
private void processMessageSet(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName, String errorQueueName,
|
||||
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
|
||||
private void processMessagesAsRecords(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName, String errorQueueName,
|
||||
boolean durable, boolean shared, String subscriptionName, String messageSelector, String charset) {
|
||||
|
||||
int batchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
|
||||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
final OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
|
||||
|
@ -385,7 +398,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
|||
getLogger()
|
||||
);
|
||||
|
||||
consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, jmsResponses -> {
|
||||
consumer.consumeMessageSet(destinationName, errorQueueName, durable, shared, subscriptionName, messageSelector, charset, batchSize, jmsResponses -> {
|
||||
flowFileWriter.write(session, jmsResponses, new FlowFileWriterCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(FlowFile flowFile, List<JMSResponse> processedMessages, List<JMSResponse> failedMessages) {
|
||||
|
@ -477,7 +490,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
|||
* Use provided clientId for non shared durable consumers, if not set
|
||||
* always a different value as defined in {@link AbstractJMSProcessor#setClientId(ProcessContext, SingleConnectionFactory)}.
|
||||
* </p>
|
||||
* See {@link Session#createDurableConsumer(javax.jms.Topic, String, String, boolean)},
|
||||
* See {@link Session#createDurableConsumer(jakarta.jms.Topic, String, String, boolean)},
|
||||
* in special following part: <i>An unshared durable subscription is
|
||||
* identified by a name specified by the client and by the client identifier,
|
||||
* which must be set. An application which subsequently wishes to create
|
||||
|
|
|
@ -52,8 +52,6 @@ import java.util.function.Consumer;
|
|||
*/
|
||||
class JMSConsumer extends JMSWorker {
|
||||
|
||||
private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000;
|
||||
|
||||
JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) {
|
||||
super(connectionFactory, jmsTemplate, logger);
|
||||
logger.debug("Created Message Consumer for '{}'", jmsTemplate);
|
||||
|
@ -88,29 +86,11 @@ class JMSConsumer extends JMSWorker {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives a message from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
|
||||
*/
|
||||
public void consumeSingleMessage(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
|
||||
final String charset, final Consumer<JMSResponse> singleMessageConsumer) {
|
||||
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> {
|
||||
final JMSResponse response = receiveMessage(session, messageConsumer, charset, errorQueueName);
|
||||
if (response != null) {
|
||||
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
|
||||
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
|
||||
// the responsibility of the processor to handle closing the Message Consumer.
|
||||
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
|
||||
// the JMSResponse.
|
||||
singleMessageConsumer.accept(response);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives a list of messages from the broker. It is the consumerCallback's responsibility to acknowledge the received message.
|
||||
*/
|
||||
public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
|
||||
final String charset, final Consumer<List<JMSResponse>> messageSetConsumer) {
|
||||
final String charset, final int batchSize, final Consumer<List<JMSResponse>> messageSetConsumer) {
|
||||
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() {
|
||||
@Override
|
||||
public void consume(Session session, MessageConsumer messageConsumer) throws JMSException {
|
||||
|
@ -118,7 +98,7 @@ class JMSConsumer extends JMSWorker {
|
|||
int batchCounter = 0;
|
||||
|
||||
JMSResponse response;
|
||||
while ((response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) {
|
||||
while (batchCounter < batchSize && (response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null) {
|
||||
response.setBatchOrder(batchCounter);
|
||||
jmsResponses.add(response);
|
||||
batchCounter++;
|
||||
|
|
|
@ -153,7 +153,7 @@ class JMSPublisher extends JMSWorker {
|
|||
}
|
||||
|
||||
/**
|
||||
* Implementations of this interface use {@link javax.jms.Message} methods to set strongly typed properties.
|
||||
* Implementations of this interface use {@link jakarta.jms.Message} methods to set strongly typed properties.
|
||||
*/
|
||||
public interface JmsPropertySetter {
|
||||
void setProperty(final Message message, final String name, final String value) throws JMSException, NumberFormatException;
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.RequiredPermission;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
|
@ -183,6 +184,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
|
|||
_propertyDescriptors.add(CHARSET);
|
||||
_propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
|
||||
_propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
|
||||
_propertyDescriptors.add(MAX_BATCH_SIZE);
|
||||
|
||||
_propertyDescriptors.add(RECORD_READER);
|
||||
_propertyDescriptors.add(RECORD_WRITER);
|
||||
|
@ -198,6 +200,22 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
|
|||
relationships = Collections.unmodifiableSet(_relationships);
|
||||
}
|
||||
|
||||
volatile Boolean allowIllegalChars;
|
||||
volatile Pattern attributeHeaderPattern;
|
||||
volatile RecordReaderFactory readerFactory;
|
||||
volatile RecordSetWriterFactory writerFactory;
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
|
||||
|
||||
final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
|
||||
attributeHeaderPattern = Pattern.compile(attributeHeaderRegex);
|
||||
|
||||
readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Will construct JMS {@link Message} by extracting its body from the
|
||||
* incoming {@link FlowFile}. {@link FlowFile} attributes that represent
|
||||
|
@ -211,20 +229,21 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
|
|||
*/
|
||||
@Override
|
||||
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
|
||||
FlowFile flowFile = processSession.get();
|
||||
if (flowFile != null) {
|
||||
final List<FlowFile> flowFiles = processSession.get(context.getProperty(MAX_BATCH_SIZE).asInteger());
|
||||
if (flowFiles.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
flowFiles.forEach(flowFile -> {
|
||||
try {
|
||||
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final Boolean allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
|
||||
final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
|
||||
|
||||
final Map<String, String> attributesToSend = new HashMap<>();
|
||||
// REGEX Attributes
|
||||
final Pattern pattern = Pattern.compile(attributeHeaderRegex);
|
||||
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
|
||||
final String key = entry.getKey();
|
||||
if (pattern.matcher(key).matches()) {
|
||||
if (attributeHeaderPattern.matcher(key).matches()) {
|
||||
if (allowIllegalChars || key.endsWith(".type") || (!key.contains("-") && !key.contains("."))) {
|
||||
attributesToSend.put(key, flowFile.getAttribute(key));
|
||||
}
|
||||
|
@ -232,9 +251,6 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
|
|||
}
|
||||
|
||||
if (context.getProperty(RECORD_READER).isSet()) {
|
||||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
|
||||
final FlowFileReader flowFileReader = new StateTrackingFlowFileReader(
|
||||
getIdentifier(),
|
||||
new RecordSupplier(readerFactory, writerFactory),
|
||||
|
@ -278,7 +294,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
|
|||
} catch (Exception e) {
|
||||
handleException(context, processSession, publisher, flowFile, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void handleException(ProcessContext context, ProcessSession processSession, JMSPublisher publisher, FlowFile flowFile, Exception e) {
|
||||
|
|
|
@ -503,6 +503,7 @@ public class ConsumeJMSIT {
|
|||
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
|
||||
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
|
||||
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
|
||||
testRunner.setProperty(AbstractJMSProcessor.MAX_BATCH_SIZE, "10");
|
||||
|
||||
testRunner.run(1, false);
|
||||
|
||||
|
@ -535,6 +536,7 @@ public class ConsumeJMSIT {
|
|||
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
|
||||
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
|
||||
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
|
||||
testRunner.setProperty(AbstractJMSProcessor.MAX_BATCH_SIZE, "10");
|
||||
testRunner.setRelationshipAvailable(ConsumeJMS.REL_PARSE_FAILURE);
|
||||
|
||||
testRunner.run(1, false);
|
||||
|
@ -568,6 +570,7 @@ public class ConsumeJMSIT {
|
|||
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
|
||||
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
|
||||
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
|
||||
testRunner.setProperty(AbstractJMSProcessor.MAX_BATCH_SIZE, "10");
|
||||
testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, OutputStrategy.USE_APPENDER.getValue());
|
||||
|
||||
testRunner.run(1, false);
|
||||
|
@ -609,6 +612,7 @@ public class ConsumeJMSIT {
|
|||
TestRunner testRunner = initializeTestRunner(jmsTemplate.getConnectionFactory(), destination);
|
||||
testRunner.setProperty(ConsumeJMS.RECORD_READER, createJsonRecordSetReaderService(testRunner));
|
||||
testRunner.setProperty(ConsumeJMS.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
|
||||
testRunner.setProperty(AbstractJMSProcessor.MAX_BATCH_SIZE, "10");
|
||||
testRunner.setProperty(ConsumeJMS.OUTPUT_STRATEGY, OutputStrategy.USE_WRAPPER.getValue());
|
||||
|
||||
testRunner.run(1, false);
|
||||
|
|
|
@ -43,8 +43,10 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
|
@ -197,9 +199,9 @@ public class JMSPublisherConsumerIT {
|
|||
jmsTemplate.send(destinationName, messageCreator);
|
||||
|
||||
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
|
||||
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, null, "UTF-8", 1, responses -> {
|
||||
callbackInvoked.set(true);
|
||||
responseChecker.accept(response);
|
||||
responseChecker.accept(responses.getFirst());
|
||||
});
|
||||
|
||||
assertTrue(callbackInvoked.get());
|
||||
|
@ -300,6 +302,7 @@ public class JMSPublisherConsumerIT {
|
|||
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* At the moment the only two supported message types are TextMessage and
|
||||
* BytesMessage which is sufficient for the type if JMS use cases NiFi is
|
||||
|
@ -320,7 +323,7 @@ public class JMSPublisherConsumerIT {
|
|||
});
|
||||
|
||||
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
|
||||
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, null, "UTF-8", 1, responses -> {
|
||||
// noop
|
||||
});
|
||||
} finally {
|
||||
|
@ -347,8 +350,9 @@ public class JMSPublisherConsumerIT {
|
|||
|
||||
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
|
||||
final AtomicBoolean callbackInvoked = new AtomicBoolean();
|
||||
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, null, "UTF-8", 1, responses -> {
|
||||
callbackInvoked.set(true);
|
||||
JMSResponse response = responses.getFirst();
|
||||
assertEquals("hello from the other side", new String(response.getMessageBody()));
|
||||
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
|
||||
assertEquals("foo", response.getMessageProperties().get("foo"));
|
||||
|
@ -361,6 +365,76 @@ public class JMSPublisherConsumerIT {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(value = 20000, unit = TimeUnit.MILLISECONDS)
|
||||
public void testMultipleThreadsWithBatchConsume() throws Exception {
|
||||
final int threadCount = 4;
|
||||
final int totalMessageCount = 1000;
|
||||
|
||||
String destinationName = "testMultipleThreads";
|
||||
JmsTemplate publishTemplate = CommonTest.buildJmsTemplateForDestination(false);
|
||||
final CountDownLatch consumerTemplateCloseCount = new CountDownLatch(threadCount);
|
||||
final AtomicInteger msgCounter = new AtomicInteger(0);
|
||||
final boolean[] msgConsumed = new boolean[totalMessageCount];
|
||||
|
||||
try {
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
// Start "threadCount" consumers
|
||||
final Thread t = new Thread(() -> {
|
||||
JmsTemplate consumeTemplate = CommonTest.buildJmsTemplateForDestination(false);
|
||||
|
||||
try {
|
||||
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
|
||||
|
||||
while (msgCounter.get() < totalMessageCount) {
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, null, "UTF-8", 5,
|
||||
responses -> {
|
||||
responses.forEach( response -> {
|
||||
msgCounter.incrementAndGet();
|
||||
String body = new String(response.getMessageBody(), StandardCharsets.UTF_8);
|
||||
int msgNum = 0;
|
||||
try {
|
||||
msgNum = Integer.parseInt(body);
|
||||
} catch (NumberFormatException e) {
|
||||
System.out.println("Bad message with unexpected body: " + body);
|
||||
}
|
||||
msgConsumed[msgNum] = true;
|
||||
});
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
|
||||
consumerTemplateCloseCount.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
t.start();
|
||||
}
|
||||
|
||||
// Publish "totalMessageCount" messages
|
||||
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), publishTemplate, mock(ComponentLog.class));
|
||||
for (int i = 0; i < totalMessageCount; i++) {
|
||||
publisher.publish(destinationName, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
} finally {
|
||||
((CachingConnectionFactory) publishTemplate.getConnectionFactory()).destroy();
|
||||
|
||||
consumerTemplateCloseCount.await();
|
||||
}
|
||||
|
||||
// Verify we consumed all messages
|
||||
boolean receivedAllMessages = true;
|
||||
for (int i = 0; i < totalMessageCount; i++) {
|
||||
if (!msgConsumed[i]) {
|
||||
System.out.println("Did not receive message " + i);
|
||||
receivedAllMessages = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue(receivedAllMessages, "Did not receive all messages!");
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
|
||||
public void validateMessageRedeliveryWhenNotAcked() {
|
||||
|
@ -374,9 +448,9 @@ public class JMSPublisherConsumerIT {
|
|||
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
|
||||
final AtomicBoolean callbackInvoked = new AtomicBoolean();
|
||||
try {
|
||||
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, null, "UTF-8", 1, responses -> {
|
||||
callbackInvoked.set(true);
|
||||
assertEquals("1", new String(response.getMessageBody()));
|
||||
assertEquals("1", new String(responses.getFirst().getMessageBody()));
|
||||
throw new RuntimeException("intentional to avoid explicit ack");
|
||||
});
|
||||
} catch (Exception e) {
|
||||
|
@ -388,11 +462,8 @@ public class JMSPublisherConsumerIT {
|
|||
|
||||
// should receive the same message, but will process it successfully
|
||||
while (!callbackInvoked.get()) {
|
||||
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
|
||||
if (response == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, null, "UTF-8", 1, responses -> {
|
||||
JMSResponse response = responses.getFirst();
|
||||
callbackInvoked.set(true);
|
||||
assertEquals("2", new String(response.getMessageBody()));
|
||||
acknowledge(response);
|
||||
|
@ -405,13 +476,9 @@ public class JMSPublisherConsumerIT {
|
|||
// receiving next message and fail again
|
||||
try {
|
||||
while (!callbackInvoked.get()) {
|
||||
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
|
||||
if (response == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, null, "UTF-8", 1, responses -> {
|
||||
callbackInvoked.set(true);
|
||||
assertEquals("1", new String(response.getMessageBody()));
|
||||
assertEquals("1", new String(responses.getFirst().getMessageBody()));
|
||||
throw new RuntimeException("intentional to avoid explicit ack");
|
||||
});
|
||||
}
|
||||
|
@ -424,11 +491,8 @@ public class JMSPublisherConsumerIT {
|
|||
// should receive the same message, but will process it successfully
|
||||
try {
|
||||
while (!callbackInvoked.get()) {
|
||||
consumer.consumeSingleMessage(destinationName, null, false, false, null, null, "UTF-8", response -> {
|
||||
if (response == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, null, "UTF-8", 1, responses -> {
|
||||
JMSResponse response = responses.getFirst();
|
||||
callbackInvoked.set(true);
|
||||
assertEquals("1", new String(response.getMessageBody()));
|
||||
acknowledge(response);
|
||||
|
@ -472,9 +536,9 @@ public class JMSPublisherConsumerIT {
|
|||
|
||||
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
|
||||
AtomicBoolean callbackInvoked = new AtomicBoolean();
|
||||
consumer.consumeSingleMessage(destinationName, null, false, false, null, messageSelector, "UTF-8", response -> {
|
||||
consumer.consumeMessageSet(destinationName, null, false, false, null, messageSelector, "UTF-8", 1, responses -> {
|
||||
callbackInvoked.set(true);
|
||||
assertEquals("msg1", new String(response.getMessageBody()));
|
||||
assertEquals("msg1", new String(responses.getFirst().getMessageBody()));
|
||||
});
|
||||
assertTrue(callbackInvoked.get());
|
||||
|
||||
|
|
|
@ -182,7 +182,7 @@ public class GetFile extends AbstractProcessor {
|
|||
.build();
|
||||
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Batch Size")
|
||||
.description("The maximum number of files to pull in each iteration")
|
||||
.description("The maximum number of files to pull in each invocation of the processor")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.defaultValue("10")
|
||||
|
|
Loading…
Reference in New Issue