From e1584207d114d4915cc793d633039b724f6a07cb Mon Sep 17 00:00:00 2001 From: Gardella Juan Pablo Date: Wed, 1 Jul 2020 06:30:39 -0300 Subject: [PATCH] NIFI-7563: Optimize the usage of JMS sessions and message producers The introduced changes prevent creating unnecesary sessions and producers in some scenarios. This closes #4378. Signed-off-by: Joey Frazee --- .../nifi/jms/processors/JMSPublisher.java | 35 ++--- .../nifi/jms/processors/PublishJMSIT.java | 128 ++++++++++++++++++ .../ConnectionFactoryInvocationHandler.java | 112 +++++++++++++++ .../helpers/ConnectionInvocationHandler.java | 108 +++++++++++++++ .../MessageProducerInvocationHandler.java | 63 +++++++++ .../helpers/SessionInvocationHandler.java | 91 +++++++++++++ 6 files changed, 511 insertions(+), 26 deletions(-) create mode 100644 nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java create mode 100644 nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java create mode 100644 nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java create mode 100644 nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index 17d5690e7e..3a65bababd 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -20,17 +20,14 @@ import org.apache.nifi.logging.ComponentLog; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; -import org.springframework.jms.core.SessionCallback; import org.springframework.jms.support.JmsHeaders; import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import javax.jms.Topic; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -56,7 +53,7 @@ final class JMSPublisher extends JMSWorker { public Message createMessage(Session session) throws JMSException { BytesMessage message = session.createBytesMessage(); message.writeBytes(messageBytes); - setMessageHeaderAndProperties(message, flowFileAttributes); + setMessageHeaderAndProperties(session, message, flowFileAttributes); return message; } }); @@ -67,13 +64,13 @@ final class JMSPublisher extends JMSWorker { @Override public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(messageText); - setMessageHeaderAndProperties(message, flowFileAttributes); + setMessageHeaderAndProperties(session, message, flowFileAttributes); return message; } }); } - void setMessageHeaderAndProperties(final Message message, final Map flowFileAttributes) throws JMSException { + void setMessageHeaderAndProperties(final Session session, final Message message, final Map flowFileAttributes) throws JMSException { if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) { for (Entry entry : flowFileAttributes.entrySet()) { @@ -96,14 +93,14 @@ final class JMSPublisher extends JMSWorker { } else if (entry.getKey().equals(JmsHeaders.TYPE)) { message.setJMSType(entry.getValue()); } else if (entry.getKey().equals(JmsHeaders.REPLY_TO)) { - Destination destination = buildDestination(entry.getValue()); + Destination destination = buildDestination(session, entry.getValue()); if (destination != null) { message.setJMSReplyTo(destination); } else { logUnbuildableDestination(entry.getValue(), JmsHeaders.REPLY_TO); } } else if (entry.getKey().equals(JmsHeaders.DESTINATION)) { - Destination destination = buildDestination(entry.getValue()); + Destination destination = buildDestination(session, entry.getValue()); if (destination != null) { message.setJMSDestination(destination); } else { @@ -128,27 +125,13 @@ final class JMSPublisher extends JMSWorker { } - private Destination buildDestination(final String destinationName) { - Destination destination; + private static Destination buildDestination(final Session session, final String destinationName) throws JMSException { if (destinationName.toLowerCase().contains("topic")) { - destination = this.jmsTemplate.execute(new SessionCallback() { - @Override - public Topic doInJms(Session session) throws JMSException { - return session.createTopic(destinationName); - } - }); + return session.createTopic(destinationName); } else if (destinationName.toLowerCase().contains("queue")) { - destination = this.jmsTemplate.execute(new SessionCallback() { - @Override - public Queue doInJms(Session session) throws JMSException { - return session.createQueue(destinationName); - } - }); - } else { - destination = null; + return session.createQueue(destinationName); } - - return destination; + return null; } /** diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java index b901b7391d..8070194146 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java @@ -33,6 +33,7 @@ import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.wireformat.WireFormat; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties; +import org.apache.nifi.jms.processors.helpers.ConnectionFactoryInvocationHandler; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; @@ -45,6 +46,7 @@ import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.JmsHeaders; import java.io.IOException; +import java.lang.reflect.Proxy; import java.net.URI; import java.net.UnknownHostException; import java.util.HashMap; @@ -325,6 +327,7 @@ public class PublishJMSIT { runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory } + /** *

* This test validates the connection resources are closed if the publisher is marked as invalid. @@ -385,6 +388,131 @@ public class PublishJMSIT { } } + /** + *

+ * This test validates the optimal resources usage. To process one message is expected to create only one connection, one session and one message producer. + *

+ *

+ * See https://issues.apache.org/jira/browse/NIFI-7563 for details. + *

+ * @throws Exception any error related to the broker. + */ + @Test(timeout = 10000) + public void validateNIFI7563UsingOneThread() throws Exception { + BrokerService broker = new BrokerService(); + try { + broker.setPersistent(false); + TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0"); + int port = connector.getServer().getSocketAddress().getPort(); + broker.start(); + + final ActiveMQConnectionFactory innerCf = new ActiveMQConnectionFactory("tcp://127.0.0.1:" + port); + ConnectionFactoryInvocationHandler connectionFactoryProxy = new ConnectionFactoryInvocationHandler(innerCf); + + // Create a connection Factory proxy to catch metrics and usage. + ConnectionFactory cf = (ConnectionFactory) Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { ConnectionFactory.class }, connectionFactoryProxy); + + TestRunner runner = TestRunners.newTestRunner(new PublishJMS()); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + + String destinationName = "myDestinationName"; + // The destination option according current implementation should contain topic or queue to infer the destination type + // from the name. Check https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name can be + // randomly created. + String topicNameInHeader = "topic-foo"; + runner.setProperty(PublishJMS.DESTINATION, destinationName); + runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE); + + int threads = 1; + Map flowFileAttributes = new HashMap<>(); + // This method will be removed once https://issues.apache.org/jira/browse/NIFI-7564 is fixed. + flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader); + flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader); + runner.setThreadCount(threads); + runner.enqueue("hi".getBytes(), flowFileAttributes); + runner.enqueue("hi".getBytes(), flowFileAttributes); + runner.run(2); + assertTrue("It is expected at least " + threads + " Connection to be opened.", threads == connectionFactoryProxy.openedConnections()); + assertTrue("It is expected " + threads + " Session to be opened and there are " + connectionFactoryProxy.openedSessions(), threads == connectionFactoryProxy.openedSessions()); + assertTrue("It is expected " + threads + " MessageProducer to be opened and there are " + connectionFactoryProxy.openedProducers(), threads == connectionFactoryProxy.openedProducers()); + assertTrue("Some resources were not closed.", connectionFactoryProxy.isAllResourcesClosed()); + } finally { + if (broker != null) { + broker.stop(); + } + } + } + + /** + *

+ * This test validates the optimal resources usage. To process one message is expected to create only one connection, one session and one message producer. + *

+ *

+ * See https://issues.apache.org/jira/browse/NIFI-7563 for details. + *

+ * @throws Exception any error related to the broker. + */ + @Test(timeout = 10000) + public void validateNIFI7563UsingMultipleThreads() throws Exception { + BrokerService broker = new BrokerService(); + try { + broker.setPersistent(false); + TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0"); + int port = connector.getServer().getSocketAddress().getPort(); + broker.start(); + + final ActiveMQConnectionFactory innerCf = new ActiveMQConnectionFactory("tcp://127.0.0.1:" + port); + ConnectionFactoryInvocationHandler connectionFactoryProxy = new ConnectionFactoryInvocationHandler(innerCf); + + // Create a connection Factory proxy to catch metrics and usage. + ConnectionFactory cf = (ConnectionFactory) Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { ConnectionFactory.class }, connectionFactoryProxy); + + TestRunner runner = TestRunners.newTestRunner(new PublishJMS()); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + + String destinationName = "myDestinationName"; + // The destination option according current implementation should contain topic or queue to infer the destination type + // from the name. Check https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name can be + // randomly created. + String topicNameInHeader = "topic-foo"; + runner.setProperty(PublishJMS.DESTINATION, destinationName); + runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE); + + int messagesToGenerate = 1000; + int threads = 10; + runner.setThreadCount(threads); + Map flowFileAttributes = new HashMap<>(); + // This method will be removed once https://issues.apache.org/jira/browse/NIFI-7564 is fixed. + flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader); + flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader); + byte[] messageContent = "hi".getBytes(); + for (int i = 0; i < messagesToGenerate; i++) { + runner.enqueue(messageContent, flowFileAttributes); + } + runner.run(messagesToGenerate); + assertTrue("It is expected at least " + threads + " Connection to be opened.", connectionFactoryProxy.openedConnections() <= threads); + assertTrue("It is expected " + threads + " Session to be opened and there are " + connectionFactoryProxy.openedSessions(), connectionFactoryProxy.openedSessions() <= threads); + assertTrue("It is expected " + threads + " MessageProducer to be opened and there are " + connectionFactoryProxy.openedProducers(), connectionFactoryProxy.openedProducers() <= threads); + assertTrue("Some resources were not closed.", connectionFactoryProxy.isAllResourcesClosed()); + } finally { + if (broker != null) { + broker.stop(); + } + } + } + @Test public void whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded() throws Exception { TestRunner runner = TestRunners.newTestRunner(PublishJMS.class); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java new file mode 100644 index 0000000000..8510ad60d3 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.helpers; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.concurrent.ThreadSafe; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link ConnectionFactory}'s invocation handler to be used to create {@link Proxy} instances. This handler stores + * useful information to validate the proper resources handling of underlying connection factory. + */ +@ThreadSafe +public final class ConnectionFactoryInvocationHandler implements InvocationHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactoryInvocationHandler.class); + + private final ConnectionFactory connectionFactory; + private final List handlers = new CopyOnWriteArrayList<>(); + private final AtomicInteger openedConnections = new AtomicInteger(); + + public ConnectionFactoryInvocationHandler(ConnectionFactory connectionFactory) { + this.connectionFactory = Objects.requireNonNull(connectionFactory); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + final Object o = connectionFactory.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(connectionFactory, args); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Method {} called on {}", method.getName(), connectionFactory); + } + if ("createConnection".equals(method.getName())) { + Connection connection = (Connection) o; + ConnectionInvocationHandler cp = new ConnectionInvocationHandler(connection); + handlers.add(cp); + openedConnections.incrementAndGet(); + LOGGER.info("Connection created {}", connection); + return (Connection) Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { Connection.class }, cp); + } + return o; + } + + /** + * @return true if all opened resources were closed. + */ + public boolean isAllResourcesClosed() { + boolean closed = true; + for (ConnectionInvocationHandler handler : handlers) { + boolean handlerClosed = handler.isClosed(); + closed = closed && handlerClosed; + if (!handlerClosed) { + LOGGER.warn("Connection is not closed {}", handler.getConnection()); + } + } + return closed; + } + + /** + * @return number of opened connections. + */ + public int openedConnections() { + return openedConnections.get(); + } + + /** + * @return all opened producers. + */ + public int openedProducers() { + int producers = 0; + for (ConnectionInvocationHandler handler : handlers) { + producers += handler.openedProducers(); + } + return producers; + } + + /** + * @return number of opened sessions. + */ + public int openedSessions() { + int sessions = 0; + for (ConnectionInvocationHandler handler : handlers) { + sessions += handler.openedSessions(); + } + return sessions; + } + +} diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java new file mode 100644 index 0000000000..fba52783f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.helpers; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.concurrent.ThreadSafe; +import javax.jms.Connection; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple {@link Connection} proxy utility to detect opened and unclosed resources. + */ +@ThreadSafe +final class ConnectionInvocationHandler implements InvocationHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionInvocationHandler.class); + private final AtomicInteger closeCalled = new AtomicInteger(); + private final Connection connection; + private final List handlers = new CopyOnWriteArrayList<>(); + private final AtomicInteger openedSessions = new AtomicInteger(); + + public ConnectionInvocationHandler(Connection connection) { + this.connection = Objects.requireNonNull(connection); + } + + public Connection getConnection() { + return connection; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + final Object o = connection.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(connection, args); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Method {} called on {}", method.getName(), connection); + } + if (method.getName().equals("createSession")) { + Session session = (Session) o; + LOGGER.info("Session created {} using connection {}", session, connection); + openedSessions.incrementAndGet(); + SessionInvocationHandler sp = new SessionInvocationHandler(session); + handlers.add(sp); + Session sessionProxy = (Session) Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { Session.class }, sp); + return sessionProxy; + } + if ("close".equals(method.getName())) { + closeCalled.incrementAndGet(); + LOGGER.info("Connection close method called {} times for {}", closeCalled, connection); + } + return o; + } + + /** + * @return true if {@link Connection#close()} method was closed to {@link #connection} and all resources created from the connection were closed too. + */ + public boolean isClosed() { + boolean closed = closeCalled.get() >= 1; + for (SessionInvocationHandler handler : handlers) { + boolean handlerClosed = handler.isClosed(); + closed = closed && handlerClosed; + if (!handlerClosed) { + LOGGER.warn("Session is not closed {}", handler.getSession()); + } + } + return closed; + } + + /** + * @return number opened producers. + */ + public int openedProducers() { + int producers = 0; + for (SessionInvocationHandler handler : handlers) { + producers += handler.openedProducers(); + } + return producers; + } + + /** + * @return the number of opened sessions. + */ + public int openedSessions() { + return openedSessions.get(); + } + +} diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java new file mode 100644 index 0000000000..5fd6f2e4cf --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.helpers; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.MessageProducer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link MessageProducer} invocation handler. + */ +final class MessageProducerInvocationHandler implements InvocationHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SessionInvocationHandler.class); + + private final AtomicInteger closeCalled = new AtomicInteger(); + private final MessageProducer messageProducer; + + public MessageProducerInvocationHandler(MessageProducer messageProducer) { + this.messageProducer = Objects.requireNonNull(messageProducer); + } + + public MessageProducer getMessageProducer() { + return messageProducer; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + final Object o = messageProducer.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(messageProducer, args); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Method {} called on {}", method.getName(), messageProducer); + } + if ("close".equals(method.getName())) { + closeCalled.incrementAndGet(); + LOGGER.info("MessageProducer closed {}", messageProducer); + } + return o; + } + + public boolean isClosed() { + return closeCalled.get() >= 1; + } + +} diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java new file mode 100644 index 0000000000..d73415d97c --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors.helpers; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link Session} invocation handler for Session proxy instances. + * @see ConnectionFactoryInvocationHandler + */ +final class SessionInvocationHandler implements InvocationHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SessionInvocationHandler.class); + + private final AtomicInteger closeCalled = new AtomicInteger(); + private final List handlers = new CopyOnWriteArrayList<>(); + private final AtomicInteger openedProducers = new AtomicInteger(); + private final Session session; + + public SessionInvocationHandler(Session session) { + this.session = Objects.requireNonNull(session); + } + + public Session getSession() { + return session; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + final Object o = session.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(session, args); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Method {} called on {}", method.getName(), session); + } + if (method.getName().equals("createProducer")) { + MessageProducer messageProducer = (MessageProducer) o; + LOGGER.info("Created a Message Producer {} using session {}", messageProducer, session); + openedProducers.incrementAndGet(); + MessageProducerInvocationHandler mp = new MessageProducerInvocationHandler(messageProducer); + handlers.add(mp); + MessageProducer messageProducerProxy = (MessageProducer) Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { MessageProducer.class }, mp); + return messageProducerProxy; + } + if ("close".equals(method.getName())) { + closeCalled.incrementAndGet(); + LOGGER.info("Session close method called {} times for {}", closeCalled, session); + } + return o; + } + + public boolean isClosed() { + boolean closed = closeCalled.get() >= 1; + for (MessageProducerInvocationHandler handler : handlers) { + boolean handlerClosed = handler.isClosed(); + closed = closed && handlerClosed; + if (!handlerClosed) { + LOGGER.warn("MessageProducer is not closed {}", handler.getMessageProducer()); + } + } + return closed; + } + + public int openedProducers() { + return openedProducers.get(); + } + +}