From 3492313d0b3436cdd0f7390d46d403fed9d65b77 Mon Sep 17 00:00:00 2001 From: Ed Date: Thu, 10 Jan 2019 11:55:29 -0500 Subject: [PATCH] NIFI-5869 Support Reconnection for JMS resets worker if it doesn't work anymore for any reason. this will add "reconnect" capabilities. Will solve issues for following use cases: - authentication changed after successful connection - JNDI mapping changed and requires recaching. - JMS server isn't available anymore or restarted. improved controller reset on exception Signed-off-by: Matthew Burgess This closes #3261 --- ...MSConnectionFactoryProviderDefinition.java | 8 ++++ .../jms/cf/JMSConnectionFactoryProvider.java | 9 +++- .../cf/JndiJmsConnectionFactoryProvider.java | 8 ++++ .../jms/processors/AbstractJMSProcessor.java | 22 ++++++++- .../nifi/jms/processors/ConsumeJMS.java | 47 ++++++++++--------- .../apache/nifi/jms/processors/JMSWorker.java | 9 ++++ .../nifi/jms/processors/PublishJMS.java | 15 +++++- 7 files changed, 93 insertions(+), 25 deletions(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java index adb94fd445..6bab920b77 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java @@ -35,4 +35,12 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic */ ConnectionFactory getConnectionFactory(); + /** + * Resets {@link ConnectionFactory}. + * Provider should reset {@link ConnectionFactory} only if a copy provided by a client matches + * current {@link ConnectionFactory}. + * @param cachedFactory - {@link ConnectionFactory} cached by client. + */ + void resetConnectionFactory(ConnectionFactory cachedFactory); + } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java index ecb4e7a538..781ce653fc 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java @@ -139,6 +139,14 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl .build(); } + @Override + public void resetConnectionFactory(ConnectionFactory cachedFactory) { + if (cachedFactory == connectionFactory) { + getLogger().debug("Resetting connection factory"); + connectionFactory = null; + } + } + /** * @return new instance of {@link ConnectionFactory} */ @@ -316,5 +324,4 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context); } } - } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java index a293d8431a..44d8d99e48 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java @@ -138,6 +138,14 @@ public class JndiJmsConnectionFactoryProvider extends AbstractControllerService connectionFactory = null; } + @Override + public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) { + if (cachedFactory == connectionFactory) { + getLogger().debug("Resetting connection factory"); + connectionFactory = null; + } + } + @Override public synchronized ConnectionFactory getConnectionFactory() { if (connectionFactory == null) { diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 0094eaf788..f47cf784da 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -158,7 +158,27 @@ abstract class AbstractJMSProcessor extends AbstractProcess try { rendezvousWithJms(context, session, worker); } finally { - workerPool.offer(worker); + //in case of exception during worker's connection (consumer or publisher), + //an appropriate service is responsible to invalidate the worker. + //if worker is not valid anymore, don't put it back into a pool, try to rebuild it first, or discard. + //this will be helpful in a situation, when JNDI has changed, or JMS server is not available + //and reconnection is required. + if (worker == null || !worker.isValid()){ + getLogger().debug("Worker is invalid. Will try re-create... "); + final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class); + try { + // Safe to cast. Method #buildTargetResource(ProcessContext context) sets only CachingConnectionFactory + CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory(); + cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory()); + worker = buildTargetResource(context); + }catch(Exception e) { + getLogger().error("Failed to rebuild: " + cfProvider); + worker = null; + } + } + if (worker != null) { + workerPool.offer(worker); + } } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index 997c6dd2e4..4b149e2652 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -188,28 +188,33 @@ public class ConsumeJMS extends AbstractJMSProcessor { final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue(); final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); - consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() { - @Override - public void accept(final JMSResponse response) { - if (response == null) { - return; + try { + consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() { + @Override + public void accept(final JMSResponse response) { + if (response == null) { + return; + } + + FlowFile flowFile = processSession.create(); + flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody())); + + final Map jmsHeaders = response.getMessageHeaders(); + final Map jmsProperties = response.getMessageProperties(); + + flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); + flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); + flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName); + + processSession.getProvenanceReporter().receive(flowFile, destinationName); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.commit(); } - - FlowFile flowFile = processSession.create(); - flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody())); - - final Map jmsHeaders = response.getMessageHeaders(); - final Map jmsProperties = response.getMessageProperties(); - - flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); - flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); - flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName); - - processSession.getProvenanceReporter().receive(flowFile, destinationName); - processSession.transfer(flowFile, REL_SUCCESS); - processSession.commit(); - } - }); + }); + } catch(Exception e) { + consumer.setValid(false); + throw e; // for backward compatibility with exception handling in flows + } } /** diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java index e6fa1bb071..ee4d76dfde 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java @@ -36,6 +36,7 @@ abstract class JMSWorker { protected final JmsTemplate jmsTemplate; protected final ComponentLog processLog; private final CachingConnectionFactory connectionFactory; + private boolean isValid = true; /** @@ -61,4 +62,12 @@ abstract class JMSWorker { return this.getClass().getSimpleName() + "[destination:" + this.jmsTemplate.getDefaultDestinationName() + "; pub-sub:" + this.jmsTemplate.isPubSubDomain() + ";]"; } + + public boolean isValid() { + return isValid; + } + + public void setValid(boolean isValid) { + this.isValid = isValid; + } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index a32a895351..12451cf3dd 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -123,11 +123,21 @@ public class PublishJMS extends AbstractJMSProcessor { String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue(); switch (context.getProperty(MESSAGE_BODY).getValue()) { case TEXT_MESSAGE: - publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes()); + try { + publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes()); + } catch(Exception e) { + publisher.setValid(false); + throw e; + } break; case BYTES_MESSAGE: default: - publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes()); + try { + publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes()); + } catch(Exception e) { + publisher.setValid(false); + throw e; + } break; } processSession.transfer(flowFile, REL_SUCCESS); @@ -136,6 +146,7 @@ public class PublishJMS extends AbstractJMSProcessor { processSession.transfer(flowFile, REL_FAILURE); this.getLogger().error("Failed while sending message to JMS via " + publisher, e); context.yield(); + publisher.setValid(false); } } }