This closes #2187
This commit is contained in:
commit
94be096861
|
@ -41,4 +41,13 @@ public interface SendAcknowledgementHandler {
|
|||
* @param message message sent asynchronously
|
||||
*/
|
||||
void sendAcknowledged(Message message);
|
||||
|
||||
default void sendFailed(Message message, Exception e) {
|
||||
/**
|
||||
* By default ignore failures to preserve compatibility with existing implementations.
|
||||
* If the message makes it to the broker and a failure occurs sendAcknowledge() will
|
||||
* still be invoked just like it always was.
|
||||
*/
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -228,4 +228,7 @@ public interface ActiveMQClientMessageBundle {
|
|||
|
||||
@Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
|
||||
ActiveMQInterruptedException packetTransmissionInterrupted();
|
||||
|
||||
@Message(id = 119063, value = "Cannot send a packet while response cache is full.")
|
||||
IllegalStateException cannotSendPacketWhilstResponseCacheFull();
|
||||
}
|
||||
|
|
|
@ -211,6 +211,9 @@ public interface Channel {
|
|||
*/
|
||||
void setCommandConfirmationHandler(CommandConfirmationHandler handler);
|
||||
|
||||
void setResponseHandler(ResponseHandler handler);
|
||||
|
||||
|
||||
/**
|
||||
* flushes any confirmations on to the connection.
|
||||
*/
|
||||
|
|
|
@ -36,6 +36,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
|
|||
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
|
||||
}
|
||||
|
||||
default boolean isVersionBeforeAsyncResponseChange() {
|
||||
int version = getChannelVersion();
|
||||
return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the client protocol used on the communication. This will determine if the client has
|
||||
* support for certain packet types
|
||||
|
|
|
@ -41,6 +41,14 @@ public interface Packet {
|
|||
return INITIAL_PACKET_SIZE;
|
||||
}
|
||||
|
||||
boolean isRequiresResponse();
|
||||
|
||||
boolean isResponseAsync();
|
||||
|
||||
long getCorrelationID();
|
||||
|
||||
void setCorrelationID(long correlationID);
|
||||
|
||||
/**
|
||||
* Returns the channel id of the channel that should handle this packet.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core;
|
||||
|
||||
/**
|
||||
* A ResponseHandler is used by the channel to handle async responses.
|
||||
*/
|
||||
public interface ResponseHandler {
|
||||
|
||||
/**
|
||||
* called by channel after an async response has been received.
|
||||
*
|
||||
* @param packet the packet confirmed
|
||||
*/
|
||||
void handleResponse(Packet packet, Packet response);
|
||||
}
|
|
@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
|||
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
|
||||
|
@ -99,9 +100,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
|
||||
|
@ -168,7 +171,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
sessionChannel.setHandler(handler);
|
||||
|
||||
if (confirmationWindow >= 0) {
|
||||
sessionChannel.setCommandConfirmationHandler(confirmationHandler);
|
||||
setHandlers();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,28 +188,58 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
this.killed = true;
|
||||
}
|
||||
|
||||
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
|
||||
private void setHandlers() {
|
||||
sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
|
||||
|
||||
if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
|
||||
sessionChannel.setResponseHandler(responseHandler);
|
||||
}
|
||||
}
|
||||
|
||||
private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
|
||||
@Override
|
||||
public void commandConfirmed(final Packet packet) {
|
||||
public void commandConfirmed(Packet packet) {
|
||||
responseHandler.handleResponse(packet, null);
|
||||
}
|
||||
};
|
||||
|
||||
private final ResponseHandler responseHandler = new ResponseHandler() {
|
||||
@Override
|
||||
public void handleResponse(Packet packet, Packet response) {
|
||||
final ActiveMQException activeMQException;
|
||||
if (response != null && response.getType() == PacketImpl.EXCEPTION) {
|
||||
ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
|
||||
activeMQException = exceptionResponseMessage.getException();
|
||||
} else {
|
||||
activeMQException = null;
|
||||
}
|
||||
|
||||
if (packet.getType() == PacketImpl.SESS_SEND) {
|
||||
SessionSendMessage ssm = (SessionSendMessage) packet;
|
||||
callSendAck(ssm.getHandler(), ssm.getMessage());
|
||||
callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException);
|
||||
} else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {
|
||||
SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
|
||||
if (!scm.isContinues()) {
|
||||
callSendAck(scm.getHandler(), scm.getMessage());
|
||||
callSendAck(scm.getHandler(), scm.getMessage(), activeMQException);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void callSendAck(SendAcknowledgementHandler handler, final Message message) {
|
||||
private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) {
|
||||
if (handler != null) {
|
||||
handler.sendAcknowledged(message);
|
||||
if (exception == null) {
|
||||
handler.sendAcknowledged(message);
|
||||
} else {
|
||||
handler.sendFailed(message, exception);
|
||||
}
|
||||
} else if (sendAckHandler != null) {
|
||||
sendAckHandler.sendAcknowledged(message);
|
||||
if (exception == null) {
|
||||
sendAckHandler.sendAcknowledged(message);
|
||||
} else {
|
||||
sendAckHandler.sendFailed(message, exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
// Failover utility methods
|
||||
|
@ -243,7 +276,8 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
|
||||
@Override
|
||||
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
|
||||
sessionChannel.setCommandConfirmationHandler(confirmationHandler);
|
||||
setHandlers();
|
||||
|
||||
this.sendAckHandler = handler;
|
||||
}
|
||||
|
||||
|
@ -472,13 +506,15 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
boolean sendBlocking,
|
||||
SendAcknowledgementHandler handler,
|
||||
SimpleString defaultAddress) throws ActiveMQException {
|
||||
SessionSendMessage packet;
|
||||
final SessionSendMessage packet;
|
||||
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
|
||||
} else {
|
||||
} else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
|
||||
packet = new SessionSendMessage(msgI, sendBlocking, handler);
|
||||
} else {
|
||||
boolean responseRequired = confirmationWindow != -1 || sendBlocking;
|
||||
packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
|
||||
}
|
||||
|
||||
if (sendBlocking) {
|
||||
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
|
||||
} else {
|
||||
|
@ -904,7 +940,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
}
|
||||
}
|
||||
|
||||
private static int sendSessionSendContinuationMessage(Channel channel,
|
||||
private int sendSessionSendContinuationMessage(Channel channel,
|
||||
Message msgI,
|
||||
long messageBodySize,
|
||||
boolean sendBlocking,
|
||||
|
@ -912,7 +948,12 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
byte[] chunk,
|
||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||
final SessionSendContinuationMessage chunkPacket;
|
||||
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
|
||||
chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||
} else {
|
||||
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
|
||||
}
|
||||
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
|
||||
//perform a weak form of flow control to avoid OOM on tight loops
|
||||
final CoreRemotingConnection connection = channel.getConnection();
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
|||
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
|
@ -96,6 +97,8 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
private final java.util.Queue<Packet> resendCache;
|
||||
|
||||
private final ResponseCache responseAsyncCache;
|
||||
|
||||
private int firstStoredCommandID;
|
||||
|
||||
private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1);
|
||||
|
@ -138,8 +141,10 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
if (confWindowSize != -1) {
|
||||
resendCache = new ConcurrentLinkedQueue<>();
|
||||
responseAsyncCache = new ResponseCache();
|
||||
} else {
|
||||
resendCache = null;
|
||||
responseAsyncCache = null;
|
||||
}
|
||||
|
||||
this.interceptors = interceptors;
|
||||
|
@ -211,7 +216,11 @@ public final class ChannelImpl implements Channel {
|
|||
lock.lock();
|
||||
|
||||
try {
|
||||
response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause));
|
||||
ActiveMQException activeMQException = ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
|
||||
if (responseAsyncCache != null) {
|
||||
responseAsyncCache.errorAll(activeMQException);
|
||||
}
|
||||
response = new ActiveMQExceptionMessage(activeMQException);
|
||||
|
||||
sendCondition.signal();
|
||||
} finally {
|
||||
|
@ -244,6 +253,10 @@ public final class ChannelImpl implements Channel {
|
|||
this.transferring = transferring;
|
||||
}
|
||||
|
||||
protected ResponseCache getCache() {
|
||||
return responseAsyncCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param timeoutMsg message to log on blocking call failover timeout
|
||||
*/
|
||||
|
@ -270,6 +283,10 @@ public final class ChannelImpl implements Channel {
|
|||
synchronized (sendLock) {
|
||||
packet.setChannelID(id);
|
||||
|
||||
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
|
||||
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
|
||||
}
|
||||
|
@ -291,6 +308,7 @@ public final class ChannelImpl implements Channel {
|
|||
if (resendCache != null && packet.isRequiresConfirmations()) {
|
||||
addResendPacket(packet);
|
||||
}
|
||||
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -301,9 +319,30 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
checkReconnectID(reconnectID);
|
||||
|
||||
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
|
||||
//As the send could block if the response cache cannot add, preventing responses to be handled.
|
||||
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
|
||||
while (!responseAsyncCache.add(packet)) {
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
|
||||
// buffer is full, preventing any incoming buffers being handled and blocking failover
|
||||
connection.getTransportConnection().write(buffer, flush, batch);
|
||||
try {
|
||||
connection.getTransportConnection().write(buffer, flush, batch);
|
||||
} catch (Throwable t) {
|
||||
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
|
||||
//The client would get still know about this as the exception bubbles up the call stack instead.
|
||||
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
|
||||
responseAsyncCache.remove(packet.getCorrelationID());
|
||||
}
|
||||
throw t;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -391,7 +430,7 @@ public final class ChannelImpl implements Channel {
|
|||
throw new ActiveMQInterruptedException(e);
|
||||
}
|
||||
|
||||
if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) {
|
||||
if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
|
||||
ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
|
||||
}
|
||||
|
||||
|
@ -477,6 +516,18 @@ public final class ChannelImpl implements Channel {
|
|||
commandConfirmationHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResponseHandler(final ResponseHandler responseHandler) {
|
||||
if (confWindowSize < 0) {
|
||||
final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information.";
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg);
|
||||
}
|
||||
throw new IllegalStateException(msg);
|
||||
}
|
||||
responseAsyncCache.setResponseHandler(responseHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setHandler(final ChannelHandler handler) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -595,6 +646,12 @@ public final class ChannelImpl implements Channel {
|
|||
}
|
||||
}
|
||||
|
||||
public void handleAsyncResponse(Packet packet) {
|
||||
if (responseAsyncCache != null && packet.isResponseAsync()) {
|
||||
responseAsyncCache.handleResponse(packet);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void confirm(final Packet packet) {
|
||||
if (resendCache != null && packet.isRequiresConfirmations()) {
|
||||
|
@ -647,6 +704,7 @@ public final class ChannelImpl implements Channel {
|
|||
if (packet.isResponse()) {
|
||||
confirm(packet);
|
||||
|
||||
handleAsyncResponse(packet);
|
||||
lock.lock();
|
||||
|
||||
try {
|
||||
|
@ -698,6 +756,9 @@ public final class ChannelImpl implements Channel {
|
|||
if (commandConfirmationHandler != null) {
|
||||
commandConfirmationHandler.commandConfirmed(packet);
|
||||
}
|
||||
if (responseAsyncCache != null) {
|
||||
responseAsyncCache.handleResponse(packet);
|
||||
}
|
||||
}
|
||||
|
||||
firstStoredCommandID += numberToClear;
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
|
||||
|
@ -71,6 +72,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
|
||||
|
@ -81,6 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
|
||||
|
@ -88,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
|
||||
|
@ -184,13 +188,25 @@ public abstract class PacketDecoder implements Serializable {
|
|||
break;
|
||||
}
|
||||
case EXCEPTION: {
|
||||
packet = new ActiveMQExceptionMessage();
|
||||
if (connection.isVersionBeforeAsyncResponseChange()) {
|
||||
packet = new ActiveMQExceptionMessage();
|
||||
} else {
|
||||
packet = new ActiveMQExceptionMessage_V2();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case PACKETS_CONFIRMED: {
|
||||
packet = new PacketsConfirmedMessage();
|
||||
break;
|
||||
}
|
||||
case NULL_RESPONSE: {
|
||||
if (connection.isVersionBeforeAsyncResponseChange()) {
|
||||
packet = new NullResponseMessage();
|
||||
} else {
|
||||
packet = new NullResponseMessage_V2();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CREATESESSION: {
|
||||
packet = new CreateSessionMessage();
|
||||
break;
|
||||
|
@ -316,7 +332,11 @@ public abstract class PacketDecoder implements Serializable {
|
|||
break;
|
||||
}
|
||||
case SESS_XA_RESP: {
|
||||
packet = new SessionXAResponseMessage();
|
||||
if (connection.isVersionBeforeAsyncResponseChange()) {
|
||||
packet = new SessionXAResponseMessage();
|
||||
} else {
|
||||
packet = new SessionXAResponseMessage_V2();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case SESS_XA_ROLLBACK: {
|
||||
|
@ -383,16 +403,16 @@ public abstract class PacketDecoder implements Serializable {
|
|||
packet = new SessionIndividualAcknowledgeMessage();
|
||||
break;
|
||||
}
|
||||
case NULL_RESPONSE: {
|
||||
packet = new NullResponseMessage();
|
||||
break;
|
||||
}
|
||||
case SESS_RECEIVE_CONTINUATION: {
|
||||
packet = new SessionReceiveContinuationMessage();
|
||||
break;
|
||||
}
|
||||
case SESS_SEND_CONTINUATION: {
|
||||
packet = new SessionSendContinuationMessage();
|
||||
if (connection.isVersionBeforeAsyncResponseChange()) {
|
||||
packet = new SessionSendContinuationMessage();
|
||||
} else {
|
||||
packet = new SessionSendContinuationMessage_V2();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case SESS_PRODUCER_REQUEST_CREDITS: {
|
||||
|
|
|
@ -31,7 +31,9 @@ public class PacketImpl implements Packet {
|
|||
|
||||
// 2.0.0
|
||||
public static final int ADDRESSING_CHANGE_VERSION = 129;
|
||||
public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
|
||||
|
||||
// 2.7.0
|
||||
public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
|
||||
|
||||
|
||||
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
||||
|
@ -272,6 +274,7 @@ public class PacketImpl implements Packet {
|
|||
|
||||
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
|
||||
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
public PacketImpl(final byte type) {
|
||||
|
@ -428,7 +431,7 @@ public class PacketImpl implements Packet {
|
|||
}
|
||||
|
||||
protected String getParentString() {
|
||||
return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
|
||||
return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName();
|
||||
}
|
||||
|
||||
private int stringEncodeSize(final String str) {
|
||||
|
@ -439,5 +442,24 @@ public class PacketImpl implements Packet {
|
|||
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResponseAsync() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCorrelationID() {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCorrelationID(long correlationID) {
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
|
||||
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
|
||||
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
|
||||
|
||||
public class ResponseCache {
|
||||
|
||||
private final AtomicLong sequence = new AtomicLong(0);
|
||||
|
||||
private final ConcurrentLongHashMap<Packet> store;
|
||||
private ResponseHandler responseHandler;
|
||||
|
||||
public ResponseCache() {
|
||||
this.store = new ConcurrentLongHashMap<>();
|
||||
}
|
||||
|
||||
public long nextCorrelationID() {
|
||||
return sequence.incrementAndGet();
|
||||
}
|
||||
|
||||
public boolean add(Packet packet) {
|
||||
this.store.put(packet.getCorrelationID(), packet);
|
||||
return true;
|
||||
}
|
||||
|
||||
public Packet remove(long correlationID) {
|
||||
return store.remove(correlationID);
|
||||
}
|
||||
|
||||
public void handleResponse(Packet response) {
|
||||
long correlationID = response.getCorrelationID();
|
||||
Packet packet = remove(correlationID);
|
||||
if (packet != null) {
|
||||
responseHandler.handleResponse(packet, response);
|
||||
}
|
||||
}
|
||||
|
||||
public void errorAll(ActiveMQException exception) {
|
||||
ConcurrentLongHashSet keys = store.keysLongHashSet();
|
||||
keys.forEach(correlationID -> {
|
||||
handleResponse(new ActiveMQExceptionMessage_V2(correlationID, exception));
|
||||
});
|
||||
}
|
||||
|
||||
public void setResponseHandler(ResponseHandler responseHandler) {
|
||||
this.responseHandler = responseHandler;
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return this.store.size();
|
||||
}
|
||||
}
|
|
@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
|||
|
||||
public class ActiveMQExceptionMessage extends PacketImpl {
|
||||
|
||||
private ActiveMQException exception;
|
||||
protected ActiveMQException exception;
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class ActiveMQExceptionMessage_V2 extends ActiveMQExceptionMessage {
|
||||
|
||||
private long correlationID;
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
|
||||
public ActiveMQExceptionMessage_V2(final long correlationID, final ActiveMQException exception) {
|
||||
super(exception);
|
||||
this.correlationID = correlationID;
|
||||
}
|
||||
|
||||
public ActiveMQExceptionMessage_V2() {
|
||||
super();
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public boolean isResponse() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeLong(correlationID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||
correlationID = buffer.readLong();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isResponseAsync() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCorrelationID() {
|
||||
return this.correlationID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getParentString() + ", exception= " + exception + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (!super.equals(obj)) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof ActiveMQExceptionMessage_V2)) {
|
||||
return false;
|
||||
}
|
||||
ActiveMQExceptionMessage_V2 other = (ActiveMQExceptionMessage_V2) obj;
|
||||
if (correlationID != other.correlationID) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -65,6 +65,7 @@ public class CreateAddressMessage extends PacketImpl {
|
|||
return address;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return requiresResponse;
|
||||
}
|
||||
|
|
|
@ -100,6 +100,7 @@ public class CreateQueueMessage extends PacketImpl {
|
|||
return temporary;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return requiresResponse;
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ public class CreateSharedQueueMessage extends PacketImpl {
|
|||
return filterString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return requiresResponse;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class NullResponseMessage_V2 extends NullResponseMessage {
|
||||
|
||||
private long correlationID;
|
||||
|
||||
public NullResponseMessage_V2(final long correlationID) {
|
||||
super();
|
||||
this.correlationID = correlationID;
|
||||
}
|
||||
|
||||
public NullResponseMessage_V2() {
|
||||
super();
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public long getCorrelationID() {
|
||||
return correlationID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeLong(correlationID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||
correlationID = buffer.readLong();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isResponse() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isResponseAsync() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getParentString() + ", correlationID=" + correlationID + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (!super.equals(obj)) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof NullResponseMessage_V2)) {
|
||||
return false;
|
||||
}
|
||||
NullResponseMessage_V2 other = (NullResponseMessage_V2) obj;
|
||||
if (correlationID != other.correlationID) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -51,6 +51,7 @@ public class SessionAcknowledgeMessage extends PacketImpl {
|
|||
return messageID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return requiresResponse;
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
|
|||
return browseOnly;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return requiresResponse;
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl {
|
|||
return messageID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return requiresResponse;
|
||||
}
|
||||
|
|
|
@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
|
|||
*/
|
||||
public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
||||
|
||||
private boolean requiresResponse;
|
||||
protected boolean requiresResponse;
|
||||
|
||||
// Used on confirmation handling
|
||||
private Message message;
|
||||
protected Message message;
|
||||
/**
|
||||
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
|
||||
* <br>
|
||||
|
@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
|||
/**
|
||||
* to be sent on the last package
|
||||
*/
|
||||
private long messageBodySize = -1;
|
||||
protected long messageBodySize = -1;
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
|
@ -54,6 +54,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
|||
handler = null;
|
||||
}
|
||||
|
||||
protected SessionSendContinuationMessage(byte type) {
|
||||
super(type);
|
||||
handler = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param body
|
||||
* @param continues
|
||||
|
@ -72,11 +77,31 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
|||
this.messageBodySize = messageBodySize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param body
|
||||
* @param continues
|
||||
* @param requiresResponse
|
||||
*/
|
||||
protected SessionSendContinuationMessage(final byte type,
|
||||
final Message message,
|
||||
final byte[] body,
|
||||
final boolean continues,
|
||||
final boolean requiresResponse,
|
||||
final long messageBodySize,
|
||||
SendAcknowledgementHandler handler) {
|
||||
super(type, body, continues);
|
||||
this.requiresResponse = requiresResponse;
|
||||
this.message = message;
|
||||
this.handler = handler;
|
||||
this.messageBodySize = messageBodySize;
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
/**
|
||||
* @return the requiresResponse
|
||||
*/
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return requiresResponse;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
/**
|
||||
* A SessionSendContinuationMessage<br>
|
||||
*/
|
||||
public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage {
|
||||
|
||||
private long correlationID;
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
|
||||
public SessionSendContinuationMessage_V2() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param body
|
||||
* @param continues
|
||||
* @param requiresResponse
|
||||
*/
|
||||
public SessionSendContinuationMessage_V2(final Message message,
|
||||
final byte[] body,
|
||||
final boolean continues,
|
||||
final boolean requiresResponse,
|
||||
final long messageBodySize,
|
||||
SendAcknowledgementHandler handler) {
|
||||
super(message, body, continues, requiresResponse, messageBodySize, handler);
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public int expectedEncodeSize() {
|
||||
return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeLong(correlationID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||
correlationID = buffer.readLong();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCorrelationID() {
|
||||
return this.correlationID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCorrelationID(long correlationID) {
|
||||
this.correlationID = correlationID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResponseAsync() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer(getParentString());
|
||||
buff.append(", continues=" + continues);
|
||||
buff.append(", message=" + message);
|
||||
buff.append(", messageBodySize=" + messageBodySize);
|
||||
buff.append(", requiresResponse=" + requiresResponse);
|
||||
buff.append(", correlationID=" + correlationID);
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (!super.equals(obj))
|
||||
return false;
|
||||
if (!(obj instanceof SessionSendContinuationMessage_V2))
|
||||
return false;
|
||||
SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj;
|
||||
if (correlationID != other.correlationID)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class SessionSendMessage extends MessagePacket {
|
||||
|
||||
|
@ -36,6 +37,22 @@ public class SessionSendMessage extends MessagePacket {
|
|||
*/
|
||||
private final transient SendAcknowledgementHandler handler;
|
||||
|
||||
/** This will be using the CoreMessage because it is meant for the core-protocol */
|
||||
protected SessionSendMessage(final byte id,
|
||||
final ICoreMessage message,
|
||||
final boolean requiresResponse,
|
||||
final SendAcknowledgementHandler handler) {
|
||||
super(id, message);
|
||||
this.handler = handler;
|
||||
this.requiresResponse = requiresResponse;
|
||||
}
|
||||
|
||||
protected SessionSendMessage(final byte id,
|
||||
final CoreMessage message) {
|
||||
super(id, message);
|
||||
this.handler = null;
|
||||
}
|
||||
|
||||
/** This will be using the CoreMessage because it is meant for the core-protocol */
|
||||
public SessionSendMessage(final ICoreMessage message,
|
||||
final boolean requiresResponse,
|
||||
|
@ -52,6 +69,7 @@ public class SessionSendMessage extends MessagePacket {
|
|||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return requiresResponse;
|
||||
}
|
||||
|
@ -62,7 +80,7 @@ public class SessionSendMessage extends MessagePacket {
|
|||
|
||||
@Override
|
||||
public int expectedEncodeSize() {
|
||||
return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1;
|
||||
return message.getEncodeSize() + PACKET_HEADERS_SIZE + fieldsEncodeSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,13 +93,16 @@ public class SessionSendMessage extends MessagePacket {
|
|||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
// Buffer comes in after having read standard headers and positioned at Beginning of body part
|
||||
|
||||
ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
|
||||
ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), fieldsEncodeSize());
|
||||
receiveMessage(messageBuffer);
|
||||
|
||||
buffer.readerIndex(buffer.capacity() - 1);
|
||||
buffer.readerIndex(buffer.capacity() - fieldsEncodeSize());
|
||||
|
||||
requiresResponse = buffer.readBoolean();
|
||||
}
|
||||
|
||||
protected int fieldsEncodeSize() {
|
||||
return DataConstants.SIZE_BOOLEAN;
|
||||
}
|
||||
|
||||
protected void receiveMessage(ByteBuf messageBuffer) {
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class SessionSendMessage_V2 extends SessionSendMessage {
|
||||
|
||||
private long correlationID;
|
||||
|
||||
/** This will be using the CoreMessage because it is meant for the core-protocol */
|
||||
public SessionSendMessage_V2(final ICoreMessage message,
|
||||
final boolean requiresResponse,
|
||||
final SendAcknowledgementHandler handler) {
|
||||
super(SESS_SEND, message, requiresResponse, handler);
|
||||
}
|
||||
|
||||
public SessionSendMessage_V2(final CoreMessage message) {
|
||||
super(SESS_SEND, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeLong(correlationID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
correlationID = buffer.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int fieldsEncodeSize() {
|
||||
return super.fieldsEncodeSize() + DataConstants.SIZE_LONG;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCorrelationID() {
|
||||
return this.correlationID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCorrelationID(long correlationID) {
|
||||
this.correlationID = correlationID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResponseAsync() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer(getParentString());
|
||||
buff.append(", correlationID=" + correlationID);
|
||||
buff.append(", requiresResponse=" + super.isRequiresResponse());
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (!super.equals(obj))
|
||||
return false;
|
||||
if (!(obj instanceof SessionSendMessage_V2))
|
||||
return false;
|
||||
SessionSendMessage_V2 other = (SessionSendMessage_V2) obj;
|
||||
if (correlationID != other.correlationID)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
|
@ -21,11 +21,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
|||
|
||||
public class SessionXAResponseMessage extends PacketImpl {
|
||||
|
||||
private boolean error;
|
||||
protected boolean error;
|
||||
|
||||
private int responseCode;
|
||||
protected int responseCode;
|
||||
|
||||
private String message;
|
||||
protected String message;
|
||||
|
||||
public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message) {
|
||||
super(SESS_XA_RESP);
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage {
|
||||
|
||||
private long correlationID;
|
||||
|
||||
public SessionXAResponseMessage_V2(final long correlationID, final boolean isError, final int responseCode, final String message) {
|
||||
super(isError, responseCode, message);
|
||||
this.correlationID = correlationID;
|
||||
}
|
||||
|
||||
public SessionXAResponseMessage_V2() {
|
||||
super();
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public long getCorrelationID() {
|
||||
return correlationID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeLong(correlationID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||
correlationID = buffer.readLong();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isResponse() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isResponseAsync() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer(getParentString());
|
||||
buff.append(", error=" + error);
|
||||
buff.append(", message=" + message);
|
||||
buff.append(", responseCode=" + responseCode);
|
||||
buff.append(", correlationID=" + correlationID);
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (!super.equals(obj)) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof SessionXAResponseMessage_V2)) {
|
||||
return false;
|
||||
}
|
||||
SessionXAResponseMessage_V2 other = (SessionXAResponseMessage_V2) obj;
|
||||
if (correlationID != other.correlationID) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
|
|||
activemq.version.microVersion=${activemq.version.microVersion}
|
||||
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
||||
activemq.version.versionTag=${activemq.version.versionTag}
|
||||
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129
|
||||
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130
|
||||
|
|
|
@ -0,0 +1,512 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Channel;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
|
||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ChannelImplTest {
|
||||
|
||||
ChannelImpl channel;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
channel = new ChannelImpl(new CoreRR(), 1, 4000, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCorrelation() {
|
||||
|
||||
AtomicInteger handleResponseCount = new AtomicInteger();
|
||||
|
||||
RequestPacket requestPacket = new RequestPacket((byte) 1);
|
||||
setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
|
||||
|
||||
channel.send(requestPacket);
|
||||
|
||||
assertEquals(1, channel.getCache().size());
|
||||
|
||||
ResponsePacket responsePacket = new ResponsePacket((byte) 1);
|
||||
responsePacket.setCorrelationID(requestPacket.getCorrelationID());
|
||||
|
||||
channel.handlePacket(responsePacket);
|
||||
|
||||
assertEquals(1, handleResponseCount.get());
|
||||
assertEquals(0, channel.getCache().size());
|
||||
}
|
||||
|
||||
private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
|
||||
channel.setResponseHandler(responseHandler);
|
||||
channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler));
|
||||
}
|
||||
|
||||
private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
|
||||
return new CommandConfirmationHandler() {
|
||||
@Override
|
||||
public void commandConfirmed(Packet packet) {
|
||||
responseHandler.handleResponse(packet, null);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPacketsConfirmedMessage() {
|
||||
|
||||
AtomicInteger handleResponseCount = new AtomicInteger();
|
||||
|
||||
RequestPacket requestPacket = new RequestPacket((byte) 1);
|
||||
setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
|
||||
|
||||
channel.send(requestPacket);
|
||||
|
||||
PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2);
|
||||
|
||||
channel.handlePacket(responsePacket);
|
||||
|
||||
assertEquals(0, channel.getCache().size());
|
||||
}
|
||||
|
||||
class RequestPacket extends PacketImpl {
|
||||
|
||||
private long id;
|
||||
|
||||
RequestPacket(byte type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRequiresResponse() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResponseAsync() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCorrelationID() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCorrelationID(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPacketSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
class ResponsePacket extends PacketImpl {
|
||||
|
||||
private long id;
|
||||
|
||||
ResponsePacket(byte type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResponseAsync() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResponse() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCorrelationID() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCorrelationID(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPacketSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
class CoreRR implements CoreRemotingConnection {
|
||||
|
||||
@Override
|
||||
public int getChannelVersion() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setChannelVersion(int clientVersion) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Channel getChannel(long channelID, int confWindowSize) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putChannel(long channelID, Channel channel) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeChannel(long channelID) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long generateChannelID() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void syncIDGeneratorSequence(long id) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getIDGeneratorSequence() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockingCallTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockingCallFailoverTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getTransferLock() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean blockUntilWritable(int size, long timeout) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getID() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCreationTime() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRemoteAddress() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scheduledFlush() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addFailureListener(FailureListener listener) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeFailureListener(FailureListener listener) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addCloseListener(CloseListener listener) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeCloseListener(CloseListener listener) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CloseListener> removeCloseListeners() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCloseListeners(List<CloseListener> listeners) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FailureListener> getFailureListeners() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FailureListener> removeFailureListeners() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFailureListeners(List<FailureListener> listeners) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActiveMQBuffer createTransportBuffer(int size) {
|
||||
return new ChannelBufferWrapper(Unpooled.buffer(size));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(ActiveMQException me) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getTransportConnection() {
|
||||
return new Connection() {
|
||||
@Override
|
||||
public ActiveMQBuffer createTransportBuffer(int size) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemotingConnection getProtocolConnection() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProtocolConnection(RemotingConnection connection) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener listener) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fireReady(boolean ready) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAutoRead(boolean autoRead) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getID() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer buffer,
|
||||
boolean flush,
|
||||
boolean batched,
|
||||
ChannelFutureListener futureListener) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer buffer) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceClose() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRemoteAddress() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLocalAddress() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkFlushBatchBuffer() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransportConfiguration getConnectorConfig() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUsingProtocolHandling() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSameTarget(TransportConfiguration... configs) {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClient() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDestroyed() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect(boolean criticalError) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect(String scaleDownNodeID, boolean criticalError) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkDataReceived() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable(ReadyListener callback) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void killMessage(SimpleString nodeID) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupportReconnect() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupportsFlowControl() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Subject getSubject() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProtocolName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClientID(String cID) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClientID() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransportLocalAddress() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -34,6 +34,8 @@ import javax.jms.TextMessage;
|
|||
import javax.jms.Topic;
|
||||
import javax.jms.TopicPublisher;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
|
@ -563,6 +565,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
|||
private final Message jmsMessage;
|
||||
private final ActiveMQMessageProducer producer;
|
||||
|
||||
/**
|
||||
* It's possible that this SendAcknowledgementHandler might be called twice due to subsequent
|
||||
* packet confirmations on the same connection. Using this boolean avoids that possibility.
|
||||
* A new CompletionListenerWrapper is created for each message sent so once it's called once
|
||||
* it will never be called again.
|
||||
*/
|
||||
private AtomicBoolean active = new AtomicBoolean(true);
|
||||
|
||||
/**
|
||||
* @param jmsMessage
|
||||
* @param producer
|
||||
|
@ -577,26 +587,62 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
|||
|
||||
@Override
|
||||
public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
|
||||
if (jmsMessage instanceof StreamMessage) {
|
||||
try {
|
||||
((StreamMessage) jmsMessage).reset();
|
||||
} catch (JMSException e) {
|
||||
// HORNETQ-1209 XXX ignore?
|
||||
if (active.get()) {
|
||||
if (jmsMessage instanceof StreamMessage) {
|
||||
try {
|
||||
((StreamMessage) jmsMessage).reset();
|
||||
} catch (JMSException e) {
|
||||
// HORNETQ-1209 XXX ignore?
|
||||
}
|
||||
}
|
||||
}
|
||||
if (jmsMessage instanceof BytesMessage) {
|
||||
try {
|
||||
((BytesMessage) jmsMessage).reset();
|
||||
} catch (JMSException e) {
|
||||
// HORNETQ-1209 XXX ignore?
|
||||
if (jmsMessage instanceof BytesMessage) {
|
||||
try {
|
||||
((BytesMessage) jmsMessage).reset();
|
||||
} catch (JMSException e) {
|
||||
// HORNETQ-1209 XXX ignore?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
producer.connection.getThreadAwareContext().setCurrentThread(true);
|
||||
completionListener.onCompletion(jmsMessage);
|
||||
} finally {
|
||||
producer.connection.getThreadAwareContext().clearCurrentThread(true);
|
||||
try {
|
||||
producer.connection.getThreadAwareContext().setCurrentThread(true);
|
||||
completionListener.onCompletion(jmsMessage);
|
||||
} finally {
|
||||
producer.connection.getThreadAwareContext().clearCurrentThread(true);
|
||||
active.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
|
||||
if (active.get()) {
|
||||
if (jmsMessage instanceof StreamMessage) {
|
||||
try {
|
||||
((StreamMessage) jmsMessage).reset();
|
||||
} catch (JMSException e) {
|
||||
// HORNETQ-1209 XXX ignore?
|
||||
}
|
||||
}
|
||||
if (jmsMessage instanceof BytesMessage) {
|
||||
try {
|
||||
((BytesMessage) jmsMessage).reset();
|
||||
} catch (JMSException e) {
|
||||
// HORNETQ-1209 XXX ignore?
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
producer.connection.getThreadAwareContext().setCurrentThread(true);
|
||||
if (exception instanceof ActiveMQException) {
|
||||
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
|
||||
} else if (exception instanceof ActiveMQInterruptedException) {
|
||||
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
|
||||
}
|
||||
completionListener.onException(jmsMessage, exception);
|
||||
} finally {
|
||||
producer.connection.getThreadAwareContext().clearCurrentThread(true);
|
||||
active.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
|
||||
|
@ -90,8 +91,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
|||
|
||||
if (connection.isVersionBeforeAddressChange()) {
|
||||
sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
|
||||
} else {
|
||||
} else if (connection.isVersionBeforeAsyncResponseChange()) {
|
||||
sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
|
||||
} else {
|
||||
sendMessage = new SessionSendMessage_V2(new CoreMessage(this.coreMessageObjectPools));
|
||||
}
|
||||
|
||||
sendMessage.decode(in);
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
|
||||
|
@ -76,11 +77,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
|
||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||
|
@ -313,7 +316,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = message.isRequiresResponse();
|
||||
sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -342,7 +345,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = request.isRequiresResponse();
|
||||
session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated());
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -351,7 +354,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = request.isRequiresResponse();
|
||||
session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable());
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -361,7 +364,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
|
||||
request.isExclusive(), request.isLastValue(), request.isAutoCreated());
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -373,7 +376,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString());
|
||||
}
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -385,7 +388,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue());
|
||||
}
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -393,7 +396,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = true;
|
||||
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
|
||||
session.deleteQueue(request.getQueueName());
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_QUEUEQUERY: {
|
||||
|
@ -453,62 +456,62 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_COMMIT: {
|
||||
requiresResponse = true;
|
||||
session.commit();
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_ROLLBACK: {
|
||||
requiresResponse = true;
|
||||
session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered());
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_COMMIT: {
|
||||
requiresResponse = true;
|
||||
SessionXACommitMessage message = (SessionXACommitMessage) packet;
|
||||
session.xaCommit(message.getXid(), message.isOnePhase());
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_END: {
|
||||
requiresResponse = true;
|
||||
SessionXAEndMessage message = (SessionXAEndMessage) packet;
|
||||
session.xaEnd(message.getXid());
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_FORGET: {
|
||||
requiresResponse = true;
|
||||
SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
|
||||
session.xaForget(message.getXid());
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_JOIN: {
|
||||
requiresResponse = true;
|
||||
SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
|
||||
session.xaJoin(message.getXid());
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_RESUME: {
|
||||
requiresResponse = true;
|
||||
SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
|
||||
session.xaResume(message.getXid());
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_ROLLBACK: {
|
||||
requiresResponse = true;
|
||||
SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
|
||||
session.xaRollback(message.getXid());
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_START: {
|
||||
requiresResponse = true;
|
||||
SessionXAStartMessage message = (SessionXAStartMessage) packet;
|
||||
session.xaStart(message.getXid());
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_FAILED: {
|
||||
|
@ -521,14 +524,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_XA_SUSPEND: {
|
||||
requiresResponse = true;
|
||||
session.xaSuspend();
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_PREPARE: {
|
||||
requiresResponse = true;
|
||||
SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
|
||||
session.xaPrepare(message.getXid());
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
response = createSessionXAResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_XA_INDOUBT_XIDS: {
|
||||
|
@ -557,14 +560,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_STOP: {
|
||||
requiresResponse = true;
|
||||
session.stop();
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_CLOSE: {
|
||||
requiresResponse = true;
|
||||
session.close(false);
|
||||
// removeConnectionListeners();
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
flush = true;
|
||||
closeChannel = true;
|
||||
break;
|
||||
|
@ -574,7 +577,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = message.isRequiresResponse();
|
||||
session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -582,7 +585,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = true;
|
||||
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet;
|
||||
session.closeConsumer(message.getConsumerID());
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
break;
|
||||
}
|
||||
case SESS_FORCE_CONSUMER_DELIVERY: {
|
||||
|
@ -591,7 +594,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
break;
|
||||
}
|
||||
case PacketImpl.SESS_ADD_METADATA: {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
|
||||
session.addMetaData(message.getKey(), message.getData());
|
||||
break;
|
||||
|
@ -600,7 +603,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = true;
|
||||
SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet;
|
||||
if (message.isRequiresConfirmations()) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
session.addMetaData(message.getKey(), message.getData());
|
||||
break;
|
||||
|
@ -609,7 +612,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = true;
|
||||
SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet;
|
||||
if (session.addUniqueMetaData(message.getKey(), message.getData())) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
} else {
|
||||
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData()));
|
||||
}
|
||||
|
@ -617,15 +620,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
}
|
||||
}
|
||||
} catch (ActiveMQIOErrorException e) {
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
||||
} catch (ActiveMQXAException e) {
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQException e) {
|
||||
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (Throwable t) {
|
||||
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
||||
}
|
||||
sendResponse(packet, response, flush, closeChannel);
|
||||
} finally {
|
||||
|
@ -633,6 +636,26 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
}
|
||||
}
|
||||
|
||||
private Packet createNullResponseMessage(Packet packet) {
|
||||
final Packet response;
|
||||
if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) {
|
||||
response = new NullResponseMessage();
|
||||
} else {
|
||||
response = new NullResponseMessage_V2(packet.getCorrelationID());
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private Packet createSessionXAResponseMessage(Packet packet) {
|
||||
Packet response;
|
||||
if (packet.isResponseAsync()) {
|
||||
response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), false, XAResource.XA_OK, null);
|
||||
} else {
|
||||
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private void onSessionAcknowledge(Packet packet) {
|
||||
this.storageManager.setContext(session.getSessionContext());
|
||||
try {
|
||||
|
@ -643,18 +666,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = message.isRequiresResponse();
|
||||
this.session.acknowledge(message.getConsumerID(), message.getMessageID());
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
} catch (ActiveMQIOErrorException e) {
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
||||
} catch (ActiveMQXAException e) {
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQException e) {
|
||||
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (Throwable t) {
|
||||
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
||||
}
|
||||
sendResponse(packet, response, false, false);
|
||||
} finally {
|
||||
|
@ -672,18 +695,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = message.isRequiresResponse();
|
||||
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
|
||||
if (requiresResponse) {
|
||||
response = new NullResponseMessage();
|
||||
response = createNullResponseMessage(packet);
|
||||
}
|
||||
} catch (ActiveMQIOErrorException e) {
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
||||
} catch (ActiveMQXAException e) {
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQException e) {
|
||||
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (Throwable t) {
|
||||
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
||||
}
|
||||
sendResponse(packet, response, false, false);
|
||||
} finally {
|
||||
|
@ -700,15 +723,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
|
||||
session.requestProducerCredits(message.getAddress(), message.getCredits());
|
||||
} catch (ActiveMQIOErrorException e) {
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
||||
} catch (ActiveMQXAException e) {
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQException e) {
|
||||
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (Throwable t) {
|
||||
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
||||
}
|
||||
sendResponse(packet, response, false, false);
|
||||
} finally {
|
||||
|
@ -725,15 +748,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
|
||||
session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
|
||||
} catch (ActiveMQIOErrorException e) {
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
||||
} catch (ActiveMQXAException e) {
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (ActiveMQException e) {
|
||||
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
||||
} catch (Throwable t) {
|
||||
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
||||
}
|
||||
sendResponse(packet, response, false, false);
|
||||
} finally {
|
||||
|
@ -742,50 +765,68 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
}
|
||||
|
||||
|
||||
private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e,
|
||||
private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet,
|
||||
ActiveMQIOErrorException e,
|
||||
boolean requiresResponse,
|
||||
Packet response,
|
||||
ServerSession session) {
|
||||
session.markTXFailed(e);
|
||||
if (requiresResponse) {
|
||||
logger.debug("Sending exception to client", e);
|
||||
response = new ActiveMQExceptionMessage(e);
|
||||
response = convertToExceptionPacket(packet, e);
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.caughtException(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e,
|
||||
private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet,
|
||||
ActiveMQXAException e,
|
||||
boolean requiresResponse,
|
||||
Packet response) {
|
||||
if (requiresResponse) {
|
||||
logger.debug("Sending exception to client", e);
|
||||
response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
|
||||
if (packet.isResponseAsync()) {
|
||||
response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, e.getMessage());
|
||||
} else {
|
||||
response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
|
||||
}
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.caughtXaException(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e,
|
||||
private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet,
|
||||
ActiveMQQueueMaxConsumerLimitReached e,
|
||||
boolean requiresResponse,
|
||||
Packet response) {
|
||||
if (requiresResponse) {
|
||||
logger.debug("Sending exception to client", e);
|
||||
response = new ActiveMQExceptionMessage(e);
|
||||
response = convertToExceptionPacket(packet, e);
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.caughtException(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e,
|
||||
private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) {
|
||||
Packet response;
|
||||
if (packet.isResponseAsync()) {
|
||||
response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), e);
|
||||
} else {
|
||||
response = new ActiveMQExceptionMessage(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet,
|
||||
ActiveMQException e,
|
||||
boolean requiresResponse,
|
||||
Packet response) {
|
||||
if (requiresResponse) {
|
||||
logger.debug("Sending exception to client", e);
|
||||
response = new ActiveMQExceptionMessage(e);
|
||||
response = convertToExceptionPacket(packet, e);
|
||||
} else {
|
||||
if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
|
||||
logger.debug("Caught exception", e);
|
||||
|
@ -796,7 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
return response;
|
||||
}
|
||||
|
||||
private static Packet onCatchThrowableWhileHandlePacket(Throwable t,
|
||||
private static Packet onCatchThrowableWhileHandlePacket(Packet packet,
|
||||
Throwable t,
|
||||
boolean requiresResponse,
|
||||
Packet response,
|
||||
ServerSession session) {
|
||||
|
@ -805,7 +847,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t);
|
||||
ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
|
||||
activeMQInternalErrorException.initCause(t);
|
||||
response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
|
||||
response = convertToExceptionPacket(packet, activeMQInternalErrorException);
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.caughtException(t);
|
||||
}
|
||||
|
@ -827,12 +869,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
public void onError(final int errorCode, final String errorMessage) {
|
||||
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
|
||||
|
||||
ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage));
|
||||
|
||||
doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
|
||||
Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage));
|
||||
doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage);
|
||||
logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -852,7 +893,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
final Packet response,
|
||||
final boolean flush,
|
||||
final boolean closeChannel) {
|
||||
if (confirmPacket != null) {
|
||||
// don't confirm if the response is an exception
|
||||
if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) {
|
||||
channel.confirm(confirmPacket);
|
||||
|
||||
if (flush) {
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -126,7 +126,7 @@
|
|||
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
||||
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
||||
<activemq.version.microVersion>0</activemq.version.microVersion>
|
||||
<activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
|
||||
<activemq.version.incrementingVersion>130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
|
||||
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
||||
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
|||
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
||||
|
@ -314,6 +315,11 @@ public class BackupSyncDelay implements Interceptor {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResponseHandler(ResponseHandler handler) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushConfirmations() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
|
@ -167,7 +167,24 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
|
|||
|
||||
@Override
|
||||
public void onException(Message message, Exception exception) {
|
||||
// TODO Auto-generated method stub
|
||||
latch.countDown();
|
||||
try {
|
||||
switch (call) {
|
||||
case 0:
|
||||
context.rollback();
|
||||
break;
|
||||
case 1:
|
||||
context.commit();
|
||||
break;
|
||||
case 2:
|
||||
context.close();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("call code " + call);
|
||||
}
|
||||
} catch (Exception error1) {
|
||||
this.error = error1;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,12 +16,28 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.jms.tests;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.CompletionListener;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.IllegalStateException;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSProducer;
|
||||
import javax.jms.JMSSecurityException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
|
||||
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
|
||||
|
@ -68,9 +84,9 @@ public class SecurityTest extends JMSTestCase {
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Login with no user, no password Should allow login (equivalent to guest)
|
||||
*/
|
||||
/**
|
||||
* Login with no user, no password Should allow login (equivalent to guest)
|
||||
*/
|
||||
@Test
|
||||
public void testLoginNoUserNoPassword() throws Exception {
|
||||
createConnection();
|
||||
|
@ -170,6 +186,173 @@ public class SecurityTest extends JMSTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Login with valid user and password
|
||||
* But try send to address not authorised - Persistent
|
||||
* Should not allow and should throw exception
|
||||
*/
|
||||
@Test
|
||||
public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
|
||||
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
|
||||
if (getJmsServer().locateQueue(queueName) == null) {
|
||||
getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||
}
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
Connection connection = connectionFactory.createConnection("guest", "guest");
|
||||
Session session = connection.createSession();
|
||||
Destination destination = session.createQueue(queueName.toString());
|
||||
MessageProducer messageProducer = session.createProducer(destination);
|
||||
try {
|
||||
messageProducer.send(session.createTextMessage("hello"));
|
||||
fail("JMSSecurityException expected as guest is not allowed to send");
|
||||
} catch (JMSSecurityException activeMQSecurityException) {
|
||||
//pass
|
||||
}
|
||||
connection.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Login with valid user and password
|
||||
* But try send to address not authorised - Non Persistent.
|
||||
* Should have same behaviour as Persistent with exception on send.
|
||||
*/
|
||||
@Test
|
||||
public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
|
||||
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
|
||||
if (getJmsServer().locateQueue(queueName) == null) {
|
||||
getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||
}
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
connectionFactory.setConfirmationWindowSize(100);
|
||||
connectionFactory.setBlockOnDurableSend(false);
|
||||
connectionFactory.setBlockOnNonDurableSend(false);
|
||||
Connection connection = connectionFactory.createConnection("guest", "guest");
|
||||
Session session = connection.createSession();
|
||||
Destination destination = session.createQueue(queueName.toString());
|
||||
MessageProducer messageProducer = session.createProducer(destination);
|
||||
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
try {
|
||||
AtomicReference<Exception> e = new AtomicReference<>();
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
|
||||
@Override
|
||||
public void onCompletion(Message message) {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Message message, Exception exception) {
|
||||
e.set(exception);
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
countDownLatch.await(10, TimeUnit.SECONDS);
|
||||
if (e.get() != null) {
|
||||
throw e.get();
|
||||
}
|
||||
fail("JMSSecurityException expected as guest is not allowed to send");
|
||||
} catch (JMSSecurityException activeMQSecurityException) {
|
||||
activeMQSecurityException.printStackTrace();
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using JMS 2 API.
|
||||
*/
|
||||
@Test
|
||||
public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistentJMS2() throws Exception {
|
||||
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
|
||||
if (getJmsServer().locateQueue(queueName) == null) {
|
||||
getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||
}
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
connectionFactory.setConfirmationWindowSize(100);
|
||||
connectionFactory.setBlockOnDurableSend(false);
|
||||
connectionFactory.setBlockOnNonDurableSend(false);
|
||||
JMSContext context = connectionFactory.createContext("guest", "guest");
|
||||
Destination destination = context.createQueue(queueName.toString());
|
||||
JMSProducer messageProducer = context.createProducer();
|
||||
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
try {
|
||||
AtomicReference<Exception> e = new AtomicReference<>();
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
messageProducer.setAsync(new CompletionListener() {
|
||||
@Override
|
||||
public void onCompletion(Message message) {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Message message, Exception exception) {
|
||||
e.set(exception);
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
messageProducer.send(destination, context.createTextMessage("hello"));
|
||||
countDownLatch.await(10, TimeUnit.SECONDS);
|
||||
if (e.get() != null) {
|
||||
throw e.get();
|
||||
}
|
||||
fail("JMSSecurityException expected as guest is not allowed to send");
|
||||
} catch (JMSSecurityException activeMQSecurityException) {
|
||||
activeMQSecurityException.printStackTrace();
|
||||
} finally {
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using a large message.
|
||||
*/
|
||||
@Test
|
||||
public void testLoginValidUserAndPasswordButNotAuthorisedToSendLargeNonPersistent() throws Exception {
|
||||
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
|
||||
if (getJmsServer().locateQueue(queueName) == null) {
|
||||
getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||
}
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
connectionFactory.setConfirmationWindowSize(100);
|
||||
connectionFactory.setBlockOnDurableSend(false);
|
||||
connectionFactory.setBlockOnNonDurableSend(false);
|
||||
connectionFactory.setMinLargeMessageSize(1024);
|
||||
Connection connection = connectionFactory.createConnection("guest", "guest");
|
||||
Session session = connection.createSession();
|
||||
Destination destination = session.createQueue(queueName.toString());
|
||||
MessageProducer messageProducer = session.createProducer(destination);
|
||||
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
try {
|
||||
AtomicReference<Exception> e = new AtomicReference<>();
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
BytesMessage message = session.createBytesMessage();
|
||||
message.writeBytes(new byte[10 * 1024]);
|
||||
messageProducer.send(message, new CompletionListener() {
|
||||
@Override
|
||||
public void onCompletion(Message message) {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Message message, Exception exception) {
|
||||
e.set(exception);
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
countDownLatch.await(10, TimeUnit.SECONDS);
|
||||
if (e.get() != null) {
|
||||
throw e.get();
|
||||
}
|
||||
fail("JMSSecurityException expected as guest is not allowed to send");
|
||||
} catch (JMSSecurityException activeMQSecurityException) {
|
||||
activeMQSecurityException.printStackTrace();
|
||||
}
|
||||
connection.close();
|
||||
}
|
||||
|
||||
/* Now some client id tests */
|
||||
|
||||
/**
|
||||
|
|
|
@ -54,6 +54,16 @@
|
|||
<permission type="browse" roles="guest,def"/>
|
||||
<permission type="send" roles="guest,def"/>
|
||||
</security-setting>
|
||||
|
||||
<security-setting match="guest.cannot.send">
|
||||
<permission type="createDurableQueue" roles="guest,def"/>
|
||||
<permission type="deleteDurableQueue" roles="guest,def"/>
|
||||
<permission type="createNonDurableQueue" roles="guest,def"/>
|
||||
<permission type="deleteNonDurableQueue" roles="guest,def"/>
|
||||
<permission type="consume" roles="guest,def"/>
|
||||
<permission type="browse" roles="guest,def"/>
|
||||
<permission type="send" roles="def"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
</core>
|
||||
</configuration>
|
Loading…
Reference in New Issue