ARTEMIS-3308 - support federation of large messages
This commit is contained in:
parent
1db3ae1dc0
commit
cf85d35355
|
@ -88,6 +88,11 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
|
||||||
return bufferDelegate.waitCompletion(timeWait);
|
return bufferDelegate.waitCompletion(timeWait);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LargeMessageControllerImpl.LargeData take() throws InterruptedException {
|
||||||
|
return bufferDelegate.take();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int capacity() {
|
public int capacity() {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -61,4 +61,5 @@ public interface LargeMessageController extends ActiveMQBuffer {
|
||||||
*/
|
*/
|
||||||
boolean waitCompletion(long timeWait) throws ActiveMQException;
|
boolean waitCompletion(long timeWait) throws ActiveMQException;
|
||||||
|
|
||||||
|
LargeMessageControllerImpl.LargeData take() throws InterruptedException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -318,6 +318,11 @@ public class LargeMessageControllerImpl implements LargeMessageController {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LargeData take() throws InterruptedException {
|
||||||
|
return largeMessageData.take();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws ActiveMQException
|
* @throws ActiveMQException
|
||||||
*/
|
*/
|
||||||
|
@ -1328,7 +1333,7 @@ public class LargeMessageControllerImpl implements LargeMessageController {
|
||||||
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
|
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class LargeData {
|
public static class LargeData {
|
||||||
|
|
||||||
final byte[] chunk;
|
final byte[] chunk;
|
||||||
final int flowControlSize;
|
final int flowControlSize;
|
||||||
|
|
|
@ -1763,6 +1763,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
@Message(id = 222304, value = "Unable to load message from journal", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 222304, value = "Unable to load message from journal", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void unableToLoadMessageFromJournal(@Cause Throwable t);
|
void unableToLoadMessageFromJournal(@Cause Throwable t);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 222305, value = "Error federating message {0}.",
|
||||||
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void federationDispatchError(@Cause Throwable e, String message);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.ERROR)
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void initializationError(@Cause Throwable e);
|
void initializationError(@Cause Throwable e);
|
||||||
|
|
|
@ -28,12 +28,17 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData;
|
||||||
|
|
||||||
public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {
|
public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class);
|
private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class);
|
||||||
|
@ -174,6 +179,24 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(ClientMessage clientMessage) {
|
public void onMessage(ClientMessage clientMessage) {
|
||||||
try {
|
try {
|
||||||
|
Message message = clientMessage;
|
||||||
|
if (message instanceof ClientLargeMessageInternal) {
|
||||||
|
|
||||||
|
final StorageManager storageManager = server.getStorageManager();
|
||||||
|
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), message);
|
||||||
|
|
||||||
|
LargeData largeData = null;
|
||||||
|
do {
|
||||||
|
// block on reading all pending chunks, ok as we are called from an executor
|
||||||
|
largeData = ((ClientLargeMessageInternal) clientMessage).getLargeMessageController().take();
|
||||||
|
lsm.addBytes(largeData.getChunk());
|
||||||
|
}
|
||||||
|
while (largeData.isContinues());
|
||||||
|
|
||||||
|
message = lsm.toMessage();
|
||||||
|
lsm.releaseResources(true, true);
|
||||||
|
}
|
||||||
|
|
||||||
if (server.hasBrokerFederationPlugins()) {
|
if (server.hasBrokerFederationPlugins()) {
|
||||||
try {
|
try {
|
||||||
server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));
|
server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));
|
||||||
|
@ -183,7 +206,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Message message = transformer == null ? clientMessage : transformer.transform(clientMessage);
|
message = transformer == null ? message : transformer.transform(message);
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
server.getPostOffice().route(message, true);
|
server.getPostOffice().route(message, true);
|
||||||
}
|
}
|
||||||
|
@ -198,6 +221,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.federationDispatchError(e, clientMessage.toString());
|
||||||
try {
|
try {
|
||||||
clientSession.rollback();
|
clientSession.rollback();
|
||||||
} catch (ActiveMQException e1) {
|
} catch (ActiveMQException e1) {
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class FederatedQueueTest extends FederatedTestBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configureQueues(ActiveMQServer server) throws Exception {
|
protected void configureQueues(ActiveMQServer server) throws Exception {
|
||||||
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false).setDefaultConsumerWindowSize(-1));
|
||||||
createSimpleQueue(server, getName());
|
createSimpleQueue(server, getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,6 +243,33 @@ public class FederatedQueueTest extends FederatedTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithLargeMessage() throws Exception {
|
||||||
|
String queueName = getName();
|
||||||
|
|
||||||
|
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
|
||||||
|
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
|
||||||
|
getServer(0).getFederationManager().deploy();
|
||||||
|
|
||||||
|
ConnectionFactory cf1 = getCF(1);
|
||||||
|
ConnectionFactory cf0 = getCF(0);
|
||||||
|
final String payload = new String(new byte[1 * 1024 * 1024]).replace('\0','+');
|
||||||
|
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
|
||||||
|
connection1.start();
|
||||||
|
Session session1 = connection1.createSession();
|
||||||
|
Queue queue1 = session1.createQueue(queueName);
|
||||||
|
MessageProducer producer = session1.createProducer(queue1);
|
||||||
|
producer.send(session1.createTextMessage(payload));
|
||||||
|
|
||||||
|
connection0.start();
|
||||||
|
Session session0 = connection0.createSession();
|
||||||
|
Queue queue0 = session0.createQueue(queueName);
|
||||||
|
MessageConsumer consumer0 = session0.createConsumer(queue0);
|
||||||
|
|
||||||
|
assertNotNull(consumer0.receive(60000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFederatedQueueRemoteConsumeDeployAfterConsumersExist() throws Exception {
|
public void testFederatedQueueRemoteConsumeDeployAfterConsumersExist() throws Exception {
|
||||||
String queueName = getName();
|
String queueName = getName();
|
||||||
|
|
Loading…
Reference in New Issue