mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-03-06 09:20:07 +00:00
merge #200 - ARTEMIS-262 Fix Bridge OOM exception
This commit is contained in:
commit
b0b567bc83
@ -147,6 +147,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||
|
||||
private String liveNodeID;
|
||||
|
||||
private Set<ConnectionLifeCycleListener> lifeCycleListeners;
|
||||
|
||||
// We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called.
|
||||
private boolean connectionReadyForWrites;
|
||||
|
||||
private final Object connectionReadyLock = new Object();
|
||||
|
||||
public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
|
||||
final TransportConfiguration connectorConfig,
|
||||
final long callTimeout,
|
||||
@ -214,6 +221,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||
|
||||
confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
|
||||
|
||||
lifeCycleListeners = new HashSet<ConnectionLifeCycleListener>();
|
||||
|
||||
connectionReadyForWrites = true;
|
||||
}
|
||||
|
||||
public void disableFinalizeCheck() {
|
||||
@ -225,6 +235,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||
return newFailoverLock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
|
||||
synchronized (connectionReadyLock) {
|
||||
lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(), connectionReadyForWrites);
|
||||
lifeCycleListeners.add(lifeCycleListener);
|
||||
}
|
||||
}
|
||||
|
||||
public void connect(final int initialConnectAttempts,
|
||||
final boolean failoverOnInitialConnection) throws ActiveMQException {
|
||||
// Get the connection
|
||||
@ -356,6 +374,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||
}
|
||||
|
||||
public void connectionReadyForWrites(final Object connectionID, final boolean ready) {
|
||||
synchronized (connectionReadyLock) {
|
||||
connectionReadyForWrites = ready;
|
||||
for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) {
|
||||
lifeCycleListener.connectionReadyForWrites(connectionID, ready);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized int numConnections() {
|
||||
|
@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
||||
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
|
||||
|
||||
public interface ClientSessionFactoryInternal extends ClientSessionFactory {
|
||||
@ -57,4 +58,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
|
||||
ConfirmationWindowWarning getConfirmationWindowWarning();
|
||||
|
||||
Lock lockFailover();
|
||||
|
||||
void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
|
||||
}
|
||||
|
@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
|
||||
@ -631,6 +632,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||
return sessionFactory.getLiveNodeId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
|
||||
sessionFactory.addLifeCycleListener(lifeCycleListener);
|
||||
}
|
||||
|
||||
// ClientSessionInternal implementation
|
||||
// ------------------------------------------------------------
|
||||
|
||||
|
@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
||||
|
||||
public interface ClientSessionInternal extends ClientSession {
|
||||
@ -122,4 +123,6 @@ public interface ClientSessionInternal extends ClientSession {
|
||||
boolean isClosing();
|
||||
|
||||
String getNodeId();
|
||||
|
||||
void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
|
||||
@ -97,6 +98,11 @@ public class DelegatingSession implements ClientSessionInternal {
|
||||
session.acknowledge(consumer, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
|
||||
session.addLifeCycleListener(lifeCycleListener);
|
||||
}
|
||||
|
||||
public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {
|
||||
session.individualAcknowledge(consumer, message);
|
||||
}
|
||||
|
@ -924,6 +924,7 @@ public class NettyConnector extends AbstractConnector {
|
||||
}
|
||||
|
||||
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
||||
listener.connectionReadyForWrites(connectionID, ready);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
@ -57,6 +58,8 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationService;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||
@ -66,7 +69,7 @@ import org.apache.activemq.artemis.utils.UUID;
|
||||
* A Core BridgeImpl
|
||||
*/
|
||||
|
||||
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler {
|
||||
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ConnectionLifeCycleListener {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
|
||||
@ -132,6 +135,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||
|
||||
private volatile ClientProducer producer;
|
||||
|
||||
private volatile boolean connectionWritable = false;
|
||||
|
||||
private volatile boolean started;
|
||||
|
||||
private volatile boolean stopping = false;
|
||||
@ -481,7 +486,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if (!active) {
|
||||
if (!active || !connectionWritable) {
|
||||
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref);
|
||||
}
|
||||
@ -532,6 +537,29 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionCreated(ActiveMQComponent component, Connection connection, String protocol) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(Object connectionID) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionException(Object connectionID, ActiveMQException me) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
||||
connectionWritable = ready;
|
||||
if (connectionWritable) {
|
||||
queue.deliverAsync();
|
||||
}
|
||||
}
|
||||
|
||||
// FailureListener implementation --------------------------------
|
||||
|
||||
public void proceedDeliver(MessageReference ref) {
|
||||
@ -840,6 +868,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||
|
||||
session.setSendAcknowledgementHandler(BridgeImpl.this);
|
||||
|
||||
session.addLifeCycleListener(BridgeImpl.this);
|
||||
|
||||
afterConnect();
|
||||
|
||||
active = true;
|
||||
|
@ -137,6 +137,115 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||
internaltestSimpleBridge(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeMessageBridge() throws Exception {
|
||||
long time = System.currentTimeMillis();
|
||||
Map<String, Object> server0Params = new HashMap<String, Object>();
|
||||
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
|
||||
|
||||
Map<String, Object> server1Params = new HashMap<String, Object>();
|
||||
addTargetParameters(server1Params);
|
||||
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
|
||||
|
||||
final String testAddress = "testAddress";
|
||||
final String queueName0 = "queue0";
|
||||
final String forwardAddress = "forwardAddress";
|
||||
final String queueName1 = "queue1";
|
||||
|
||||
// Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
|
||||
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
|
||||
|
||||
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
|
||||
|
||||
HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
|
||||
connectors.put(server1tc.getName(), server1tc);
|
||||
server0.getConfiguration().setConnectorConfigurations(connectors);
|
||||
|
||||
// final int messageSize = 1024 * 1024 * 5;
|
||||
final int messageSize = 1024 * 10;
|
||||
|
||||
final int numMessages = 100000;
|
||||
|
||||
ArrayList<String> connectorConfig = new ArrayList<String>();
|
||||
connectorConfig.add(server1tc.getName());
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig);
|
||||
|
||||
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
|
||||
bridgeConfigs.add(bridgeConfiguration);
|
||||
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
|
||||
|
||||
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
|
||||
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
|
||||
queueConfigs0.add(queueConfig0);
|
||||
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
|
||||
|
||||
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
|
||||
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
|
||||
queueConfigs1.add(queueConfig1);
|
||||
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
|
||||
|
||||
server1.start();
|
||||
server0.start();
|
||||
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
|
||||
ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
|
||||
|
||||
ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
|
||||
|
||||
ClientSession session0 = sf0.createSession(false, true, true);
|
||||
|
||||
ClientSession session1 = sf1.createSession(false, true, true);
|
||||
|
||||
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
|
||||
|
||||
ClientConsumer consumer1 = session1.createConsumer(queueName1);
|
||||
|
||||
session1.start();
|
||||
|
||||
final byte[] bytes = new byte[messageSize];
|
||||
|
||||
final SimpleString propKey = new SimpleString("testkey");
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = session0.createMessage(true);
|
||||
|
||||
message.putIntProperty(propKey, i);
|
||||
|
||||
message.getBodyBuffer().writeBytes(bytes);
|
||||
|
||||
producer0.send(message);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer1.receive(500000);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey));
|
||||
|
||||
readLargeMessages(message, 10);
|
||||
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer1.receiveImmediate());
|
||||
|
||||
session0.close();
|
||||
|
||||
session1.close();
|
||||
|
||||
sf0.close();
|
||||
|
||||
sf1.close();
|
||||
|
||||
closeFields();
|
||||
if (server0.getConfiguration().isPersistenceEnabled()) {
|
||||
assertEquals(0, loadQueues(server0).size());
|
||||
}
|
||||
long timeTaken = System.currentTimeMillis() - time;
|
||||
System.out.println(timeTaken + "ms");
|
||||
}
|
||||
|
||||
|
||||
public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception {
|
||||
Map<String, Object> server0Params = new HashMap<String, Object>();
|
||||
server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
|
||||
@ -161,7 +270,7 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||
|
||||
final int messageSize = 1024;
|
||||
|
||||
final int numMessages = 10;
|
||||
final int numMessages = 10000;
|
||||
|
||||
ArrayList<String> connectorConfig = new ArrayList<String>();
|
||||
connectorConfig.add(server1tc.getName());
|
||||
|
Loading…
x
Reference in New Issue
Block a user