NIFI-7034 Thise closes #4002. Connection leak with JMSConsumer and JMSPublisher

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Gardella Juan Pablo 2020-01-19 13:09:49 -03:00 committed by Joe Witt
parent b045c4ce0b
commit cddaac591b
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 178 additions and 47 deletions

View File

@ -196,6 +196,9 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
getLogger().debug("Worker is invalid. Will try re-create... ");
final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
try {
if (worker != null) {
worker.shutdown();
}
// Safe to cast. Method #buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());

View File

@ -17,6 +17,7 @@
package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -25,9 +26,16 @@ import static org.mockito.Mockito.when;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -38,9 +46,13 @@ import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.JmsHeaders;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@ -52,6 +64,7 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.net.SocketFactory;
public class ConsumeJMSIT {
@ -148,49 +161,29 @@ public class ConsumeJMSIT {
@Test
public void testTextMessageTypeAttribute() throws Exception {
testMessageTypeAttribute(
"testTextMessage",
Session::createTextMessage,
TextMessage.class.getSimpleName()
);
testMessageTypeAttribute("testTextMessage", Session::createTextMessage, TextMessage.class.getSimpleName());
}
@Test
public void testByteMessageTypeAttribute() throws Exception {
testMessageTypeAttribute(
"testByteMessage",
Session::createBytesMessage,
BytesMessage.class.getSimpleName()
);
testMessageTypeAttribute("testByteMessage", Session::createBytesMessage, BytesMessage.class.getSimpleName());
}
@Test
public void testObjectMessageTypeAttribute() throws Exception {
String destinationName = "testObjectMessage";
testMessageTypeAttribute(
destinationName,
Session::createObjectMessage,
ObjectMessage.class.getSimpleName()
);
testMessageTypeAttribute(destinationName, Session::createObjectMessage, ObjectMessage.class.getSimpleName());
}
@Test
public void testStreamMessageTypeAttribute() throws Exception {
testMessageTypeAttribute(
"testStreamMessage",
Session::createStreamMessage,
StreamMessage.class.getSimpleName()
);
testMessageTypeAttribute("testStreamMessage", Session::createStreamMessage, StreamMessage.class.getSimpleName());
}
@Test
public void testMapMessageTypeAttribute() throws Exception {
testMessageTypeAttribute(
"testMapMessage",
Session::createMapMessage,
MapMessage.class.getSimpleName()
);
testMessageTypeAttribute("testMapMessage", Session::createMapMessage, MapMessage.class.getSimpleName());
}
@Test
@ -201,10 +194,7 @@ public class ConsumeJMSIT {
JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
sender.jmsTemplate.send("testMapMessage", __ -> createUnsupportedMessage(
"unsupportedMessagePropertyKey",
"unsupportedMessagePropertyValue"
));
sender.jmsTemplate.send("testMapMessage", __ -> createUnsupportedMessage("unsupportedMessagePropertyKey", "unsupportedMessagePropertyValue"));
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
@ -278,7 +268,8 @@ public class ConsumeJMSIT {
* </ul>
* It is expected <tt>C1</tt> receives message <tt>M1</tt>.
* </p>
* @throws Exception unexpected
* @throws Exception
* unexpected
*/
@Test(timeout = 10000)
public void validateNifi6915() throws Exception {
@ -338,6 +329,70 @@ public class ConsumeJMSIT {
runner.run(1, true);
}
/**
* <p>
* This test validates the connection resources are closed if the publisher is marked as invalid.
* </p>
* <p>
* This tests validates the proper resources handling for TCP connections using ActiveMQ (the bug was discovered against ActiveMQ 5.x). In this test, using some ActiveMQ's classes is possible to
* verify if an opened socket is closed. See <a href="https://issues.apache.org/jira/browse/NIFI-7034">NIFI-7034</a>.
* </p>
* @throws Exception
* any error related to the broker.
*/
@Test(timeout = 10000)
public void validateNIFI7034() throws Exception {
class ConsumeJMSForNifi7034 extends ConsumeJMS {
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSConsumer consumer) throws ProcessException {
super.rendezvousWithJms(context, processSession, consumer);
consumer.setValid(false);
}
}
BrokerService broker = new BrokerService();
try {
broker.setPersistent(false);
broker.setBrokerName("nifi7034publisher");
TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0");
int port = connector.getServer().getSocketAddress().getPort();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("validateNIFI7034://127.0.0.1:" + port);
final String destinationName = "nifi7034";
final AtomicReference<TcpTransport> tcpTransport = new AtomicReference<TcpTransport>();
TcpTransportFactory.registerTransportFactory("validateNIFI7034", new TcpTransportFactory() {
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
TcpTransport transport = super.createTcpTransport(wf, socketFactory, location, localLocation);
tcpTransport.set(transport);
return transport;
}
});
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMSForNifi7034());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
try {
runner.run();
fail("Unit test implemented in a way this line must not be called");
} catch (AssertionError e) {
assertFalse("It is expected transport be closed. ", tcpTransport.get().isConnected());
}
} finally {
if (broker != null) {
broker.stop();
}
}
}
private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException {
// Publish a message.
try (Connection conn = cf.createConnection();

View File

@ -16,23 +16,6 @@
*/
package org.apache.nifi.jms.processors;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -41,6 +24,37 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.net.SocketFactory;
public class PublishJMSIT {
@Test(timeout = 10000)
@ -308,4 +322,63 @@ public class PublishJMSIT {
runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}
/**
* <p>
* This test validates the connection resources are closed if the publisher is marked as invalid.
* </p>
* <p>
* This tests validates the proper resources handling for TCP connections using ActiveMQ (the bug was discovered against ActiveMQ 5.x). In this test, using some ActiveMQ's classes is possible to
* verify if an opened socket is closed. See <a href="NIFI-7034">https://issues.apache.org/jira/browse/NIFI-7034</a>.
* </p>
* @throws Exception any error related to the broker.
*/
@Test(timeout = 10000)
public void validateNIFI7034() throws Exception {
class PublishJmsForNifi7034 extends PublishJMS {
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
super.rendezvousWithJms(context, processSession, publisher);
publisher.setValid(false);
}
}
BrokerService broker = new BrokerService();
try {
broker.setPersistent(false);
broker.setBrokerName("nifi7034publisher");
TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0");
int port = connector.getServer().getSocketAddress().getPort();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("validateNIFI7034://127.0.0.1:" + port);
final String destinationName = "nifi7034";
final AtomicReference<TcpTransport> tcpTransport = new AtomicReference<TcpTransport>();
TcpTransportFactory.registerTransportFactory("validateNIFI7034", new TcpTransportFactory() {
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
TcpTransport transport = super.createTcpTransport(wf, socketFactory, location, localLocation);
tcpTransport.set(transport);
return transport;
}
});
TestRunner runner = TestRunners.newTestRunner(new PublishJmsForNifi7034());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationName);
runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.TOPIC);
runner.enqueue("hi".getBytes());
runner.run();
assertFalse("It is expected transport be closed. ", tcpTransport.get().isConnected());
} finally {
if (broker != null) {
broker.stop();
}
}
}
}