ARTEMIS-262 Fix Bridge OOM exception

Netty 4.x uses pooled buffers.  These buffers can run out of memory when
transferring large amounts of data over connection.  This was causing an
OutOfMemory exception to be thrown on the CoreBridge when tranferring
large messages.  Netty provides a callback handler to notify listeners
when a Connection is writable.  This patch adds the ability to register
connection writable listeners to the Netty connection and registers the
relevant callback from the Bridge to avoid writing when the buffers are
full.
This commit is contained in:
Andy Taylor 2015-10-15 14:39:51 +01:00 committed by Martyn Taylor
parent 360338a362
commit 98c2aa433f
8 changed files with 185 additions and 3 deletions

View File

@ -147,6 +147,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private String liveNodeID;
private Set<ConnectionLifeCycleListener> lifeCycleListeners;
// We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called.
private boolean connectionReadyForWrites;
private final Object connectionReadyLock = new Object();
public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connectorConfig,
final long callTimeout,
@ -214,6 +221,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
lifeCycleListeners = new HashSet<ConnectionLifeCycleListener>();
connectionReadyForWrites = true;
}
public void disableFinalizeCheck() {
@ -225,6 +235,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
return newFailoverLock;
}
@Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
synchronized (connectionReadyLock) {
lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(), connectionReadyForWrites);
lifeCycleListeners.add(lifeCycleListener);
}
}
public void connect(final int initialConnectAttempts,
final boolean failoverOnInitialConnection) throws ActiveMQException {
// Get the connection
@ -356,6 +374,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
public void connectionReadyForWrites(final Object connectionID, final boolean ready) {
synchronized (connectionReadyLock) {
connectionReadyForWrites = ready;
for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) {
lifeCycleListener.connectionReadyForWrites(connectionID, ready);
}
}
}
public synchronized int numConnections() {

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
public interface ClientSessionFactoryInternal extends ClientSessionFactory {
@ -57,4 +58,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
ConfirmationWindowWarning getConfirmationWindowWarning();
Lock lockFailover();
void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
}

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
@ -631,6 +632,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return sessionFactory.getLiveNodeId();
}
@Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
sessionFactory.addLifeCycleListener(lifeCycleListener);
}
// ClientSessionInternal implementation
// ------------------------------------------------------------

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
public interface ClientSessionInternal extends ClientSession {
@ -122,4 +123,6 @@ public interface ClientSessionInternal extends ClientSession {
boolean isClosing();
String getNodeId();
void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
@ -97,6 +98,11 @@ public class DelegatingSession implements ClientSessionInternal {
session.acknowledge(consumer, message);
}
@Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
session.addLifeCycleListener(lifeCycleListener);
}
public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {
session.individualAcknowledge(consumer, message);
}

View File

@ -924,6 +924,7 @@ public class NettyConnector extends AbstractConnector {
}
public void connectionReadyForWrites(Object connectionID, boolean ready) {
listener.connectionReadyForWrites(connectionID, ready);
}
}

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -57,6 +58,8 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TypedProperties;
@ -66,7 +69,7 @@ import org.apache.activemq.artemis.utils.UUID;
* A Core BridgeImpl
*/
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler {
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ConnectionLifeCycleListener {
// Constants -----------------------------------------------------
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -132,6 +135,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private volatile ClientProducer producer;
private volatile boolean connectionWritable = false;
private volatile boolean started;
private volatile boolean stopping = false;
@ -481,7 +486,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
synchronized (this) {
if (!active) {
if (!active || !connectionWritable) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref);
}
@ -532,6 +537,29 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
@Override
public void connectionCreated(ActiveMQComponent component, Connection connection, String protocol) {
}
@Override
public void connectionDestroyed(Object connectionID) {
}
@Override
public void connectionException(Object connectionID, ActiveMQException me) {
}
@Override
public void connectionReadyForWrites(Object connectionID, boolean ready) {
connectionWritable = ready;
if (connectionWritable) {
queue.deliverAsync();
}
}
// FailureListener implementation --------------------------------
public void proceedDeliver(MessageReference ref) {
@ -840,6 +868,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
session.setSendAcknowledgementHandler(BridgeImpl.this);
session.addLifeCycleListener(BridgeImpl.this);
afterConnect();
active = true;

View File

@ -137,6 +137,115 @@ public class BridgeTest extends ActiveMQTestBase {
internaltestSimpleBridge(true, true);
}
@Test
public void testLargeMessageBridge() throws Exception {
long time = System.currentTimeMillis();
Map<String, Object> server0Params = new HashMap<String, Object>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
// Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
// final int messageSize = 1024 * 1024 * 5;
final int messageSize = 1024 * 10;
final int numMessages = 100000;
ArrayList<String> connectorConfig = new ArrayList<String>();
connectorConfig.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
server0.start();
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
ClientSession session0 = sf0.createSession(false, true, true);
ClientSession session1 = sf1.createSession(false, true, true);
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session1.createConsumer(queueName1);
session1.start();
final byte[] bytes = new byte[messageSize];
final SimpleString propKey = new SimpleString("testkey");
for (int i = 0; i < numMessages; i++) {
ClientMessage message = session0.createMessage(true);
message.putIntProperty(propKey, i);
message.getBodyBuffer().writeBytes(bytes);
producer0.send(message);
}
for (int i = 0; i < numMessages; i++) {
ClientMessage message = consumer1.receive(500000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey));
readLargeMessages(message, 10);
message.acknowledge();
}
Assert.assertNull(consumer1.receiveImmediate());
session0.close();
session1.close();
sf0.close();
sf1.close();
closeFields();
if (server0.getConfiguration().isPersistenceEnabled()) {
assertEquals(0, loadQueues(server0).size());
}
long timeTaken = System.currentTimeMillis() - time;
System.out.println(timeTaken + "ms");
}
public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception {
Map<String, Object> server0Params = new HashMap<String, Object>();
server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
@ -161,7 +270,7 @@ public class BridgeTest extends ActiveMQTestBase {
final int messageSize = 1024;
final int numMessages = 10;
final int numMessages = 10000;
ArrayList<String> connectorConfig = new ArrayList<String>();
connectorConfig.add(server1tc.getName());