ARTEMIS-437 Large Message send should be interrupted during failover
This commit is contained in:
parent
212c16867c
commit
26fe21baa4
|
@ -26,4 +26,8 @@ public final class ActiveMQInterruptedException extends RuntimeException {
|
||||||
public ActiveMQInterruptedException(Throwable cause) {
|
public ActiveMQInterruptedException(Throwable cause) {
|
||||||
super(cause);
|
super(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ActiveMQInterruptedException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterceptorRejectedPacketException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInterceptorRejectedPacketException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageException;
|
import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
|
||||||
|
@ -230,4 +231,7 @@ public interface ActiveMQClientMessageBundle {
|
||||||
|
|
||||||
@Message(id = 119061, value = "Cannot send a packet while channel is failing over.")
|
@Message(id = 119061, value = "Cannot send a packet while channel is failing over.")
|
||||||
IllegalStateException cannotSendPacketDuringFailover();
|
IllegalStateException cannotSendPacketDuringFailover();
|
||||||
|
|
||||||
|
@Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
|
||||||
|
ActiveMQInterruptedException packetTransmissionInterrupted();
|
||||||
}
|
}
|
||||||
|
|
|
@ -171,7 +171,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
|
||||||
static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
|
static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void acquireCredits(int credits) throws InterruptedException {
|
public void acquireCredits(int credits) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||||
|
|
||||||
public interface ClientProducerCredits {
|
public interface ClientProducerCredits {
|
||||||
|
|
||||||
void acquireCredits(int credits) throws InterruptedException, ActiveMQException;
|
void acquireCredits(int credits) throws ActiveMQException;
|
||||||
|
|
||||||
void receiveCredits(int credits);
|
void receiveCredits(int credits);
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.artemis.core.client.impl;
|
package org.apache.activemq.artemis.core.client.impl;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||||
|
@ -75,7 +76,7 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void acquireCredits(final int credits) throws InterruptedException, ActiveMQException {
|
public void acquireCredits(final int credits) throws ActiveMQException {
|
||||||
checkCredits(credits);
|
checkCredits(credits);
|
||||||
|
|
||||||
boolean tryAcquire;
|
boolean tryAcquire;
|
||||||
|
@ -94,6 +95,10 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
|
||||||
ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
|
ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (InterruptedException interrupted) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new ActiveMQInterruptedException(interrupted);
|
||||||
|
}
|
||||||
finally {
|
finally {
|
||||||
this.blocked = false;
|
this.blocked = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,14 +23,12 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.message.BodyEncoder;
|
import org.apache.activemq.artemis.core.message.BodyEncoder;
|
||||||
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
|
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||||
import org.apache.activemq.artemis.utils.DeflaterReader;
|
import org.apache.activemq.artemis.utils.DeflaterReader;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
|
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
|
||||||
|
@ -286,20 +284,15 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||||
final boolean sendBlocking,
|
final boolean sendBlocking,
|
||||||
final ClientProducerCredits theCredits,
|
final ClientProducerCredits theCredits,
|
||||||
final SendAcknowledgementHandler handler) throws ActiveMQException {
|
final SendAcknowledgementHandler handler) throws ActiveMQException {
|
||||||
try {
|
// This will block if credits are not available
|
||||||
// This will block if credits are not available
|
|
||||||
|
|
||||||
// Note, that for a large message, the encode size only includes the properties + headers
|
// Note, that for a large message, the encode size only includes the properties + headers
|
||||||
// Not the continuations, but this is ok since we are only interested in limiting the amount of
|
// Not the continuations, but this is ok since we are only interested in limiting the amount of
|
||||||
// data in *memory* and continuations go straight to the disk
|
// data in *memory* and continuations go straight to the disk
|
||||||
|
|
||||||
int creditSize = sessionContext.getCreditsOnSendingFull(msgI);
|
int creditSize = sessionContext.getCreditsOnSendingFull(msgI);
|
||||||
|
|
||||||
theCredits.acquireCredits(creditSize);
|
theCredits.acquireCredits(creditSize);
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
throw new ActiveMQInterruptedException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
|
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
|
||||||
}
|
}
|
||||||
|
@ -352,12 +345,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||||
// On the case of large messages we tried to send credits before but we would starve otherwise
|
// On the case of large messages we tried to send credits before but we would starve otherwise
|
||||||
// we may find a way to improve the logic and always acquire the credits before
|
// we may find a way to improve the logic and always acquire the credits before
|
||||||
// but that's the way it's been tested and been working ATM
|
// but that's the way it's been tested and been working ATM
|
||||||
try {
|
credits.acquireCredits(creditsUsed);
|
||||||
credits.acquireCredits(creditsUsed);
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
throw new ActiveMQInterruptedException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -379,6 +367,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||||
|
|
||||||
final long bodySize = context.getLargeBodySize();
|
final long bodySize = context.getLargeBodySize();
|
||||||
|
|
||||||
|
final int reconnectID = sessionContext.getReconnectID();
|
||||||
|
|
||||||
context.open();
|
context.open();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -396,14 +386,9 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||||
lastChunk = pos >= bodySize;
|
lastChunk = pos >= bodySize;
|
||||||
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
|
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
|
||||||
|
|
||||||
int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
|
int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), reconnectID, messageHandler);
|
||||||
|
|
||||||
try {
|
credits.acquireCredits(creditsUsed);
|
||||||
credits.acquireCredits(creditsUsed);
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
throw new ActiveMQInterruptedException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -457,6 +442,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||||
|
|
||||||
boolean headerSent = false;
|
boolean headerSent = false;
|
||||||
|
|
||||||
|
|
||||||
|
int reconnectID = sessionContext.getReconnectID();
|
||||||
while (!lastPacket) {
|
while (!lastPacket) {
|
||||||
byte[] buff = new byte[minLargeMessageSize];
|
byte[] buff = new byte[minLargeMessageSize];
|
||||||
|
|
||||||
|
@ -485,8 +472,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||||
|
|
||||||
totalSize += pos;
|
totalSize += pos;
|
||||||
|
|
||||||
final SessionSendContinuationMessage chunk;
|
|
||||||
|
|
||||||
if (lastPacket) {
|
if (lastPacket) {
|
||||||
if (!session.isCompressLargeMessages()) {
|
if (!session.isCompressLargeMessages()) {
|
||||||
messageSize.set(totalSize);
|
messageSize.set(totalSize);
|
||||||
|
@ -514,13 +499,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||||
headerSent = true;
|
headerSent = true;
|
||||||
sendInitialLargeMessageHeader(msgI, credits);
|
sendInitialLargeMessageHeader(msgI, credits);
|
||||||
}
|
}
|
||||||
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, handler);
|
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
|
||||||
try {
|
credits.acquireCredits(creditsSent);
|
||||||
credits.acquireCredits(creditsSent);
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
throw new ActiveMQInterruptedException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -529,13 +509,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||||
sendInitialLargeMessageHeader(msgI, credits);
|
sendInitialLargeMessageHeader(msgI, credits);
|
||||||
}
|
}
|
||||||
|
|
||||||
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, handler);
|
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
|
||||||
try {
|
credits.acquireCredits(creditsSent);
|
||||||
credits.acquireCredits(creditsSent);
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
throw new ActiveMQInterruptedException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||||
|
@ -336,7 +337,19 @@ public class LargeMessageControllerImpl implements LargeMessageController {
|
||||||
// once the exception is set, the controller is pretty much useless
|
// once the exception is set, the controller is pretty much useless
|
||||||
if (handledException != null) {
|
if (handledException != null) {
|
||||||
if (handledException instanceof ActiveMQException) {
|
if (handledException instanceof ActiveMQException) {
|
||||||
throw (ActiveMQException) handledException;
|
ActiveMQException nestedException;
|
||||||
|
|
||||||
|
// This is just to be user friendly and give the user a proper exception trace,
|
||||||
|
// instead to just where it was canceled.
|
||||||
|
if (handledException instanceof ActiveMQLargeMessageInterruptedException) {
|
||||||
|
nestedException = new ActiveMQLargeMessageInterruptedException(handledException.getMessage());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
nestedException = new ActiveMQException(((ActiveMQException) handledException).getType(), handledException.getMessage());
|
||||||
|
}
|
||||||
|
nestedException.initCause(handledException);
|
||||||
|
|
||||||
|
throw nestedException;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
throw new ActiveMQException(ActiveMQExceptionType.LARGE_MESSAGE_ERROR_BODY, "Error on saving LargeMessageBufferImpl", handledException);
|
throw new ActiveMQException(ActiveMQExceptionType.LARGE_MESSAGE_ERROR_BODY, "Error on saving LargeMessageBufferImpl", handledException);
|
||||||
|
|
|
@ -39,6 +39,13 @@ public interface Channel {
|
||||||
*/
|
*/
|
||||||
long getID();
|
long getID();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This number increases every time the channel reconnects succesfully.
|
||||||
|
* This is used to guarantee the integrity of the channel on sequential commands such as large messages.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int getReconnectID();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For protocol check
|
* For protocol check
|
||||||
*/
|
*/
|
||||||
|
@ -53,6 +60,15 @@ public interface Channel {
|
||||||
*/
|
*/
|
||||||
boolean send(Packet packet);
|
boolean send(Packet packet);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a packet on this channel.
|
||||||
|
*
|
||||||
|
* @param packet the packet to send
|
||||||
|
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
|
||||||
|
* successful
|
||||||
|
*/
|
||||||
|
boolean send(Packet packet, final int reconnectID);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a packet on this channel using batching algorithm if appropriate
|
* Sends a packet on this channel using batching algorithm if appropriate
|
||||||
*
|
*
|
||||||
|
@ -82,6 +98,17 @@ public interface Channel {
|
||||||
*/
|
*/
|
||||||
Packet sendBlocking(Packet packet, byte expectedPacket) throws ActiveMQException;
|
Packet sendBlocking(Packet packet, byte expectedPacket) throws ActiveMQException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a packet on this channel and then blocks until a response is received or a timeout
|
||||||
|
* occurs.
|
||||||
|
*
|
||||||
|
* @param packet the packet to send
|
||||||
|
* @param expectedPacket the packet being expected.
|
||||||
|
* @return the response
|
||||||
|
* @throws ActiveMQException if an error occurs during the send
|
||||||
|
*/
|
||||||
|
Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket) throws ActiveMQException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should
|
* Sets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should
|
||||||
* forward received packets to.
|
* forward received packets to.
|
||||||
|
|
|
@ -154,6 +154,10 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getReconnectID() {
|
||||||
|
return sessionChannel.getReconnectID();
|
||||||
|
}
|
||||||
|
|
||||||
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
|
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
|
||||||
@Override
|
@Override
|
||||||
public void commandConfirmed(final Packet packet) {
|
public void commandConfirmed(final Packet packet) {
|
||||||
|
@ -413,16 +417,17 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
boolean sendBlocking,
|
boolean sendBlocking,
|
||||||
boolean lastChunk,
|
boolean lastChunk,
|
||||||
byte[] chunk,
|
byte[] chunk,
|
||||||
|
int reconnectID,
|
||||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||||
final boolean requiresResponse = lastChunk && sendBlocking;
|
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||||
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||||
|
|
||||||
if (requiresResponse) {
|
if (requiresResponse) {
|
||||||
// When sending it blocking, only the last chunk will be blocking.
|
// When sending it blocking, only the last chunk will be blocking.
|
||||||
sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
|
sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
sessionChannel.send(chunkPacket);
|
sessionChannel.send(chunkPacket, reconnectID);
|
||||||
}
|
}
|
||||||
|
|
||||||
return chunkPacket.getPacketSize();
|
return chunkPacket.getPacketSize();
|
||||||
|
|
|
@ -83,6 +83,9 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
private volatile long id;
|
private volatile long id;
|
||||||
|
|
||||||
|
/** This is used in */
|
||||||
|
private final AtomicInteger reconnectID = new AtomicInteger(0);
|
||||||
|
|
||||||
private ChannelHandler handler;
|
private ChannelHandler handler;
|
||||||
|
|
||||||
private Packet response;
|
private Packet response;
|
||||||
|
@ -139,6 +142,10 @@ public final class ChannelImpl implements Channel {
|
||||||
this.interceptors = interceptors;
|
this.interceptors = interceptors;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getReconnectID() {
|
||||||
|
return reconnectID.get();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(final byte packetType) {
|
public boolean supports(final byte packetType) {
|
||||||
int version = connection.getClientVersion();
|
int version = connection.getClientVersion();
|
||||||
|
@ -202,17 +209,21 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean sendAndFlush(final Packet packet) {
|
public boolean sendAndFlush(final Packet packet) {
|
||||||
return send(packet, true, false);
|
return send(packet, -1, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean send(final Packet packet) {
|
public boolean send(final Packet packet) {
|
||||||
return send(packet, false, false);
|
return send(packet, -1, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean send(Packet packet, final int reconnectID) {
|
||||||
|
return send(packet, reconnectID, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean sendBatched(final Packet packet) {
|
public boolean sendBatched(final Packet packet) {
|
||||||
return send(packet, false, true);
|
return send(packet, -1, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -221,7 +232,7 @@ public final class ChannelImpl implements Channel {
|
||||||
}
|
}
|
||||||
|
|
||||||
// This must never called by more than one thread concurrently
|
// This must never called by more than one thread concurrently
|
||||||
public boolean send(final Packet packet, final boolean flush, final boolean batch) {
|
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
|
||||||
if (invokeInterceptors(packet, interceptors, connection) != null) {
|
if (invokeInterceptors(packet, interceptors, connection) != null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -271,6 +282,8 @@ public final class ChannelImpl implements Channel {
|
||||||
ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
|
ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkReconnectID(reconnectID);
|
||||||
|
|
||||||
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
|
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
|
||||||
// buffer is full, preventing any incoming buffers being handled and blocking failover
|
// buffer is full, preventing any incoming buffers being handled and blocking failover
|
||||||
connection.getTransportConnection().write(buffer, flush, batch);
|
connection.getTransportConnection().write(buffer, flush, batch);
|
||||||
|
@ -279,13 +292,24 @@ public final class ChannelImpl implements Channel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkReconnectID(int reconnectID) {
|
||||||
|
if (reconnectID >= 0 && reconnectID != this.reconnectID.get()) {
|
||||||
|
throw ActiveMQClientMessageBundle.BUNDLE.packetTransmissionInterrupted();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException {
|
||||||
|
return sendBlocking(packet, -1, expectedPacket);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Due to networking issues or server issues the server may take longer to answer than expected.. the client may timeout the call throwing an exception
|
* Due to networking issues or server issues the server may take longer to answer than expected.. the client may timeout the call throwing an exception
|
||||||
* and the client could eventually retry another call, but the server could then answer a previous command issuing a class-cast-exception.
|
* and the client could eventually retry another call, but the server could then answer a previous command issuing a class-cast-exception.
|
||||||
* The expectedPacket will be used to filter out undesirable packets that would belong to previous calls.
|
* The expectedPacket will be used to filter out undesirable packets that would belong to previous calls.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Packet sendBlocking(final Packet packet, byte expectedPacket) throws ActiveMQException {
|
public Packet sendBlocking(final Packet packet, final int reconnectID, byte expectedPacket) throws ActiveMQException {
|
||||||
String interceptionResult = invokeInterceptors(packet, interceptors, connection);
|
String interceptionResult = invokeInterceptors(packet, interceptors, connection);
|
||||||
|
|
||||||
if (interceptionResult != null) {
|
if (interceptionResult != null) {
|
||||||
|
@ -335,6 +359,8 @@ public final class ChannelImpl implements Channel {
|
||||||
addResendPacket(packet);
|
addResendPacket(packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkReconnectID(reconnectID);
|
||||||
|
|
||||||
connection.getTransportConnection().write(buffer, false, false);
|
connection.getTransportConnection().write(buffer, false, false);
|
||||||
|
|
||||||
long toWait = connection.getBlockingCallTimeout();
|
long toWait = connection.getBlockingCallTimeout();
|
||||||
|
@ -492,6 +518,8 @@ public final class ChannelImpl implements Channel {
|
||||||
public void lock() {
|
public void lock() {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
|
|
||||||
|
reconnectID.incrementAndGet();
|
||||||
|
|
||||||
failingOver = true;
|
failingOver = true;
|
||||||
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
|
@ -61,6 +61,8 @@ public abstract class SessionContext {
|
||||||
|
|
||||||
public abstract void resetName(String name);
|
public abstract void resetName(String name);
|
||||||
|
|
||||||
|
public abstract int getReconnectID();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* it will eather reattach or reconnect, preferably reattaching it.
|
* it will eather reattach or reconnect, preferably reattaching it.
|
||||||
*
|
*
|
||||||
|
@ -145,6 +147,7 @@ public abstract class SessionContext {
|
||||||
boolean sendBlocking,
|
boolean sendBlocking,
|
||||||
boolean lastChunk,
|
boolean lastChunk,
|
||||||
byte[] chunk,
|
byte[] chunk,
|
||||||
|
int reconnectID,
|
||||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
|
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
|
||||||
|
|
||||||
public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);
|
public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);
|
||||||
|
|
|
@ -625,6 +625,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ClientSessionFactory getSessionFactory() {
|
||||||
|
return sessionFactory;
|
||||||
|
}
|
||||||
|
|
||||||
// Private --------------------------------------------------------------------------------------
|
// Private --------------------------------------------------------------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -35,6 +35,7 @@ import javax.jms.Topic;
|
||||||
import javax.jms.TopicPublisher;
|
import javax.jms.TopicPublisher;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
@ -500,6 +501,11 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
||||||
clientProducer.send(address, coreMessage);
|
clientProducer.send(address, coreMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (ActiveMQInterruptedException e) {
|
||||||
|
JMSException jmsException = new JMSException(e.getMessage());
|
||||||
|
jmsException.initCause(e);
|
||||||
|
throw jmsException;
|
||||||
|
}
|
||||||
catch (ActiveMQException e) {
|
catch (ActiveMQException e) {
|
||||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,264 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.byteman;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MapMessage;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
|
||||||
|
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
|
||||||
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
import org.apache.qpid.transport.util.Logger;
|
||||||
|
import org.jboss.byteman.contrib.bmunit.BMRule;
|
||||||
|
import org.jboss.byteman.contrib.bmunit.BMRules;
|
||||||
|
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
|
@RunWith(BMUnitRunner.class)
|
||||||
|
public class LargeMessageOverReplicationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
public static int messageChunkCount = 0;
|
||||||
|
|
||||||
|
private static final ReusableLatch ruleFired = new ReusableLatch(1);
|
||||||
|
private static ActiveMQServer backupServer;
|
||||||
|
private static ActiveMQServer liveServer;
|
||||||
|
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
|
||||||
|
ActiveMQConnection connection;
|
||||||
|
Session session;
|
||||||
|
Queue queue;
|
||||||
|
MessageProducer producer;
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
ruleFired.setCount(1);
|
||||||
|
messageChunkCount = 0;
|
||||||
|
|
||||||
|
TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
|
||||||
|
TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
|
||||||
|
TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
|
||||||
|
TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
|
||||||
|
|
||||||
|
Configuration backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true));
|
||||||
|
|
||||||
|
Configuration liveConfig = createDefaultInVMConfig();
|
||||||
|
|
||||||
|
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
|
||||||
|
|
||||||
|
liveServer = createServer(liveConfig);
|
||||||
|
liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue"));
|
||||||
|
liveServer.start();
|
||||||
|
|
||||||
|
waitForServerToStart(liveServer);
|
||||||
|
|
||||||
|
backupServer = createServer(backupConfig);
|
||||||
|
backupServer.start();
|
||||||
|
|
||||||
|
waitForServerToStart(backupServer);
|
||||||
|
|
||||||
|
|
||||||
|
// Just to make sure the expression worked
|
||||||
|
Assert.assertEquals(10000, factory.getMinLargeMessageSize());
|
||||||
|
Assert.assertEquals(10000, factory.getProducerWindowSize());
|
||||||
|
Assert.assertEquals(100, factory.getRetryInterval());
|
||||||
|
Assert.assertEquals(-1, factory.getReconnectAttempts());
|
||||||
|
Assert.assertTrue(factory.isHA());
|
||||||
|
|
||||||
|
connection = (ActiveMQConnection) factory.createConnection();
|
||||||
|
|
||||||
|
waitForRemoteBackup(connection.getSessionFactory(), 30);
|
||||||
|
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
queue = session.createQueue("jms.queue.Queue");
|
||||||
|
producer = session.createProducer(queue);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stopServers() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (backupServer != null) {
|
||||||
|
backupServer.stop();
|
||||||
|
backupServer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (liveServer != null) {
|
||||||
|
liveServer.stop();
|
||||||
|
liveServer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
backupServer = liveServer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* simple test to induce a potential race condition where the server's acceptors are active, but the server's
|
||||||
|
* state != STARTED
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@BMRules(
|
||||||
|
rules = {@BMRule(
|
||||||
|
name = "InterruptSending",
|
||||||
|
targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext",
|
||||||
|
targetMethod = "sendLargeMessageChunk",
|
||||||
|
targetLocation = "ENTRY",
|
||||||
|
action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkSent();")})
|
||||||
|
public void testSendLargeMessage() throws Exception {
|
||||||
|
|
||||||
|
MapMessage message = createLargeMessage();
|
||||||
|
|
||||||
|
try {
|
||||||
|
producer.send(message);
|
||||||
|
Assert.fail("expected an exception");
|
||||||
|
// session.commit();
|
||||||
|
}
|
||||||
|
catch (JMSException expected) {
|
||||||
|
}
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
producer.send(message);
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
MapMessage messageRec = (MapMessage) consumer.receive(5000);
|
||||||
|
Assert.assertNotNull(messageRec);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@BMRules(
|
||||||
|
rules = {@BMRule(
|
||||||
|
name = "InterruptReceive",
|
||||||
|
targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback",
|
||||||
|
targetMethod = "sendLargeMessageContinuation",
|
||||||
|
targetLocation = "ENTRY",
|
||||||
|
action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkReceived();")})
|
||||||
|
public void testReceiveLargeMessage() throws Exception {
|
||||||
|
|
||||||
|
MapMessage message = createLargeMessage();
|
||||||
|
|
||||||
|
producer.send(message);
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
MapMessage messageRec = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
consumer.receive(5000);
|
||||||
|
Assert.fail("Expected a failure here");
|
||||||
|
}
|
||||||
|
catch (JMSException expected) {
|
||||||
|
}
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
messageRec = (MapMessage) consumer.receive(5000);
|
||||||
|
Assert.assertNotNull(messageRec);
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void messageChunkReceived() {
|
||||||
|
messageChunkCount++;
|
||||||
|
|
||||||
|
if (messageChunkCount == 1000) {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
latch.countDown();
|
||||||
|
liveServer.stop();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.start();
|
||||||
|
try {
|
||||||
|
// just to make sure it's about to be stopped
|
||||||
|
// avoiding bootstrapping the thread as a delay
|
||||||
|
latch.await(1, TimeUnit.MINUTES);
|
||||||
|
}
|
||||||
|
catch (Throwable ignored ) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void messageChunkSent() {
|
||||||
|
messageChunkCount++;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (messageChunkCount == 10) {
|
||||||
|
liveServer.stop(true);
|
||||||
|
|
||||||
|
System.err.println("activating");
|
||||||
|
if (!backupServer.waitForActivation(1, TimeUnit.MINUTES)) {
|
||||||
|
Logger.get(LargeMessageOverReplicationTest.class).warn("Can't failover server");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MapMessage createLargeMessage() throws JMSException {
|
||||||
|
MapMessage message = session.createMapMessage();
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
message.setBytes("test" + i, new byte[1024 * 1024]);
|
||||||
|
}
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -250,6 +250,21 @@ public class BackupSyncDelay implements Interceptor {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReconnectID() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean send(Packet packet, int reconnectID) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket) throws ActiveMQException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void replayCommands(int lastConfirmedCommandID) {
|
public void replayCommands(int lastConfirmedCommandID) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
Loading…
Reference in New Issue