ARTEMIS-3308 - support federation of large messages

This commit is contained in:
gtully 2021-10-19 11:19:36 +01:00 committed by Gary Tully
parent 1db3ae1dc0
commit cf85d35355
6 changed files with 70 additions and 3 deletions

View File

@ -88,6 +88,11 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
return bufferDelegate.waitCompletion(timeWait); return bufferDelegate.waitCompletion(timeWait);
} }
@Override
public LargeMessageControllerImpl.LargeData take() throws InterruptedException {
return bufferDelegate.take();
}
@Override @Override
public int capacity() { public int capacity() {
return -1; return -1;

View File

@ -61,4 +61,5 @@ public interface LargeMessageController extends ActiveMQBuffer {
*/ */
boolean waitCompletion(long timeWait) throws ActiveMQException; boolean waitCompletion(long timeWait) throws ActiveMQException;
LargeMessageControllerImpl.LargeData take() throws InterruptedException;
} }

View File

@ -318,6 +318,11 @@ public class LargeMessageControllerImpl implements LargeMessageController {
} }
@Override
public LargeData take() throws InterruptedException {
return largeMessageData.take();
}
/** /**
* @throws ActiveMQException * @throws ActiveMQException
*/ */
@ -1328,7 +1333,7 @@ public class LargeMessageControllerImpl implements LargeMessageController {
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
} }
private static class LargeData { public static class LargeData {
final byte[] chunk; final byte[] chunk;
final int flowControlSize; final int flowControlSize;

View File

@ -1763,6 +1763,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222304, value = "Unable to load message from journal", format = Message.Format.MESSAGE_FORMAT) @Message(id = 222304, value = "Unable to load message from journal", format = Message.Format.MESSAGE_FORMAT)
void unableToLoadMessageFromJournal(@Cause Throwable t); void unableToLoadMessageFromJournal(@Cause Throwable t);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222305, value = "Error federating message {0}.",
format = Message.Format.MESSAGE_FORMAT)
void federationDispatchError(@Cause Throwable e, String message);
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e); void initializationError(@Cause Throwable e);

View File

@ -28,12 +28,17 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData;
public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener { public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {
private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class); private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class);
@ -174,6 +179,24 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
@Override @Override
public void onMessage(ClientMessage clientMessage) { public void onMessage(ClientMessage clientMessage) {
try { try {
Message message = clientMessage;
if (message instanceof ClientLargeMessageInternal) {
final StorageManager storageManager = server.getStorageManager();
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), message);
LargeData largeData = null;
do {
// block on reading all pending chunks, ok as we are called from an executor
largeData = ((ClientLargeMessageInternal) clientMessage).getLargeMessageController().take();
lsm.addBytes(largeData.getChunk());
}
while (largeData.isContinues());
message = lsm.toMessage();
lsm.releaseResources(true, true);
}
if (server.hasBrokerFederationPlugins()) { if (server.hasBrokerFederationPlugins()) {
try { try {
server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage)); server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));
@ -183,7 +206,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
} }
} }
Message message = transformer == null ? clientMessage : transformer.transform(clientMessage); message = transformer == null ? message : transformer.transform(message);
if (message != null) { if (message != null) {
server.getPostOffice().route(message, true); server.getPostOffice().route(message, true);
} }
@ -198,6 +221,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
} }
} }
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationDispatchError(e, clientMessage.toString());
try { try {
clientSession.rollback(); clientSession.rollback();
} catch (ActiveMQException e1) { } catch (ActiveMQException e1) {

View File

@ -61,7 +61,7 @@ public class FederatedQueueTest extends FederatedTestBase {
@Override @Override
protected void configureQueues(ActiveMQServer server) throws Exception { protected void configureQueues(ActiveMQServer server) throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false)); server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false).setDefaultConsumerWindowSize(-1));
createSimpleQueue(server, getName()); createSimpleQueue(server, getName());
} }
@ -243,6 +243,33 @@ public class FederatedQueueTest extends FederatedTestBase {
} }
@Test
public void testWithLargeMessage() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
final String payload = new String(new byte[1 * 1024 * 1024]).replace('\0','+');
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection1.start();
Session session1 = connection1.createSession();
Queue queue1 = session1.createQueue(queueName);
MessageProducer producer = session1.createProducer(queue1);
producer.send(session1.createTextMessage(payload));
connection0.start();
Session session0 = connection0.createSession();
Queue queue0 = session0.createQueue(queueName);
MessageConsumer consumer0 = session0.createConsumer(queue0);
assertNotNull(consumer0.receive(60000));
}
}
@Test @Test
public void testFederatedQueueRemoteConsumeDeployAfterConsumersExist() throws Exception { public void testFederatedQueueRemoteConsumeDeployAfterConsumersExist() throws Exception {
String queueName = getName(); String queueName = getName();