ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions

This commit is contained in:
Michael André Pearce 2017-12-14 07:47:30 +00:00 committed by Clebert Suconic
parent 515a2e064c
commit e4ba48a311
35 changed files with 1134 additions and 109 deletions

View File

@ -41,4 +41,11 @@ public interface SendAcknowledgementHandler {
* @param message message sent asynchronously * @param message message sent asynchronously
*/ */
void sendAcknowledged(Message message); void sendAcknowledged(Message message);
default void sendFailed(Message message, Exception e) {
//This is to keep old behaviour that would ack even if error,
// if anyone custom implemented this interface but doesnt update.
sendAcknowledged(message);
}
} }

View File

@ -228,4 +228,7 @@ public interface ActiveMQClientMessageBundle {
@Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.") @Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
ActiveMQInterruptedException packetTransmissionInterrupted(); ActiveMQInterruptedException packetTransmissionInterrupted();
@Message(id = 119063, value = "Cannot send a packet while response cache is full.")
IllegalStateException cannotSendPacketWhilstResponseCacheFull();
} }

View File

@ -211,6 +211,9 @@ public interface Channel {
*/ */
void setCommandConfirmationHandler(CommandConfirmationHandler handler); void setCommandConfirmationHandler(CommandConfirmationHandler handler);
void setResponseHandler(ResponseHandler handler);
/** /**
* flushes any confirmations on to the connection. * flushes any confirmations on to the connection.
*/ */

View File

@ -36,6 +36,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION); return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
} }
default boolean isVersionBeforeAsyncResponseChange() {
int version = getChannelVersion();
return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION);
}
/** /**
* Sets the client protocol used on the communication. This will determine if the client has * Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types * support for certain packet types

View File

@ -41,6 +41,14 @@ public interface Packet {
return INITIAL_PACKET_SIZE; return INITIAL_PACKET_SIZE;
} }
boolean isRequiresResponse();
boolean isResponseAsync();
long getCorrelationID();
void setCorrelationID(long correlationID);
/** /**
* Returns the channel id of the channel that should handle this packet. * Returns the channel id of the channel that should handle this packet.
* *

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core;
/**
* A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
*/
public interface ResponseHandler {
/**
* called by channel after a confirmation has been received.
*
* @param packet the packet confirmed
*/
void responseHandler(Packet packet, Packet response);
}

View File

@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
@ -99,9 +100,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@ -168,7 +171,11 @@ public class ActiveMQSessionContext extends SessionContext {
sessionChannel.setHandler(handler); sessionChannel.setHandler(handler);
if (confirmationWindow >= 0) { if (confirmationWindow >= 0) {
sessionChannel.setCommandConfirmationHandler(confirmationHandler); if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
} else {
sessionChannel.setResponseHandler(responseHandler);
}
} }
} }
@ -185,28 +192,50 @@ public class ActiveMQSessionContext extends SessionContext {
this.killed = true; this.killed = true;
} }
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() { private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
@Override @Override
public void commandConfirmed(final Packet packet) { public void commandConfirmed(Packet packet) {
responseHandler.responseHandler(packet, null);
}
};
private final ResponseHandler responseHandler = new ResponseHandler() {
@Override
public void responseHandler(Packet packet, Packet response) {
final ActiveMQException activeMQException;
if (response != null && response.getType() == PacketImpl.EXCEPTION) {
ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
activeMQException = exceptionResponseMessage.getException();
} else {
activeMQException = null;
}
if (packet.getType() == PacketImpl.SESS_SEND) { if (packet.getType() == PacketImpl.SESS_SEND) {
SessionSendMessage ssm = (SessionSendMessage) packet; SessionSendMessage ssm = (SessionSendMessage) packet;
callSendAck(ssm.getHandler(), ssm.getMessage()); callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException);
} else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) { } else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {
SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet; SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
if (!scm.isContinues()) { if (!scm.isContinues()) {
callSendAck(scm.getHandler(), scm.getMessage()); callSendAck(scm.getHandler(), scm.getMessage(), activeMQException);
} }
} }
} }
private void callSendAck(SendAcknowledgementHandler handler, final Message message) { private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) {
if (handler != null) { if (handler != null) {
handler.sendAcknowledged(message); if (exception == null) {
handler.sendAcknowledged(message);
} else {
handler.sendFailed(message, exception);
}
} else if (sendAckHandler != null) { } else if (sendAckHandler != null) {
sendAckHandler.sendAcknowledged(message); if (exception == null) {
sendAckHandler.sendAcknowledged(message);
} else {
handler.sendFailed(message, exception);
}
} }
} }
}; };
// Failover utility methods // Failover utility methods
@ -243,7 +272,11 @@ public class ActiveMQSessionContext extends SessionContext {
@Override @Override
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
sessionChannel.setCommandConfirmationHandler(confirmationHandler); if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
} else {
sessionChannel.setResponseHandler(responseHandler);
}
this.sendAckHandler = handler; this.sendAckHandler = handler;
} }
@ -472,13 +505,15 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking, boolean sendBlocking,
SendAcknowledgementHandler handler, SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException { SimpleString defaultAddress) throws ActiveMQException {
SessionSendMessage packet; final SessionSendMessage packet;
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) { if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler); packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
} else { } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendMessage(msgI, sendBlocking, handler); packet = new SessionSendMessage(msgI, sendBlocking, handler);
} else {
boolean responseRequired = confirmationWindow != -1 || sendBlocking;
packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
} }
if (sendBlocking) { if (sendBlocking) {
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
} else { } else {
@ -904,15 +939,20 @@ public class ActiveMQSessionContext extends SessionContext {
} }
} }
private static int sendSessionSendContinuationMessage(Channel channel, private int sendSessionSendContinuationMessage(Channel channel,
Message msgI, Message msgI,
long messageBodySize, long messageBodySize,
boolean sendBlocking, boolean sendBlocking,
boolean lastChunk, boolean lastChunk,
byte[] chunk, byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException { SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking; final boolean requiresResponse = lastChunk || confirmationWindow != -1;
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); final SessionSendContinuationMessage chunkPacket;
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
} else {
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
}
final int expectedEncodeSize = chunkPacket.expectedEncodeSize(); final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
//perform a weak form of flow control to avoid OOM on tight loops //perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection(); final CoreRemotingConnection connection = channel.getConnection();
@ -929,7 +969,11 @@ public class ActiveMQSessionContext extends SessionContext {
} }
if (requiresResponse) { if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking. // When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); if (sendBlocking) {
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
} else {
channel.send(chunkPacket);
}
} else { } else {
channel.send(chunkPacket); channel.send(chunkPacket);
} }

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -96,6 +97,8 @@ public final class ChannelImpl implements Channel {
private final java.util.Queue<Packet> resendCache; private final java.util.Queue<Packet> resendCache;
private final ResponseCache responseAsyncCache;
private int firstStoredCommandID; private int firstStoredCommandID;
private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1); private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1);
@ -138,8 +141,10 @@ public final class ChannelImpl implements Channel {
if (confWindowSize != -1) { if (confWindowSize != -1) {
resendCache = new ConcurrentLinkedQueue<>(); resendCache = new ConcurrentLinkedQueue<>();
responseAsyncCache = new ResponseCache();
} else { } else {
resendCache = null; resendCache = null;
responseAsyncCache = null;
} }
this.interceptors = interceptors; this.interceptors = interceptors;
@ -211,7 +216,11 @@ public final class ChannelImpl implements Channel {
lock.lock(); lock.lock();
try { try {
response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause)); ActiveMQException activeMQException = ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
if (responseAsyncCache != null) {
responseAsyncCache.errorAll(activeMQException);
}
response = new ActiveMQExceptionMessage(activeMQException);
sendCondition.signal(); sendCondition.signal();
} finally { } finally {
@ -270,6 +279,10 @@ public final class ChannelImpl implements Channel {
synchronized (sendLock) { synchronized (sendLock) {
packet.setChannelID(id); packet.setChannelID(id);
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
} }
@ -291,6 +304,7 @@ public final class ChannelImpl implements Channel {
if (resendCache != null && packet.isRequiresConfirmations()) { if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet); addResendPacket(packet);
} }
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -301,9 +315,30 @@ public final class ChannelImpl implements Channel {
checkReconnectID(reconnectID); checkReconnectID(reconnectID);
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache is cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
Thread.sleep(1);
} catch (Exception e) {
// Ignore
}
}
}
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover // buffer is full, preventing any incoming buffers being handled and blocking failover
connection.getTransportConnection().write(buffer, flush, batch); try {
connection.getTransportConnection().write(buffer, flush, batch);
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
responseAsyncCache.remove(packet.getCorrelationID());
}
throw t;
}
return true; return true;
} }
} }
@ -477,6 +512,18 @@ public final class ChannelImpl implements Channel {
commandConfirmationHandler = handler; commandConfirmationHandler = handler;
} }
@Override
public void setResponseHandler(final ResponseHandler responseHandler) {
if (confWindowSize < 0) {
final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information.";
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg);
}
throw new IllegalStateException(msg);
}
responseAsyncCache.setResponseHandler(responseHandler);
}
@Override @Override
public void setHandler(final ChannelHandler handler) { public void setHandler(final ChannelHandler handler) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -595,6 +642,12 @@ public final class ChannelImpl implements Channel {
} }
} }
public void handleResponse(Packet packet) {
if (responseAsyncCache != null && packet.isResponseAsync()) {
responseAsyncCache.handleResponse(packet);
}
}
@Override @Override
public void confirm(final Packet packet) { public void confirm(final Packet packet) {
if (resendCache != null && packet.isRequiresConfirmations()) { if (resendCache != null && packet.isRequiresConfirmations()) {
@ -647,6 +700,7 @@ public final class ChannelImpl implements Channel {
if (packet.isResponse()) { if (packet.isResponse()) {
confirm(packet); confirm(packet);
handleResponse(packet);
lock.lock(); lock.lock();
try { try {

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
@ -71,6 +72,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@ -81,6 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
@ -88,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
@ -184,13 +188,25 @@ public abstract class PacketDecoder implements Serializable {
break; break;
} }
case EXCEPTION: { case EXCEPTION: {
packet = new ActiveMQExceptionMessage(); if (connection.isVersionBeforeAsyncResponseChange()) {
packet = new ActiveMQExceptionMessage();
} else {
packet = new ActiveMQExceptionMessage_V2();
}
break; break;
} }
case PACKETS_CONFIRMED: { case PACKETS_CONFIRMED: {
packet = new PacketsConfirmedMessage(); packet = new PacketsConfirmedMessage();
break; break;
} }
case NULL_RESPONSE: {
if (connection.isVersionBeforeAsyncResponseChange()) {
packet = new NullResponseMessage();
} else {
packet = new NullResponseMessage_V2();
}
break;
}
case CREATESESSION: { case CREATESESSION: {
packet = new CreateSessionMessage(); packet = new CreateSessionMessage();
break; break;
@ -316,7 +332,11 @@ public abstract class PacketDecoder implements Serializable {
break; break;
} }
case SESS_XA_RESP: { case SESS_XA_RESP: {
packet = new SessionXAResponseMessage(); if (connection.isVersionBeforeAsyncResponseChange()) {
packet = new SessionXAResponseMessage();
} else {
packet = new SessionXAResponseMessage_V2();
}
break; break;
} }
case SESS_XA_ROLLBACK: { case SESS_XA_ROLLBACK: {
@ -383,16 +403,16 @@ public abstract class PacketDecoder implements Serializable {
packet = new SessionIndividualAcknowledgeMessage(); packet = new SessionIndividualAcknowledgeMessage();
break; break;
} }
case NULL_RESPONSE: {
packet = new NullResponseMessage();
break;
}
case SESS_RECEIVE_CONTINUATION: { case SESS_RECEIVE_CONTINUATION: {
packet = new SessionReceiveContinuationMessage(); packet = new SessionReceiveContinuationMessage();
break; break;
} }
case SESS_SEND_CONTINUATION: { case SESS_SEND_CONTINUATION: {
packet = new SessionSendContinuationMessage(); if (connection.isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendContinuationMessage();
} else {
packet = new SessionSendContinuationMessage_V2();
}
break; break;
} }
case SESS_PRODUCER_REQUEST_CREDITS: { case SESS_PRODUCER_REQUEST_CREDITS: {

View File

@ -32,6 +32,7 @@ public class PacketImpl implements Packet {
// 2.0.0 // 2.0.0
public static final int ADDRESSING_CHANGE_VERSION = 129; public static final int ADDRESSING_CHANGE_VERSION = 129;
public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130; public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
@ -272,6 +273,7 @@ public class PacketImpl implements Packet {
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15; public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
public PacketImpl(final byte type) { public PacketImpl(final byte type) {
@ -439,5 +441,24 @@ public class PacketImpl implements Packet {
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0); return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
} }
@Override
public boolean isRequiresResponse() {
return false;
}
@Override
public boolean isResponseAsync() {
return false;
}
@Override
public long getCorrelationID() {
return -1;
}
@Override
public void setCorrelationID(long correlationID) {
}
} }

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
public class ResponseCache {
private final AtomicLong sequence = new AtomicLong(0);
private final ConcurrentLongHashMap<Packet> store;
private ResponseHandler responseHandler;
public ResponseCache() {
this.store = new ConcurrentLongHashMap<>();
}
public long nextCorrelationID() {
return sequence.incrementAndGet();
}
public boolean add(Packet packet) {
this.store.put(packet.getCorrelationID(), packet);
return true;
}
public Packet remove(long correlationID) {
return store.remove(correlationID);
}
public void handleResponse(Packet response) {
long correlationID = response.getCorrelationID();
Packet packet = remove(correlationID);
if (packet != null) {
responseHandler.responseHandler(packet, response);
}
}
public void errorAll(ActiveMQException exception) {
ConcurrentLongHashSet keys = store.keysLongHashSet();
keys.forEach(correlationID -> {
handleResponse(new ActiveMQExceptionMessage_V2(correlationID, exception));
});
}
public void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}
}

View File

@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class ActiveMQExceptionMessage extends PacketImpl { public class ActiveMQExceptionMessage extends PacketImpl {
private ActiveMQException exception; protected ActiveMQException exception;
// Static -------------------------------------------------------- // Static --------------------------------------------------------

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.utils.DataConstants;
public class ActiveMQExceptionMessage_V2 extends ActiveMQExceptionMessage {
private long correlationID;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public ActiveMQExceptionMessage_V2(final long correlationID, final ActiveMQException exception) {
super(exception);
this.correlationID = correlationID;
}
public ActiveMQExceptionMessage_V2() {
super();
}
// Public --------------------------------------------------------
@Override
public boolean isResponse() {
return true;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeLong(correlationID);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
correlationID = buffer.readLong();
}
}
@Override
public final boolean isResponseAsync() {
return true;
}
@Override
public long getCorrelationID() {
return this.correlationID;
}
@Override
public String toString() {
return getParentString() + ", exception= " + exception + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof ActiveMQExceptionMessage_V2)) {
return false;
}
ActiveMQExceptionMessage_V2 other = (ActiveMQExceptionMessage_V2) obj;
if (correlationID != other.correlationID) {
return false;
}
return true;
}
}

View File

@ -65,6 +65,7 @@ public class CreateAddressMessage extends PacketImpl {
return address; return address;
} }
@Override
public boolean isRequiresResponse() { public boolean isRequiresResponse() {
return requiresResponse; return requiresResponse;
} }

View File

@ -100,6 +100,7 @@ public class CreateQueueMessage extends PacketImpl {
return temporary; return temporary;
} }
@Override
public boolean isRequiresResponse() { public boolean isRequiresResponse() {
return requiresResponse; return requiresResponse;
} }

View File

@ -80,6 +80,7 @@ public class CreateSharedQueueMessage extends PacketImpl {
return filterString; return filterString;
} }
@Override
public boolean isRequiresResponse() { public boolean isRequiresResponse() {
return requiresResponse; return requiresResponse;
} }

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.utils.DataConstants;
public class NullResponseMessage_V2 extends NullResponseMessage {
private long correlationID;
public NullResponseMessage_V2(final long correlationID) {
super();
this.correlationID = correlationID;
}
public NullResponseMessage_V2() {
super();
}
// Public --------------------------------------------------------
@Override
public long getCorrelationID() {
return correlationID;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeLong(correlationID);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
correlationID = buffer.readLong();
}
}
@Override
public final boolean isResponse() {
return true;
}
@Override
public final boolean isResponseAsync() {
return true;
}
@Override
public String toString() {
return getParentString() + ", correlationID=" + correlationID + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof NullResponseMessage_V2)) {
return false;
}
NullResponseMessage_V2 other = (NullResponseMessage_V2) obj;
if (correlationID != other.correlationID) {
return false;
}
return true;
}
}

View File

@ -51,6 +51,7 @@ public class SessionAcknowledgeMessage extends PacketImpl {
return messageID; return messageID;
} }
@Override
public boolean isRequiresResponse() { public boolean isRequiresResponse() {
return requiresResponse; return requiresResponse;
} }

View File

@ -71,6 +71,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
return browseOnly; return browseOnly;
} }
@Override
public boolean isRequiresResponse() { public boolean isRequiresResponse() {
return requiresResponse; return requiresResponse;
} }

View File

@ -60,6 +60,7 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl {
return messageID; return messageID;
} }
@Override
public boolean isRequiresResponse() { public boolean isRequiresResponse() {
return requiresResponse; return requiresResponse;
} }

View File

@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/ */
public class SessionSendContinuationMessage extends SessionContinuationMessage { public class SessionSendContinuationMessage extends SessionContinuationMessage {
private boolean requiresResponse; protected boolean requiresResponse;
// Used on confirmation handling // Used on confirmation handling
private Message message; protected Message message;
/** /**
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession} * In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
* <br> * <br>
@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
/** /**
* to be sent on the last package * to be sent on the last package
*/ */
private long messageBodySize = -1; protected long messageBodySize = -1;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
@ -54,6 +54,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
handler = null; handler = null;
} }
protected SessionSendContinuationMessage(byte type) {
super(type);
handler = null;
}
/** /**
* @param body * @param body
* @param continues * @param continues
@ -72,11 +77,31 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
this.messageBodySize = messageBodySize; this.messageBodySize = messageBodySize;
} }
/**
* @param body
* @param continues
* @param requiresResponse
*/
protected SessionSendContinuationMessage(final byte type,
final Message message,
final byte[] body,
final boolean continues,
final boolean requiresResponse,
final long messageBodySize,
SendAcknowledgementHandler handler) {
super(type, body, continues);
this.requiresResponse = requiresResponse;
this.message = message;
this.handler = handler;
this.messageBodySize = messageBodySize;
}
// Public -------------------------------------------------------- // Public --------------------------------------------------------
/** /**
* @return the requiresResponse * @return the requiresResponse
*/ */
@Override
public boolean isRequiresResponse() { public boolean isRequiresResponse() {
return requiresResponse; return requiresResponse;
} }

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.utils.DataConstants;
/**
* A SessionSendContinuationMessage<br>
*/
public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage {
private long correlationID;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public SessionSendContinuationMessage_V2() {
super();
}
/**
* @param body
* @param continues
* @param requiresResponse
*/
public SessionSendContinuationMessage_V2(final Message message,
final byte[] body,
final boolean continues,
final boolean requiresResponse,
final long messageBodySize,
SendAcknowledgementHandler handler) {
super(message, body, continues, requiresResponse, messageBodySize, handler);
}
// Public --------------------------------------------------------
@Override
public int expectedEncodeSize() {
return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeLong(correlationID);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
correlationID = buffer.readLong();
}
}
@Override
public long getCorrelationID() {
return this.correlationID;
}
@Override
public void setCorrelationID(long correlationID) {
this.correlationID = correlationID;
}
@Override
public boolean isResponseAsync() {
return true;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
return result;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", continues=" + continues);
buff.append(", message=" + message);
buff.append(", messageBodySize=" + messageBodySize);
buff.append(", requiresResponse=" + requiresResponse);
buff.append(", correlationID=" + correlationID);
buff.append("]");
return buff.toString();
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionSendContinuationMessage_V2))
return false;
SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj;
if (correlationID != other.correlationID)
return false;
return true;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.utils.DataConstants;
public class SessionSendMessage extends MessagePacket { public class SessionSendMessage extends MessagePacket {
@ -36,6 +37,22 @@ public class SessionSendMessage extends MessagePacket {
*/ */
private final transient SendAcknowledgementHandler handler; private final transient SendAcknowledgementHandler handler;
/** This will be using the CoreMessage because it is meant for the core-protocol */
protected SessionSendMessage(final byte id,
final ICoreMessage message,
final boolean requiresResponse,
final SendAcknowledgementHandler handler) {
super(id, message);
this.handler = handler;
this.requiresResponse = requiresResponse;
}
protected SessionSendMessage(final byte id,
final CoreMessage message) {
super(id, message);
this.handler = null;
}
/** This will be using the CoreMessage because it is meant for the core-protocol */ /** This will be using the CoreMessage because it is meant for the core-protocol */
public SessionSendMessage(final ICoreMessage message, public SessionSendMessage(final ICoreMessage message,
final boolean requiresResponse, final boolean requiresResponse,
@ -52,6 +69,7 @@ public class SessionSendMessage extends MessagePacket {
// Public -------------------------------------------------------- // Public --------------------------------------------------------
@Override
public boolean isRequiresResponse() { public boolean isRequiresResponse() {
return requiresResponse; return requiresResponse;
} }
@ -62,7 +80,7 @@ public class SessionSendMessage extends MessagePacket {
@Override @Override
public int expectedEncodeSize() { public int expectedEncodeSize() {
return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1; return message.getEncodeSize() + PACKET_HEADERS_SIZE + fieldsEncodeSize();
} }
@Override @Override
@ -75,13 +93,16 @@ public class SessionSendMessage extends MessagePacket {
public void decodeRest(final ActiveMQBuffer buffer) { public void decodeRest(final ActiveMQBuffer buffer) {
// Buffer comes in after having read standard headers and positioned at Beginning of body part // Buffer comes in after having read standard headers and positioned at Beginning of body part
ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1); ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), fieldsEncodeSize());
receiveMessage(messageBuffer); receiveMessage(messageBuffer);
buffer.readerIndex(buffer.capacity() - 1); buffer.readerIndex(buffer.capacity() - fieldsEncodeSize());
requiresResponse = buffer.readBoolean(); requiresResponse = buffer.readBoolean();
}
protected int fieldsEncodeSize() {
return DataConstants.SIZE_BOOLEAN;
} }
protected void receiveMessage(ByteBuf messageBuffer) { protected void receiveMessage(ByteBuf messageBuffer) {

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.utils.DataConstants;
public class SessionSendMessage_V2 extends SessionSendMessage {
private long correlationID;
/** This will be using the CoreMessage because it is meant for the core-protocol */
public SessionSendMessage_V2(final ICoreMessage message,
final boolean requiresResponse,
final SendAcknowledgementHandler handler) {
super(SESS_SEND, message, requiresResponse, handler);
}
public SessionSendMessage_V2(final CoreMessage message) {
super(SESS_SEND, message);
}
@Override
public void encodeRest(ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeLong(correlationID);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
correlationID = buffer.readLong();
}
@Override
protected int fieldsEncodeSize() {
return super.fieldsEncodeSize() + DataConstants.SIZE_LONG;
}
@Override
public long getCorrelationID() {
return this.correlationID;
}
@Override
public void setCorrelationID(long correlationID) {
this.correlationID = correlationID;
}
@Override
public boolean isResponseAsync() {
return true;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
return result;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", correlationID=" + correlationID);
buff.append(", requiresResponse=" + super.isRequiresResponse());
buff.append("]");
return buff.toString();
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionSendMessage_V2))
return false;
SessionSendMessage_V2 other = (SessionSendMessage_V2) obj;
if (correlationID != other.correlationID)
return false;
return true;
}
}

View File

@ -21,11 +21,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class SessionXAResponseMessage extends PacketImpl { public class SessionXAResponseMessage extends PacketImpl {
private boolean error; protected boolean error;
private int responseCode; protected int responseCode;
private String message; protected String message;
public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) { public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) {
super(SESS_XA_RESP); super(SESS_XA_RESP);

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.utils.DataConstants;
public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage {
private long correlationID;
public SessionXAResponseMessage_V2(final long correlationID, final boolean isError, final int responseCode, final String message) {
super(isError, responseCode, message);
this.correlationID = correlationID;
}
public SessionXAResponseMessage_V2() {
super();
}
// Public --------------------------------------------------------
@Override
public long getCorrelationID() {
return correlationID;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeLong(correlationID);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
correlationID = buffer.readLong();
}
}
@Override
public final boolean isResponse() {
return true;
}
@Override
public final boolean isResponseAsync() {
return true;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", error=" + error);
buff.append(", message=" + message);
buff.append(", responseCode=" + responseCode);
buff.append(", correlationID=" + correlationID);
buff.append("]");
return buff.toString();
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof SessionXAResponseMessage_V2)) {
return false;
}
SessionXAResponseMessage_V2 other = (SessionXAResponseMessage_V2) obj;
if (correlationID != other.correlationID) {
return false;
}
return true;
}
}

View File

@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion} activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag} activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129 activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130

View File

@ -600,6 +600,36 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
} }
} }
@Override
public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
if (jmsMessage instanceof StreamMessage) {
try {
((StreamMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
if (jmsMessage instanceof BytesMessage) {
try {
((BytesMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
try {
producer.connection.getThreadAwareContext().setCurrentThread(true);
if (exception instanceof ActiveMQException) {
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception);
} else if (exception instanceof ActiveMQInterruptedException) {
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
}
completionListener.onException(jmsMessage, exception);
} finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true);
}
}
@Override @Override
public String toString() { public String toString() {
return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")"; return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";

View File

@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@ -90,8 +91,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
if (connection.isVersionBeforeAddressChange()) { if (connection.isVersionBeforeAddressChange()) {
sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools)); sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
} else { } else if (connection.isVersionBeforeAsyncResponseChange()) {
sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools)); sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
} else {
sendMessage = new SessionSendMessage_V2(new CoreMessage(this.coreMessageObjectPools));
} }
sendMessage.decode(in); sendMessage.decode(in);

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
@ -76,11 +77,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
@ -313,7 +316,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = message.isRequiresResponse(); requiresResponse = message.isRequiresResponse();
sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues()); sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
break; break;
} }
@ -342,7 +345,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = request.isRequiresResponse(); requiresResponse = request.isRequiresResponse();
session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated()); session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated());
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
break; break;
} }
@ -351,7 +354,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = request.isRequiresResponse(); requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable()); session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable());
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
break; break;
} }
@ -361,7 +364,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
request.isExclusive(), request.isLastValue(), request.isAutoCreated()); request.isExclusive(), request.isLastValue(), request.isAutoCreated());
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
break; break;
} }
@ -373,7 +376,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString()); session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString());
} }
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
break; break;
} }
@ -385,7 +388,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue()); session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue());
} }
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
break; break;
} }
@ -393,7 +396,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true; requiresResponse = true;
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet; SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
session.deleteQueue(request.getQueueName()); session.deleteQueue(request.getQueueName());
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
break; break;
} }
case SESS_QUEUEQUERY: { case SESS_QUEUEQUERY: {
@ -453,62 +456,62 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_COMMIT: { case SESS_COMMIT: {
requiresResponse = true; requiresResponse = true;
session.commit(); session.commit();
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
break; break;
} }
case SESS_ROLLBACK: { case SESS_ROLLBACK: {
requiresResponse = true; requiresResponse = true;
session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered()); session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered());
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
break; break;
} }
case SESS_XA_COMMIT: { case SESS_XA_COMMIT: {
requiresResponse = true; requiresResponse = true;
SessionXACommitMessage message = (SessionXACommitMessage) packet; SessionXACommitMessage message = (SessionXACommitMessage) packet;
session.xaCommit(message.getXid(), message.isOnePhase()); session.xaCommit(message.getXid(), message.isOnePhase());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_END: { case SESS_XA_END: {
requiresResponse = true; requiresResponse = true;
SessionXAEndMessage message = (SessionXAEndMessage) packet; SessionXAEndMessage message = (SessionXAEndMessage) packet;
session.xaEnd(message.getXid()); session.xaEnd(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_FORGET: { case SESS_XA_FORGET: {
requiresResponse = true; requiresResponse = true;
SessionXAForgetMessage message = (SessionXAForgetMessage) packet; SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
session.xaForget(message.getXid()); session.xaForget(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_JOIN: { case SESS_XA_JOIN: {
requiresResponse = true; requiresResponse = true;
SessionXAJoinMessage message = (SessionXAJoinMessage) packet; SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
session.xaJoin(message.getXid()); session.xaJoin(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_RESUME: { case SESS_XA_RESUME: {
requiresResponse = true; requiresResponse = true;
SessionXAResumeMessage message = (SessionXAResumeMessage) packet; SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
session.xaResume(message.getXid()); session.xaResume(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_ROLLBACK: { case SESS_XA_ROLLBACK: {
requiresResponse = true; requiresResponse = true;
SessionXARollbackMessage message = (SessionXARollbackMessage) packet; SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
session.xaRollback(message.getXid()); session.xaRollback(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_START: { case SESS_XA_START: {
requiresResponse = true; requiresResponse = true;
SessionXAStartMessage message = (SessionXAStartMessage) packet; SessionXAStartMessage message = (SessionXAStartMessage) packet;
session.xaStart(message.getXid()); session.xaStart(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_FAILED: { case SESS_XA_FAILED: {
@ -521,14 +524,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_XA_SUSPEND: { case SESS_XA_SUSPEND: {
requiresResponse = true; requiresResponse = true;
session.xaSuspend(); session.xaSuspend();
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_PREPARE: { case SESS_XA_PREPARE: {
requiresResponse = true; requiresResponse = true;
SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet; SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
session.xaPrepare(message.getXid()); session.xaPrepare(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); response = createSessionXAResponseMessage(packet);
break; break;
} }
case SESS_XA_INDOUBT_XIDS: { case SESS_XA_INDOUBT_XIDS: {
@ -557,14 +560,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_STOP: { case SESS_STOP: {
requiresResponse = true; requiresResponse = true;
session.stop(); session.stop();
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
break; break;
} }
case SESS_CLOSE: { case SESS_CLOSE: {
requiresResponse = true; requiresResponse = true;
session.close(false); session.close(false);
// removeConnectionListeners(); // removeConnectionListeners();
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
flush = true; flush = true;
closeChannel = true; closeChannel = true;
break; break;
@ -574,7 +577,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = message.isRequiresResponse(); requiresResponse = message.isRequiresResponse();
session.individualAcknowledge(message.getConsumerID(), message.getMessageID()); session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
break; break;
} }
@ -582,7 +585,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true; requiresResponse = true;
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet; SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet;
session.closeConsumer(message.getConsumerID()); session.closeConsumer(message.getConsumerID());
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
break; break;
} }
case SESS_FORCE_CONSUMER_DELIVERY: { case SESS_FORCE_CONSUMER_DELIVERY: {
@ -591,7 +594,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
break; break;
} }
case PacketImpl.SESS_ADD_METADATA: { case PacketImpl.SESS_ADD_METADATA: {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet; SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
session.addMetaData(message.getKey(), message.getData()); session.addMetaData(message.getKey(), message.getData());
break; break;
@ -600,7 +603,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true; requiresResponse = true;
SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet; SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet;
if (message.isRequiresConfirmations()) { if (message.isRequiresConfirmations()) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
session.addMetaData(message.getKey(), message.getData()); session.addMetaData(message.getKey(), message.getData());
break; break;
@ -609,7 +612,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true; requiresResponse = true;
SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet; SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet;
if (session.addUniqueMetaData(message.getKey(), message.getData())) { if (session.addUniqueMetaData(message.getKey(), message.getData())) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} else { } else {
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData())); response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData()));
} }
@ -617,15 +620,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} }
} }
} catch (ActiveMQIOErrorException e) { } catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) { } catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) { } catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) { } catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
} }
sendResponse(packet, response, flush, closeChannel); sendResponse(packet, response, flush, closeChannel);
} finally { } finally {
@ -633,6 +636,26 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} }
} }
private Packet createNullResponseMessage(Packet packet) {
final Packet response;
if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) {
response = new NullResponseMessage();
} else {
response = new NullResponseMessage_V2(packet.getCorrelationID());
}
return response;
}
private Packet createSessionXAResponseMessage(Packet packet) {
Packet response;
if (packet.isResponseAsync()) {
response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), false, XAResource.XA_OK, null);
} else {
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
return response;
}
private void onSessionAcknowledge(Packet packet) { private void onSessionAcknowledge(Packet packet) {
this.storageManager.setContext(session.getSessionContext()); this.storageManager.setContext(session.getSessionContext());
try { try {
@ -643,18 +666,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = message.isRequiresResponse(); requiresResponse = message.isRequiresResponse();
this.session.acknowledge(message.getConsumerID(), message.getMessageID()); this.session.acknowledge(message.getConsumerID(), message.getMessageID());
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
} catch (ActiveMQIOErrorException e) { } catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) { } catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) { } catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) { } catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
} }
sendResponse(packet, response, false, false); sendResponse(packet, response, false, false);
} finally { } finally {
@ -672,18 +695,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = message.isRequiresResponse(); requiresResponse = message.isRequiresResponse();
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct); this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
if (requiresResponse) { if (requiresResponse) {
response = new NullResponseMessage(); response = createNullResponseMessage(packet);
} }
} catch (ActiveMQIOErrorException e) { } catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) { } catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) { } catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) { } catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
} }
sendResponse(packet, response, false, false); sendResponse(packet, response, false, false);
} finally { } finally {
@ -700,15 +723,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet; SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
session.requestProducerCredits(message.getAddress(), message.getCredits()); session.requestProducerCredits(message.getAddress(), message.getCredits());
} catch (ActiveMQIOErrorException e) { } catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) { } catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) { } catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) { } catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
} }
sendResponse(packet, response, false, false); sendResponse(packet, response, false, false);
} finally { } finally {
@ -725,15 +748,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet; SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
session.receiveConsumerCredits(message.getConsumerID(), message.getCredits()); session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
} catch (ActiveMQIOErrorException e) { } catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) { } catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) { } catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) { } catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
} }
sendResponse(packet, response, false, false); sendResponse(packet, response, false, false);
} finally { } finally {
@ -742,50 +765,68 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} }
private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e, private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet,
ActiveMQIOErrorException e,
boolean requiresResponse, boolean requiresResponse,
Packet response, Packet response,
ServerSession session) { ServerSession session) {
session.markTXFailed(e); session.markTXFailed(e);
if (requiresResponse) { if (requiresResponse) {
logger.debug("Sending exception to client", e); logger.debug("Sending exception to client", e);
response = new ActiveMQExceptionMessage(e); response = convertToExceptionPacket(packet, e);
} else { } else {
ActiveMQServerLogger.LOGGER.caughtException(e); ActiveMQServerLogger.LOGGER.caughtException(e);
} }
return response; return response;
} }
private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e, private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet,
ActiveMQXAException e,
boolean requiresResponse, boolean requiresResponse,
Packet response) { Packet response) {
if (requiresResponse) { if (requiresResponse) {
logger.debug("Sending exception to client", e); logger.debug("Sending exception to client", e);
response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); if (packet.isResponseAsync()) {
response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, e.getMessage());
} else {
response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
}
} else { } else {
ActiveMQServerLogger.LOGGER.caughtXaException(e); ActiveMQServerLogger.LOGGER.caughtXaException(e);
} }
return response; return response;
} }
private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e, private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet,
ActiveMQQueueMaxConsumerLimitReached e,
boolean requiresResponse, boolean requiresResponse,
Packet response) { Packet response) {
if (requiresResponse) { if (requiresResponse) {
logger.debug("Sending exception to client", e); logger.debug("Sending exception to client", e);
response = new ActiveMQExceptionMessage(e); response = convertToExceptionPacket(packet, e);
} else { } else {
ActiveMQServerLogger.LOGGER.caughtException(e); ActiveMQServerLogger.LOGGER.caughtException(e);
} }
return response; return response;
} }
private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e, private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) {
Packet response;
if (packet.isResponseAsync()) {
response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), e);
} else {
response = new ActiveMQExceptionMessage(e);
}
return response;
}
private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet,
ActiveMQException e,
boolean requiresResponse, boolean requiresResponse,
Packet response) { Packet response) {
if (requiresResponse) { if (requiresResponse) {
logger.debug("Sending exception to client", e); logger.debug("Sending exception to client", e);
response = new ActiveMQExceptionMessage(e); response = convertToExceptionPacket(packet, e);
} else { } else {
if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) { if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
logger.debug("Caught exception", e); logger.debug("Caught exception", e);
@ -796,7 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
return response; return response;
} }
private static Packet onCatchThrowableWhileHandlePacket(Throwable t, private static Packet onCatchThrowableWhileHandlePacket(Packet packet,
Throwable t,
boolean requiresResponse, boolean requiresResponse,
Packet response, Packet response,
ServerSession session) { ServerSession session) {
@ -805,7 +847,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t); ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t);
ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException(); ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
activeMQInternalErrorException.initCause(t); activeMQInternalErrorException.initCause(t);
response = new ActiveMQExceptionMessage(activeMQInternalErrorException); response = convertToExceptionPacket(packet, activeMQInternalErrorException);
} else { } else {
ActiveMQServerLogger.LOGGER.caughtException(t); ActiveMQServerLogger.LOGGER.caughtException(t);
} }
@ -827,12 +869,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
public void onError(final int errorCode, final String errorMessage) { public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage)); Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage));
doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel);
doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage); logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket);
} }
} }

View File

@ -126,7 +126,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion> <activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion> <activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion> <activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion> <activemq.version.incrementingVersion>130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag> <activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version> <ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
@ -314,6 +315,11 @@ public class BackupSyncDelay implements Interceptor {
} }
@Override
public void setResponseHandler(ResponseHandler handler) {
throw new UnsupportedOperationException();
}
@Override @Override
public void flushConfirmations() { public void flushConfirmations() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -167,7 +167,24 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
@Override @Override
public void onException(Message message, Exception exception) { public void onException(Message message, Exception exception) {
// TODO Auto-generated method stub latch.countDown();
try {
switch (call) {
case 0:
context.rollback();
break;
case 1:
context.commit();
break;
case 2:
context.close();
break;
default:
throw new IllegalArgumentException("call code " + call);
}
} catch (Exception error1) {
this.error = error1;
}
} }
} }

View File

@ -16,12 +16,23 @@
*/ */
package org.apache.activemq.artemis.jms.tests; package org.apache.activemq.artemis.jms.tests;
import static org.junit.Assert.fail;
import javax.jms.CompletionListener;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.JMSSecurityException; import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties; import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@ -68,9 +79,9 @@ public class SecurityTest extends JMSTestCase {
} }
/** /**
* Login with no user, no password Should allow login (equivalent to guest) * Login with no user, no password Should allow login (equivalent to guest)
*/ */
@Test @Test
public void testLoginNoUserNoPassword() throws Exception { public void testLoginNoUserNoPassword() throws Exception {
createConnection(); createConnection();
@ -170,6 +181,71 @@ public class SecurityTest extends JMSTestCase {
} }
} }
/**
* Login with valid user and password
* But try send to address not authorised - Persistent
* Should not allow and should throw exception
*/
@Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection("guest", "guest");
Session session = connection.createSession();
Destination destination = session.createQueue("guest.cannot.send");
MessageProducer messageProducer = session.createProducer(destination);
try {
messageProducer.send(session.createTextMessage("hello"));
fail("JMSSecurityException expected as guest is not allowed to send");
} catch (JMSSecurityException activeMQSecurityException) {
//pass
}
connection.close();
}
/**
* Login with valid user and password
* But try send to address not authorised - Non Persistent.
* Should have same behaviour as Persistent with exception on send.
*/
@Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connectionFactory.setConfirmationWindowSize(100);
connectionFactory.setBlockOnDurableSend(false);
connectionFactory.setBlockOnNonDurableSend(false);
Connection connection = connectionFactory.createConnection("guest", "guest");
Session session = connection.createSession();
Destination destination = session.createQueue("guest.cannot.send");
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
try {
AtomicReference<Exception> e = new AtomicReference<>();
// messageProducer.send(session.createTextMessage("hello"));
CountDownLatch countDownLatch = new CountDownLatch(1);
messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
@Override
public void onCompletion(Message message) {
countDownLatch.countDown();
}
@Override
public void onException(Message message, Exception exception) {
e.set(exception);
countDownLatch.countDown();
}
});
countDownLatch.await(10, TimeUnit.SECONDS);
if (e.get() != null) {
throw e.get();
}
fail("JMSSecurityException expected as guest is not allowed to send");
} catch (JMSSecurityException activeMQSecurityException) {
activeMQSecurityException.printStackTrace();
}
connection.close();
}
/* Now some client id tests */ /* Now some client id tests */
/** /**

View File

@ -54,6 +54,16 @@
<permission type="browse" roles="guest,def"/> <permission type="browse" roles="guest,def"/>
<permission type="send" roles="guest,def"/> <permission type="send" roles="guest,def"/>
</security-setting> </security-setting>
<security-setting match="guest.cannot.send">
<permission type="createDurableQueue" roles="guest,def"/>
<permission type="deleteDurableQueue" roles="guest,def"/>
<permission type="createNonDurableQueue" roles="guest,def"/>
<permission type="deleteNonDurableQueue" roles="guest,def"/>
<permission type="consume" roles="guest,def"/>
<permission type="browse" roles="guest,def"/>
<permission type="send" roles="def"/>
</security-setting>
</security-settings> </security-settings>
</core> </core>
</configuration> </configuration>