diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java index 9a3ff7fc49..c8fd80b13f 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInterruptedException.java @@ -26,4 +26,8 @@ public final class ActiveMQInterruptedException extends RuntimeException { public ActiveMQInterruptedException(Throwable cause) { super(cause); } + + public ActiveMQInterruptedException(String message) { + super(message); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java index c23e9bbd12..08f51ae356 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQInterceptorRejectedPacketException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageException; import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; @@ -230,4 +231,7 @@ public interface ActiveMQClientMessageBundle { @Message(id = 119061, value = "Cannot send a packet while channel is failing over.") IllegalStateException cannotSendPacketDuringFailover(); + + @Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.") + ActiveMQInterruptedException packetTransmissionInterrupted(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java index 982ce29df9..32ada4ffde 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java @@ -171,7 +171,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl(); @Override - public void acquireCredits(int credits) throws InterruptedException { + public void acquireCredits(int credits) { } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java index 443d7e5254..a97df92ee2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java @@ -21,7 +21,7 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext; public interface ClientProducerCredits { - void acquireCredits(int credits) throws InterruptedException, ActiveMQException; + void acquireCredits(int credits) throws ActiveMQException; void receiveCredits(int credits); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java index f7cf98fbd2..70fda674e6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.client.impl; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; @@ -75,7 +76,7 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits { } @Override - public void acquireCredits(final int credits) throws InterruptedException, ActiveMQException { + public void acquireCredits(final int credits) throws ActiveMQException { checkCredits(credits); boolean tryAcquire; @@ -94,6 +95,10 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits { ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address); } } + catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new ActiveMQInterruptedException(interrupted); + } finally { this.blocked = false; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index b963aac67f..b4aa196082 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -23,14 +23,12 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.message.BodyEncoder; import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.DeflaterReader; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; @@ -286,20 +284,15 @@ public class ClientProducerImpl implements ClientProducerInternal { final boolean sendBlocking, final ClientProducerCredits theCredits, final SendAcknowledgementHandler handler) throws ActiveMQException { - try { - // This will block if credits are not available + // This will block if credits are not available - // Note, that for a large message, the encode size only includes the properties + headers - // Not the continuations, but this is ok since we are only interested in limiting the amount of - // data in *memory* and continuations go straight to the disk + // Note, that for a large message, the encode size only includes the properties + headers + // Not the continuations, but this is ok since we are only interested in limiting the amount of + // data in *memory* and continuations go straight to the disk - int creditSize = sessionContext.getCreditsOnSendingFull(msgI); + int creditSize = sessionContext.getCreditsOnSendingFull(msgI); - theCredits.acquireCredits(creditSize); - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + theCredits.acquireCredits(creditSize); sessionContext.sendFullMessage(msgI, sendBlocking, handler, address); } @@ -352,12 +345,7 @@ public class ClientProducerImpl implements ClientProducerInternal { // On the case of large messages we tried to send credits before but we would starve otherwise // we may find a way to improve the logic and always acquire the credits before // but that's the way it's been tested and been working ATM - try { - credits.acquireCredits(creditsUsed); - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + credits.acquireCredits(creditsUsed); } /** @@ -379,6 +367,8 @@ public class ClientProducerImpl implements ClientProducerInternal { final long bodySize = context.getLargeBodySize(); + final int reconnectID = sessionContext.getReconnectID(); + context.open(); try { @@ -396,14 +386,9 @@ public class ClientProducerImpl implements ClientProducerInternal { lastChunk = pos >= bodySize; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; - int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); + int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), reconnectID, messageHandler); - try { - credits.acquireCredits(creditsUsed); - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + credits.acquireCredits(creditsUsed); } } finally { @@ -457,6 +442,8 @@ public class ClientProducerImpl implements ClientProducerInternal { boolean headerSent = false; + + int reconnectID = sessionContext.getReconnectID(); while (!lastPacket) { byte[] buff = new byte[minLargeMessageSize]; @@ -485,8 +472,6 @@ public class ClientProducerImpl implements ClientProducerInternal { totalSize += pos; - final SessionSendContinuationMessage chunk; - if (lastPacket) { if (!session.isCompressLargeMessages()) { messageSize.set(totalSize); @@ -514,13 +499,8 @@ public class ClientProducerImpl implements ClientProducerInternal { headerSent = true; sendInitialLargeMessageHeader(msgI, credits); } - int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, handler); - try { - credits.acquireCredits(creditsSent); - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler); + credits.acquireCredits(creditsSent); } } else { @@ -529,13 +509,8 @@ public class ClientProducerImpl implements ClientProducerInternal { sendInitialLargeMessageHeader(msgI, credits); } - int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, handler); - try { - credits.acquireCredits(creditsSent); - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler); + credits.acquireCredits(creditsSent); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index fb5b687896..289181e825 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; @@ -336,7 +337,19 @@ public class LargeMessageControllerImpl implements LargeMessageController { // once the exception is set, the controller is pretty much useless if (handledException != null) { if (handledException instanceof ActiveMQException) { - throw (ActiveMQException) handledException; + ActiveMQException nestedException; + + // This is just to be user friendly and give the user a proper exception trace, + // instead to just where it was canceled. + if (handledException instanceof ActiveMQLargeMessageInterruptedException) { + nestedException = new ActiveMQLargeMessageInterruptedException(handledException.getMessage()); + } + else { + nestedException = new ActiveMQException(((ActiveMQException) handledException).getType(), handledException.getMessage()); + } + nestedException.initCause(handledException); + + throw nestedException; } else { throw new ActiveMQException(ActiveMQExceptionType.LARGE_MESSAGE_ERROR_BODY, "Error on saving LargeMessageBufferImpl", handledException); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index fba1a1c3c9..4c59174a20 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -39,6 +39,13 @@ public interface Channel { */ long getID(); + /** + * This number increases every time the channel reconnects succesfully. + * This is used to guarantee the integrity of the channel on sequential commands such as large messages. + * @return + */ + int getReconnectID(); + /** * For protocol check */ @@ -53,6 +60,15 @@ public interface Channel { */ boolean send(Packet packet); + /** + * Sends a packet on this channel. + * + * @param packet the packet to send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean send(Packet packet, final int reconnectID); + /** * Sends a packet on this channel using batching algorithm if appropriate * @@ -82,6 +98,17 @@ public interface Channel { */ Packet sendBlocking(Packet packet, byte expectedPacket) throws ActiveMQException; + /** + * Sends a packet on this channel and then blocks until a response is received or a timeout + * occurs. + * + * @param packet the packet to send + * @param expectedPacket the packet being expected. + * @return the response + * @throws ActiveMQException if an error occurs during the send + */ + Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket) throws ActiveMQException; + /** * Sets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should * forward received packets to. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index f72380288a..ff61c21be1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -154,6 +154,10 @@ public class ActiveMQSessionContext extends SessionContext { } } + public int getReconnectID() { + return sessionChannel.getReconnectID(); + } + private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() { @Override public void commandConfirmed(final Packet packet) { @@ -413,16 +417,17 @@ public class ActiveMQSessionContext extends SessionContext { boolean sendBlocking, boolean lastChunk, byte[] chunk, + int reconnectID, SendAcknowledgementHandler messageHandler) throws ActiveMQException { final boolean requiresResponse = lastChunk && sendBlocking; final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); if (requiresResponse) { // When sending it blocking, only the last chunk will be blocking. - sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE); } else { - sessionChannel.send(chunkPacket); + sessionChannel.send(chunkPacket, reconnectID); } return chunkPacket.getPacketSize(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 4ef61049f9..a7cb659d51 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -83,6 +83,9 @@ public final class ChannelImpl implements Channel { private volatile long id; + /** This is used in */ + private final AtomicInteger reconnectID = new AtomicInteger(0); + private ChannelHandler handler; private Packet response; @@ -139,6 +142,10 @@ public final class ChannelImpl implements Channel { this.interceptors = interceptors; } + public int getReconnectID() { + return reconnectID.get(); + } + @Override public boolean supports(final byte packetType) { int version = connection.getClientVersion(); @@ -202,17 +209,21 @@ public final class ChannelImpl implements Channel { @Override public boolean sendAndFlush(final Packet packet) { - return send(packet, true, false); + return send(packet, -1, true, false); } @Override public boolean send(final Packet packet) { - return send(packet, false, false); + return send(packet, -1, false, false); + } + + public boolean send(Packet packet, final int reconnectID) { + return send(packet, reconnectID, false, false); } @Override public boolean sendBatched(final Packet packet) { - return send(packet, false, true); + return send(packet, -1, false, true); } @Override @@ -221,7 +232,7 @@ public final class ChannelImpl implements Channel { } // This must never called by more than one thread concurrently - public boolean send(final Packet packet, final boolean flush, final boolean batch) { + private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { if (invokeInterceptors(packet, interceptors, connection) != null) { return false; } @@ -271,6 +282,8 @@ public final class ChannelImpl implements Channel { ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id); } + checkReconnectID(reconnectID); + // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover connection.getTransportConnection().write(buffer, flush, batch); @@ -279,13 +292,24 @@ public final class ChannelImpl implements Channel { } } + private void checkReconnectID(int reconnectID) { + if (reconnectID >= 0 && reconnectID != this.reconnectID.get()) { + throw ActiveMQClientMessageBundle.BUNDLE.packetTransmissionInterrupted(); + } + } + + @Override + public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException { + return sendBlocking(packet, -1, expectedPacket); + } + /** * Due to networking issues or server issues the server may take longer to answer than expected.. the client may timeout the call throwing an exception * and the client could eventually retry another call, but the server could then answer a previous command issuing a class-cast-exception. * The expectedPacket will be used to filter out undesirable packets that would belong to previous calls. */ @Override - public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException { + public Packet sendBlocking(final Packet packet, final int reconnectID, byte expectedPacket) throws ActiveMQException { String interceptionResult = invokeInterceptors(packet, interceptors, connection); if (interceptionResult != null) { @@ -335,6 +359,8 @@ public final class ChannelImpl implements Channel { addResendPacket(packet); } + checkReconnectID(reconnectID); + connection.getTransportConnection().write(buffer, false, false); long toWait = connection.getBlockingCallTimeout(); @@ -492,6 +518,8 @@ public final class ChannelImpl implements Channel { public void lock() { lock.lock(); + reconnectID.incrementAndGet(); + failingOver = true; lock.unlock(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index f766e48f00..774dbfe2df 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -61,6 +61,8 @@ public abstract class SessionContext { public abstract void resetName(String name); + public abstract int getReconnectID(); + /** * it will eather reattach or reconnect, preferably reattaching it. * @@ -145,6 +147,7 @@ public abstract class SessionContext { boolean sendBlocking, boolean lastChunk, byte[] chunk, + int reconnectID, SendAcknowledgementHandler messageHandler) throws ActiveMQException; public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index 19e5b677dd..e8122d08f3 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -625,6 +625,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme } } + public ClientSessionFactory getSessionFactory() { + return sessionFactory; + } + // Private -------------------------------------------------------------------------------------- /** diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 9c6c4970fe..0f33c0499c 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -35,6 +35,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -500,6 +501,11 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To clientProducer.send(address, coreMessage); } } + catch (ActiveMQInterruptedException e) { + JMSException jmsException = new JMSException(e.getMessage()); + jmsException.initCause(e); + throw jmsException; + } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java new file mode 100644 index 0000000000..5f4d2e45f4 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOverReplicationTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.qpid.transport.util.Logger; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class LargeMessageOverReplicationTest extends ActiveMQTestBase { + + public static int messageChunkCount = 0; + + private static final ReusableLatch ruleFired = new ReusableLatch(1); + private static ActiveMQServer backupServer; + private static ActiveMQServer liveServer; + + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000"); + ActiveMQConnection connection; + Session session; + Queue queue; + MessageProducer producer; + + + @Before + public void setUp() throws Exception { + super.setUp(); + ruleFired.setCount(1); + messageChunkCount = 0; + + TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0); + TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0); + TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0); + TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0); + + Configuration backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)); + + Configuration liveConfig = createDefaultInVMConfig(); + + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor); + + liveServer = createServer(liveConfig); + liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue")); + liveServer.start(); + + waitForServerToStart(liveServer); + + backupServer = createServer(backupConfig); + backupServer.start(); + + waitForServerToStart(backupServer); + + + // Just to make sure the expression worked + Assert.assertEquals(10000, factory.getMinLargeMessageSize()); + Assert.assertEquals(10000, factory.getProducerWindowSize()); + Assert.assertEquals(100, factory.getRetryInterval()); + Assert.assertEquals(-1, factory.getReconnectAttempts()); + Assert.assertTrue(factory.isHA()); + + connection = (ActiveMQConnection) factory.createConnection(); + + waitForRemoteBackup(connection.getSessionFactory(), 30); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + queue = session.createQueue("jms.queue.Queue"); + producer = session.createProducer(queue); + + } + + @After + public void stopServers() throws Exception { + if (connection != null) { + try { + connection.close(); + } + catch (Exception e) { + } + } + if (backupServer != null) { + backupServer.stop(); + backupServer = null; + } + + if (liveServer != null) { + liveServer.stop(); + liveServer = null; + } + + backupServer = liveServer = null; + } + + /* + * simple test to induce a potential race condition where the server's acceptors are active, but the server's + * state != STARTED + */ + @Test + @BMRules( + rules = {@BMRule( + name = "InterruptSending", + targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext", + targetMethod = "sendLargeMessageChunk", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkSent();")}) + public void testSendLargeMessage() throws Exception { + + MapMessage message = createLargeMessage(); + + try { + producer.send(message); + Assert.fail("expected an exception"); + // session.commit(); + } + catch (JMSException expected) { + } + + session.rollback(); + + producer.send(message); + session.commit(); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + MapMessage messageRec = (MapMessage) consumer.receive(5000); + Assert.assertNotNull(messageRec); + + for (int i = 0; i < 10; i++) { + Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length); + } + } + + @Test + @BMRules( + rules = {@BMRule( + name = "InterruptReceive", + targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback", + targetMethod = "sendLargeMessageContinuation", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkReceived();")}) + public void testReceiveLargeMessage() throws Exception { + + MapMessage message = createLargeMessage(); + + producer.send(message); + session.commit(); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + MapMessage messageRec = null; + + try { + consumer.receive(5000); + Assert.fail("Expected a failure here"); + } + catch (JMSException expected) { + } + + session.rollback(); + + messageRec = (MapMessage) consumer.receive(5000); + Assert.assertNotNull(messageRec); + session.commit(); + + for (int i = 0; i < 10; i++) { + Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length); + } + } + + public static void messageChunkReceived() { + messageChunkCount++; + + if (messageChunkCount == 1000) { + final CountDownLatch latch = new CountDownLatch(1); + new Thread() { + public void run() { + try { + latch.countDown(); + liveServer.stop(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + try { + // just to make sure it's about to be stopped + // avoiding bootstrapping the thread as a delay + latch.await(1, TimeUnit.MINUTES); + } + catch (Throwable ignored ) { + } + } + } + + public static void messageChunkSent() { + messageChunkCount++; + + try { + if (messageChunkCount == 10) { + liveServer.stop(true); + + System.err.println("activating"); + if (!backupServer.waitForActivation(1, TimeUnit.MINUTES)) { + Logger.get(LargeMessageOverReplicationTest.class).warn("Can't failover server"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + + private MapMessage createLargeMessage() throws JMSException { + MapMessage message = session.createMapMessage(); + + for (int i = 0; i < 10; i++) { + message.setBytes("test" + i, new byte[1024 * 1024]); + } + return message; + } + +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index 7e04b91419..c379fcd79f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -250,6 +250,21 @@ public class BackupSyncDelay implements Interceptor { } + @Override + public int getReconnectID() { + return 0; + } + + @Override + public boolean send(Packet packet, int reconnectID) { + return false; + } + + @Override + public Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket) throws ActiveMQException { + return null; + } + @Override public void replayCommands(int lastConfirmedCommandID) { throw new UnsupportedOperationException();