NIFI-6915 This closes #3961. Jms Durable non shared subscription is broken

Revert NIFI-4834 enhancement for durable non shared consumers only.

Updated also AbstractJMSProcessor class to be public. The testing are not working
without that change, as org.apache.nifi.jms.processors.PublishJMSIT and
org.apache.nifi.jms.processors.ConsumeJMSIT are not working, as @OnSchedule
method is not called (because it is not public).
The method org.apache.nifi.util.StandardProcessorTestRunner.run(int iterations, boolean stopOnFinish, boolean initialize, long runWait) uses ReflectionUtils.invokeMethodsWithAnnotation which does not call non public
methods.

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Gardella Juan Pablo 2020-01-07 03:42:43 -03:00 committed by Joe Witt
parent b205b99668
commit c1301e196c
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 183 additions and 16 deletions

View File

@ -29,6 +29,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsTemplate;
@ -49,7 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @see ConsumeJMS
* @see JMSConnectionFactoryProviderDefinition
*/
abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcessor {
public abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcessor {
static final String QUEUE = "QUEUE";
static final String TOPIC = "TOPIC";
@ -164,6 +165,10 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
}
protected static String getClientId(ProcessContext context) {
return context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
@ -258,12 +263,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue());
final CachingConnectionFactory cachingFactory = new CachingConnectionFactory(cfCredentialsAdapter);
String clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
if (clientId != null) {
clientId = clientId + "-" + clientIdCounter.getAndIncrement();
cachingFactory.setClientId(clientId);
}
setClientId(context, cachingFactory);
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(cachingFactory);
@ -271,4 +271,21 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
}
/**
* Set clientId for JMS connections when <tt>clientId</tt> is not null.
* It is overridden by {@code}ConsumeJMS{@code} when durable subscriptions
* is configured on the processor.
* @param context context.
* @param connectionFactory the connection factory.
* @since NIFI-6915
*/
protected void setClientId(ProcessContext context, final SingleConnectionFactory connectionFactory) {
String clientId = getClientId(context);
if (clientId != null) {
clientId = clientId + "-" + clientIdCounter.getAndIncrement();
connectionFactory.setClientId(clientId);
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -38,6 +39,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
@ -188,6 +190,23 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
relationships = Collections.unmodifiableSet(_relationships);
}
private static boolean isDurableSubscriber(final ProcessContext context) {
final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
return durableBoolean == null ? false : durableBoolean;
}
private static boolean isShared(final ProcessContext context) {
final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
return sharedBoolean == null ? false : sharedBoolean;
}
@OnScheduled
public void onSchedule(ProcessContext context) {
if (context.getMaxConcurrentTasks() > 1 && isDurableSubscriber(context) && !isShared(context)) {
throw new ProcessException("Durable non shared subscriptions cannot work on multiple threads. Check javax/jms/Session#createDurableConsumer API doc.");
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
@ -203,7 +222,6 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
"'" + DESTINATION_TYPE.getDisplayName() + "'='" + QUEUE + "'")
.build());
}
return validationResults;
}
@ -218,10 +236,8 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
final String errorQueueName = context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue();
final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
final boolean durable = durableBoolean == null ? false : durableBoolean;
final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
final boolean shared = sharedBoolean == null ? false : sharedBoolean;
final boolean durable = isDurableSubscriber(context);
final boolean shared = isShared(context);
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
@ -279,6 +295,27 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
return thisPropertyDescriptors;
}
/**
* <p>
* Use provided clientId for non shared durable consumers, if not set
* always a different value as defined in {@link AbstractJMSProcessor#setClientId(ProcessContext, SingleConnectionFactory)}.
* </p>
* See {@link Session#createDurableConsumer(javax.jms.Topic, String, String, boolean)},
* in special following part: <i>An unshared durable subscription is
* identified by a name specified by the client and by the client identifier,
* which must be set. An application which subsequently wishes to create
* a consumer on that unshared durable subscription must use the same
* client identifier.</i>
*/
@Override
protected void setClientId(ProcessContext context, SingleConnectionFactory cachingFactory) {
if (isDurableSubscriber(context) && !isShared(context)) {
cachingFactory.setClientId(getClientId(context));
} else {
super.setClientId(context, cachingFactory);
}
}
/**
* Copies JMS attributes (i.e., headers and properties) as FF attributes.
* Given that FF attributes mandate that values are of type String, the

View File

@ -18,16 +18,17 @@ package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -37,10 +38,16 @@ import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.JmsHeaders;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
@ -50,7 +57,7 @@ public class ConsumeJMSIT {
@Test
public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
final String destinationName = "cooQueue";
final String destinationName = "cooQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
@ -258,4 +265,110 @@ public class ConsumeJMSIT {
return message;
}
/**
* Validates <a href="https://issues.apache.org/jira/browse/NIFI-6915">NIFI-6915</a>.
* <p>
* The test consists on:
* <ul>
* <li>Start a durable non shared consumer <tt>C1</tt> with client id <tt>client1</tt> subscribed to topic <tt>T</tt>.</li>
* <li>Stop <tt>C1</tt>.</li>
* <li>Publish a message <tt>M1</tt> to topic <tt>T</tt>.</li>
* <li>Start <tt>C1</tt>.</li>
* </ul>
* It is expected <tt>C1</tt> receives message <tt>M1</tt>.
* </p>
* @throws Exception unexpected
*/
@Test(timeout = 10000)
public void validateNifi6915() throws Exception {
BrokerService broker = new BrokerService();
try {
broker.setPersistent(false);
broker.setBrokerName("broker1");
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://broker1");
final String destinationName = "validateNifi6915";
TestRunner c1Consumer = createNonSharedDurableConsumer(cf, destinationName);
// 1. Start a durable non shared consumer C1 with client id client1 subscribed to topic T.
boolean stopConsumer = true;
c1Consumer.run(1, stopConsumer);
List<MockFlowFile> flowFiles = c1Consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
assertTrue("Expected no messages", flowFiles.isEmpty());
// 2. Publish a message M1 to topic T.
publishAMessage(cf, destinationName, "Hi buddy!!");
// 3. Start C1.
c1Consumer.run(1, true);
flowFiles = c1Consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
// It is expected C1 receives message M1.
final MockFlowFile successFF = flowFiles.get(0);
assertNotNull(successFF);
successFF.assertAttributeExists(JmsHeaders.DESTINATION);
successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName);
successFF.assertContentEquals("Hi buddy!!".getBytes());
assertEquals(destinationName, successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME));
} catch (Exception e) {
throw e;
} finally {
if (broker != null) {
broker.stop();
}
}
}
@Test(timeout = 10000)
public void validateNifi6915OnlyOneThreadAllowed() {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final String destinationName = "validateNifi6915";
try {
TestRunner runner = createNonSharedDurableConsumer(cf, destinationName);
runner.setThreadCount(2);
runner.run(1, true);
fail();
} catch (Throwable e) {
// Unable to capture the message :(
}
TestRunner runner = createNonSharedDurableConsumer(cf, destinationName);
// using one thread, it should not fail.
runner.setThreadCount(1);
runner.run(1, true);
}
private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException {
// Publish a message.
try (Connection conn = cf.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createTopic(destinationName))) {
producer.send(session.createTextMessage(messageContent));
}
}
private static TestRunner createNonSharedDurableConsumer(ActiveMQConnectionFactory cf, final String destinationName) {
ConsumeJMS c1 = new ConsumeJMS();
TestRunner c1Consumer = TestRunners.newTestRunner(c1);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
try {
c1Consumer.addControllerService("cfProvider", cs);
} catch (InitializationException e) {
throw new IllegalStateException(e);
}
c1Consumer.enableControllerService(cs);
c1Consumer.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
c1Consumer.setProperty(ConsumeJMS.DESTINATION, destinationName);
c1Consumer.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
c1Consumer.setProperty(ConsumeJMS.DURABLE_SUBSCRIBER, "true");
c1Consumer.setProperty(ConsumeJMS.SUBSCRIPTION_NAME, "SubscriptionName");
c1Consumer.setProperty(ConsumeJMS.SHARED_SUBSCRIBER, "false");
c1Consumer.setProperty(ConsumeJMS.CLIENT_ID, "client1");
return c1Consumer;
}
}