diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java index c164f6cdf2..ad45a5f9e2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java @@ -41,4 +41,13 @@ public interface SendAcknowledgementHandler { * @param message message sent asynchronously */ void sendAcknowledged(Message message); + + default void sendFailed(Message message, Exception e) { + /** + * By default ignore failures to preserve compatibility with existing implementations. + * If the message makes it to the broker and a failure occurs sendAcknowledge() will + * still be invoked just like it always was. + */ + } + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java index bb88e6d22b..e043ac9eb7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java @@ -228,4 +228,7 @@ public interface ActiveMQClientMessageBundle { @Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.") ActiveMQInterruptedException packetTransmissionInterrupted(); + + @Message(id = 119063, value = "Cannot send a packet while response cache is full.") + IllegalStateException cannotSendPacketWhilstResponseCacheFull(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 127a69aecc..56f825959f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -211,6 +211,9 @@ public interface Channel { */ void setCommandConfirmationHandler(CommandConfirmationHandler handler); + void setResponseHandler(ResponseHandler handler); + + /** * flushes any confirmations on to the connection. */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index b6a5d93af5..74d9847ffb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -36,6 +36,11 @@ public interface CoreRemotingConnection extends RemotingConnection { return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION); } + default boolean isVersionBeforeAsyncResponseChange() { + int version = getChannelVersion(); + return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION); + } + /** * Sets the client protocol used on the communication. This will determine if the client has * support for certain packet types diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index 1f40314a6d..b658090be3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -41,6 +41,14 @@ public interface Packet { return INITIAL_PACKET_SIZE; } + boolean isRequiresResponse(); + + boolean isResponseAsync(); + + long getCorrelationID(); + + void setCorrelationID(long correlationID); + /** * Returns the channel id of the channel that should handle this packet. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java new file mode 100644 index 0000000000..f96ef13c10 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core; + +/** + * A ResponseHandler is used by the channel to handle async responses. + */ +public interface ResponseHandler { + + /** + * called by channel after an async response has been received. + * + * @param packet the packet confirmed + */ + void handleResponse(Packet packet, Packet response); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 18227cbd54..7306072ad9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; @@ -99,9 +100,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; @@ -168,7 +171,7 @@ public class ActiveMQSessionContext extends SessionContext { sessionChannel.setHandler(handler); if (confirmationWindow >= 0) { - sessionChannel.setCommandConfirmationHandler(confirmationHandler); + setHandlers(); } } @@ -185,28 +188,58 @@ public class ActiveMQSessionContext extends SessionContext { this.killed = true; } - private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() { + private void setHandlers() { + sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler); + + if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { + sessionChannel.setResponseHandler(responseHandler); + } + } + + private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() { @Override - public void commandConfirmed(final Packet packet) { + public void commandConfirmed(Packet packet) { + responseHandler.handleResponse(packet, null); + } + }; + + private final ResponseHandler responseHandler = new ResponseHandler() { + @Override + public void handleResponse(Packet packet, Packet response) { + final ActiveMQException activeMQException; + if (response != null && response.getType() == PacketImpl.EXCEPTION) { + ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response; + activeMQException = exceptionResponseMessage.getException(); + } else { + activeMQException = null; + } + if (packet.getType() == PacketImpl.SESS_SEND) { SessionSendMessage ssm = (SessionSendMessage) packet; - callSendAck(ssm.getHandler(), ssm.getMessage()); + callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException); } else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) { SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet; if (!scm.isContinues()) { - callSendAck(scm.getHandler(), scm.getMessage()); + callSendAck(scm.getHandler(), scm.getMessage(), activeMQException); } } } - private void callSendAck(SendAcknowledgementHandler handler, final Message message) { + private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) { if (handler != null) { - handler.sendAcknowledged(message); + if (exception == null) { + handler.sendAcknowledged(message); + } else { + handler.sendFailed(message, exception); + } } else if (sendAckHandler != null) { - sendAckHandler.sendAcknowledged(message); + if (exception == null) { + sendAckHandler.sendAcknowledged(message); + } else { + sendAckHandler.sendFailed(message, exception); + } } } - }; // Failover utility methods @@ -243,7 +276,8 @@ public class ActiveMQSessionContext extends SessionContext { @Override public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { - sessionChannel.setCommandConfirmationHandler(confirmationHandler); + setHandlers(); + this.sendAckHandler = handler; } @@ -472,13 +506,15 @@ public class ActiveMQSessionContext extends SessionContext { boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException { - SessionSendMessage packet; + final SessionSendMessage packet; if (sessionChannel.getConnection().isVersionBeforeAddressChange()) { packet = new SessionSendMessage_1X(msgI, sendBlocking, handler); - } else { + } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { packet = new SessionSendMessage(msgI, sendBlocking, handler); + } else { + boolean responseRequired = confirmationWindow != -1 || sendBlocking; + packet = new SessionSendMessage_V2(msgI, responseRequired, handler); } - if (sendBlocking) { sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); } else { @@ -904,7 +940,7 @@ public class ActiveMQSessionContext extends SessionContext { } } - private static int sendSessionSendContinuationMessage(Channel channel, + private int sendSessionSendContinuationMessage(Channel channel, Message msgI, long messageBodySize, boolean sendBlocking, @@ -912,7 +948,12 @@ public class ActiveMQSessionContext extends SessionContext { byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { final boolean requiresResponse = lastChunk && sendBlocking; - final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + final SessionSendContinuationMessage chunkPacket; + if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { + chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + } else { + chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler); + } final int expectedEncodeSize = chunkPacket.expectedEncodeSize(); //perform a weak form of flow control to avoid OOM on tight loops final CoreRemotingConnection connection = channel.getConnection(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 4d73cf82ce..61268d6938 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -96,6 +97,8 @@ public final class ChannelImpl implements Channel { private final java.util.Queue resendCache; + private final ResponseCache responseAsyncCache; + private int firstStoredCommandID; private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1); @@ -138,8 +141,10 @@ public final class ChannelImpl implements Channel { if (confWindowSize != -1) { resendCache = new ConcurrentLinkedQueue<>(); + responseAsyncCache = new ResponseCache(); } else { resendCache = null; + responseAsyncCache = null; } this.interceptors = interceptors; @@ -211,7 +216,11 @@ public final class ChannelImpl implements Channel { lock.lock(); try { - response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause)); + ActiveMQException activeMQException = ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause); + if (responseAsyncCache != null) { + responseAsyncCache.errorAll(activeMQException); + } + response = new ActiveMQExceptionMessage(activeMQException); sendCondition.signal(); } finally { @@ -244,6 +253,10 @@ public final class ChannelImpl implements Channel { this.transferring = transferring; } + protected ResponseCache getCache() { + return responseAsyncCache; + } + /** * @param timeoutMsg message to log on blocking call failover timeout */ @@ -270,6 +283,10 @@ public final class ChannelImpl implements Channel { synchronized (sendLock) { packet.setChannelID(id); + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); + } + if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); } @@ -291,6 +308,7 @@ public final class ChannelImpl implements Channel { if (resendCache != null && packet.isRequiresConfirmations()) { addResendPacket(packet); } + } finally { lock.unlock(); } @@ -301,9 +319,30 @@ public final class ChannelImpl implements Channel { checkReconnectID(reconnectID); + //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, + //As the send could block if the response cache cannot add, preventing responses to be handled. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + while (!responseAsyncCache.add(packet)) { + try { + Thread.sleep(1); + } catch (Exception e) { + // Ignore + } + } + } + // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover - connection.getTransportConnection().write(buffer, flush, batch); + try { + connection.getTransportConnection().write(buffer, flush, batch); + } catch (Throwable t) { + //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. + //The client would get still know about this as the exception bubbles up the call stack instead. + if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { + responseAsyncCache.remove(packet.getCorrelationID()); + } + throw t; + } return true; } } @@ -391,7 +430,7 @@ public final class ChannelImpl implements Channel { throw new ActiveMQInterruptedException(e); } - if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) { + if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) { ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace")); } @@ -477,6 +516,18 @@ public final class ChannelImpl implements Channel { commandConfirmationHandler = handler; } + @Override + public void setResponseHandler(final ResponseHandler responseHandler) { + if (confWindowSize < 0) { + final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information."; + if (logger.isTraceEnabled()) { + logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg); + } + throw new IllegalStateException(msg); + } + responseAsyncCache.setResponseHandler(responseHandler); + } + @Override public void setHandler(final ChannelHandler handler) { if (logger.isTraceEnabled()) { @@ -595,6 +646,12 @@ public final class ChannelImpl implements Channel { } } + public void handleAsyncResponse(Packet packet) { + if (responseAsyncCache != null && packet.isResponseAsync()) { + responseAsyncCache.handleResponse(packet); + } + } + @Override public void confirm(final Packet packet) { if (resendCache != null && packet.isRequiresConfirmations()) { @@ -647,6 +704,7 @@ public final class ChannelImpl implements Channel { if (packet.isResponse()) { confirm(packet); + handleAsyncResponse(packet); lock.lock(); try { @@ -698,6 +756,9 @@ public final class ChannelImpl implements Channel { if (commandConfirmationHandler != null) { commandConfirmationHandler.commandConfirmed(packet); } + if (responseAsyncCache != null) { + responseAsyncCache.handleResponse(packet); + } } firstStoredCommandID += numberToClear; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 5e46848615..9a8166e31c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping; @@ -71,6 +72,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; @@ -81,6 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; @@ -88,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY; @@ -184,13 +188,25 @@ public abstract class PacketDecoder implements Serializable { break; } case EXCEPTION: { - packet = new ActiveMQExceptionMessage(); + if (connection.isVersionBeforeAsyncResponseChange()) { + packet = new ActiveMQExceptionMessage(); + } else { + packet = new ActiveMQExceptionMessage_V2(); + } break; } case PACKETS_CONFIRMED: { packet = new PacketsConfirmedMessage(); break; } + case NULL_RESPONSE: { + if (connection.isVersionBeforeAsyncResponseChange()) { + packet = new NullResponseMessage(); + } else { + packet = new NullResponseMessage_V2(); + } + break; + } case CREATESESSION: { packet = new CreateSessionMessage(); break; @@ -316,7 +332,11 @@ public abstract class PacketDecoder implements Serializable { break; } case SESS_XA_RESP: { - packet = new SessionXAResponseMessage(); + if (connection.isVersionBeforeAsyncResponseChange()) { + packet = new SessionXAResponseMessage(); + } else { + packet = new SessionXAResponseMessage_V2(); + } break; } case SESS_XA_ROLLBACK: { @@ -383,16 +403,16 @@ public abstract class PacketDecoder implements Serializable { packet = new SessionIndividualAcknowledgeMessage(); break; } - case NULL_RESPONSE: { - packet = new NullResponseMessage(); - break; - } case SESS_RECEIVE_CONTINUATION: { packet = new SessionReceiveContinuationMessage(); break; } case SESS_SEND_CONTINUATION: { - packet = new SessionSendContinuationMessage(); + if (connection.isVersionBeforeAsyncResponseChange()) { + packet = new SessionSendContinuationMessage(); + } else { + packet = new SessionSendContinuationMessage_V2(); + } break; } case SESS_PRODUCER_REQUEST_CREDITS: { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 87ba0c3a75..0168a47c66 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -31,7 +31,9 @@ public class PacketImpl implements Packet { // 2.0.0 public static final int ADDRESSING_CHANGE_VERSION = 129; - public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130; + + // 2.7.0 + public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130; public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); @@ -272,6 +274,7 @@ public class PacketImpl implements Packet { public static final byte SESS_BINDINGQUERY_RESP_V4 = -15; + // Static -------------------------------------------------------- public PacketImpl(final byte type) { @@ -428,7 +431,7 @@ public class PacketImpl implements Packet { } protected String getParentString() { - return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName(); + return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName(); } private int stringEncodeSize(final String str) { @@ -439,5 +442,24 @@ public class PacketImpl implements Packet { return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0); } + @Override + public boolean isRequiresResponse() { + return false; + } + + @Override + public boolean isResponseAsync() { + return false; + } + + @Override + public long getCorrelationID() { + return -1; + } + + @Override + public void setCorrelationID(long correlationID) { + } + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java new file mode 100644 index 0000000000..8ee73d7055 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; + +public class ResponseCache { + + private final AtomicLong sequence = new AtomicLong(0); + + private final ConcurrentLongHashMap store; + private ResponseHandler responseHandler; + + public ResponseCache() { + this.store = new ConcurrentLongHashMap<>(); + } + + public long nextCorrelationID() { + return sequence.incrementAndGet(); + } + + public boolean add(Packet packet) { + this.store.put(packet.getCorrelationID(), packet); + return true; + } + + public Packet remove(long correlationID) { + return store.remove(correlationID); + } + + public void handleResponse(Packet response) { + long correlationID = response.getCorrelationID(); + Packet packet = remove(correlationID); + if (packet != null) { + responseHandler.handleResponse(packet, response); + } + } + + public void errorAll(ActiveMQException exception) { + ConcurrentLongHashSet keys = store.keysLongHashSet(); + keys.forEach(correlationID -> { + handleResponse(new ActiveMQExceptionMessage_V2(correlationID, exception)); + }); + } + + public void setResponseHandler(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + public int size() { + return this.store.size(); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java index da34d2e4a9..51637f3a63 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage.java @@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class ActiveMQExceptionMessage extends PacketImpl { - private ActiveMQException exception; + protected ActiveMQException exception; // Static -------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java new file mode 100644 index 0000000000..661a040259 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ActiveMQExceptionMessage_V2.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.utils.DataConstants; + +public class ActiveMQExceptionMessage_V2 extends ActiveMQExceptionMessage { + + private long correlationID; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public ActiveMQExceptionMessage_V2(final long correlationID, final ActiveMQException exception) { + super(exception); + this.correlationID = correlationID; + } + + public ActiveMQExceptionMessage_V2() { + super(); + } + + // Public -------------------------------------------------------- + + @Override + public boolean isResponse() { + return true; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } + } + + @Override + public final boolean isResponseAsync() { + return true; + } + + @Override + public long getCorrelationID() { + return this.correlationID; + } + + @Override + public String toString() { + return getParentString() + ", exception= " + exception + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof ActiveMQExceptionMessage_V2)) { + return false; + } + ActiveMQExceptionMessage_V2 other = (ActiveMQExceptionMessage_V2) obj; + if (correlationID != other.correlationID) { + return false; + } + return true; + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java index a98f888979..8c84a9bc89 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java @@ -65,6 +65,7 @@ public class CreateAddressMessage extends PacketImpl { return address; } + @Override public boolean isRequiresResponse() { return requiresResponse; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java index 2ebf1474aa..985d5f4340 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage.java @@ -100,6 +100,7 @@ public class CreateQueueMessage extends PacketImpl { return temporary; } + @Override public boolean isRequiresResponse() { return requiresResponse; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java index af25ae9f14..3c072e0bfc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java @@ -80,6 +80,7 @@ public class CreateSharedQueueMessage extends PacketImpl { return filterString; } + @Override public boolean isRequiresResponse() { return requiresResponse; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java new file mode 100644 index 0000000000..e3453af1a8 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.utils.DataConstants; + +public class NullResponseMessage_V2 extends NullResponseMessage { + + private long correlationID; + + public NullResponseMessage_V2(final long correlationID) { + super(); + this.correlationID = correlationID; + } + + public NullResponseMessage_V2() { + super(); + } + + // Public -------------------------------------------------------- + + @Override + public long getCorrelationID() { + return correlationID; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } + } + + @Override + public final boolean isResponse() { + return true; + } + + @Override + public final boolean isResponseAsync() { + return true; + } + + @Override + public String toString() { + return getParentString() + ", correlationID=" + correlationID + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof NullResponseMessage_V2)) { + return false; + } + NullResponseMessage_V2 other = (NullResponseMessage_V2) obj; + if (correlationID != other.correlationID) { + return false; + } + return true; + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java index 542c34cc00..67d9f67f39 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionAcknowledgeMessage.java @@ -51,6 +51,7 @@ public class SessionAcknowledgeMessage extends PacketImpl { return messageID; } + @Override public boolean isRequiresResponse() { return requiresResponse; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java index f09beeb25b..e07b50ca2f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java @@ -71,6 +71,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket { return browseOnly; } + @Override public boolean isRequiresResponse() { return requiresResponse; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java index 7d06081d60..3164c23f11 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionIndividualAcknowledgeMessage.java @@ -60,6 +60,7 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl { return messageID; } + @Override public boolean isRequiresResponse() { return requiresResponse; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index 26eedd76fc..4105b11864 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants; */ public class SessionSendContinuationMessage extends SessionContinuationMessage { - private boolean requiresResponse; + protected boolean requiresResponse; // Used on confirmation handling - private Message message; + protected Message message; /** * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession} *
@@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { /** * to be sent on the last package */ - private long messageBodySize = -1; + protected long messageBodySize = -1; // Static -------------------------------------------------------- @@ -54,6 +54,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { handler = null; } + protected SessionSendContinuationMessage(byte type) { + super(type); + handler = null; + } + /** * @param body * @param continues @@ -72,11 +77,31 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { this.messageBodySize = messageBodySize; } + /** + * @param body + * @param continues + * @param requiresResponse + */ + protected SessionSendContinuationMessage(final byte type, + final Message message, + final byte[] body, + final boolean continues, + final boolean requiresResponse, + final long messageBodySize, + SendAcknowledgementHandler handler) { + super(type, body, continues); + this.requiresResponse = requiresResponse; + this.message = message; + this.handler = handler; + this.messageBodySize = messageBodySize; + } + // Public -------------------------------------------------------- /** * @return the requiresResponse */ + @Override public boolean isRequiresResponse() { return requiresResponse; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java new file mode 100644 index 0000000000..2a3071c930 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.utils.DataConstants; + +/** + * A SessionSendContinuationMessage
+ */ +public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage { + + private long correlationID; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public SessionSendContinuationMessage_V2() { + super(); + } + + /** + * @param body + * @param continues + * @param requiresResponse + */ + public SessionSendContinuationMessage_V2(final Message message, + final byte[] body, + final boolean continues, + final boolean requiresResponse, + final long messageBodySize, + SendAcknowledgementHandler handler) { + super(message, body, continues, requiresResponse, messageBodySize, handler); + } + + // Public -------------------------------------------------------- + + @Override + public int expectedEncodeSize() { + return super.expectedEncodeSize() + DataConstants.SIZE_LONG; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } + } + + @Override + public long getCorrelationID() { + return this.correlationID; + } + + @Override + public void setCorrelationID(long correlationID) { + this.correlationID = correlationID; + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", continues=" + continues); + buff.append(", message=" + message); + buff.append(", messageBodySize=" + messageBodySize); + buff.append(", requiresResponse=" + requiresResponse); + buff.append(", correlationID=" + correlationID); + buff.append("]"); + return buff.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionSendContinuationMessage_V2)) + return false; + SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj; + if (correlationID != other.correlationID) + return false; + return true; + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index b56ae30512..e8dbdc12a7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.utils.DataConstants; public class SessionSendMessage extends MessagePacket { @@ -36,6 +37,22 @@ public class SessionSendMessage extends MessagePacket { */ private final transient SendAcknowledgementHandler handler; + /** This will be using the CoreMessage because it is meant for the core-protocol */ + protected SessionSendMessage(final byte id, + final ICoreMessage message, + final boolean requiresResponse, + final SendAcknowledgementHandler handler) { + super(id, message); + this.handler = handler; + this.requiresResponse = requiresResponse; + } + + protected SessionSendMessage(final byte id, + final CoreMessage message) { + super(id, message); + this.handler = null; + } + /** This will be using the CoreMessage because it is meant for the core-protocol */ public SessionSendMessage(final ICoreMessage message, final boolean requiresResponse, @@ -52,6 +69,7 @@ public class SessionSendMessage extends MessagePacket { // Public -------------------------------------------------------- + @Override public boolean isRequiresResponse() { return requiresResponse; } @@ -62,7 +80,7 @@ public class SessionSendMessage extends MessagePacket { @Override public int expectedEncodeSize() { - return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1; + return message.getEncodeSize() + PACKET_HEADERS_SIZE + fieldsEncodeSize(); } @Override @@ -75,13 +93,16 @@ public class SessionSendMessage extends MessagePacket { public void decodeRest(final ActiveMQBuffer buffer) { // Buffer comes in after having read standard headers and positioned at Beginning of body part - ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1); + ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), fieldsEncodeSize()); receiveMessage(messageBuffer); - buffer.readerIndex(buffer.capacity() - 1); + buffer.readerIndex(buffer.capacity() - fieldsEncodeSize()); requiresResponse = buffer.readBoolean(); + } + protected int fieldsEncodeSize() { + return DataConstants.SIZE_BOOLEAN; } protected void receiveMessage(ByteBuf messageBuffer) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java new file mode 100644 index 0000000000..63c9a344f6 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V2.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.utils.DataConstants; + +public class SessionSendMessage_V2 extends SessionSendMessage { + + private long correlationID; + + /** This will be using the CoreMessage because it is meant for the core-protocol */ + public SessionSendMessage_V2(final ICoreMessage message, + final boolean requiresResponse, + final SendAcknowledgementHandler handler) { + super(SESS_SEND, message, requiresResponse, handler); + } + + public SessionSendMessage_V2(final CoreMessage message) { + super(SESS_SEND, message); + } + + @Override + public void encodeRest(ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + correlationID = buffer.readLong(); + } + + @Override + protected int fieldsEncodeSize() { + return super.fieldsEncodeSize() + DataConstants.SIZE_LONG; + } + + @Override + public long getCorrelationID() { + return this.correlationID; + } + + @Override + public void setCorrelationID(long correlationID) { + this.correlationID = correlationID; + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", correlationID=" + correlationID); + buff.append(", requiresResponse=" + super.isRequiresResponse()); + buff.append("]"); + return buff.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionSendMessage_V2)) + return false; + SessionSendMessage_V2 other = (SessionSendMessage_V2) obj; + if (correlationID != other.correlationID) + return false; + return true; + } + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java index 086b8511a6..f88e0c8055 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage.java @@ -21,11 +21,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class SessionXAResponseMessage extends PacketImpl { - private boolean error; + protected boolean error; - private int responseCode; + protected int responseCode; - private String message; + protected String message; public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) { super(SESS_XA_RESP); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java new file mode 100644 index 0000000000..4e949bd405 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.utils.DataConstants; + +public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage { + + private long correlationID; + + public SessionXAResponseMessage_V2(final long correlationID, final boolean isError, final int responseCode, final String message) { + super(isError, responseCode, message); + this.correlationID = correlationID; + } + + public SessionXAResponseMessage_V2() { + super(); + } + + // Public -------------------------------------------------------- + + @Override + public long getCorrelationID() { + return correlationID; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } + } + + @Override + public final boolean isResponse() { + return true; + } + + @Override + public final boolean isResponseAsync() { + return true; + } + + @Override + public String toString() { + StringBuffer buff = new StringBuffer(getParentString()); + buff.append(", error=" + error); + buff.append(", message=" + message); + buff.append(", responseCode=" + responseCode); + buff.append(", correlationID=" + correlationID); + buff.append("]"); + return buff.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof SessionXAResponseMessage_V2)) { + return false; + } + SessionXAResponseMessage_V2 other = (SessionXAResponseMessage_V2) obj; + if (correlationID != other.correlationID) { + return false; + } + return true; + } +} diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties index a39b4220eb..ff65ff998c 100644 --- a/artemis-core-client/src/main/resources/activemq-version.properties +++ b/artemis-core-client/src/main/resources/activemq-version.properties @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion} activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129 +activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130 diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java new file mode 100644 index 0000000000..416c911f49 --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -0,0 +1,512 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl; + +import javax.security.auth.Subject; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; +import org.apache.activemq.artemis.core.remoting.CloseListener; +import org.apache.activemq.artemis.core.remoting.FailureListener; +import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ChannelImplTest { + + ChannelImpl channel; + + @Before + public void setUp() { + channel = new ChannelImpl(new CoreRR(), 1, 4000, null); + } + + @Test + public void testCorrelation() { + + AtomicInteger handleResponseCount = new AtomicInteger(); + + RequestPacket requestPacket = new RequestPacket((byte) 1); + setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet()); + + channel.send(requestPacket); + + assertEquals(1, channel.getCache().size()); + + ResponsePacket responsePacket = new ResponsePacket((byte) 1); + responsePacket.setCorrelationID(requestPacket.getCorrelationID()); + + channel.handlePacket(responsePacket); + + assertEquals(1, handleResponseCount.get()); + assertEquals(0, channel.getCache().size()); + } + + private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) { + channel.setResponseHandler(responseHandler); + channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler)); + } + + private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) { + return new CommandConfirmationHandler() { + @Override + public void commandConfirmed(Packet packet) { + responseHandler.handleResponse(packet, null); + } + }; + } + + @Test + public void testPacketsConfirmedMessage() { + + AtomicInteger handleResponseCount = new AtomicInteger(); + + RequestPacket requestPacket = new RequestPacket((byte) 1); + setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet()); + + channel.send(requestPacket); + + PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2); + + channel.handlePacket(responsePacket); + + assertEquals(0, channel.getCache().size()); + } + + class RequestPacket extends PacketImpl { + + private long id; + + RequestPacket(byte type) { + super(type); + } + + @Override + public boolean isRequiresResponse() { + return true; + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public long getCorrelationID() { + return id; + } + + @Override + public void setCorrelationID(long id) { + this.id = id; + } + + @Override + public int getPacketSize() { + return 0; + } + } + + class ResponsePacket extends PacketImpl { + + private long id; + + ResponsePacket(byte type) { + super(type); + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public boolean isResponse() { + return true; + } + + @Override + public long getCorrelationID() { + return id; + } + + @Override + public void setCorrelationID(long id) { + this.id = id; + } + + @Override + public int getPacketSize() { + return 0; + } + } + + class CoreRR implements CoreRemotingConnection { + + @Override + public int getChannelVersion() { + return 0; + } + + @Override + public void setChannelVersion(int clientVersion) { + + } + + @Override + public Channel getChannel(long channelID, int confWindowSize) { + return null; + } + + @Override + public void putChannel(long channelID, Channel channel) { + + } + + @Override + public boolean removeChannel(long channelID) { + return false; + } + + @Override + public long generateChannelID() { + return 0; + } + + @Override + public void syncIDGeneratorSequence(long id) { + + } + + @Override + public long getIDGeneratorSequence() { + return 0; + } + + @Override + public long getBlockingCallTimeout() { + return 0; + } + + @Override + public long getBlockingCallFailoverTimeout() { + return 0; + } + + @Override + public Object getTransferLock() { + return null; + } + + @Override + public ActiveMQPrincipal getDefaultActiveMQPrincipal() { + return null; + } + + @Override + public boolean blockUntilWritable(int size, long timeout) { + return false; + } + + @Override + public Object getID() { + return null; + } + + @Override + public long getCreationTime() { + return 0; + } + + @Override + public String getRemoteAddress() { + return null; + } + + @Override + public void scheduledFlush() { + + } + + @Override + public void addFailureListener(FailureListener listener) { + + } + + @Override + public boolean removeFailureListener(FailureListener listener) { + return false; + } + + @Override + public void addCloseListener(CloseListener listener) { + + } + + @Override + public boolean removeCloseListener(CloseListener listener) { + return false; + } + + @Override + public List removeCloseListeners() { + return null; + } + + @Override + public void setCloseListeners(List listeners) { + + } + + @Override + public List getFailureListeners() { + return null; + } + + @Override + public List removeFailureListeners() { + return null; + } + + @Override + public void setFailureListeners(List listeners) { + + } + + @Override + public ActiveMQBuffer createTransportBuffer(int size) { + return new ChannelBufferWrapper(Unpooled.buffer(size)); + } + + @Override + public void fail(ActiveMQException me) { + + } + + @Override + public void fail(ActiveMQException me, String scaleDownTargetNodeID) { + + } + + @Override + public void destroy() { + + } + + @Override + public Connection getTransportConnection() { + return new Connection() { + @Override + public ActiveMQBuffer createTransportBuffer(int size) { + return null; + } + + @Override + public RemotingConnection getProtocolConnection() { + return null; + } + + @Override + public void setProtocolConnection(RemotingConnection connection) { + + } + + @Override + public boolean isWritable(ReadyListener listener) { + return false; + } + + @Override + public void fireReady(boolean ready) { + + } + + @Override + public void setAutoRead(boolean autoRead) { + + } + + @Override + public Object getID() { + return null; + } + + @Override + public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) { + + } + + @Override + public void write(ActiveMQBuffer buffer, + boolean flush, + boolean batched, + ChannelFutureListener futureListener) { + + } + + @Override + public void write(ActiveMQBuffer buffer) { + + } + + @Override + public void forceClose() { + + } + + @Override + public void close() { + + } + + @Override + public String getRemoteAddress() { + return null; + } + + @Override + public String getLocalAddress() { + return null; + } + + @Override + public void checkFlushBatchBuffer() { + + } + + @Override + public TransportConfiguration getConnectorConfig() { + return null; + } + + @Override + public ActiveMQPrincipal getDefaultActiveMQPrincipal() { + return null; + } + + @Override + public boolean isUsingProtocolHandling() { + return false; + } + + @Override + public boolean isSameTarget(TransportConfiguration... configs) { + return false; + } + }; + } + + @Override + public boolean isClient() { + return true; + } + + @Override + public boolean isDestroyed() { + return false; + } + + @Override + public void disconnect(boolean criticalError) { + + } + + @Override + public void disconnect(String scaleDownNodeID, boolean criticalError) { + + } + + @Override + public boolean checkDataReceived() { + return false; + } + + @Override + public void flush() { + + } + + @Override + public boolean isWritable(ReadyListener callback) { + return false; + } + + @Override + public void killMessage(SimpleString nodeID) { + + } + + @Override + public boolean isSupportReconnect() { + return false; + } + + @Override + public boolean isSupportsFlowControl() { + return false; + } + + @Override + public Subject getSubject() { + return null; + } + + @Override + public String getProtocolName() { + return null; + } + + @Override + public void setClientID(String cID) { + + } + + @Override + public String getClientID() { + return null; + } + + @Override + public String getTransportLocalAddress() { + return null; + } + + @Override + public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { + + } + } + +} \ No newline at end of file diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index ae1d270e0d..ee4223cc57 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -34,6 +34,8 @@ import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicPublisher; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -563,6 +565,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To private final Message jmsMessage; private final ActiveMQMessageProducer producer; + /** + * It's possible that this SendAcknowledgementHandler might be called twice due to subsequent + * packet confirmations on the same connection. Using this boolean avoids that possibility. + * A new CompletionListenerWrapper is created for each message sent so once it's called once + * it will never be called again. + */ + private AtomicBoolean active = new AtomicBoolean(true); + /** * @param jmsMessage * @param producer @@ -577,26 +587,62 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To @Override public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) { - if (jmsMessage instanceof StreamMessage) { - try { - ((StreamMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? + if (active.get()) { + if (jmsMessage instanceof StreamMessage) { + try { + ((StreamMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } } - } - if (jmsMessage instanceof BytesMessage) { - try { - ((BytesMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? + if (jmsMessage instanceof BytesMessage) { + try { + ((BytesMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } } - } - try { - producer.connection.getThreadAwareContext().setCurrentThread(true); - completionListener.onCompletion(jmsMessage); - } finally { - producer.connection.getThreadAwareContext().clearCurrentThread(true); + try { + producer.connection.getThreadAwareContext().setCurrentThread(true); + completionListener.onCompletion(jmsMessage); + } finally { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + active.set(false); + } + } + } + + @Override + public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) { + if (active.get()) { + if (jmsMessage instanceof StreamMessage) { + try { + ((StreamMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } + } + if (jmsMessage instanceof BytesMessage) { + try { + ((BytesMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } + } + + try { + producer.connection.getThreadAwareContext().setCurrentThread(true); + if (exception instanceof ActiveMQException) { + exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception); + } else if (exception instanceof ActiveMQInterruptedException) { + exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception); + } + completionListener.onException(jmsMessage, exception); + } finally { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + active.set(false); + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index d38f45f0b1..0428abebd0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -90,8 +91,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder { if (connection.isVersionBeforeAddressChange()) { sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools)); - } else { + } else if (connection.isVersionBeforeAsyncResponseChange()) { sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools)); + } else { + sendMessage = new SessionSendMessage_V2(new CoreMessage(this.coreMessageObjectPools)); } sendMessage.decode(in); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 37564b51a2..16a87d8e8e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; @@ -76,11 +77,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; @@ -313,7 +316,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -342,7 +345,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = request.isRequiresResponse(); session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -351,7 +354,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = request.isRequiresResponse(); session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -361,7 +364,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue(), request.isAutoCreated()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -373,7 +376,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString()); } if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -385,7 +388,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue()); } if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -393,7 +396,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet; session.deleteQueue(request.getQueueName()); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_QUEUEQUERY: { @@ -453,62 +456,62 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_COMMIT: { requiresResponse = true; session.commit(); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_ROLLBACK: { requiresResponse = true; session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered()); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_XA_COMMIT: { requiresResponse = true; SessionXACommitMessage message = (SessionXACommitMessage) packet; session.xaCommit(message.getXid(), message.isOnePhase()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_END: { requiresResponse = true; SessionXAEndMessage message = (SessionXAEndMessage) packet; session.xaEnd(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_FORGET: { requiresResponse = true; SessionXAForgetMessage message = (SessionXAForgetMessage) packet; session.xaForget(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_JOIN: { requiresResponse = true; SessionXAJoinMessage message = (SessionXAJoinMessage) packet; session.xaJoin(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_RESUME: { requiresResponse = true; SessionXAResumeMessage message = (SessionXAResumeMessage) packet; session.xaResume(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_ROLLBACK: { requiresResponse = true; SessionXARollbackMessage message = (SessionXARollbackMessage) packet; session.xaRollback(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_START: { requiresResponse = true; SessionXAStartMessage message = (SessionXAStartMessage) packet; session.xaStart(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_FAILED: { @@ -521,14 +524,14 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_XA_SUSPEND: { requiresResponse = true; session.xaSuspend(); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_PREPARE: { requiresResponse = true; SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet; session.xaPrepare(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_INDOUBT_XIDS: { @@ -557,14 +560,14 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_STOP: { requiresResponse = true; session.stop(); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_CLOSE: { requiresResponse = true; session.close(false); // removeConnectionListeners(); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); flush = true; closeChannel = true; break; @@ -574,7 +577,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); session.individualAcknowledge(message.getConsumerID(), message.getMessageID()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -582,7 +585,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet; session.closeConsumer(message.getConsumerID()); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_FORCE_CONSUMER_DELIVERY: { @@ -591,7 +594,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { break; } case PacketImpl.SESS_ADD_METADATA: { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet; session.addMetaData(message.getKey(), message.getData()); break; @@ -600,7 +603,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet; if (message.isRequiresConfirmations()) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } session.addMetaData(message.getKey(), message.getData()); break; @@ -609,7 +612,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet; if (session.addUniqueMetaData(message.getKey(), message.getData())) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } else { response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData())); } @@ -617,15 +620,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, flush, closeChannel); } finally { @@ -633,6 +636,26 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } + private Packet createNullResponseMessage(Packet packet) { + final Packet response; + if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) { + response = new NullResponseMessage(); + } else { + response = new NullResponseMessage_V2(packet.getCorrelationID()); + } + return response; + } + + private Packet createSessionXAResponseMessage(Packet packet) { + Packet response; + if (packet.isResponseAsync()) { + response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), false, XAResource.XA_OK, null); + } else { + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + } + return response; + } + private void onSessionAcknowledge(Packet packet) { this.storageManager.setContext(session.getSessionContext()); try { @@ -643,18 +666,18 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); this.session.acknowledge(message.getConsumerID(), message.getMessageID()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -672,18 +695,18 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -700,15 +723,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet; session.requestProducerCredits(message.getAddress(), message.getCredits()); } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -725,15 +748,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet; session.receiveConsumerCredits(message.getConsumerID(), message.getCredits()); } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -742,50 +765,68 @@ public class ServerSessionPacketHandler implements ChannelHandler { } - private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e, + private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet, + ActiveMQIOErrorException e, boolean requiresResponse, Packet response, ServerSession session) { session.markTXFailed(e); if (requiresResponse) { logger.debug("Sending exception to client", e); - response = new ActiveMQExceptionMessage(e); + response = convertToExceptionPacket(packet, e); } else { ActiveMQServerLogger.LOGGER.caughtException(e); } return response; } - private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e, + private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet, + ActiveMQXAException e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); + if (packet.isResponseAsync()) { + response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, e.getMessage()); + } else { + response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); + } } else { ActiveMQServerLogger.LOGGER.caughtXaException(e); } return response; } - private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e, + private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet, + ActiveMQQueueMaxConsumerLimitReached e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - response = new ActiveMQExceptionMessage(e); + response = convertToExceptionPacket(packet, e); } else { ActiveMQServerLogger.LOGGER.caughtException(e); } return response; } - private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e, + private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) { + Packet response; + if (packet.isResponseAsync()) { + response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), e); + } else { + response = new ActiveMQExceptionMessage(e); + } + return response; + } + + private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet, + ActiveMQException e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - response = new ActiveMQExceptionMessage(e); + response = convertToExceptionPacket(packet, e); } else { if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) { logger.debug("Caught exception", e); @@ -796,7 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { return response; } - private static Packet onCatchThrowableWhileHandlePacket(Throwable t, + private static Packet onCatchThrowableWhileHandlePacket(Packet packet, + Throwable t, boolean requiresResponse, Packet response, ServerSession session) { @@ -805,7 +847,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t); ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException(); activeMQInternalErrorException.initCause(t); - response = new ActiveMQExceptionMessage(activeMQInternalErrorException); + response = convertToExceptionPacket(packet, activeMQInternalErrorException); } else { ActiveMQServerLogger.LOGGER.caughtException(t); } @@ -827,12 +869,11 @@ public class ServerSessionPacketHandler implements ChannelHandler { public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); - ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage)); - - doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel); + Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage)); + doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel); if (logger.isTraceEnabled()) { - logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage); + logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket); } } @@ -852,7 +893,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { final Packet response, final boolean flush, final boolean closeChannel) { - if (confirmPacket != null) { + // don't confirm if the response is an exception + if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) { channel.confirm(confirmPacket); if (flush) { diff --git a/pom.xml b/pom.xml index 36e5c33f3a..7acfe0d4df 100644 --- a/pom.xml +++ b/pom.xml @@ -126,7 +126,7 @@ 1 0 0 - 129,128,127,126,125,124,123,122 + 130,129,128,127,126,125,124,123,122 ${project.version} ${project.version}(${activemq.version.incrementingVersion}) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index e4afb5be51..c7ed8699c1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2; @@ -314,6 +315,11 @@ public class BackupSyncDelay implements Interceptor { } + @Override + public void setResponseHandler(ResponseHandler handler) { + throw new UnsupportedOperationException(); + } + @Override public void flushConfirmations() { throw new UnsupportedOperationException(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java index d3951f26bf..3020310a9e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java @@ -167,7 +167,24 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase { @Override public void onException(Message message, Exception exception) { - // TODO Auto-generated method stub + latch.countDown(); + try { + switch (call) { + case 0: + context.rollback(); + break; + case 1: + context.commit(); + break; + case 2: + context.close(); + break; + default: + throw new IllegalArgumentException("call code " + call); + } + } catch (Exception error1) { + this.error = error1; + } } } diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java index e83d815467..3ba58f0658 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java @@ -16,12 +16,28 @@ */ package org.apache.activemq.artemis.jms.tests; +import static org.junit.Assert.fail; + +import javax.jms.BytesMessage; +import javax.jms.CompletionListener; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; import javax.jms.IllegalStateException; +import javax.jms.JMSContext; +import javax.jms.JMSProducer; import javax.jms.JMSSecurityException; +import javax.jms.Message; +import javax.jms.MessageProducer; import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; @@ -68,9 +84,9 @@ public class SecurityTest extends JMSTestCase { } - /** - * Login with no user, no password Should allow login (equivalent to guest) - */ + /** + * Login with no user, no password Should allow login (equivalent to guest) + */ @Test public void testLoginNoUserNoPassword() throws Exception { createConnection(); @@ -170,6 +186,173 @@ public class SecurityTest extends JMSTestCase { } } + /** + * Login with valid user and password + * But try send to address not authorised - Persistent + * Should not allow and should throw exception + */ + @Test + public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); + if (getJmsServer().locateQueue(queueName) == null) { + getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection connection = connectionFactory.createConnection("guest", "guest"); + Session session = connection.createSession(); + Destination destination = session.createQueue(queueName.toString()); + MessageProducer messageProducer = session.createProducer(destination); + try { + messageProducer.send(session.createTextMessage("hello")); + fail("JMSSecurityException expected as guest is not allowed to send"); + } catch (JMSSecurityException activeMQSecurityException) { + //pass + } + connection.close(); + } + + /** + * Login with valid user and password + * But try send to address not authorised - Non Persistent. + * Should have same behaviour as Persistent with exception on send. + */ + @Test + public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); + if (getJmsServer().locateQueue(queueName) == null) { + getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connectionFactory.setConfirmationWindowSize(100); + connectionFactory.setBlockOnDurableSend(false); + connectionFactory.setBlockOnNonDurableSend(false); + Connection connection = connectionFactory.createConnection("guest", "guest"); + Session session = connection.createSession(); + Destination destination = session.createQueue(queueName.toString()); + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + try { + AtomicReference e = new AtomicReference<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + messageProducer.send(session.createTextMessage("hello"), new CompletionListener() { + @Override + public void onCompletion(Message message) { + countDownLatch.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + e.set(exception); + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + if (e.get() != null) { + throw e.get(); + } + fail("JMSSecurityException expected as guest is not allowed to send"); + } catch (JMSSecurityException activeMQSecurityException) { + activeMQSecurityException.printStackTrace(); + } finally { + connection.close(); + } + } + + /** + * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using JMS 2 API. + */ + @Test + public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistentJMS2() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); + if (getJmsServer().locateQueue(queueName) == null) { + getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connectionFactory.setConfirmationWindowSize(100); + connectionFactory.setBlockOnDurableSend(false); + connectionFactory.setBlockOnNonDurableSend(false); + JMSContext context = connectionFactory.createContext("guest", "guest"); + Destination destination = context.createQueue(queueName.toString()); + JMSProducer messageProducer = context.createProducer(); + messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + try { + AtomicReference e = new AtomicReference<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + messageProducer.setAsync(new CompletionListener() { + @Override + public void onCompletion(Message message) { + countDownLatch.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + e.set(exception); + countDownLatch.countDown(); + } + }); + messageProducer.send(destination, context.createTextMessage("hello")); + countDownLatch.await(10, TimeUnit.SECONDS); + if (e.get() != null) { + throw e.get(); + } + fail("JMSSecurityException expected as guest is not allowed to send"); + } catch (JMSSecurityException activeMQSecurityException) { + activeMQSecurityException.printStackTrace(); + } finally { + context.close(); + } + } + + /** + * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using a large message. + */ + @Test + public void testLoginValidUserAndPasswordButNotAuthorisedToSendLargeNonPersistent() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); + if (getJmsServer().locateQueue(queueName) == null) { + getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connectionFactory.setConfirmationWindowSize(100); + connectionFactory.setBlockOnDurableSend(false); + connectionFactory.setBlockOnNonDurableSend(false); + connectionFactory.setMinLargeMessageSize(1024); + Connection connection = connectionFactory.createConnection("guest", "guest"); + Session session = connection.createSession(); + Destination destination = session.createQueue(queueName.toString()); + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + try { + AtomicReference e = new AtomicReference<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[10 * 1024]); + messageProducer.send(message, new CompletionListener() { + @Override + public void onCompletion(Message message) { + countDownLatch.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + e.set(exception); + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + if (e.get() != null) { + throw e.get(); + } + fail("JMSSecurityException expected as guest is not allowed to send"); + } catch (JMSSecurityException activeMQSecurityException) { + activeMQSecurityException.printStackTrace(); + } + connection.close(); + } + /* Now some client id tests */ /** diff --git a/tests/jms-tests/src/test/resources/broker.xml b/tests/jms-tests/src/test/resources/broker.xml index 733e8c37bc..644ce83ab1 100644 --- a/tests/jms-tests/src/test/resources/broker.xml +++ b/tests/jms-tests/src/test/resources/broker.xml @@ -54,6 +54,16 @@ + + + + + + + + + + \ No newline at end of file