From 98c2aa433f3373533c39eb12b6446bfc8a556aff Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Thu, 15 Oct 2015 14:39:51 +0100 Subject: [PATCH] ARTEMIS-262 Fix Bridge OOM exception Netty 4.x uses pooled buffers. These buffers can run out of memory when transferring large amounts of data over connection. This was causing an OutOfMemory exception to be thrown on the CoreBridge when tranferring large messages. Netty provides a callback handler to notify listeners when a Connection is writable. This patch adds the ability to register connection writable listeners to the Netty connection and registers the relevant callback from the Bridge to avoid writing when the buffers are full. --- .../client/impl/ClientSessionFactoryImpl.java | 24 ++++ .../impl/ClientSessionFactoryInternal.java | 3 + .../core/client/impl/ClientSessionImpl.java | 6 + .../client/impl/ClientSessionInternal.java | 3 + .../core/client/impl/DelegatingSession.java | 6 + .../remoting/impl/netty/NettyConnector.java | 1 + .../core/server/cluster/impl/BridgeImpl.java | 34 +++++- .../cluster/bridge/BridgeTest.java | 111 +++++++++++++++++- 8 files changed, 185 insertions(+), 3 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index ed588ef364..6e1ccbdf6b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -147,6 +147,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private String liveNodeID; + private Set lifeCycleListeners; + + // We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called. + private boolean connectionReadyForWrites; + + private final Object connectionReadyLock = new Object(); + public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, final TransportConfiguration connectorConfig, final long callTimeout, @@ -214,6 +221,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); + lifeCycleListeners = new HashSet(); + + connectionReadyForWrites = true; } public void disableFinalizeCheck() { @@ -225,6 +235,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C return newFailoverLock; } + @Override + public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { + synchronized (connectionReadyLock) { + lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(), connectionReadyForWrites); + lifeCycleListeners.add(lifeCycleListener); + } + } + public void connect(final int initialConnectAttempts, final boolean failoverOnInitialConnection) throws ActiveMQException { // Get the connection @@ -356,6 +374,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } public void connectionReadyForWrites(final Object connectionID, final boolean ready) { + synchronized (connectionReadyLock) { + connectionReadyForWrites = ready; + for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) { + lifeCycleListener.connectionReadyForWrites(connectionID, ready); + } + } } public synchronized int numConnections() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java index ba2dab7694..367161827f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; public interface ClientSessionFactoryInternal extends ClientSessionFactory { @@ -57,4 +58,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory { ConfirmationWindowWarning getConfirmationWindowWarning(); Lock lockFailover(); + + void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 5b028b28f1..acb07256a1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; @@ -631,6 +632,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return sessionFactory.getLiveNodeId(); } + @Override + public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { + sessionFactory.addLifeCycleListener(lifeCycleListener); + } + // ClientSessionInternal implementation // ------------------------------------------------------------ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index 3a8524501c..06d60249d2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; public interface ClientSessionInternal extends ClientSession { @@ -122,4 +123,6 @@ public interface ClientSessionInternal extends ClientSession { boolean isClosing(); String getNodeId(); + + void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java index a4296bce23..2d6f4a4795 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.utils.ConcurrentHashSet; @@ -97,6 +98,11 @@ public class DelegatingSession implements ClientSessionInternal { session.acknowledge(consumer, message); } + @Override + public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { + session.addLifeCycleListener(lifeCycleListener); + } + public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException { session.individualAcknowledge(consumer, message); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 7dad1433d4..decac8953e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -924,6 +924,7 @@ public class NettyConnector extends AbstractConnector { } public void connectionReadyForWrites(Object connectionID, boolean ready) { + listener.connectionReadyForWrites(connectionID, ready); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index eb99fcc99d..a8b048c98e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -57,6 +58,8 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.TypedProperties; @@ -66,7 +69,7 @@ import org.apache.activemq.artemis.utils.UUID; * A Core BridgeImpl */ -public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler { +public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ConnectionLifeCycleListener { // Constants ----------------------------------------------------- private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -132,6 +135,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private volatile ClientProducer producer; + private volatile boolean connectionWritable = false; + private volatile boolean started; private volatile boolean stopping = false; @@ -481,7 +486,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } synchronized (this) { - if (!active) { + if (!active || !connectionWritable) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref); } @@ -532,6 +537,29 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override + public void connectionCreated(ActiveMQComponent component, Connection connection, String protocol) { + + } + + @Override + public void connectionDestroyed(Object connectionID) { + + } + + @Override + public void connectionException(Object connectionID, ActiveMQException me) { + + } + + @Override + public void connectionReadyForWrites(Object connectionID, boolean ready) { + connectionWritable = ready; + if (connectionWritable) { + queue.deliverAsync(); + } + } + // FailureListener implementation -------------------------------- public void proceedDeliver(MessageReference ref) { @@ -840,6 +868,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled session.setSendAcknowledgementHandler(BridgeImpl.this); + session.addLifeCycleListener(BridgeImpl.this); + afterConnect(); active = true; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index f7592be4be..37338aab83 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -137,6 +137,115 @@ public class BridgeTest extends ActiveMQTestBase { internaltestSimpleBridge(true, true); } + @Test + public void testLargeMessageBridge() throws Exception { + long time = System.currentTimeMillis(); + Map server0Params = new HashMap(); + server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params); + + Map server1Params = new HashMap(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + + // Map connectors = new HashMap(); + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params); + + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + + HashMap connectors = new HashMap(); + connectors.put(server1tc.getName(), server1tc); + server0.getConfiguration().setConnectorConfigurations(connectors); + +// final int messageSize = 1024 * 1024 * 5; + final int messageSize = 1024 * 10; + + final int numMessages = 100000; + + ArrayList connectorConfig = new ArrayList(); + connectorConfig.add(server1tc.getName()); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig); + + List bridgeConfigs = new ArrayList(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0); + List queueConfigs0 = new ArrayList(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1); + List queueConfigs1 = new ArrayList(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigurations(queueConfigs1); + + server1.start(); + server0.start(); + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc)); + + ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc)); + + ClientSession session0 = sf0.createSession(false, true, true); + + ClientSession session1 = sf1.createSession(false, true, true); + + ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress)); + + ClientConsumer consumer1 = session1.createConsumer(queueName1); + + session1.start(); + + final byte[] bytes = new byte[messageSize]; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session0.createMessage(true); + + message.putIntProperty(propKey, i); + + message.getBodyBuffer().writeBytes(bytes); + + producer0.send(message); + } + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = consumer1.receive(500000); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + readLargeMessages(message, 10); + + message.acknowledge(); + } + + Assert.assertNull(consumer1.receiveImmediate()); + + session0.close(); + + session1.close(); + + sf0.close(); + + sf1.close(); + + closeFields(); + if (server0.getConfiguration().isPersistenceEnabled()) { + assertEquals(0, loadQueues(server0).size()); + } + long timeTaken = System.currentTimeMillis() - time; + System.out.println(timeTaken + "ms"); + } + + public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception { Map server0Params = new HashMap(); server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params); @@ -161,7 +270,7 @@ public class BridgeTest extends ActiveMQTestBase { final int messageSize = 1024; - final int numMessages = 10; + final int numMessages = 10000; ArrayList connectorConfig = new ArrayList(); connectorConfig.add(server1tc.getName());