diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index ed588ef364..6e1ccbdf6b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -147,6 +147,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private String liveNodeID; + private Set lifeCycleListeners; + + // We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called. + private boolean connectionReadyForWrites; + + private final Object connectionReadyLock = new Object(); + public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, final TransportConfiguration connectorConfig, final long callTimeout, @@ -214,6 +221,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); + lifeCycleListeners = new HashSet(); + + connectionReadyForWrites = true; } public void disableFinalizeCheck() { @@ -225,6 +235,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C return newFailoverLock; } + @Override + public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { + synchronized (connectionReadyLock) { + lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(), connectionReadyForWrites); + lifeCycleListeners.add(lifeCycleListener); + } + } + public void connect(final int initialConnectAttempts, final boolean failoverOnInitialConnection) throws ActiveMQException { // Get the connection @@ -356,6 +374,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } public void connectionReadyForWrites(final Object connectionID, final boolean ready) { + synchronized (connectionReadyLock) { + connectionReadyForWrites = ready; + for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) { + lifeCycleListener.connectionReadyForWrites(connectionID, ready); + } + } } public synchronized int numConnections() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java index ba2dab7694..367161827f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; public interface ClientSessionFactoryInternal extends ClientSessionFactory { @@ -57,4 +58,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory { ConfirmationWindowWarning getConfirmationWindowWarning(); Lock lockFailover(); + + void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 5b028b28f1..acb07256a1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; @@ -631,6 +632,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return sessionFactory.getLiveNodeId(); } + @Override + public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { + sessionFactory.addLifeCycleListener(lifeCycleListener); + } + // ClientSessionInternal implementation // ------------------------------------------------------------ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index 3a8524501c..06d60249d2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; public interface ClientSessionInternal extends ClientSession { @@ -122,4 +123,6 @@ public interface ClientSessionInternal extends ClientSession { boolean isClosing(); String getNodeId(); + + void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java index a4296bce23..2d6f4a4795 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.utils.ConcurrentHashSet; @@ -97,6 +98,11 @@ public class DelegatingSession implements ClientSessionInternal { session.acknowledge(consumer, message); } + @Override + public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { + session.addLifeCycleListener(lifeCycleListener); + } + public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException { session.individualAcknowledge(consumer, message); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 7dad1433d4..decac8953e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -924,6 +924,7 @@ public class NettyConnector extends AbstractConnector { } public void connectionReadyForWrites(Object connectionID, boolean ready) { + listener.connectionReadyForWrites(connectionID, ready); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index eb99fcc99d..a8b048c98e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -57,6 +58,8 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.TypedProperties; @@ -66,7 +69,7 @@ import org.apache.activemq.artemis.utils.UUID; * A Core BridgeImpl */ -public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler { +public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ConnectionLifeCycleListener { // Constants ----------------------------------------------------- private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -132,6 +135,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private volatile ClientProducer producer; + private volatile boolean connectionWritable = false; + private volatile boolean started; private volatile boolean stopping = false; @@ -481,7 +486,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } synchronized (this) { - if (!active) { + if (!active || !connectionWritable) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref); } @@ -532,6 +537,29 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override + public void connectionCreated(ActiveMQComponent component, Connection connection, String protocol) { + + } + + @Override + public void connectionDestroyed(Object connectionID) { + + } + + @Override + public void connectionException(Object connectionID, ActiveMQException me) { + + } + + @Override + public void connectionReadyForWrites(Object connectionID, boolean ready) { + connectionWritable = ready; + if (connectionWritable) { + queue.deliverAsync(); + } + } + // FailureListener implementation -------------------------------- public void proceedDeliver(MessageReference ref) { @@ -840,6 +868,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled session.setSendAcknowledgementHandler(BridgeImpl.this); + session.addLifeCycleListener(BridgeImpl.this); + afterConnect(); active = true; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index f7592be4be..37338aab83 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -137,6 +137,115 @@ public class BridgeTest extends ActiveMQTestBase { internaltestSimpleBridge(true, true); } + @Test + public void testLargeMessageBridge() throws Exception { + long time = System.currentTimeMillis(); + Map server0Params = new HashMap(); + server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params); + + Map server1Params = new HashMap(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + + // Map connectors = new HashMap(); + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params); + + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + + HashMap connectors = new HashMap(); + connectors.put(server1tc.getName(), server1tc); + server0.getConfiguration().setConnectorConfigurations(connectors); + +// final int messageSize = 1024 * 1024 * 5; + final int messageSize = 1024 * 10; + + final int numMessages = 100000; + + ArrayList connectorConfig = new ArrayList(); + connectorConfig.add(server1tc.getName()); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig); + + List bridgeConfigs = new ArrayList(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0); + List queueConfigs0 = new ArrayList(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1); + List queueConfigs1 = new ArrayList(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigurations(queueConfigs1); + + server1.start(); + server0.start(); + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc)); + + ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc)); + + ClientSession session0 = sf0.createSession(false, true, true); + + ClientSession session1 = sf1.createSession(false, true, true); + + ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress)); + + ClientConsumer consumer1 = session1.createConsumer(queueName1); + + session1.start(); + + final byte[] bytes = new byte[messageSize]; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session0.createMessage(true); + + message.putIntProperty(propKey, i); + + message.getBodyBuffer().writeBytes(bytes); + + producer0.send(message); + } + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = consumer1.receive(500000); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + readLargeMessages(message, 10); + + message.acknowledge(); + } + + Assert.assertNull(consumer1.receiveImmediate()); + + session0.close(); + + session1.close(); + + sf0.close(); + + sf1.close(); + + closeFields(); + if (server0.getConfiguration().isPersistenceEnabled()) { + assertEquals(0, loadQueues(server0).size()); + } + long timeTaken = System.currentTimeMillis() - time; + System.out.println(timeTaken + "ms"); + } + + public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception { Map server0Params = new HashMap(); server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params); @@ -161,7 +270,7 @@ public class BridgeTest extends ActiveMQTestBase { final int messageSize = 1024; - final int numMessages = 10; + final int numMessages = 10000; ArrayList connectorConfig = new ArrayList(); connectorConfig.add(server1tc.getName());