Revert "ARTEMIS-1545 Support JMS 2.0 Completion Listener for Exceptions"
Revert "ARTEMIS-1545 Adding HornetQ 2.4.7 on the mesh to validate send-acks" I'm reverting this as the testsuite is broken.. We will send it back once worked out. This reverts commit8f5b7a1e73
. This reverts commit9b982b3e30
.
This commit is contained in:
parent
386f090372
commit
7514e91ed1
|
@ -41,11 +41,4 @@ public interface SendAcknowledgementHandler {
|
||||||
* @param message message sent asynchronously
|
* @param message message sent asynchronously
|
||||||
*/
|
*/
|
||||||
void sendAcknowledged(Message message);
|
void sendAcknowledged(Message message);
|
||||||
|
|
||||||
default void sendFailed(Message message, Exception e) {
|
|
||||||
//This is to keep old behaviour that would ack even if error,
|
|
||||||
// if anyone custom implemented this interface but doesnt update.
|
|
||||||
sendAcknowledged(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -234,7 +234,4 @@ public interface ActiveMQClientMessageBundle {
|
||||||
|
|
||||||
@Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
|
@Message(id = 119062, value = "Multi-packet transmission (e.g. Large Messages) interrupted because of a reconnection.")
|
||||||
ActiveMQInterruptedException packetTransmissionInterrupted();
|
ActiveMQInterruptedException packetTransmissionInterrupted();
|
||||||
|
|
||||||
@Message(id = 119063, value = "Cannot send a packet while response cache is full.")
|
|
||||||
IllegalStateException cannotSendPacketWhilstResponseCacheFull();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,9 +211,6 @@ public interface Channel {
|
||||||
*/
|
*/
|
||||||
void setCommandConfirmationHandler(CommandConfirmationHandler handler);
|
void setCommandConfirmationHandler(CommandConfirmationHandler handler);
|
||||||
|
|
||||||
void setResponseHandler(ResponseHandler handler);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* flushes any confirmations on to the connection.
|
* flushes any confirmations on to the connection.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -36,11 +36,6 @@ public interface CoreRemotingConnection extends RemotingConnection {
|
||||||
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
|
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
|
||||||
}
|
}
|
||||||
|
|
||||||
default boolean isVersionBeforeAsyncResponseChange() {
|
|
||||||
int version = getChannelVersion();
|
|
||||||
return (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the client protocol used on the communication. This will determine if the client has
|
* Sets the client protocol used on the communication. This will determine if the client has
|
||||||
* support for certain packet types
|
* support for certain packet types
|
||||||
|
|
|
@ -41,14 +41,6 @@ public interface Packet {
|
||||||
return INITIAL_PACKET_SIZE;
|
return INITIAL_PACKET_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isRequiresResponse();
|
|
||||||
|
|
||||||
boolean isResponseAsync();
|
|
||||||
|
|
||||||
long getCorrelationID();
|
|
||||||
|
|
||||||
void setCorrelationID(long correlationID);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the channel id of the channel that should handle this packet.
|
* Returns the channel id of the channel that should handle this packet.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1,30 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.core.protocol.core;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A CommandConfirmationHandler is used by the channel to confirm confirmations of packets.
|
|
||||||
*/
|
|
||||||
public interface ResponseHandler {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* called by channel after a confirmation has been received.
|
|
||||||
*
|
|
||||||
* @param packet the packet confirmed
|
|
||||||
*/
|
|
||||||
void responseHandler(Packet packet, Packet response);
|
|
||||||
}
|
|
|
@ -54,7 +54,6 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
|
||||||
|
@ -63,7 +62,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSess
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ExceptionResponseMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
||||||
|
@ -91,11 +89,9 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
|
||||||
|
@ -168,11 +164,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
sessionChannel.setHandler(handler);
|
sessionChannel.setHandler(handler);
|
||||||
|
|
||||||
if (confirmationWindow >= 0) {
|
if (confirmationWindow >= 0) {
|
||||||
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
|
sessionChannel.setCommandConfirmationHandler(confirmationHandler);
|
||||||
sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
|
|
||||||
} else {
|
|
||||||
sessionChannel.setResponseHandler(responseHandler);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,50 +181,28 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
this.killed = true;
|
this.killed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
|
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
|
||||||
@Override
|
@Override
|
||||||
public void commandConfirmed(Packet packet) {
|
public void commandConfirmed(final Packet packet) {
|
||||||
responseHandler.responseHandler(packet, null);
|
if (packet.getType() == PacketImpl.SESS_SEND) {
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private final ResponseHandler responseHandler = new ResponseHandler() {
|
|
||||||
@Override
|
|
||||||
public void responseHandler(Packet packet, Packet response) {
|
|
||||||
final ActiveMQException activeMQException;
|
|
||||||
if (response != null && response.getType() == PacketImpl.EXCEPTION_RESPONSE) {
|
|
||||||
ExceptionResponseMessage exceptionResponseMessage = (ExceptionResponseMessage) response;
|
|
||||||
activeMQException = exceptionResponseMessage.getException();
|
|
||||||
} else {
|
|
||||||
activeMQException = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (packet.getType() == PacketImpl.SESS_SEND || packet.getType() == PacketImpl.SESS_SEND_V2) {
|
|
||||||
SessionSendMessage ssm = (SessionSendMessage) packet;
|
SessionSendMessage ssm = (SessionSendMessage) packet;
|
||||||
callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException);
|
callSendAck(ssm.getHandler(), ssm.getMessage());
|
||||||
} else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION || packet.getType() == PacketImpl.SESS_SEND_CONTINUATION_V2) {
|
} else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {
|
||||||
SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
|
SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
|
||||||
if (!scm.isContinues()) {
|
if (!scm.isContinues()) {
|
||||||
callSendAck(scm.getHandler(), scm.getMessage(), activeMQException);
|
callSendAck(scm.getHandler(), scm.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) {
|
private void callSendAck(SendAcknowledgementHandler handler, final Message message) {
|
||||||
if (handler != null) {
|
if (handler != null) {
|
||||||
if (exception == null) {
|
|
||||||
handler.sendAcknowledged(message);
|
handler.sendAcknowledged(message);
|
||||||
} else {
|
|
||||||
handler.sendFailed(message, exception);
|
|
||||||
}
|
|
||||||
} else if (sendAckHandler != null) {
|
} else if (sendAckHandler != null) {
|
||||||
if (exception == null) {
|
|
||||||
sendAckHandler.sendAcknowledged(message);
|
sendAckHandler.sendAcknowledged(message);
|
||||||
} else {
|
|
||||||
handler.sendFailed(message, exception);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Failover utility methods
|
// Failover utility methods
|
||||||
|
@ -269,11 +239,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
|
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
|
||||||
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
|
sessionChannel.setCommandConfirmationHandler(confirmationHandler);
|
||||||
sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
|
|
||||||
} else {
|
|
||||||
sessionChannel.setResponseHandler(responseHandler);
|
|
||||||
}
|
|
||||||
this.sendAckHandler = handler;
|
this.sendAckHandler = handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -488,21 +454,15 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
boolean sendBlocking,
|
boolean sendBlocking,
|
||||||
SendAcknowledgementHandler handler,
|
SendAcknowledgementHandler handler,
|
||||||
SimpleString defaultAddress) throws ActiveMQException {
|
SimpleString defaultAddress) throws ActiveMQException {
|
||||||
final SessionSendMessage packet;
|
SessionSendMessage packet;
|
||||||
final byte expectedPacket;
|
|
||||||
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||||
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
|
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
|
||||||
expectedPacket = PacketImpl.NULL_RESPONSE;
|
|
||||||
} else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
|
|
||||||
packet = new SessionSendMessage(msgI, sendBlocking, handler);
|
|
||||||
expectedPacket = PacketImpl.NULL_RESPONSE;
|
|
||||||
} else {
|
} else {
|
||||||
boolean responseRequired = confirmationWindow > 0 || sendBlocking;
|
packet = new SessionSendMessage(msgI, sendBlocking, handler);
|
||||||
packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
|
|
||||||
expectedPacket = PacketImpl.SUCCESS_RESPONSE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sendBlocking) {
|
if (sendBlocking) {
|
||||||
sessionChannel.sendBlocking(packet, expectedPacket);
|
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
|
||||||
} else {
|
} else {
|
||||||
sessionChannel.sendBatched(packet);
|
sessionChannel.sendBatched(packet);
|
||||||
}
|
}
|
||||||
|
@ -880,20 +840,15 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int sendSessionSendContinuationMessage(Channel channel,
|
private static int sendSessionSendContinuationMessage(Channel channel,
|
||||||
Message msgI,
|
Message msgI,
|
||||||
long messageBodySize,
|
long messageBodySize,
|
||||||
boolean sendBlocking,
|
boolean sendBlocking,
|
||||||
boolean lastChunk,
|
boolean lastChunk,
|
||||||
byte[] chunk,
|
byte[] chunk,
|
||||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||||
final boolean requiresResponse = lastChunk || confirmationWindow > 0;
|
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||||
final SessionSendContinuationMessage chunkPacket;
|
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||||
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
|
|
||||||
chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
|
||||||
} else {
|
|
||||||
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
|
||||||
}
|
|
||||||
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
|
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
|
||||||
//perform a weak form of flow control to avoid OOM on tight loops
|
//perform a weak form of flow control to avoid OOM on tight loops
|
||||||
final CoreRemotingConnection connection = channel.getConnection();
|
final CoreRemotingConnection connection = channel.getConnection();
|
||||||
|
@ -910,11 +865,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
}
|
}
|
||||||
if (requiresResponse) {
|
if (requiresResponse) {
|
||||||
// When sending it blocking, only the last chunk will be blocking.
|
// When sending it blocking, only the last chunk will be blocking.
|
||||||
if (sendBlocking) {
|
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
|
||||||
channel.sendBlocking(chunkPacket, PacketImpl.SUCCESS_RESPONSE);
|
|
||||||
} else {
|
|
||||||
channel.send(chunkPacket);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
channel.send(chunkPacket);
|
channel.send(chunkPacket);
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,9 +36,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ExceptionResponseMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.utils.ConcurrentUtil;
|
import org.apache.activemq.artemis.utils.ConcurrentUtil;
|
||||||
|
@ -98,8 +96,6 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
private final java.util.Queue<Packet> resendCache;
|
private final java.util.Queue<Packet> resendCache;
|
||||||
|
|
||||||
private final ResponseCache responseAsyncCache;
|
|
||||||
|
|
||||||
private int firstStoredCommandID;
|
private int firstStoredCommandID;
|
||||||
|
|
||||||
private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1);
|
private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1);
|
||||||
|
@ -126,8 +122,6 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
private CommandConfirmationHandler commandConfirmationHandler;
|
private CommandConfirmationHandler commandConfirmationHandler;
|
||||||
|
|
||||||
private ResponseHandler responseHandler;
|
|
||||||
|
|
||||||
private volatile boolean transferring;
|
private volatile boolean transferring;
|
||||||
|
|
||||||
private final List<Interceptor> interceptors;
|
private final List<Interceptor> interceptors;
|
||||||
|
@ -144,10 +138,8 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
if (confWindowSize != -1) {
|
if (confWindowSize != -1) {
|
||||||
resendCache = new ConcurrentLinkedQueue<>();
|
resendCache = new ConcurrentLinkedQueue<>();
|
||||||
responseAsyncCache = new ResponseCache(confWindowSize);
|
|
||||||
} else {
|
} else {
|
||||||
resendCache = null;
|
resendCache = null;
|
||||||
responseAsyncCache = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.interceptors = interceptors;
|
this.interceptors = interceptors;
|
||||||
|
@ -184,12 +176,6 @@ public final class ChannelImpl implements Channel {
|
||||||
return version >= 129;
|
return version >= 129;
|
||||||
case PacketImpl.SESS_BINDINGQUERY_RESP_V4:
|
case PacketImpl.SESS_BINDINGQUERY_RESP_V4:
|
||||||
return version >= 129;
|
return version >= 129;
|
||||||
case PacketImpl.SESS_SEND_V2:
|
|
||||||
return version >= 130;
|
|
||||||
case PacketImpl.EXCEPTION_RESPONSE:
|
|
||||||
return version >= 130;
|
|
||||||
case PacketImpl.SUCCESS_RESPONSE:
|
|
||||||
return version >= 130;
|
|
||||||
default:
|
default:
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -284,10 +270,6 @@ public final class ChannelImpl implements Channel {
|
||||||
synchronized (sendLock) {
|
synchronized (sendLock) {
|
||||||
packet.setChannelID(id);
|
packet.setChannelID(id);
|
||||||
|
|
||||||
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
|
|
||||||
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
|
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
|
||||||
}
|
}
|
||||||
|
@ -309,10 +291,6 @@ public final class ChannelImpl implements Channel {
|
||||||
if (resendCache != null && packet.isRequiresConfirmations()) {
|
if (resendCache != null && packet.isRequiresConfirmations()) {
|
||||||
addResendPacket(packet);
|
addResendPacket(packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (responseAsyncCache != null && packet.isRequiresResponse()) {
|
|
||||||
addResponseExpectedPacket(packet);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -406,14 +384,14 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && response.getType() != PacketImpl.EXCEPTION_RESPONSE && response.getType() != expectedPacket)) && toWait > 0) {
|
while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket)) && toWait > 0) {
|
||||||
try {
|
try {
|
||||||
sendCondition.await(toWait, TimeUnit.MILLISECONDS);
|
sendCondition.await(toWait, TimeUnit.MILLISECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new ActiveMQInterruptedException(e);
|
throw new ActiveMQInterruptedException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != PacketImpl.EXCEPTION_RESPONSE && response.getType() != expectedPacket) {
|
if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) {
|
||||||
ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
|
ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,16 +424,6 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (response.getType() == PacketImpl.EXCEPTION_RESPONSE) {
|
|
||||||
final ExceptionResponseMessage mem = (ExceptionResponseMessage) response;
|
|
||||||
|
|
||||||
ActiveMQException e = mem.getException();
|
|
||||||
|
|
||||||
e.fillInStackTrace();
|
|
||||||
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -509,18 +477,6 @@ public final class ChannelImpl implements Channel {
|
||||||
commandConfirmationHandler = handler;
|
commandConfirmationHandler = handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setResponseHandler(final ResponseHandler responseHandler) {
|
|
||||||
if (confWindowSize < 0) {
|
|
||||||
final String msg = "You can't set responseHandler on a connection with confirmation-window-size < 0." + " Look at the documentation for more information.";
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " " + msg);
|
|
||||||
}
|
|
||||||
throw new IllegalStateException(msg);
|
|
||||||
}
|
|
||||||
responseAsyncCache.setResponseHandler(responseHandler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setHandler(final ChannelHandler handler) {
|
public void setHandler(final ChannelHandler handler) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -639,12 +595,6 @@ public final class ChannelImpl implements Channel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void handleResponse(Packet packet) {
|
|
||||||
if (responseAsyncCache != null && packet.isResponseAsync()) {
|
|
||||||
responseAsyncCache.handleResponse(packet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void confirm(final Packet packet) {
|
public void confirm(final Packet packet) {
|
||||||
if (resendCache != null && packet.isRequiresConfirmations()) {
|
if (resendCache != null && packet.isRequiresConfirmations()) {
|
||||||
|
@ -697,7 +647,6 @@ public final class ChannelImpl implements Channel {
|
||||||
if (packet.isResponse()) {
|
if (packet.isResponse()) {
|
||||||
confirm(packet);
|
confirm(packet);
|
||||||
|
|
||||||
handleResponse(packet);
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -727,15 +676,6 @@ public final class ChannelImpl implements Channel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addResponseExpectedPacket(Packet packet) {
|
|
||||||
if (packet.isResponseAsync()) {
|
|
||||||
responseAsyncCache.add(packet);
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::addResendPacket adding packet " + packet + " stored correlationID=" + packet.getCorrelationID());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void clearUpTo(final int lastReceivedCommandID) {
|
private void clearUpTo(final int lastReceivedCommandID) {
|
||||||
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
|
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ExceptionResponseMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
|
||||||
|
@ -72,7 +71,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
|
||||||
|
@ -90,7 +88,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SuccessResponseMessage;
|
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
|
||||||
|
@ -108,7 +105,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DIS
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_V2;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_V2;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION_RESPONSE;
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.PING;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.PING;
|
||||||
|
@ -140,7 +136,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION_V2;
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
|
||||||
|
@ -163,7 +158,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUCCESS_RESPONSE;
|
|
||||||
|
|
||||||
public abstract class PacketDecoder implements Serializable {
|
public abstract class PacketDecoder implements Serializable {
|
||||||
|
|
||||||
|
@ -193,22 +187,10 @@ public abstract class PacketDecoder implements Serializable {
|
||||||
packet = new ActiveMQExceptionMessage();
|
packet = new ActiveMQExceptionMessage();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case EXCEPTION_RESPONSE: {
|
|
||||||
packet = new ExceptionResponseMessage();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case PACKETS_CONFIRMED: {
|
case PACKETS_CONFIRMED: {
|
||||||
packet = new PacketsConfirmedMessage();
|
packet = new PacketsConfirmedMessage();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case NULL_RESPONSE: {
|
|
||||||
packet = new NullResponseMessage();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case SUCCESS_RESPONSE: {
|
|
||||||
packet = new SuccessResponseMessage();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CREATESESSION: {
|
case CREATESESSION: {
|
||||||
packet = new CreateSessionMessage();
|
packet = new CreateSessionMessage();
|
||||||
break;
|
break;
|
||||||
|
@ -401,6 +383,10 @@ public abstract class PacketDecoder implements Serializable {
|
||||||
packet = new SessionIndividualAcknowledgeMessage();
|
packet = new SessionIndividualAcknowledgeMessage();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case NULL_RESPONSE: {
|
||||||
|
packet = new NullResponseMessage();
|
||||||
|
break;
|
||||||
|
}
|
||||||
case SESS_RECEIVE_CONTINUATION: {
|
case SESS_RECEIVE_CONTINUATION: {
|
||||||
packet = new SessionReceiveContinuationMessage();
|
packet = new SessionReceiveContinuationMessage();
|
||||||
break;
|
break;
|
||||||
|
@ -409,10 +395,6 @@ public abstract class PacketDecoder implements Serializable {
|
||||||
packet = new SessionSendContinuationMessage();
|
packet = new SessionSendContinuationMessage();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case SESS_SEND_CONTINUATION_V2: {
|
|
||||||
packet = new SessionSendContinuationMessage_V2();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case SESS_PRODUCER_REQUEST_CREDITS: {
|
case SESS_PRODUCER_REQUEST_CREDITS: {
|
||||||
packet = new SessionRequestProducerCreditsMessage();
|
packet = new SessionRequestProducerCreditsMessage();
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -29,13 +29,8 @@ public class PacketImpl implements Packet {
|
||||||
// Constants -------------------------------------------------------------------------
|
// Constants -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
// 2.0.0
|
|
||||||
public static final int ADDRESSING_CHANGE_VERSION = 129;
|
public static final int ADDRESSING_CHANGE_VERSION = 129;
|
||||||
|
|
||||||
// 2.5.0
|
|
||||||
public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
|
|
||||||
|
|
||||||
|
|
||||||
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
||||||
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
|
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
|
||||||
|
|
||||||
|
@ -68,10 +63,6 @@ public class PacketImpl implements Packet {
|
||||||
|
|
||||||
public static final byte PACKETS_CONFIRMED = 22;
|
public static final byte PACKETS_CONFIRMED = 22;
|
||||||
|
|
||||||
public static final byte SUCCESS_RESPONSE = 23;
|
|
||||||
|
|
||||||
public static final byte EXCEPTION_RESPONSE = 24;
|
|
||||||
|
|
||||||
// Server
|
// Server
|
||||||
public static final byte CREATESESSION = 30;
|
public static final byte CREATESESSION = 30;
|
||||||
|
|
||||||
|
@ -276,11 +267,6 @@ public class PacketImpl implements Packet {
|
||||||
|
|
||||||
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
|
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
|
||||||
|
|
||||||
public static final byte SESS_SEND_V2 = -16;
|
|
||||||
|
|
||||||
public static final byte SESS_SEND_CONTINUATION_V2 = -17;
|
|
||||||
|
|
||||||
|
|
||||||
// Static --------------------------------------------------------
|
// Static --------------------------------------------------------
|
||||||
|
|
||||||
public PacketImpl(final byte type) {
|
public PacketImpl(final byte type) {
|
||||||
|
@ -448,24 +434,5 @@ public class PacketImpl implements Packet {
|
||||||
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
|
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isResponseAsync() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getCorrelationID() {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setCorrelationID(long correlationID) {
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,61 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.core.protocol.core.impl;
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
|
|
||||||
|
|
||||||
public class ResponseCache {
|
|
||||||
|
|
||||||
private final AtomicLong sequence = new AtomicLong(0);
|
|
||||||
|
|
||||||
private final ConcurrentLongHashMap<Packet> store;
|
|
||||||
private final int size;
|
|
||||||
private ResponseHandler responseHandler;
|
|
||||||
|
|
||||||
public ResponseCache(int size) {
|
|
||||||
this.store = new ConcurrentLongHashMap<>(size);
|
|
||||||
this.size = size;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long nextCorrelationID() {
|
|
||||||
return sequence.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void add(Packet packet) {
|
|
||||||
if (store.size() + 1 > size) {
|
|
||||||
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketWhilstResponseCacheFull();
|
|
||||||
}
|
|
||||||
this.store.put(packet.getCorrelationID(), packet);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void handleResponse(Packet response) {
|
|
||||||
long correlationID = response.getCorrelationID();
|
|
||||||
Packet packet = store.remove(correlationID);
|
|
||||||
if (packet != null) {
|
|
||||||
responseHandler.responseHandler(packet, response);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setResponseHandler(ResponseHandler responseHandler) {
|
|
||||||
this.responseHandler = responseHandler;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -66,7 +66,6 @@ public class CreateAddressMessage extends PacketImpl {
|
||||||
return address;
|
return address;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
public boolean isRequiresResponse() {
|
||||||
return requiresResponse;
|
return requiresResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,6 @@ public class CreateQueueMessage extends PacketImpl {
|
||||||
return temporary;
|
return temporary;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
public boolean isRequiresResponse() {
|
||||||
return requiresResponse;
|
return requiresResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,6 @@ public class CreateSharedQueueMessage extends PacketImpl {
|
||||||
return filterString;
|
return filterString;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
public boolean isRequiresResponse() {
|
||||||
return requiresResponse;
|
return requiresResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,120 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
|
||||||
|
|
||||||
public class ExceptionResponseMessage extends PacketImpl {
|
|
||||||
|
|
||||||
private ActiveMQException exception;
|
|
||||||
|
|
||||||
private long correlationID;
|
|
||||||
|
|
||||||
// Static --------------------------------------------------------
|
|
||||||
|
|
||||||
// Constructors --------------------------------------------------
|
|
||||||
|
|
||||||
public ExceptionResponseMessage(final long correlationID, final ActiveMQException exception) {
|
|
||||||
super(EXCEPTION_RESPONSE);
|
|
||||||
|
|
||||||
this.correlationID = correlationID;
|
|
||||||
this.exception = exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ExceptionResponseMessage() {
|
|
||||||
super(EXCEPTION_RESPONSE);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isResponse() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ActiveMQException getException() {
|
|
||||||
return exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
|
||||||
buffer.writeLong(correlationID);
|
|
||||||
buffer.writeInt(exception.getType().getCode());
|
|
||||||
buffer.writeNullableString(exception.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
|
||||||
correlationID = buffer.readLong();
|
|
||||||
|
|
||||||
int code = buffer.readInt();
|
|
||||||
String msg = buffer.readNullableString();
|
|
||||||
|
|
||||||
exception = ActiveMQExceptionType.createException(code, msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public final boolean isResponseAsync() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getCorrelationID() {
|
|
||||||
return this.correlationID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getParentString() + ", exception= " + exception + "]";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
final int prime = 31;
|
|
||||||
int result = super.hashCode();
|
|
||||||
result = prime * result + ((exception == null) ? 0 : exception.hashCode());
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (this == obj) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (!super.equals(obj)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!(obj instanceof ExceptionResponseMessage)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
ExceptionResponseMessage other = (ExceptionResponseMessage) obj;
|
|
||||||
if (correlationID != other.correlationID) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (exception == null) {
|
|
||||||
if (other.exception != null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else if (!exception.equals(other.exception)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -51,7 +51,6 @@ public class SessionAcknowledgeMessage extends PacketImpl {
|
||||||
return messageID;
|
return messageID;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
public boolean isRequiresResponse() {
|
||||||
return requiresResponse;
|
return requiresResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,6 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
|
||||||
return browseOnly;
|
return browseOnly;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
public boolean isRequiresResponse() {
|
||||||
return requiresResponse;
|
return requiresResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,6 @@ public class SessionIndividualAcknowledgeMessage extends PacketImpl {
|
||||||
return messageID;
|
return messageID;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
public boolean isRequiresResponse() {
|
||||||
return requiresResponse;
|
return requiresResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
*/
|
*/
|
||||||
public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
||||||
|
|
||||||
protected boolean requiresResponse;
|
private boolean requiresResponse;
|
||||||
|
|
||||||
// Used on confirmation handling
|
// Used on confirmation handling
|
||||||
protected Message message;
|
private Message message;
|
||||||
/**
|
/**
|
||||||
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
|
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
|
||||||
* <br>
|
* <br>
|
||||||
|
@ -43,7 +43,7 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
||||||
/**
|
/**
|
||||||
* to be sent on the last package
|
* to be sent on the last package
|
||||||
*/
|
*/
|
||||||
protected long messageBodySize = -1;
|
private long messageBodySize = -1;
|
||||||
|
|
||||||
// Static --------------------------------------------------------
|
// Static --------------------------------------------------------
|
||||||
|
|
||||||
|
@ -54,11 +54,6 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
||||||
handler = null;
|
handler = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SessionSendContinuationMessage(byte type) {
|
|
||||||
super(type);
|
|
||||||
handler = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param body
|
* @param body
|
||||||
* @param continues
|
* @param continues
|
||||||
|
@ -77,31 +72,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
||||||
this.messageBodySize = messageBodySize;
|
this.messageBodySize = messageBodySize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param body
|
|
||||||
* @param continues
|
|
||||||
* @param requiresResponse
|
|
||||||
*/
|
|
||||||
protected SessionSendContinuationMessage(final byte type,
|
|
||||||
final Message message,
|
|
||||||
final byte[] body,
|
|
||||||
final boolean continues,
|
|
||||||
final boolean requiresResponse,
|
|
||||||
final long messageBodySize,
|
|
||||||
SendAcknowledgementHandler handler) {
|
|
||||||
super(type, body, continues);
|
|
||||||
this.requiresResponse = requiresResponse;
|
|
||||||
this.message = message;
|
|
||||||
this.handler = handler;
|
|
||||||
this.messageBodySize = messageBodySize;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the requiresResponse
|
* @return the requiresResponse
|
||||||
*/
|
*/
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
public boolean isRequiresResponse() {
|
||||||
return requiresResponse;
|
return requiresResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,120 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
|
||||||
import org.apache.activemq.artemis.utils.DataConstants;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A SessionSendContinuationMessage<br>
|
|
||||||
*/
|
|
||||||
public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMessage {
|
|
||||||
|
|
||||||
private long correlationID;
|
|
||||||
|
|
||||||
// Static --------------------------------------------------------
|
|
||||||
|
|
||||||
// Constructors --------------------------------------------------
|
|
||||||
|
|
||||||
public SessionSendContinuationMessage_V2() {
|
|
||||||
super(SESS_SEND_CONTINUATION_V2);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param body
|
|
||||||
* @param continues
|
|
||||||
* @param requiresResponse
|
|
||||||
*/
|
|
||||||
public SessionSendContinuationMessage_V2(final Message message,
|
|
||||||
final byte[] body,
|
|
||||||
final boolean continues,
|
|
||||||
final boolean requiresResponse,
|
|
||||||
final long messageBodySize,
|
|
||||||
SendAcknowledgementHandler handler) {
|
|
||||||
super(SESS_SEND_CONTINUATION_V2, message, body, continues, requiresResponse, messageBodySize, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int expectedEncodeSize() {
|
|
||||||
return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
|
||||||
super.encodeRest(buffer);
|
|
||||||
buffer.writeLong(correlationID);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
|
||||||
super.decodeRest(buffer);
|
|
||||||
correlationID = buffer.readLong();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getCorrelationID() {
|
|
||||||
return this.correlationID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setCorrelationID(long correlationID) {
|
|
||||||
this.correlationID = correlationID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isResponseAsync() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
final int prime = 31;
|
|
||||||
int result = super.hashCode();
|
|
||||||
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
StringBuffer buff = new StringBuffer(getParentString());
|
|
||||||
buff.append(", continues=" + continues);
|
|
||||||
buff.append(", message=" + message);
|
|
||||||
buff.append(", messageBodySize=" + messageBodySize);
|
|
||||||
buff.append(", requiresResponse=" + requiresResponse);
|
|
||||||
buff.append(", correlationID=" + correlationID);
|
|
||||||
buff.append("]");
|
|
||||||
return buff.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (this == obj)
|
|
||||||
return true;
|
|
||||||
if (!super.equals(obj))
|
|
||||||
return false;
|
|
||||||
if (!(obj instanceof SessionSendContinuationMessage_V2))
|
|
||||||
return false;
|
|
||||||
SessionSendContinuationMessage_V2 other = (SessionSendContinuationMessage_V2) obj;
|
|
||||||
if (correlationID != other.correlationID)
|
|
||||||
return false;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -21,7 +21,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||||
import org.apache.activemq.artemis.utils.DataConstants;
|
|
||||||
|
|
||||||
public class SessionSendMessage extends MessagePacket {
|
public class SessionSendMessage extends MessagePacket {
|
||||||
|
|
||||||
|
@ -37,22 +36,6 @@ public class SessionSendMessage extends MessagePacket {
|
||||||
*/
|
*/
|
||||||
private final transient SendAcknowledgementHandler handler;
|
private final transient SendAcknowledgementHandler handler;
|
||||||
|
|
||||||
/** This will be using the CoreMessage because it is meant for the core-protocol */
|
|
||||||
protected SessionSendMessage(final byte id,
|
|
||||||
final ICoreMessage message,
|
|
||||||
final boolean requiresResponse,
|
|
||||||
final SendAcknowledgementHandler handler) {
|
|
||||||
super(id, message);
|
|
||||||
this.handler = handler;
|
|
||||||
this.requiresResponse = requiresResponse;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected SessionSendMessage(final byte id,
|
|
||||||
final CoreMessage message) {
|
|
||||||
super(id, message);
|
|
||||||
this.handler = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** This will be using the CoreMessage because it is meant for the core-protocol */
|
/** This will be using the CoreMessage because it is meant for the core-protocol */
|
||||||
public SessionSendMessage(final ICoreMessage message,
|
public SessionSendMessage(final ICoreMessage message,
|
||||||
final boolean requiresResponse,
|
final boolean requiresResponse,
|
||||||
|
@ -69,7 +52,6 @@ public class SessionSendMessage extends MessagePacket {
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRequiresResponse() {
|
public boolean isRequiresResponse() {
|
||||||
return requiresResponse;
|
return requiresResponse;
|
||||||
}
|
}
|
||||||
|
@ -80,7 +62,7 @@ public class SessionSendMessage extends MessagePacket {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int expectedEncodeSize() {
|
public int expectedEncodeSize() {
|
||||||
return message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_BOOLEAN;
|
return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,113 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|
||||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
|
||||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
|
||||||
import org.apache.activemq.artemis.utils.DataConstants;
|
|
||||||
|
|
||||||
public class SessionSendMessage_V2 extends SessionSendMessage {
|
|
||||||
|
|
||||||
private long correlationID;
|
|
||||||
|
|
||||||
/** This will be using the CoreMessage because it is meant for the core-protocol */
|
|
||||||
public SessionSendMessage_V2(final ICoreMessage message,
|
|
||||||
final boolean requiresResponse,
|
|
||||||
final SendAcknowledgementHandler handler) {
|
|
||||||
super(SESS_SEND_V2, message, requiresResponse, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
public SessionSendMessage_V2(final CoreMessage message) {
|
|
||||||
super(SESS_SEND_V2, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int expectedEncodeSize() {
|
|
||||||
return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void encodeRest(ActiveMQBuffer buffer) {
|
|
||||||
message.sendBuffer(buffer.byteBuf(), 0);
|
|
||||||
buffer.writeLong(correlationID);
|
|
||||||
buffer.writeBoolean(requiresResponse);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
|
||||||
// Buffer comes in after having read standard headers and positioned at Beginning of body part
|
|
||||||
|
|
||||||
ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
|
|
||||||
message.receiveBuffer(messageBuffer);
|
|
||||||
|
|
||||||
buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_BOOLEAN);
|
|
||||||
|
|
||||||
correlationID = buffer.readLong();
|
|
||||||
requiresResponse = buffer.readBoolean();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getCorrelationID() {
|
|
||||||
return this.correlationID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setCorrelationID(long correlationID) {
|
|
||||||
this.correlationID = correlationID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isResponseAsync() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
final int prime = 31;
|
|
||||||
int result = super.hashCode();
|
|
||||||
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
StringBuffer buff = new StringBuffer(getParentString());
|
|
||||||
buff.append(", correlationID=" + correlationID);
|
|
||||||
buff.append(", requiresResponse=" + super.isRequiresResponse());
|
|
||||||
buff.append("]");
|
|
||||||
return buff.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (this == obj)
|
|
||||||
return true;
|
|
||||||
if (!super.equals(obj))
|
|
||||||
return false;
|
|
||||||
if (!(obj instanceof SessionSendMessage_V2))
|
|
||||||
return false;
|
|
||||||
SessionSendMessage_V2 other = (SessionSendMessage_V2) obj;
|
|
||||||
if (correlationID != other.correlationID)
|
|
||||||
return false;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,93 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
|
||||||
|
|
||||||
public class SuccessResponseMessage extends PacketImpl {
|
|
||||||
|
|
||||||
private long correlationID;
|
|
||||||
|
|
||||||
public SuccessResponseMessage(final long correlationID) {
|
|
||||||
super(SUCCESS_RESPONSE);
|
|
||||||
|
|
||||||
this.correlationID = correlationID;
|
|
||||||
}
|
|
||||||
|
|
||||||
public SuccessResponseMessage() {
|
|
||||||
super(SUCCESS_RESPONSE);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getCorrelationID() {
|
|
||||||
return correlationID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
|
||||||
buffer.writeLong(correlationID);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
|
||||||
correlationID = buffer.readLong();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public final boolean isResponse() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public final boolean isResponseAsync() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return getParentString() + ", correlationID=" + correlationID + "]";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
final int prime = 31;
|
|
||||||
int result = super.hashCode();
|
|
||||||
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (this == obj) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (!super.equals(obj)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!(obj instanceof SuccessResponseMessage)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
SuccessResponseMessage other = (SuccessResponseMessage) obj;
|
|
||||||
if (correlationID != other.correlationID) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
|
||||||
activemq.version.microVersion=${activemq.version.microVersion}
|
activemq.version.microVersion=${activemq.version.microVersion}
|
||||||
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
||||||
activemq.version.versionTag=${activemq.version.versionTag}
|
activemq.version.versionTag=${activemq.version.versionTag}
|
||||||
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130
|
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129
|
||||||
|
|
|
@ -578,36 +578,6 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
|
|
||||||
if (jmsMessage instanceof StreamMessage) {
|
|
||||||
try {
|
|
||||||
((StreamMessage) jmsMessage).reset();
|
|
||||||
} catch (JMSException e) {
|
|
||||||
// HORNETQ-1209 XXX ignore?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (jmsMessage instanceof BytesMessage) {
|
|
||||||
try {
|
|
||||||
((BytesMessage) jmsMessage).reset();
|
|
||||||
} catch (JMSException e) {
|
|
||||||
// HORNETQ-1209 XXX ignore?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
producer.connection.getThreadAwareContext().setCurrentThread(true);
|
|
||||||
if (exception instanceof ActiveMQException) {
|
|
||||||
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception);
|
|
||||||
} else if (exception instanceof ActiveMQInterruptedException) {
|
|
||||||
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
|
|
||||||
}
|
|
||||||
completionListener.onException(jmsMessage, exception);
|
|
||||||
} finally {
|
|
||||||
producer.connection.getThreadAwareContext().clearCurrentThread(true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
|
return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
|
||||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
|
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
|
||||||
|
@ -81,7 +80,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_V2;
|
|
||||||
|
|
||||||
public class ServerPacketDecoder extends ClientPacketDecoder {
|
public class ServerPacketDecoder extends ClientPacketDecoder {
|
||||||
|
|
||||||
|
@ -101,12 +99,6 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
||||||
return sendMessage;
|
return sendMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SessionSendMessage decodeSessionSendMessageV2(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
|
||||||
final SessionSendMessage_V2 sendMessage = new SessionSendMessage_V2(new CoreMessage());
|
|
||||||
sendMessage.decode(in);
|
|
||||||
return sendMessage;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||||
final SessionAcknowledgeMessage acknowledgeMessage = new SessionAcknowledgeMessage();
|
final SessionAcknowledgeMessage acknowledgeMessage = new SessionAcknowledgeMessage();
|
||||||
acknowledgeMessage.decode(in);
|
acknowledgeMessage.decode(in);
|
||||||
|
@ -132,8 +124,6 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
||||||
switch (packetType) {
|
switch (packetType) {
|
||||||
case SESS_SEND:
|
case SESS_SEND:
|
||||||
return decodeSessionSendMessage(in, connection);
|
return decodeSessionSendMessage(in, connection);
|
||||||
case SESS_SEND_V2:
|
|
||||||
return decodeSessionSendMessageV2(in, connection);
|
|
||||||
case SESS_ACKNOWLEDGE:
|
case SESS_ACKNOWLEDGE:
|
||||||
return decodeSessionAcknowledgeMessage(in, connection);
|
return decodeSessionAcknowledgeMessage(in, connection);
|
||||||
case SESS_PRODUCER_REQUEST_CREDITS:
|
case SESS_PRODUCER_REQUEST_CREDITS:
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ExceptionResponseMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
|
||||||
|
@ -64,7 +63,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQue
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
||||||
|
@ -82,7 +80,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SuccessResponseMessage;
|
|
||||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||||
|
@ -122,9 +119,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION_V2;
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_V2;
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
|
||||||
|
@ -269,10 +264,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
onSessionSend(packet);
|
onSessionSend(packet);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case SESS_SEND_V2: {
|
|
||||||
onSessionSend(packet);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case SESS_ACKNOWLEDGE: {
|
case SESS_ACKNOWLEDGE: {
|
||||||
onSessionAcknowledge(packet);
|
onSessionAcknowledge(packet);
|
||||||
break;
|
break;
|
||||||
|
@ -320,15 +311,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case SESS_SEND_CONTINUATION_V2: {
|
|
||||||
SessionSendContinuationMessage_V2 message = (SessionSendContinuationMessage_V2) packet;
|
|
||||||
requiresResponse = message.isRequiresResponse();
|
|
||||||
sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
|
|
||||||
if (requiresResponse) {
|
|
||||||
response = new SuccessResponseMessage();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case SESS_CREATECONSUMER: {
|
case SESS_CREATECONSUMER: {
|
||||||
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
|
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
|
||||||
requiresResponse = request.isRequiresResponse();
|
requiresResponse = request.isRequiresResponse();
|
||||||
|
@ -623,15 +605,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (ActiveMQIOErrorException e) {
|
} catch (ActiveMQIOErrorException e) {
|
||||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||||
} catch (ActiveMQXAException e) {
|
} catch (ActiveMQXAException e) {
|
||||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||||
}
|
}
|
||||||
sendResponse(packet, response, flush, closeChannel);
|
sendResponse(packet, response, flush, closeChannel);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -652,15 +634,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
response = new NullResponseMessage();
|
response = new NullResponseMessage();
|
||||||
}
|
}
|
||||||
} catch (ActiveMQIOErrorException e) {
|
} catch (ActiveMQIOErrorException e) {
|
||||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||||
} catch (ActiveMQXAException e) {
|
} catch (ActiveMQXAException e) {
|
||||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||||
}
|
}
|
||||||
sendResponse(packet, response, false, false);
|
sendResponse(packet, response, false, false);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -678,22 +660,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
requiresResponse = message.isRequiresResponse();
|
requiresResponse = message.isRequiresResponse();
|
||||||
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
|
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct);
|
||||||
if (requiresResponse) {
|
if (requiresResponse) {
|
||||||
if (packet.isResponseAsync()) {
|
|
||||||
response = new SuccessResponseMessage(packet.getCorrelationID());
|
|
||||||
} else {
|
|
||||||
response = new NullResponseMessage();
|
response = new NullResponseMessage();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} catch (ActiveMQIOErrorException e) {
|
} catch (ActiveMQIOErrorException e) {
|
||||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||||
} catch (ActiveMQXAException e) {
|
} catch (ActiveMQXAException e) {
|
||||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||||
}
|
}
|
||||||
sendResponse(packet, response, false, false);
|
sendResponse(packet, response, false, false);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -710,15 +688,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
|
SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
|
||||||
session.requestProducerCredits(message.getAddress(), message.getCredits());
|
session.requestProducerCredits(message.getAddress(), message.getCredits());
|
||||||
} catch (ActiveMQIOErrorException e) {
|
} catch (ActiveMQIOErrorException e) {
|
||||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||||
} catch (ActiveMQXAException e) {
|
} catch (ActiveMQXAException e) {
|
||||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||||
}
|
}
|
||||||
sendResponse(packet, response, false, false);
|
sendResponse(packet, response, false, false);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -735,15 +713,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
|
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
|
||||||
session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
|
session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
|
||||||
} catch (ActiveMQIOErrorException e) {
|
} catch (ActiveMQIOErrorException e) {
|
||||||
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
|
response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
|
||||||
} catch (ActiveMQXAException e) {
|
} catch (ActiveMQXAException e) {
|
||||||
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
|
||||||
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
|
response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
|
response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
|
||||||
}
|
}
|
||||||
sendResponse(packet, response, false, false);
|
sendResponse(packet, response, false, false);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -752,15 +730,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet,
|
private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e,
|
||||||
ActiveMQIOErrorException e,
|
|
||||||
boolean requiresResponse,
|
boolean requiresResponse,
|
||||||
Packet response,
|
Packet response,
|
||||||
ServerSession session) {
|
ServerSession session) {
|
||||||
session.markTXFailed(e);
|
session.markTXFailed(e);
|
||||||
if (requiresResponse) {
|
if (requiresResponse) {
|
||||||
logger.debug("Sending exception to client", e);
|
logger.debug("Sending exception to client", e);
|
||||||
response = convertToExceptionPacket(packet, e);
|
response = new ActiveMQExceptionMessage(e);
|
||||||
} else {
|
} else {
|
||||||
ActiveMQServerLogger.LOGGER.caughtException(e);
|
ActiveMQServerLogger.LOGGER.caughtException(e);
|
||||||
}
|
}
|
||||||
|
@ -779,36 +756,24 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet,
|
private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e,
|
||||||
ActiveMQQueueMaxConsumerLimitReached e,
|
|
||||||
boolean requiresResponse,
|
boolean requiresResponse,
|
||||||
Packet response) {
|
Packet response) {
|
||||||
if (requiresResponse) {
|
if (requiresResponse) {
|
||||||
logger.debug("Sending exception to client", e);
|
logger.debug("Sending exception to client", e);
|
||||||
response = convertToExceptionPacket(packet, e);
|
response = new ActiveMQExceptionMessage(e);
|
||||||
} else {
|
} else {
|
||||||
ActiveMQServerLogger.LOGGER.caughtException(e);
|
ActiveMQServerLogger.LOGGER.caughtException(e);
|
||||||
}
|
}
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) {
|
private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e,
|
||||||
Packet response;
|
|
||||||
if (packet.isResponseAsync()) {
|
|
||||||
response = new ExceptionResponseMessage(packet.getCorrelationID(), e);
|
|
||||||
} else {
|
|
||||||
response = new ActiveMQExceptionMessage(e);
|
|
||||||
}
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet,
|
|
||||||
ActiveMQException e,
|
|
||||||
boolean requiresResponse,
|
boolean requiresResponse,
|
||||||
Packet response) {
|
Packet response) {
|
||||||
if (requiresResponse) {
|
if (requiresResponse) {
|
||||||
logger.debug("Sending exception to client", e);
|
logger.debug("Sending exception to client", e);
|
||||||
response = convertToExceptionPacket(packet, e);
|
response = new ActiveMQExceptionMessage(e);
|
||||||
} else {
|
} else {
|
||||||
if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
|
if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
|
||||||
logger.debug("Caught exception", e);
|
logger.debug("Caught exception", e);
|
||||||
|
@ -819,8 +784,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Packet onCatchThrowableWhileHandlePacket(Packet packet,
|
private static Packet onCatchThrowableWhileHandlePacket(Throwable t,
|
||||||
Throwable t,
|
|
||||||
boolean requiresResponse,
|
boolean requiresResponse,
|
||||||
Packet response,
|
Packet response,
|
||||||
ServerSession session) {
|
ServerSession session) {
|
||||||
|
@ -829,7 +793,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t);
|
ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t);
|
||||||
ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
|
ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
|
||||||
activeMQInternalErrorException.initCause(t);
|
activeMQInternalErrorException.initCause(t);
|
||||||
response = convertToExceptionPacket(packet, activeMQInternalErrorException);
|
response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
|
||||||
} else {
|
} else {
|
||||||
ActiveMQServerLogger.LOGGER.caughtException(t);
|
ActiveMQServerLogger.LOGGER.caughtException(t);
|
||||||
}
|
}
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -121,7 +121,7 @@
|
||||||
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
||||||
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
||||||
<activemq.version.microVersion>0</activemq.version.microVersion>
|
<activemq.version.microVersion>0</activemq.version.microVersion>
|
||||||
<activemq.version.incrementingVersion>130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
|
<activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
|
||||||
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
||||||
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
|
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_247;
|
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FOUR;
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FOUR;
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
|
||||||
|
@ -62,7 +61,6 @@ public class SendAckTest extends VersionedBaseTest {
|
||||||
// not every combination on two four would make sense.. as there's a compatibility issue between 2.4 and 1.4 when crossing consumers and producers
|
// not every combination on two four would make sense.. as there's a compatibility issue between 2.4 and 1.4 when crossing consumers and producers
|
||||||
combinations.add(new Object[]{TWO_FOUR, SNAPSHOT, SNAPSHOT});
|
combinations.add(new Object[]{TWO_FOUR, SNAPSHOT, SNAPSHOT});
|
||||||
combinations.add(new Object[]{SNAPSHOT, TWO_FOUR, TWO_FOUR});
|
combinations.add(new Object[]{SNAPSHOT, TWO_FOUR, TWO_FOUR});
|
||||||
combinations.add(new Object[]{HORNETQ_247, SNAPSHOT, SNAPSHOT});
|
|
||||||
return combinations;
|
return combinations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
||||||
|
@ -315,11 +314,6 @@ public class BackupSyncDelay implements Interceptor {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setResponseHandler(ResponseHandler handler) {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flushConfirmations() {
|
public void flushConfirmations() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
|
@ -16,23 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.jms.tests;
|
package org.apache.activemq.artemis.jms.tests;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import javax.jms.CompletionListener;
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
import javax.jms.DeliveryMode;
|
|
||||||
import javax.jms.Destination;
|
|
||||||
import javax.jms.IllegalStateException;
|
import javax.jms.IllegalStateException;
|
||||||
import javax.jms.JMSSecurityException;
|
import javax.jms.JMSSecurityException;
|
||||||
import javax.jms.Message;
|
|
||||||
import javax.jms.MessageProducer;
|
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
|
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
|
||||||
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
|
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
|
||||||
|
@ -180,71 +169,6 @@ public class SecurityTest extends JMSTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Login with valid user and password
|
|
||||||
* But try send to address not authorised - Persistent
|
|
||||||
* Should not allow and should throw exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
|
|
||||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
|
||||||
Connection connection = connectionFactory.createConnection("guest", "guest");
|
|
||||||
Session session = connection.createSession();
|
|
||||||
Destination destination = session.createQueue("guest.cannot.send");
|
|
||||||
MessageProducer messageProducer = session.createProducer(destination);
|
|
||||||
try {
|
|
||||||
messageProducer.send(session.createTextMessage("hello"));
|
|
||||||
fail("JMSSecurityException expected as guest is not allowed to send");
|
|
||||||
} catch (JMSSecurityException activeMQSecurityException) {
|
|
||||||
//pass
|
|
||||||
}
|
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Login with valid user and password
|
|
||||||
* But try send to address not authorised - Non Persistent.
|
|
||||||
* Should have same behaviour as Persistent with exception on send.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
|
|
||||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
|
||||||
connectionFactory.setConfirmationWindowSize(100);
|
|
||||||
connectionFactory.setBlockOnDurableSend(false);
|
|
||||||
connectionFactory.setBlockOnNonDurableSend(false);
|
|
||||||
Connection connection = connectionFactory.createConnection("guest", "guest");
|
|
||||||
Session session = connection.createSession();
|
|
||||||
Destination destination = session.createQueue("guest.cannot.send");
|
|
||||||
MessageProducer messageProducer = session.createProducer(destination);
|
|
||||||
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
|
||||||
try {
|
|
||||||
AtomicReference<Exception> e = new AtomicReference<>();
|
|
||||||
// messageProducer.send(session.createTextMessage("hello"));
|
|
||||||
|
|
||||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
|
||||||
messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
|
|
||||||
@Override
|
|
||||||
public void onCompletion(Message message) {
|
|
||||||
countDownLatch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onException(Message message, Exception exception) {
|
|
||||||
e.set(exception);
|
|
||||||
countDownLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
countDownLatch.await(10, TimeUnit.SECONDS);
|
|
||||||
if (e.get() != null) {
|
|
||||||
throw e.get();
|
|
||||||
}
|
|
||||||
fail("JMSSecurityException expected as guest is not allowed to send");
|
|
||||||
} catch (JMSSecurityException activeMQSecurityException) {
|
|
||||||
activeMQSecurityException.printStackTrace();
|
|
||||||
}
|
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Now some client id tests */
|
/* Now some client id tests */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -52,16 +52,6 @@
|
||||||
<permission type="browse" roles="guest,def"/>
|
<permission type="browse" roles="guest,def"/>
|
||||||
<permission type="send" roles="guest,def"/>
|
<permission type="send" roles="guest,def"/>
|
||||||
</security-setting>
|
</security-setting>
|
||||||
|
|
||||||
<security-setting match="guest.cannot.send">
|
|
||||||
<permission type="createDurableQueue" roles="guest,def"/>
|
|
||||||
<permission type="deleteDurableQueue" roles="guest,def"/>
|
|
||||||
<permission type="createNonDurableQueue" roles="guest,def"/>
|
|
||||||
<permission type="deleteNonDurableQueue" roles="guest,def"/>
|
|
||||||
<permission type="consume" roles="guest,def"/>
|
|
||||||
<permission type="browse" roles="guest,def"/>
|
|
||||||
<permission type="send" roles="def"/>
|
|
||||||
</security-setting>
|
|
||||||
</security-settings>
|
</security-settings>
|
||||||
</core>
|
</core>
|
||||||
</configuration>
|
</configuration>
|
Loading…
Reference in New Issue