ARTEMIS-1546 Fixing body compatibility issue by recast body towards 1.4
https://issues.apache.org/jira/browse/ARTEMIS-1546 Recasting the body as 1.x format when there's a 1.x in use at the other size of the wire
This commit is contained in:
parent
9ef90f8def
commit
dbe575a0c1
|
@ -20,6 +20,7 @@ package org.apache.activemq.artemis.api.core;
|
|||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
|
||||
|
@ -35,25 +36,33 @@ public interface ICoreMessage extends Message {
|
|||
@Override
|
||||
InputStream getBodyInputStream();
|
||||
|
||||
/** Returns a new Buffer slicing the current Body. */
|
||||
/**
|
||||
* Returns a new Buffer slicing the current Body.
|
||||
*/
|
||||
ActiveMQBuffer getReadOnlyBodyBuffer();
|
||||
|
||||
/** Return the type of the message */
|
||||
/**
|
||||
* Return the type of the message
|
||||
*/
|
||||
@Override
|
||||
byte getType();
|
||||
|
||||
/** the type of the message */
|
||||
/**
|
||||
* the type of the message
|
||||
*/
|
||||
@Override
|
||||
CoreMessage setType(byte type);
|
||||
|
||||
/**
|
||||
* We are really interested if this is a LargeServerMessage.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
boolean isServerMessage();
|
||||
|
||||
/**
|
||||
* The body used for this message.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
|
@ -61,10 +70,18 @@ public interface ICoreMessage extends Message {
|
|||
|
||||
int getEndOfBodyPosition();
|
||||
|
||||
|
||||
/** Used on large messages treatment */
|
||||
/**
|
||||
* Used on large messages treatment
|
||||
*/
|
||||
void copyHeadersAndProperties(Message msg);
|
||||
|
||||
void sendBuffer_1X(ByteBuf sendBuffer);
|
||||
|
||||
/**
|
||||
* it will fix the body of incoming messages from 1.x and before versions
|
||||
*/
|
||||
void receiveBuffer_1X(ByteBuf buffer);
|
||||
|
||||
/**
|
||||
* @return Returns the message in Map form, useful when encoding to JSON
|
||||
*/
|
||||
|
|
|
@ -188,8 +188,16 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
public void receiveBuffer(ByteBuf buffer) {
|
||||
this.buffer = buffer;
|
||||
this.buffer.retain();
|
||||
decode();
|
||||
this.validBuffer = true;
|
||||
decode(false);
|
||||
}
|
||||
|
||||
/** This will fix the incoming body of 1.x messages */
|
||||
@Override
|
||||
public void receiveBuffer_1X(ByteBuf buffer) {
|
||||
this.buffer = buffer;
|
||||
this.buffer.retain();
|
||||
decode(true);
|
||||
validBuffer = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -205,7 +213,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param sendBuffer
|
||||
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
|
||||
*/
|
||||
|
@ -215,6 +222,21 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
|
||||
}
|
||||
|
||||
/**
|
||||
* Recast the message as an 1.4 message
|
||||
*/
|
||||
@Override
|
||||
public void sendBuffer_1X(ByteBuf sendBuffer) {
|
||||
checkEncode();
|
||||
ByteBuf tmpBuffer = buffer.duplicate();
|
||||
sendBuffer.writeInt(endOfBodyPosition + DataConstants.SIZE_INT);
|
||||
tmpBuffer.readerIndex(DataConstants.SIZE_INT);
|
||||
tmpBuffer.readBytes(sendBuffer, endOfBodyPosition - BUFFER_HEADER_SPACE);
|
||||
sendBuffer.writeInt(tmpBuffer.writerIndex() + DataConstants.SIZE_INT + BUFFER_HEADER_SPACE);
|
||||
tmpBuffer.readBytes(sendBuffer, tmpBuffer.readableBytes());
|
||||
sendBuffer.readerIndex(0);
|
||||
}
|
||||
|
||||
private synchronized void checkEncode() {
|
||||
if (!validBuffer) {
|
||||
encode();
|
||||
|
@ -280,12 +302,10 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return endOfBodyPosition;
|
||||
}
|
||||
|
||||
|
||||
public TypedProperties getTypedProperties() {
|
||||
return checkProperties();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void messageChanged() {
|
||||
validBuffer = false;
|
||||
|
@ -323,7 +343,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
public void copyHeadersAndProperties(final Message msg) {
|
||||
messageID = msg.getMessageID();
|
||||
address = msg.getAddressSimpleString();
|
||||
userID = (UUID)msg.getUserID();
|
||||
userID = (UUID) msg.getUserID();
|
||||
type = msg.toCore().getType();
|
||||
durable = msg.isDurable();
|
||||
expiration = msg.getExpiration();
|
||||
|
@ -331,11 +351,10 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
priority = msg.getPriority();
|
||||
|
||||
if (msg instanceof CoreMessage) {
|
||||
properties = ((CoreMessage)msg).getTypedProperties();
|
||||
properties = ((CoreMessage) msg).getTypedProperties();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Message copy() {
|
||||
checkEncode();
|
||||
|
@ -380,7 +399,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
|
||||
@Override
|
||||
public CoreMessage setUserID(Object uuid) {
|
||||
this.userID = (UUID)uuid;
|
||||
this.userID = (UUID) uuid;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -418,7 +437,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return address;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage setExpiration(long expiration) {
|
||||
this.expiration = expiration;
|
||||
|
@ -487,18 +505,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
private void decode() {
|
||||
private void decode(boolean beforeAddress) {
|
||||
endOfBodyPosition = buffer.readInt();
|
||||
|
||||
buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
|
||||
|
||||
decodeHeadersAndProperties(buffer, true);
|
||||
buffer.readerIndex(0);
|
||||
validBuffer = true;
|
||||
|
||||
if (beforeAddress) {
|
||||
endOfBodyPosition = endOfBodyPosition - DataConstants.SIZE_INT;
|
||||
}
|
||||
|
||||
internalWritableBuffer();
|
||||
}
|
||||
|
||||
|
||||
public void decodeHeadersAndProperties(final ByteBuf buffer) {
|
||||
decodeHeadersAndProperties(buffer, false);
|
||||
}
|
||||
|
@ -529,7 +551,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public synchronized CoreMessage encode() {
|
||||
|
||||
checkProperties();
|
||||
|
@ -654,7 +675,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putBooleanProperty(final String key, final boolean value) {
|
||||
messageChanged();
|
||||
|
@ -683,7 +703,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return properties.getBooleanProperty(new SimpleString(key));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putByteProperty(final SimpleString key, final byte value) {
|
||||
messageChanged();
|
||||
|
@ -692,7 +711,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putByteProperty(final String key, final byte value) {
|
||||
messageChanged();
|
||||
|
@ -702,7 +720,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
checkProperties();
|
||||
|
@ -731,7 +748,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
checkProperties();
|
||||
|
@ -775,7 +791,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putIntProperty(final SimpleString key, final int value) {
|
||||
messageChanged();
|
||||
|
@ -803,7 +818,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return getIntProperty(SimpleString.toSimpleString(key));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putLongProperty(final SimpleString key, final long value) {
|
||||
messageChanged();
|
||||
|
@ -832,7 +846,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return getLongProperty(SimpleString.toSimpleString(key));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putFloatProperty(final SimpleString key, final float value) {
|
||||
messageChanged();
|
||||
|
@ -865,7 +878,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
messageChanged();
|
||||
|
@ -1071,7 +1083,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
int size = record.readInt();
|
||||
initBuffer(size);
|
||||
buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
|
||||
decode();
|
||||
decode(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,10 +19,12 @@ package org.apache.activemq.artemis.core.protocol;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
|
||||
|
@ -33,10 +35,10 @@ public class ClientPacketDecoder extends PacketDecoder {
|
|||
public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
|
||||
|
||||
@Override
|
||||
public Packet decode(final ActiveMQBuffer in) {
|
||||
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final byte packetType = in.readByte();
|
||||
|
||||
Packet packet = decode(packetType);
|
||||
Packet packet = decode(packetType, connection);
|
||||
|
||||
packet.decode(in);
|
||||
|
||||
|
@ -44,12 +46,16 @@ public class ClientPacketDecoder extends PacketDecoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Packet decode(byte packetType) {
|
||||
public Packet decode(byte packetType, CoreRemotingConnection connection) {
|
||||
Packet packet;
|
||||
|
||||
switch (packetType) {
|
||||
case SESS_RECEIVE_MSG: {
|
||||
packet = new SessionReceiveMessage(new ClientMessageImpl());
|
||||
if (connection.isVersionBeforeAddressChange()) {
|
||||
packet = new SessionReceiveMessage_1X(new ClientMessageImpl());
|
||||
} else {
|
||||
packet = new SessionReceiveMessage(new ClientMessageImpl());
|
||||
}
|
||||
break;
|
||||
}
|
||||
case SESS_RECEIVE_LARGE_MSG: {
|
||||
|
@ -57,7 +63,7 @@ public class ClientPacketDecoder extends PacketDecoder {
|
|||
break;
|
||||
}
|
||||
default: {
|
||||
packet = super.decode(packetType);
|
||||
packet = super.decode(packetType, connection);
|
||||
}
|
||||
}
|
||||
return packet;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core;
|
||||
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
|
||||
|
@ -28,13 +29,18 @@ public interface CoreRemotingConnection extends RemotingConnection {
|
|||
* The client protocol used on the communication.
|
||||
* This will determine if the client has support for certain packet types
|
||||
*/
|
||||
int getClientVersion();
|
||||
int getChannelVersion();
|
||||
|
||||
default boolean isVersionBeforeAddressChange() {
|
||||
int version = getChannelVersion();
|
||||
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the client protocol used on the communication. This will determine if the client has
|
||||
* support for certain packet types
|
||||
*/
|
||||
void setClientVersion(int clientVersion);
|
||||
void setChannelVersion(int clientVersion);
|
||||
|
||||
/**
|
||||
* Returns the channel with the channel id specified.
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.artemis.core.protocol.core;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
|
||||
/**
|
||||
* A Packet represents a packet of data transmitted over a connection.
|
||||
|
@ -71,7 +70,7 @@ public interface Packet {
|
|||
* @param connection the connection
|
||||
* @return the buffer to encode to
|
||||
*/
|
||||
ActiveMQBuffer encode(RemotingConnection connection);
|
||||
ActiveMQBuffer encode(CoreRemotingConnection connection);
|
||||
|
||||
/**
|
||||
* decodes the buffer into this packet
|
||||
|
|
|
@ -326,6 +326,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
|
|||
}
|
||||
}
|
||||
while (retry);
|
||||
sessionChannel.getConnection().setChannelVersion(response.getServerVersion());
|
||||
return newSessionContext(name, confirmationWindowSize, sessionChannel, response);
|
||||
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||
|
@ -91,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
|
||||
|
@ -266,8 +267,14 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
|
||||
@Override
|
||||
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
|
||||
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
|
||||
SessionQueueQueryResponseMessage_V3 response = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
|
||||
SessionQueueQueryResponseMessage response;
|
||||
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
|
||||
response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
|
||||
} else {
|
||||
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
|
||||
response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
|
||||
}
|
||||
|
||||
return response.toQueueQuery();
|
||||
}
|
||||
|
@ -292,7 +299,13 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
|
||||
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
|
||||
|
||||
SessionQueueQueryResponseMessage_V3 queueInfo = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
|
||||
SessionQueueQueryResponseMessage queueInfo;
|
||||
|
||||
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
|
||||
} else {
|
||||
queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
|
||||
}
|
||||
|
||||
// The actual windows size that gets used is determined by the user since
|
||||
// could be overridden on the queue settings
|
||||
|
@ -441,7 +454,12 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
boolean sendBlocking,
|
||||
SendAcknowledgementHandler handler,
|
||||
SimpleString defaultAddress) throws ActiveMQException {
|
||||
SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler);
|
||||
SessionSendMessage packet;
|
||||
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
|
||||
} else {
|
||||
packet = new SessionSendMessage(msgI, sendBlocking, handler);
|
||||
}
|
||||
|
||||
if (sendBlocking) {
|
||||
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
|
||||
|
@ -596,7 +614,9 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
Set<RoutingType> routingTypes,
|
||||
final boolean autoCreated) throws ActiveMQException {
|
||||
CreateAddressMessage request = new CreateAddressMessage(address, routingTypes, autoCreated, true);
|
||||
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
|
||||
if (!sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
|
@ -621,7 +641,9 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
boolean purgeOnNoConsumers,
|
||||
boolean autoCreated) throws ActiveMQException {
|
||||
CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, routingType, filterString, durable, temp, maxConsumers, purgeOnNoConsumers, autoCreated, true);
|
||||
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
|
||||
if (!sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -695,11 +717,13 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
boolean autoCommitSends,
|
||||
boolean autoCommitAcks,
|
||||
boolean preAcknowledge) {
|
||||
return new CreateSessionMessage(name, sessionChannel.getID(), serverVersion, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
|
||||
return new CreateSessionMessage(name, sessionChannel.getID(), getServerVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException {
|
||||
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal,
|
||||
long consumerId,
|
||||
boolean isSessionStarted) throws ActiveMQException {
|
||||
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
|
||||
|
||||
// We try and recreate any non durable queues, since they probably won't be there unless
|
||||
|
@ -851,7 +875,6 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
class ClientSessionPacketHandler implements ChannelHandler {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -152,7 +152,7 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
@Override
|
||||
public boolean supports(final byte packetType) {
|
||||
return supports(packetType, connection.getClientVersion());
|
||||
return supports(packetType, connection.getChannelVersion());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.Serializable;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverMessage;
|
||||
|
@ -160,9 +161,9 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUB
|
|||
|
||||
public abstract class PacketDecoder implements Serializable {
|
||||
|
||||
public abstract Packet decode(ActiveMQBuffer in);
|
||||
public abstract Packet decode(ActiveMQBuffer in, CoreRemotingConnection connection);
|
||||
|
||||
public Packet decode(byte packetType) {
|
||||
public Packet decode(byte packetType, CoreRemotingConnection connection) {
|
||||
Packet packet;
|
||||
|
||||
switch (packetType) {
|
||||
|
|
|
@ -21,8 +21,8 @@ import io.netty.buffer.Unpooled;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class PacketImpl implements Packet {
|
||||
|
@ -305,28 +305,35 @@ public class PacketImpl implements Packet {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ActiveMQBuffer encode(final RemotingConnection connection) {
|
||||
public ActiveMQBuffer encode(final CoreRemotingConnection connection) {
|
||||
ActiveMQBuffer buffer = createPacket(connection);
|
||||
|
||||
// The standard header fields
|
||||
|
||||
buffer.writeInt(0); // The length gets filled in at the end
|
||||
buffer.writeByte(type);
|
||||
buffer.writeLong(channelID);
|
||||
encodeHeader(buffer);
|
||||
|
||||
encodeRest(buffer);
|
||||
|
||||
encodeSize(buffer);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
protected void encodeHeader(ActiveMQBuffer buffer) {
|
||||
// The standard header fields
|
||||
buffer.writeInt(0); // The length gets filled in at the end
|
||||
buffer.writeByte(type);
|
||||
buffer.writeLong(channelID);
|
||||
}
|
||||
|
||||
protected void encodeSize(ActiveMQBuffer buffer) {
|
||||
size = buffer.writerIndex();
|
||||
|
||||
// The length doesn't include the actual length byte
|
||||
int len = size - DataConstants.SIZE_INT;
|
||||
|
||||
buffer.setInt(0, len);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
protected ActiveMQBuffer createPacket(RemotingConnection connection) {
|
||||
protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
|
||||
|
||||
int size = expectedEncodeSize();
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
|
||||
private final boolean client;
|
||||
|
||||
private int clientVersion;
|
||||
private int channelVersion;
|
||||
|
||||
private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id);
|
||||
|
||||
|
@ -146,19 +146,19 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
}
|
||||
|
||||
/**
|
||||
* @return the clientVersion
|
||||
* @return the channelVersion
|
||||
*/
|
||||
@Override
|
||||
public int getClientVersion() {
|
||||
return clientVersion;
|
||||
public int getChannelVersion() {
|
||||
return channelVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param clientVersion the clientVersion to set
|
||||
* @param clientVersion the channelVersion to set
|
||||
*/
|
||||
@Override
|
||||
public void setClientVersion(int clientVersion) {
|
||||
this.clientVersion = clientVersion;
|
||||
public void setChannelVersion(int clientVersion) {
|
||||
this.channelVersion = clientVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -362,7 +362,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
@Override
|
||||
public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
|
||||
try {
|
||||
final Packet packet = packetDecoder.decode(buffer);
|
||||
final Packet packet = packetDecoder.decode(buffer, this);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet);
|
||||
|
@ -417,7 +417,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
|
||||
@Override
|
||||
public void killMessage(SimpleString nodeID) {
|
||||
if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
|
||||
if (channelVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
|
||||
return;
|
||||
}
|
||||
Channel clientChannel = getChannel(1, -1);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
|
@ -26,9 +27,9 @@ public class SessionReceiveMessage extends MessagePacket {
|
|||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
private long consumerID;
|
||||
protected long consumerID;
|
||||
|
||||
private int deliveryCount;
|
||||
protected int deliveryCount;
|
||||
|
||||
public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) {
|
||||
super(SESS_RECEIVE_MSG, message);
|
||||
|
@ -69,13 +70,17 @@ public class SessionReceiveMessage extends MessagePacket {
|
|||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
// Buffer comes in after having read standard headers and positioned at Beginning of body part
|
||||
|
||||
message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
|
||||
receiveMessage(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
|
||||
|
||||
buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT);
|
||||
this.consumerID = buffer.readLong();
|
||||
this.deliveryCount = buffer.readInt();
|
||||
|
||||
}
|
||||
|
||||
protected void receiveMessage(ByteBuf buffer) {
|
||||
message.receiveBuffer(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class SessionReceiveMessage_1X extends SessionReceiveMessage {
|
||||
|
||||
public SessionReceiveMessage_1X(long consumerID, ICoreMessage message, int deliveryCount) {
|
||||
super(consumerID, message, deliveryCount);
|
||||
}
|
||||
|
||||
public SessionReceiveMessage_1X(CoreMessage message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(ActiveMQBuffer buffer) {
|
||||
message.sendBuffer_1X(buffer.byteBuf());
|
||||
buffer.writeLong(consumerID);
|
||||
buffer.writeInt(deliveryCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void receiveMessage(ByteBuf buffer) {
|
||||
message.receiveBuffer_1X(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int expectedEncodeSize() {
|
||||
return super.expectedEncodeSize() + DataConstants.SIZE_INT;
|
||||
}
|
||||
|
||||
}
|
|
@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
|||
|
||||
public class SessionSendMessage extends MessagePacket {
|
||||
|
||||
private boolean requiresResponse;
|
||||
protected boolean requiresResponse;
|
||||
|
||||
/**
|
||||
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
|
||||
|
@ -76,7 +76,7 @@ public class SessionSendMessage extends MessagePacket {
|
|||
// Buffer comes in after having read standard headers and positioned at Beginning of body part
|
||||
|
||||
ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
|
||||
message.receiveBuffer(messageBuffer);
|
||||
receiveMessage(messageBuffer);
|
||||
|
||||
buffer.readerIndex(buffer.capacity() - 1);
|
||||
|
||||
|
@ -84,6 +84,10 @@ public class SessionSendMessage extends MessagePacket {
|
|||
|
||||
}
|
||||
|
||||
protected void receiveMessage(ByteBuf messageBuffer) {
|
||||
message.receiveBuffer(messageBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
/**
|
||||
* SessionSend Message for the 1.x branch
|
||||
*/
|
||||
public class SessionSendMessage_1X extends SessionSendMessage {
|
||||
|
||||
public SessionSendMessage_1X(ICoreMessage message, boolean requiresResponse, SendAcknowledgementHandler handler) {
|
||||
super(message, requiresResponse, handler);
|
||||
}
|
||||
|
||||
public SessionSendMessage_1X(CoreMessage message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(ActiveMQBuffer buffer) {
|
||||
message.sendBuffer_1X(buffer.byteBuf());
|
||||
buffer.writeBoolean(requiresResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void receiveMessage(ByteBuf messageBuffer) {
|
||||
message.receiveBuffer_1X(messageBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int expectedEncodeSize() {
|
||||
return super.expectedEncodeSize() + DataConstants.SIZE_INT;
|
||||
}
|
||||
|
||||
}
|
|
@ -64,6 +64,8 @@ import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
|||
*/
|
||||
public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Externalizable, Referenceable, ConnectionFactory, XAConnectionFactory, AutoCloseable {
|
||||
|
||||
private static final long serialVersionUID = -7554006056207377105L;
|
||||
|
||||
private ServerLocator serverLocator;
|
||||
|
||||
private String clientID;
|
||||
|
|
|
@ -384,7 +384,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
|
|||
public static boolean isQueue(TYPE type) {
|
||||
boolean result = false;
|
||||
|
||||
if (type.equals(QUEUE) || type.equals(TEMP_QUEUE)) {
|
||||
if (type != null && (type.equals(QUEUE) || type.equals(TEMP_QUEUE))) {
|
||||
result = true;
|
||||
}
|
||||
|
||||
|
@ -394,7 +394,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
|
|||
public static boolean isTemporary(TYPE type) {
|
||||
boolean result = false;
|
||||
|
||||
if (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE)) {
|
||||
if (type != null && (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE))) {
|
||||
result = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCon
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
|
||||
|
@ -84,51 +86,58 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
|||
private static final long serialVersionUID = 3348673114388400766L;
|
||||
public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
|
||||
|
||||
private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in) {
|
||||
final SessionSendMessage sendMessage = new SessionSendMessage(new CoreMessage());
|
||||
private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final SessionSendMessage sendMessage;
|
||||
|
||||
if (connection.isVersionBeforeAddressChange()) {
|
||||
sendMessage = new SessionSendMessage_1X(new CoreMessage());
|
||||
} else {
|
||||
sendMessage = new SessionSendMessage(new CoreMessage());
|
||||
}
|
||||
|
||||
sendMessage.decode(in);
|
||||
return sendMessage;
|
||||
}
|
||||
|
||||
private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in) {
|
||||
private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final SessionAcknowledgeMessage acknowledgeMessage = new SessionAcknowledgeMessage();
|
||||
acknowledgeMessage.decode(in);
|
||||
return acknowledgeMessage;
|
||||
}
|
||||
|
||||
private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in) {
|
||||
private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final SessionRequestProducerCreditsMessage requestProducerCreditsMessage = new SessionRequestProducerCreditsMessage();
|
||||
requestProducerCreditsMessage.decode(in);
|
||||
return requestProducerCreditsMessage;
|
||||
}
|
||||
|
||||
private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in) {
|
||||
private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final SessionConsumerFlowCreditMessage sessionConsumerFlowCreditMessage = new SessionConsumerFlowCreditMessage();
|
||||
sessionConsumerFlowCreditMessage.decode(in);
|
||||
return sessionConsumerFlowCreditMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Packet decode(final ActiveMQBuffer in) {
|
||||
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final byte packetType = in.readByte();
|
||||
//optimized for the most common cases: hottest and commons methods will be inlined and this::decode too due to the byte code size
|
||||
switch (packetType) {
|
||||
case SESS_SEND:
|
||||
return decodeSessionSendMessage(in);
|
||||
return decodeSessionSendMessage(in, connection);
|
||||
case SESS_ACKNOWLEDGE:
|
||||
return decodeSessionAcknowledgeMessage(in);
|
||||
return decodeSessionAcknowledgeMessage(in, connection);
|
||||
case SESS_PRODUCER_REQUEST_CREDITS:
|
||||
return decodeRequestProducerCreditsMessage(in);
|
||||
return decodeRequestProducerCreditsMessage(in, connection);
|
||||
case SESS_FLOWTOKEN:
|
||||
return decodeSessionConsumerFlowCreditMessage(in);
|
||||
return decodeSessionConsumerFlowCreditMessage(in, connection);
|
||||
default:
|
||||
return slowPathDecode(in, packetType);
|
||||
return slowPathDecode(in, packetType, connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// separating for performance reasons
|
||||
private Packet slowPathDecode(ActiveMQBuffer in, byte packetType) {
|
||||
private Packet slowPathDecode(ActiveMQBuffer in, byte packetType, CoreRemotingConnection connection) {
|
||||
Packet packet;
|
||||
|
||||
switch (packetType) {
|
||||
|
@ -242,7 +251,7 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
|||
break;
|
||||
}
|
||||
default: {
|
||||
packet = super.decode(packetType);
|
||||
packet = super.decode(packetType, connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -314,11 +314,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_CREATECONSUMER: {
|
||||
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
|
||||
requiresResponse = request.isRequiresResponse();
|
||||
session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getClientVersion()), request.getFilterString(), request.isBrowseOnly());
|
||||
session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getChannelVersion()), request.getFilterString(), request.isBrowseOnly());
|
||||
if (requiresResponse) {
|
||||
// We send back queue information on the queue as a response- this allows the queue to
|
||||
// be automatically recreated on failover
|
||||
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
|
||||
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion()));
|
||||
|
||||
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
|
||||
response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
|
||||
|
@ -387,9 +387,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_QUEUEQUERY: {
|
||||
requiresResponse = true;
|
||||
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
|
||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
|
||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion()));
|
||||
|
||||
if (remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
if (remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
|
||||
}
|
||||
|
||||
|
@ -405,7 +405,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_BINDINGQUERY: {
|
||||
requiresResponse = true;
|
||||
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
|
||||
final int clientVersion = remotingConnection.getClientVersion();
|
||||
final int clientVersion = remotingConnection.getChannelVersion();
|
||||
BindingQueryResult result = session.executeBindingQuery(request.getAddress(clientVersion));
|
||||
|
||||
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
|
||||
|
|
|
@ -145,10 +145,10 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
|||
"Server will not accept create session requests");
|
||||
}*/
|
||||
|
||||
if (connection.getClientVersion() == 0) {
|
||||
connection.setClientVersion(request.getVersion());
|
||||
} else if (connection.getClientVersion() != request.getVersion()) {
|
||||
ActiveMQServerLogger.LOGGER.incompatibleVersionAfterConnect(request.getVersion(), connection.getClientVersion());
|
||||
if (connection.getChannelVersion() == 0) {
|
||||
connection.setChannelVersion(request.getVersion());
|
||||
} else if (connection.getChannelVersion() != request.getVersion()) {
|
||||
ActiveMQServerLogger.LOGGER.incompatibleVersionAfterConnect(request.getVersion(), connection.getChannelVersion());
|
||||
}
|
||||
|
||||
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
|
||||
|
@ -163,7 +163,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
|||
|
||||
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
|
||||
|
||||
if (connection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
if (connection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
routingTypeMap = new HashMap<>();
|
||||
routingTypeMap.put(PacketImpl.OLD_QUEUE_PREFIX, RoutingType.ANYCAST);
|
||||
routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST);
|
||||
|
|
|
@ -253,14 +253,14 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
|
|||
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage) packet;
|
||||
|
||||
if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
|
||||
channel0.getConnection().setClientVersion(((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion());
|
||||
channel0.getConnection().setChannelVersion(((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion());
|
||||
}
|
||||
|
||||
final ClusterTopologyListener listener = new ClusterTopologyListener() {
|
||||
@Override
|
||||
public void nodeUP(final TopologyMember topologyMember, final boolean last) {
|
||||
try {
|
||||
final Pair<TransportConfiguration, TransportConfiguration> connectorPair = BackwardsCompatibilityUtils.getTCPair(channel0.getConnection().getClientVersion(), topologyMember);
|
||||
final Pair<TransportConfiguration, TransportConfiguration> connectorPair = BackwardsCompatibilityUtils.getTCPair(channel0.getConnection().getChannelVersion(), topologyMember);
|
||||
|
||||
final String nodeID = topologyMember.getNodeId();
|
||||
// Using an executor as most of the notifications on the Topology
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
|
@ -112,7 +113,12 @@ public final class CoreSessionCallback implements SessionCallback {
|
|||
@Override
|
||||
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
|
||||
|
||||
Packet packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
|
||||
Packet packet;
|
||||
if (channel.getConnection().isVersionBeforeAddressChange()) {
|
||||
packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount);
|
||||
} else {
|
||||
packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
|
||||
}
|
||||
|
||||
int size = 0;
|
||||
|
||||
|
|
|
@ -237,7 +237,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
if (session.getRemotingConnection() instanceof CoreRemotingConnection) {
|
||||
CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection();
|
||||
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
requiresLegacyPrefix = true;
|
||||
if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) {
|
||||
anycast = true;
|
||||
|
|
|
@ -53,9 +53,8 @@ server.setConfiguration(configuration);
|
|||
server.setJmsConfiguration(jmsConfiguration);
|
||||
server.start();
|
||||
|
||||
// uncomment this line to validate https://issues.apache.org/jira/browse/ARTEMIS-1561
|
||||
// this api exists on both 1.4 and 2.x... so, this one was preferred for this
|
||||
if (producer.toString().startsWith("HORNETQ")) {
|
||||
// hornetq servers won't auto-create
|
||||
// uncomment this next statements to validate https://issues.apache.org/jira/browse/ARTEMIS-1561
|
||||
if (producer.toString().equals("ARTEMIS-140") && type.equals("ARTEMIS-SNAPSHOT") ||
|
||||
producer.toString().startsWith("HORNETQ")) {
|
||||
server.getJMSServerManager().createQueue(true, "queue", null, true);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue