NIFI-5869 Support Reconnection for JMS

resets worker if it doesn't work anymore for any reason. this will add "reconnect" capabilities. Will solve issues for following use cases:
- authentication changed after successful connection
- JNDI mapping changed and requires recaching.
- JMS server isn't available anymore or restarted.

improved controller reset on exception

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3261
This commit is contained in:
Ed 2019-01-10 11:55:29 -05:00 committed by Matthew Burgess
parent da8c8a14a1
commit 3492313d0b
7 changed files with 93 additions and 25 deletions

View File

@ -35,4 +35,12 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic
*/
ConnectionFactory getConnectionFactory();
/**
* Resets {@link ConnectionFactory}.
* Provider should reset {@link ConnectionFactory} only if a copy provided by a client matches
* current {@link ConnectionFactory}.
* @param cachedFactory - {@link ConnectionFactory} cached by client.
*/
void resetConnectionFactory(ConnectionFactory cachedFactory);
}

View File

@ -139,6 +139,14 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
.build();
}
@Override
public void resetConnectionFactory(ConnectionFactory cachedFactory) {
if (cachedFactory == connectionFactory) {
getLogger().debug("Resetting connection factory");
connectionFactory = null;
}
}
/**
* @return new instance of {@link ConnectionFactory}
*/
@ -316,5 +324,4 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
}
}
}

View File

@ -138,6 +138,14 @@ public class JndiJmsConnectionFactoryProvider extends AbstractControllerService
connectionFactory = null;
}
@Override
public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) {
if (cachedFactory == connectionFactory) {
getLogger().debug("Resetting connection factory");
connectionFactory = null;
}
}
@Override
public synchronized ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {

View File

@ -158,7 +158,27 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
try {
rendezvousWithJms(context, session, worker);
} finally {
workerPool.offer(worker);
//in case of exception during worker's connection (consumer or publisher),
//an appropriate service is responsible to invalidate the worker.
//if worker is not valid anymore, don't put it back into a pool, try to rebuild it first, or discard.
//this will be helpful in a situation, when JNDI has changed, or JMS server is not available
//and reconnection is required.
if (worker == null || !worker.isValid()){
getLogger().debug("Worker is invalid. Will try re-create... ");
final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
try {
// Safe to cast. Method #buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
worker = buildTargetResource(context);
}catch(Exception e) {
getLogger().error("Failed to rebuild: " + cfProvider);
worker = null;
}
}
if (worker != null) {
workerPool.offer(worker);
}
}
}

View File

@ -188,28 +188,33 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {
return;
try {
consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {
return;
}
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
final Map<String, String> jmsHeaders = response.getMessageHeaders();
final Map<String, String> jmsProperties = response.getMessageProperties();
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
}
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
final Map<String, String> jmsHeaders = response.getMessageHeaders();
final Map<String, String> jmsProperties = response.getMessageProperties();
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
}
});
});
} catch(Exception e) {
consumer.setValid(false);
throw e; // for backward compatibility with exception handling in flows
}
}
/**

View File

@ -36,6 +36,7 @@ abstract class JMSWorker {
protected final JmsTemplate jmsTemplate;
protected final ComponentLog processLog;
private final CachingConnectionFactory connectionFactory;
private boolean isValid = true;
/**
@ -61,4 +62,12 @@ abstract class JMSWorker {
return this.getClass().getSimpleName() + "[destination:" + this.jmsTemplate.getDefaultDestinationName()
+ "; pub-sub:" + this.jmsTemplate.isPubSubDomain() + ";]";
}
public boolean isValid() {
return isValid;
}
public void setValid(boolean isValid) {
this.isValid = isValid;
}
}

View File

@ -123,11 +123,21 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
switch (context.getProperty(MESSAGE_BODY).getValue()) {
case TEXT_MESSAGE:
publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes());
try {
publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes());
} catch(Exception e) {
publisher.setValid(false);
throw e;
}
break;
case BYTES_MESSAGE:
default:
publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
try {
publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
} catch(Exception e) {
publisher.setValid(false);
throw e;
}
break;
}
processSession.transfer(flowFile, REL_SUCCESS);
@ -136,6 +146,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
processSession.transfer(flowFile, REL_FAILURE);
this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
context.yield();
publisher.setValid(false);
}
}
}