diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java index 58c3511f67..a6913879da 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -88,6 +88,11 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll return bufferDelegate.waitCompletion(timeWait); } + @Override + public LargeMessageControllerImpl.LargeData take() throws InterruptedException { + return bufferDelegate.take(); + } + @Override public int capacity() { return -1; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java index 165a4d61b6..0565c83647 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java @@ -61,4 +61,5 @@ public interface LargeMessageController extends ActiveMQBuffer { */ boolean waitCompletion(long timeWait) throws ActiveMQException; + LargeMessageControllerImpl.LargeData take() throws InterruptedException; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index f91878dad2..be2bb788e1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -318,6 +318,11 @@ public class LargeMessageControllerImpl implements LargeMessageController { } + @Override + public LargeData take() throws InterruptedException { + return largeMessageData.take(); + } + /** * @throws ActiveMQException */ @@ -1328,7 +1333,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } - private static class LargeData { + public static class LargeData { final byte[] chunk; final int flowControlSize; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index d10016b88b..ac75becdc8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1763,6 +1763,11 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 222304, value = "Unable to load message from journal", format = Message.Format.MESSAGE_FORMAT) void unableToLoadMessageFromJournal(@Cause Throwable t); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222305, value = "Error federating message {0}.", + format = Message.Format.MESSAGE_FORMAT) + void federationDispatchError(@Cause Throwable e, String message); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java index 0f408a1dd2..c06042770c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -28,12 +28,17 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; +import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData; + public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener { private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class); @@ -174,6 +179,24 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi @Override public void onMessage(ClientMessage clientMessage) { try { + Message message = clientMessage; + if (message instanceof ClientLargeMessageInternal) { + + final StorageManager storageManager = server.getStorageManager(); + LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), message); + + LargeData largeData = null; + do { + // block on reading all pending chunks, ok as we are called from an executor + largeData = ((ClientLargeMessageInternal) clientMessage).getLargeMessageController().take(); + lsm.addBytes(largeData.getChunk()); + } + while (largeData.isContinues()); + + message = lsm.toMessage(); + lsm.releaseResources(true, true); + } + if (server.hasBrokerFederationPlugins()) { try { server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage)); @@ -183,7 +206,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi } } - Message message = transformer == null ? clientMessage : transformer.transform(clientMessage); + message = transformer == null ? message : transformer.transform(message); if (message != null) { server.getPostOffice().route(message, true); } @@ -198,6 +221,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi } } } catch (Exception e) { + ActiveMQServerLogger.LOGGER.federationDispatchError(e, clientMessage.toString()); try { clientSession.rollback(); } catch (ActiveMQException e1) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java index 1b5b814d15..6acbc775e8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java @@ -61,7 +61,7 @@ public class FederatedQueueTest extends FederatedTestBase { @Override protected void configureQueues(ActiveMQServer server) throws Exception { - server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false)); + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false).setDefaultConsumerWindowSize(-1)); createSimpleQueue(server, getName()); } @@ -243,6 +243,33 @@ public class FederatedQueueTest extends FederatedTestBase { } + @Test + public void testWithLargeMessage() throws Exception { + String queueName = getName(); + + FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName); + getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(0).getFederationManager().deploy(); + + ConnectionFactory cf1 = getCF(1); + ConnectionFactory cf0 = getCF(0); + final String payload = new String(new byte[1 * 1024 * 1024]).replace('\0','+'); + try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) { + connection1.start(); + Session session1 = connection1.createSession(); + Queue queue1 = session1.createQueue(queueName); + MessageProducer producer = session1.createProducer(queue1); + producer.send(session1.createTextMessage(payload)); + + connection0.start(); + Session session0 = connection0.createSession(); + Queue queue0 = session0.createQueue(queueName); + MessageConsumer consumer0 = session0.createConsumer(queue0); + + assertNotNull(consumer0.receive(60000)); + } + } + @Test public void testFederatedQueueRemoteConsumeDeployAfterConsumersExist() throws Exception { String queueName = getName();