NIFI-7563: Optimize the usage of JMS sessions and message producers

The introduced changes prevent creating unnecesary sessions and producers
in some scenarios.

This closes #4378.

Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
Gardella Juan Pablo 2020-07-01 06:30:39 -03:00 committed by Joey Frazee
parent 239a2e884c
commit e1584207d1
6 changed files with 511 additions and 26 deletions

View File

@ -20,17 +20,14 @@ import org.apache.nifi.logging.ComponentLog;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@ -56,7 +53,7 @@ final class JMSPublisher extends JMSWorker {
public Message createMessage(Session session) throws JMSException {
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageBytes);
setMessageHeaderAndProperties(message, flowFileAttributes);
setMessageHeaderAndProperties(session, message, flowFileAttributes);
return message;
}
});
@ -67,13 +64,13 @@ final class JMSPublisher extends JMSWorker {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage(messageText);
setMessageHeaderAndProperties(message, flowFileAttributes);
setMessageHeaderAndProperties(session, message, flowFileAttributes);
return message;
}
});
}
void setMessageHeaderAndProperties(final Message message, final Map<String, String> flowFileAttributes) throws JMSException {
void setMessageHeaderAndProperties(final Session session, final Message message, final Map<String, String> flowFileAttributes) throws JMSException {
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
@ -96,14 +93,14 @@ final class JMSPublisher extends JMSWorker {
} else if (entry.getKey().equals(JmsHeaders.TYPE)) {
message.setJMSType(entry.getValue());
} else if (entry.getKey().equals(JmsHeaders.REPLY_TO)) {
Destination destination = buildDestination(entry.getValue());
Destination destination = buildDestination(session, entry.getValue());
if (destination != null) {
message.setJMSReplyTo(destination);
} else {
logUnbuildableDestination(entry.getValue(), JmsHeaders.REPLY_TO);
}
} else if (entry.getKey().equals(JmsHeaders.DESTINATION)) {
Destination destination = buildDestination(entry.getValue());
Destination destination = buildDestination(session, entry.getValue());
if (destination != null) {
message.setJMSDestination(destination);
} else {
@ -128,27 +125,13 @@ final class JMSPublisher extends JMSWorker {
}
private Destination buildDestination(final String destinationName) {
Destination destination;
private static Destination buildDestination(final Session session, final String destinationName) throws JMSException {
if (destinationName.toLowerCase().contains("topic")) {
destination = this.jmsTemplate.execute(new SessionCallback<Topic>() {
@Override
public Topic doInJms(Session session) throws JMSException {
return session.createTopic(destinationName);
}
});
return session.createTopic(destinationName);
} else if (destinationName.toLowerCase().contains("queue")) {
destination = this.jmsTemplate.execute(new SessionCallback<Queue>() {
@Override
public Queue doInJms(Session session) throws JMSException {
return session.createQueue(destinationName);
}
});
} else {
destination = null;
return session.createQueue(destinationName);
}
return destination;
return null;
}
/**

View File

@ -33,6 +33,7 @@ import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
import org.apache.nifi.jms.processors.helpers.ConnectionFactoryInvocationHandler;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
@ -45,6 +46,7 @@ import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
@ -325,6 +327,7 @@ public class PublishJMSIT {
runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}
/**
* <p>
* This test validates the connection resources are closed if the publisher is marked as invalid.
@ -385,6 +388,131 @@ public class PublishJMSIT {
}
}
/**
* <p>
* This test validates the optimal resources usage. To process one message is expected to create only one connection, one session and one message producer.
* </p>
* <p>
* See <a href="NIFI-7563">https://issues.apache.org/jira/browse/NIFI-7563</a> for details.
* </p>
* @throws Exception any error related to the broker.
*/
@Test(timeout = 10000)
public void validateNIFI7563UsingOneThread() throws Exception {
BrokerService broker = new BrokerService();
try {
broker.setPersistent(false);
TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0");
int port = connector.getServer().getSocketAddress().getPort();
broker.start();
final ActiveMQConnectionFactory innerCf = new ActiveMQConnectionFactory("tcp://127.0.0.1:" + port);
ConnectionFactoryInvocationHandler connectionFactoryProxy = new ConnectionFactoryInvocationHandler(innerCf);
// Create a connection Factory proxy to catch metrics and usage.
ConnectionFactory cf = (ConnectionFactory) Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { ConnectionFactory.class }, connectionFactoryProxy);
TestRunner runner = TestRunners.newTestRunner(new PublishJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
String destinationName = "myDestinationName";
// The destination option according current implementation should contain topic or queue to infer the destination type
// from the name. Check https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name can be
// randomly created.
String topicNameInHeader = "topic-foo";
runner.setProperty(PublishJMS.DESTINATION, destinationName);
runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE);
int threads = 1;
Map<String, String> flowFileAttributes = new HashMap<>();
// This method will be removed once https://issues.apache.org/jira/browse/NIFI-7564 is fixed.
flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader);
flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader);
runner.setThreadCount(threads);
runner.enqueue("hi".getBytes(), flowFileAttributes);
runner.enqueue("hi".getBytes(), flowFileAttributes);
runner.run(2);
assertTrue("It is expected at least " + threads + " Connection to be opened.", threads == connectionFactoryProxy.openedConnections());
assertTrue("It is expected " + threads + " Session to be opened and there are " + connectionFactoryProxy.openedSessions(), threads == connectionFactoryProxy.openedSessions());
assertTrue("It is expected " + threads + " MessageProducer to be opened and there are " + connectionFactoryProxy.openedProducers(), threads == connectionFactoryProxy.openedProducers());
assertTrue("Some resources were not closed.", connectionFactoryProxy.isAllResourcesClosed());
} finally {
if (broker != null) {
broker.stop();
}
}
}
/**
* <p>
* This test validates the optimal resources usage. To process one message is expected to create only one connection, one session and one message producer.
* </p>
* <p>
* See <a href="NIFI-7563">https://issues.apache.org/jira/browse/NIFI-7563</a> for details.
* </p>
* @throws Exception any error related to the broker.
*/
@Test(timeout = 10000)
public void validateNIFI7563UsingMultipleThreads() throws Exception {
BrokerService broker = new BrokerService();
try {
broker.setPersistent(false);
TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0");
int port = connector.getServer().getSocketAddress().getPort();
broker.start();
final ActiveMQConnectionFactory innerCf = new ActiveMQConnectionFactory("tcp://127.0.0.1:" + port);
ConnectionFactoryInvocationHandler connectionFactoryProxy = new ConnectionFactoryInvocationHandler(innerCf);
// Create a connection Factory proxy to catch metrics and usage.
ConnectionFactory cf = (ConnectionFactory) Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { ConnectionFactory.class }, connectionFactoryProxy);
TestRunner runner = TestRunners.newTestRunner(new PublishJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
String destinationName = "myDestinationName";
// The destination option according current implementation should contain topic or queue to infer the destination type
// from the name. Check https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name can be
// randomly created.
String topicNameInHeader = "topic-foo";
runner.setProperty(PublishJMS.DESTINATION, destinationName);
runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE);
int messagesToGenerate = 1000;
int threads = 10;
runner.setThreadCount(threads);
Map<String, String> flowFileAttributes = new HashMap<>();
// This method will be removed once https://issues.apache.org/jira/browse/NIFI-7564 is fixed.
flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader);
flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader);
byte[] messageContent = "hi".getBytes();
for (int i = 0; i < messagesToGenerate; i++) {
runner.enqueue(messageContent, flowFileAttributes);
}
runner.run(messagesToGenerate);
assertTrue("It is expected at least " + threads + " Connection to be opened.", connectionFactoryProxy.openedConnections() <= threads);
assertTrue("It is expected " + threads + " Session to be opened and there are " + connectionFactoryProxy.openedSessions(), connectionFactoryProxy.openedSessions() <= threads);
assertTrue("It is expected " + threads + " MessageProducer to be opened and there are " + connectionFactoryProxy.openedProducers(), connectionFactoryProxy.openedProducers() <= threads);
assertTrue("Some resources were not closed.", connectionFactoryProxy.isAllResourcesClosed());
} finally {
if (broker != null) {
broker.stop();
}
}
}
@Test
public void whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded() throws Exception {
TestRunner runner = TestRunners.newTestRunner(PublishJMS.class);

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.helpers;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link ConnectionFactory}'s invocation handler to be used to create {@link Proxy} instances. This handler stores
* useful information to validate the proper resources handling of underlying connection factory.
*/
@ThreadSafe
public final class ConnectionFactoryInvocationHandler implements InvocationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactoryInvocationHandler.class);
private final ConnectionFactory connectionFactory;
private final List<ConnectionInvocationHandler> handlers = new CopyOnWriteArrayList<>();
private final AtomicInteger openedConnections = new AtomicInteger();
public ConnectionFactoryInvocationHandler(ConnectionFactory connectionFactory) {
this.connectionFactory = Objects.requireNonNull(connectionFactory);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final Object o = connectionFactory.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(connectionFactory, args);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Method {} called on {}", method.getName(), connectionFactory);
}
if ("createConnection".equals(method.getName())) {
Connection connection = (Connection) o;
ConnectionInvocationHandler cp = new ConnectionInvocationHandler(connection);
handlers.add(cp);
openedConnections.incrementAndGet();
LOGGER.info("Connection created {}", connection);
return (Connection) Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { Connection.class }, cp);
}
return o;
}
/**
* @return true if all opened resources were closed.
*/
public boolean isAllResourcesClosed() {
boolean closed = true;
for (ConnectionInvocationHandler handler : handlers) {
boolean handlerClosed = handler.isClosed();
closed = closed && handlerClosed;
if (!handlerClosed) {
LOGGER.warn("Connection is not closed {}", handler.getConnection());
}
}
return closed;
}
/**
* @return number of opened connections.
*/
public int openedConnections() {
return openedConnections.get();
}
/**
* @return all opened producers.
*/
public int openedProducers() {
int producers = 0;
for (ConnectionInvocationHandler handler : handlers) {
producers += handler.openedProducers();
}
return producers;
}
/**
* @return number of opened sessions.
*/
public int openedSessions() {
int sessions = 0;
for (ConnectionInvocationHandler handler : handlers) {
sessions += handler.openedSessions();
}
return sessions;
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.helpers;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
import javax.jms.Connection;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple {@link Connection} proxy utility to detect opened and unclosed resources.
*/
@ThreadSafe
final class ConnectionInvocationHandler implements InvocationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionInvocationHandler.class);
private final AtomicInteger closeCalled = new AtomicInteger();
private final Connection connection;
private final List<SessionInvocationHandler> handlers = new CopyOnWriteArrayList<>();
private final AtomicInteger openedSessions = new AtomicInteger();
public ConnectionInvocationHandler(Connection connection) {
this.connection = Objects.requireNonNull(connection);
}
public Connection getConnection() {
return connection;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final Object o = connection.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(connection, args);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Method {} called on {}", method.getName(), connection);
}
if (method.getName().equals("createSession")) {
Session session = (Session) o;
LOGGER.info("Session created {} using connection {}", session, connection);
openedSessions.incrementAndGet();
SessionInvocationHandler sp = new SessionInvocationHandler(session);
handlers.add(sp);
Session sessionProxy = (Session) Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { Session.class }, sp);
return sessionProxy;
}
if ("close".equals(method.getName())) {
closeCalled.incrementAndGet();
LOGGER.info("Connection close method called {} times for {}", closeCalled, connection);
}
return o;
}
/**
* @return true if {@link Connection#close()} method was closed to {@link #connection} and all resources created from the connection were closed too.
*/
public boolean isClosed() {
boolean closed = closeCalled.get() >= 1;
for (SessionInvocationHandler handler : handlers) {
boolean handlerClosed = handler.isClosed();
closed = closed && handlerClosed;
if (!handlerClosed) {
LOGGER.warn("Session is not closed {}", handler.getSession());
}
}
return closed;
}
/**
* @return number opened producers.
*/
public int openedProducers() {
int producers = 0;
for (SessionInvocationHandler handler : handlers) {
producers += handler.openedProducers();
}
return producers;
}
/**
* @return the number of opened sessions.
*/
public int openedSessions() {
return openedSessions.get();
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.helpers;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.MessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link MessageProducer} invocation handler.
*/
final class MessageProducerInvocationHandler implements InvocationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionInvocationHandler.class);
private final AtomicInteger closeCalled = new AtomicInteger();
private final MessageProducer messageProducer;
public MessageProducerInvocationHandler(MessageProducer messageProducer) {
this.messageProducer = Objects.requireNonNull(messageProducer);
}
public MessageProducer getMessageProducer() {
return messageProducer;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final Object o = messageProducer.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(messageProducer, args);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Method {} called on {}", method.getName(), messageProducer);
}
if ("close".equals(method.getName())) {
closeCalled.incrementAndGet();
LOGGER.info("MessageProducer closed {}", messageProducer);
}
return o;
}
public boolean isClosed() {
return closeCalled.get() >= 1;
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors.helpers;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link Session} invocation handler for Session proxy instances.
* @see ConnectionFactoryInvocationHandler
*/
final class SessionInvocationHandler implements InvocationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionInvocationHandler.class);
private final AtomicInteger closeCalled = new AtomicInteger();
private final List<MessageProducerInvocationHandler> handlers = new CopyOnWriteArrayList<>();
private final AtomicInteger openedProducers = new AtomicInteger();
private final Session session;
public SessionInvocationHandler(Session session) {
this.session = Objects.requireNonNull(session);
}
public Session getSession() {
return session;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final Object o = session.getClass().getMethod(method.getName(), method.getParameterTypes()).invoke(session, args);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Method {} called on {}", method.getName(), session);
}
if (method.getName().equals("createProducer")) {
MessageProducer messageProducer = (MessageProducer) o;
LOGGER.info("Created a Message Producer {} using session {}", messageProducer, session);
openedProducers.incrementAndGet();
MessageProducerInvocationHandler mp = new MessageProducerInvocationHandler(messageProducer);
handlers.add(mp);
MessageProducer messageProducerProxy = (MessageProducer) Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { MessageProducer.class }, mp);
return messageProducerProxy;
}
if ("close".equals(method.getName())) {
closeCalled.incrementAndGet();
LOGGER.info("Session close method called {} times for {}", closeCalled, session);
}
return o;
}
public boolean isClosed() {
boolean closed = closeCalled.get() >= 1;
for (MessageProducerInvocationHandler handler : handlers) {
boolean handlerClosed = handler.isClosed();
closed = closed && handlerClosed;
if (!handlerClosed) {
LOGGER.warn("MessageProducer is not closed {}", handler.getMessageProducer());
}
}
return closed;
}
public int openedProducers() {
return openedProducers.get();
}
}