This closes #1713
This commit is contained in:
commit
de7251d546
|
@ -20,6 +20,7 @@ package org.apache.activemq.artemis.api.core;
|
|||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
|
||||
|
@ -35,25 +36,33 @@ public interface ICoreMessage extends Message {
|
|||
@Override
|
||||
InputStream getBodyInputStream();
|
||||
|
||||
/** Returns a new Buffer slicing the current Body. */
|
||||
/**
|
||||
* Returns a new Buffer slicing the current Body.
|
||||
*/
|
||||
ActiveMQBuffer getReadOnlyBodyBuffer();
|
||||
|
||||
/** Return the type of the message */
|
||||
/**
|
||||
* Return the type of the message
|
||||
*/
|
||||
@Override
|
||||
byte getType();
|
||||
|
||||
/** the type of the message */
|
||||
/**
|
||||
* the type of the message
|
||||
*/
|
||||
@Override
|
||||
CoreMessage setType(byte type);
|
||||
|
||||
/**
|
||||
* We are really interested if this is a LargeServerMessage.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
boolean isServerMessage();
|
||||
|
||||
/**
|
||||
* The body used for this message.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
|
@ -61,10 +70,18 @@ public interface ICoreMessage extends Message {
|
|||
|
||||
int getEndOfBodyPosition();
|
||||
|
||||
|
||||
/** Used on large messages treatment */
|
||||
/**
|
||||
* Used on large messages treatment
|
||||
*/
|
||||
void copyHeadersAndProperties(Message msg);
|
||||
|
||||
void sendBuffer_1X(ByteBuf sendBuffer);
|
||||
|
||||
/**
|
||||
* it will fix the body of incoming messages from 1.x and before versions
|
||||
*/
|
||||
void receiveBuffer_1X(ByteBuf buffer);
|
||||
|
||||
/**
|
||||
* @return Returns the message in Map form, useful when encoding to JSON
|
||||
*/
|
||||
|
|
|
@ -188,8 +188,16 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
public void receiveBuffer(ByteBuf buffer) {
|
||||
this.buffer = buffer;
|
||||
this.buffer.retain();
|
||||
decode();
|
||||
this.validBuffer = true;
|
||||
decode(false);
|
||||
}
|
||||
|
||||
/** This will fix the incoming body of 1.x messages */
|
||||
@Override
|
||||
public void receiveBuffer_1X(ByteBuf buffer) {
|
||||
this.buffer = buffer;
|
||||
this.buffer.retain();
|
||||
decode(true);
|
||||
validBuffer = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -205,7 +213,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param sendBuffer
|
||||
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
|
||||
*/
|
||||
|
@ -215,6 +222,21 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
|
||||
}
|
||||
|
||||
/**
|
||||
* Recast the message as an 1.4 message
|
||||
*/
|
||||
@Override
|
||||
public void sendBuffer_1X(ByteBuf sendBuffer) {
|
||||
checkEncode();
|
||||
ByteBuf tmpBuffer = buffer.duplicate();
|
||||
sendBuffer.writeInt(endOfBodyPosition + DataConstants.SIZE_INT);
|
||||
tmpBuffer.readerIndex(DataConstants.SIZE_INT);
|
||||
tmpBuffer.readBytes(sendBuffer, endOfBodyPosition - BUFFER_HEADER_SPACE);
|
||||
sendBuffer.writeInt(tmpBuffer.writerIndex() + DataConstants.SIZE_INT + BUFFER_HEADER_SPACE);
|
||||
tmpBuffer.readBytes(sendBuffer, tmpBuffer.readableBytes());
|
||||
sendBuffer.readerIndex(0);
|
||||
}
|
||||
|
||||
private synchronized void checkEncode() {
|
||||
if (!validBuffer) {
|
||||
encode();
|
||||
|
@ -280,12 +302,10 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return endOfBodyPosition;
|
||||
}
|
||||
|
||||
|
||||
public TypedProperties getTypedProperties() {
|
||||
return checkProperties();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void messageChanged() {
|
||||
validBuffer = false;
|
||||
|
@ -323,7 +343,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
public void copyHeadersAndProperties(final Message msg) {
|
||||
messageID = msg.getMessageID();
|
||||
address = msg.getAddressSimpleString();
|
||||
userID = (UUID)msg.getUserID();
|
||||
userID = (UUID) msg.getUserID();
|
||||
type = msg.toCore().getType();
|
||||
durable = msg.isDurable();
|
||||
expiration = msg.getExpiration();
|
||||
|
@ -331,11 +351,10 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
priority = msg.getPriority();
|
||||
|
||||
if (msg instanceof CoreMessage) {
|
||||
properties = ((CoreMessage)msg).getTypedProperties();
|
||||
properties = ((CoreMessage) msg).getTypedProperties();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Message copy() {
|
||||
checkEncode();
|
||||
|
@ -380,7 +399,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
|
||||
@Override
|
||||
public CoreMessage setUserID(Object uuid) {
|
||||
this.userID = (UUID)uuid;
|
||||
this.userID = (UUID) uuid;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -418,7 +437,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return address;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage setExpiration(long expiration) {
|
||||
this.expiration = expiration;
|
||||
|
@ -487,18 +505,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
private void decode() {
|
||||
private void decode(boolean beforeAddress) {
|
||||
endOfBodyPosition = buffer.readInt();
|
||||
|
||||
buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);
|
||||
|
||||
decodeHeadersAndProperties(buffer, true);
|
||||
buffer.readerIndex(0);
|
||||
validBuffer = true;
|
||||
|
||||
if (beforeAddress) {
|
||||
endOfBodyPosition = endOfBodyPosition - DataConstants.SIZE_INT;
|
||||
}
|
||||
|
||||
internalWritableBuffer();
|
||||
}
|
||||
|
||||
|
||||
public void decodeHeadersAndProperties(final ByteBuf buffer) {
|
||||
decodeHeadersAndProperties(buffer, false);
|
||||
}
|
||||
|
@ -529,7 +551,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public synchronized CoreMessage encode() {
|
||||
|
||||
checkProperties();
|
||||
|
@ -654,7 +675,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putBooleanProperty(final String key, final boolean value) {
|
||||
messageChanged();
|
||||
|
@ -683,7 +703,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return properties.getBooleanProperty(new SimpleString(key));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putByteProperty(final SimpleString key, final byte value) {
|
||||
messageChanged();
|
||||
|
@ -692,7 +711,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putByteProperty(final String key, final byte value) {
|
||||
messageChanged();
|
||||
|
@ -702,7 +720,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
checkProperties();
|
||||
|
@ -731,7 +748,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
checkProperties();
|
||||
|
@ -775,7 +791,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putIntProperty(final SimpleString key, final int value) {
|
||||
messageChanged();
|
||||
|
@ -803,7 +818,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return getIntProperty(SimpleString.toSimpleString(key));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putLongProperty(final SimpleString key, final long value) {
|
||||
messageChanged();
|
||||
|
@ -832,7 +846,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return getLongProperty(SimpleString.toSimpleString(key));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CoreMessage putFloatProperty(final SimpleString key, final float value) {
|
||||
messageChanged();
|
||||
|
@ -865,7 +878,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
messageChanged();
|
||||
|
@ -1071,7 +1083,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
int size = record.readInt();
|
||||
initBuffer(size);
|
||||
buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
|
||||
decode();
|
||||
decode(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,10 +19,12 @@ package org.apache.activemq.artemis.core.protocol;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
|
||||
|
@ -33,10 +35,10 @@ public class ClientPacketDecoder extends PacketDecoder {
|
|||
public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
|
||||
|
||||
@Override
|
||||
public Packet decode(final ActiveMQBuffer in) {
|
||||
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final byte packetType = in.readByte();
|
||||
|
||||
Packet packet = decode(packetType);
|
||||
Packet packet = decode(packetType, connection);
|
||||
|
||||
packet.decode(in);
|
||||
|
||||
|
@ -44,12 +46,16 @@ public class ClientPacketDecoder extends PacketDecoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Packet decode(byte packetType) {
|
||||
public Packet decode(byte packetType, CoreRemotingConnection connection) {
|
||||
Packet packet;
|
||||
|
||||
switch (packetType) {
|
||||
case SESS_RECEIVE_MSG: {
|
||||
packet = new SessionReceiveMessage(new ClientMessageImpl());
|
||||
if (connection.isVersionBeforeAddressChange()) {
|
||||
packet = new SessionReceiveMessage_1X(new ClientMessageImpl());
|
||||
} else {
|
||||
packet = new SessionReceiveMessage(new ClientMessageImpl());
|
||||
}
|
||||
break;
|
||||
}
|
||||
case SESS_RECEIVE_LARGE_MSG: {
|
||||
|
@ -57,7 +63,7 @@ public class ClientPacketDecoder extends PacketDecoder {
|
|||
break;
|
||||
}
|
||||
default: {
|
||||
packet = super.decode(packetType);
|
||||
packet = super.decode(packetType, connection);
|
||||
}
|
||||
}
|
||||
return packet;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core;
|
||||
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
|
||||
|
@ -28,13 +29,18 @@ public interface CoreRemotingConnection extends RemotingConnection {
|
|||
* The client protocol used on the communication.
|
||||
* This will determine if the client has support for certain packet types
|
||||
*/
|
||||
int getClientVersion();
|
||||
int getChannelVersion();
|
||||
|
||||
default boolean isVersionBeforeAddressChange() {
|
||||
int version = getChannelVersion();
|
||||
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the client protocol used on the communication. This will determine if the client has
|
||||
* support for certain packet types
|
||||
*/
|
||||
void setClientVersion(int clientVersion);
|
||||
void setChannelVersion(int clientVersion);
|
||||
|
||||
/**
|
||||
* Returns the channel with the channel id specified.
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.artemis.core.protocol.core;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
|
||||
/**
|
||||
* A Packet represents a packet of data transmitted over a connection.
|
||||
|
@ -71,7 +70,7 @@ public interface Packet {
|
|||
* @param connection the connection
|
||||
* @return the buffer to encode to
|
||||
*/
|
||||
ActiveMQBuffer encode(RemotingConnection connection);
|
||||
ActiveMQBuffer encode(CoreRemotingConnection connection);
|
||||
|
||||
/**
|
||||
* decodes the buffer into this packet
|
||||
|
|
|
@ -326,6 +326,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
|
|||
}
|
||||
}
|
||||
while (retry);
|
||||
sessionChannel.getConnection().setChannelVersion(response.getServerVersion());
|
||||
return newSessionContext(name, confirmationWindowSize, sessionChannel, response);
|
||||
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionInd
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||
|
@ -91,6 +91,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
|
||||
|
@ -266,8 +267,14 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
|
||||
@Override
|
||||
public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException {
|
||||
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
|
||||
SessionQueueQueryResponseMessage_V3 response = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
|
||||
SessionQueueQueryResponseMessage response;
|
||||
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
|
||||
response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
|
||||
} else {
|
||||
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
|
||||
response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
|
||||
}
|
||||
|
||||
return response.toQueueQuery();
|
||||
}
|
||||
|
@ -292,7 +299,13 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
|
||||
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
|
||||
|
||||
SessionQueueQueryResponseMessage_V3 queueInfo = (SessionQueueQueryResponseMessage_V3) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
|
||||
SessionQueueQueryResponseMessage queueInfo;
|
||||
|
||||
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2);
|
||||
} else {
|
||||
queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V3);
|
||||
}
|
||||
|
||||
// The actual windows size that gets used is determined by the user since
|
||||
// could be overridden on the queue settings
|
||||
|
@ -441,7 +454,12 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
boolean sendBlocking,
|
||||
SendAcknowledgementHandler handler,
|
||||
SimpleString defaultAddress) throws ActiveMQException {
|
||||
SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler);
|
||||
SessionSendMessage packet;
|
||||
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
|
||||
} else {
|
||||
packet = new SessionSendMessage(msgI, sendBlocking, handler);
|
||||
}
|
||||
|
||||
if (sendBlocking) {
|
||||
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
|
||||
|
@ -596,7 +614,9 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
Set<RoutingType> routingTypes,
|
||||
final boolean autoCreated) throws ActiveMQException {
|
||||
CreateAddressMessage request = new CreateAddressMessage(address, routingTypes, autoCreated, true);
|
||||
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
|
||||
if (!sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
|
@ -621,7 +641,9 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
boolean purgeOnNoConsumers,
|
||||
boolean autoCreated) throws ActiveMQException {
|
||||
CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, routingType, filterString, durable, temp, maxConsumers, purgeOnNoConsumers, autoCreated, true);
|
||||
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
|
||||
if (!sessionChannel.getConnection().isVersionBeforeAddressChange()) {
|
||||
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -695,11 +717,13 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
boolean autoCommitSends,
|
||||
boolean autoCommitAcks,
|
||||
boolean preAcknowledge) {
|
||||
return new CreateSessionMessage(name, sessionChannel.getID(), serverVersion, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
|
||||
return new CreateSessionMessage(name, sessionChannel.getID(), getServerVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId, boolean isSessionStarted) throws ActiveMQException {
|
||||
public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal,
|
||||
long consumerId,
|
||||
boolean isSessionStarted) throws ActiveMQException {
|
||||
ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
|
||||
|
||||
// We try and recreate any non durable queues, since they probably won't be there unless
|
||||
|
@ -851,7 +875,6 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
class ClientSessionPacketHandler implements ChannelHandler {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -152,7 +152,7 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
@Override
|
||||
public boolean supports(final byte packetType) {
|
||||
return supports(packetType, connection.getClientVersion());
|
||||
return supports(packetType, connection.getChannelVersion());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.Serializable;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverMessage;
|
||||
|
@ -160,9 +161,9 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUB
|
|||
|
||||
public abstract class PacketDecoder implements Serializable {
|
||||
|
||||
public abstract Packet decode(ActiveMQBuffer in);
|
||||
public abstract Packet decode(ActiveMQBuffer in, CoreRemotingConnection connection);
|
||||
|
||||
public Packet decode(byte packetType) {
|
||||
public Packet decode(byte packetType, CoreRemotingConnection connection) {
|
||||
Packet packet;
|
||||
|
||||
switch (packetType) {
|
||||
|
|
|
@ -21,8 +21,8 @@ import io.netty.buffer.Unpooled;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class PacketImpl implements Packet {
|
||||
|
@ -305,28 +305,35 @@ public class PacketImpl implements Packet {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ActiveMQBuffer encode(final RemotingConnection connection) {
|
||||
public ActiveMQBuffer encode(final CoreRemotingConnection connection) {
|
||||
ActiveMQBuffer buffer = createPacket(connection);
|
||||
|
||||
// The standard header fields
|
||||
|
||||
buffer.writeInt(0); // The length gets filled in at the end
|
||||
buffer.writeByte(type);
|
||||
buffer.writeLong(channelID);
|
||||
encodeHeader(buffer);
|
||||
|
||||
encodeRest(buffer);
|
||||
|
||||
encodeSize(buffer);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
protected void encodeHeader(ActiveMQBuffer buffer) {
|
||||
// The standard header fields
|
||||
buffer.writeInt(0); // The length gets filled in at the end
|
||||
buffer.writeByte(type);
|
||||
buffer.writeLong(channelID);
|
||||
}
|
||||
|
||||
protected void encodeSize(ActiveMQBuffer buffer) {
|
||||
size = buffer.writerIndex();
|
||||
|
||||
// The length doesn't include the actual length byte
|
||||
int len = size - DataConstants.SIZE_INT;
|
||||
|
||||
buffer.setInt(0, len);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
protected ActiveMQBuffer createPacket(RemotingConnection connection) {
|
||||
protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
|
||||
|
||||
int size = expectedEncodeSize();
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
|
||||
private final boolean client;
|
||||
|
||||
private int clientVersion;
|
||||
private int channelVersion;
|
||||
|
||||
private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id);
|
||||
|
||||
|
@ -146,19 +146,19 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
}
|
||||
|
||||
/**
|
||||
* @return the clientVersion
|
||||
* @return the channelVersion
|
||||
*/
|
||||
@Override
|
||||
public int getClientVersion() {
|
||||
return clientVersion;
|
||||
public int getChannelVersion() {
|
||||
return channelVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param clientVersion the clientVersion to set
|
||||
* @param clientVersion the channelVersion to set
|
||||
*/
|
||||
@Override
|
||||
public void setClientVersion(int clientVersion) {
|
||||
this.clientVersion = clientVersion;
|
||||
public void setChannelVersion(int clientVersion) {
|
||||
this.channelVersion = clientVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -362,7 +362,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
@Override
|
||||
public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
|
||||
try {
|
||||
final Packet packet = packetDecoder.decode(buffer);
|
||||
final Packet packet = packetDecoder.decode(buffer, this);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet);
|
||||
|
@ -417,7 +417,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
|
||||
@Override
|
||||
public void killMessage(SimpleString nodeID) {
|
||||
if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
|
||||
if (channelVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
|
||||
return;
|
||||
}
|
||||
Channel clientChannel = getChannel(1, -1);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
|
@ -26,9 +27,9 @@ public class SessionReceiveMessage extends MessagePacket {
|
|||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
private long consumerID;
|
||||
protected long consumerID;
|
||||
|
||||
private int deliveryCount;
|
||||
protected int deliveryCount;
|
||||
|
||||
public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) {
|
||||
super(SESS_RECEIVE_MSG, message);
|
||||
|
@ -69,13 +70,17 @@ public class SessionReceiveMessage extends MessagePacket {
|
|||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
// Buffer comes in after having read standard headers and positioned at Beginning of body part
|
||||
|
||||
message.receiveBuffer(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
|
||||
receiveMessage(copyMessageBuffer(buffer.byteBuf(), DataConstants.SIZE_LONG + DataConstants.SIZE_INT));
|
||||
|
||||
buffer.readerIndex(buffer.capacity() - DataConstants.SIZE_LONG - DataConstants.SIZE_INT);
|
||||
this.consumerID = buffer.readLong();
|
||||
this.deliveryCount = buffer.readInt();
|
||||
|
||||
}
|
||||
|
||||
protected void receiveMessage(ByteBuf buffer) {
|
||||
message.receiveBuffer(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
public class SessionReceiveMessage_1X extends SessionReceiveMessage {
|
||||
|
||||
public SessionReceiveMessage_1X(long consumerID, ICoreMessage message, int deliveryCount) {
|
||||
super(consumerID, message, deliveryCount);
|
||||
}
|
||||
|
||||
public SessionReceiveMessage_1X(CoreMessage message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(ActiveMQBuffer buffer) {
|
||||
message.sendBuffer_1X(buffer.byteBuf());
|
||||
buffer.writeLong(consumerID);
|
||||
buffer.writeInt(deliveryCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void receiveMessage(ByteBuf buffer) {
|
||||
message.receiveBuffer_1X(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int expectedEncodeSize() {
|
||||
return super.expectedEncodeSize() + DataConstants.SIZE_INT;
|
||||
}
|
||||
|
||||
}
|
|
@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
|||
|
||||
public class SessionSendMessage extends MessagePacket {
|
||||
|
||||
private boolean requiresResponse;
|
||||
protected boolean requiresResponse;
|
||||
|
||||
/**
|
||||
* In case, we are using a different handler than the one set on the {@link org.apache.activemq.artemis.api.core.client.ClientSession}
|
||||
|
@ -76,7 +76,7 @@ public class SessionSendMessage extends MessagePacket {
|
|||
// Buffer comes in after having read standard headers and positioned at Beginning of body part
|
||||
|
||||
ByteBuf messageBuffer = copyMessageBuffer(buffer.byteBuf(), 1);
|
||||
message.receiveBuffer(messageBuffer);
|
||||
receiveMessage(messageBuffer);
|
||||
|
||||
buffer.readerIndex(buffer.capacity() - 1);
|
||||
|
||||
|
@ -84,6 +84,10 @@ public class SessionSendMessage extends MessagePacket {
|
|||
|
||||
}
|
||||
|
||||
protected void receiveMessage(ByteBuf messageBuffer) {
|
||||
message.receiveBuffer(messageBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
/**
|
||||
* SessionSend Message for the 1.x branch
|
||||
*/
|
||||
public class SessionSendMessage_1X extends SessionSendMessage {
|
||||
|
||||
public SessionSendMessage_1X(ICoreMessage message, boolean requiresResponse, SendAcknowledgementHandler handler) {
|
||||
super(message, requiresResponse, handler);
|
||||
}
|
||||
|
||||
public SessionSendMessage_1X(CoreMessage message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(ActiveMQBuffer buffer) {
|
||||
message.sendBuffer_1X(buffer.byteBuf());
|
||||
buffer.writeBoolean(requiresResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void receiveMessage(ByteBuf messageBuffer) {
|
||||
message.receiveBuffer_1X(messageBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int expectedEncodeSize() {
|
||||
return super.expectedEncodeSize() + DataConstants.SIZE_INT;
|
||||
}
|
||||
|
||||
}
|
|
@ -64,6 +64,8 @@ import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
|||
*/
|
||||
public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Externalizable, Referenceable, ConnectionFactory, XAConnectionFactory, AutoCloseable {
|
||||
|
||||
private static final long serialVersionUID = -7554006056207377105L;
|
||||
|
||||
private ServerLocator serverLocator;
|
||||
|
||||
private String clientID;
|
||||
|
|
|
@ -384,7 +384,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
|
|||
public static boolean isQueue(TYPE type) {
|
||||
boolean result = false;
|
||||
|
||||
if (type.equals(QUEUE) || type.equals(TEMP_QUEUE)) {
|
||||
if (type != null && (type.equals(QUEUE) || type.equals(TEMP_QUEUE))) {
|
||||
result = true;
|
||||
}
|
||||
|
||||
|
@ -394,7 +394,7 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
|
|||
public static boolean isTemporary(TYPE type) {
|
||||
boolean result = false;
|
||||
|
||||
if (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE)) {
|
||||
if (type != null && (type.equals(TEMP_TOPIC) || type.equals(TEMP_QUEUE))) {
|
||||
result = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.activemq.artemis.maven;
|
|||
import java.io.File;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -29,6 +31,7 @@ import org.apache.maven.plugins.annotations.LifecyclePhase;
|
|||
import org.apache.maven.plugins.annotations.Mojo;
|
||||
import org.apache.maven.plugins.annotations.Parameter;
|
||||
import org.apache.maven.project.MavenProject;
|
||||
import org.eclipse.aether.repository.RemoteRepository;
|
||||
|
||||
@Mojo(name = "dependency-scan", defaultPhase = LifecyclePhase.VERIFY)
|
||||
public class ArtemisDependencyScanPlugin extends ArtemisAbstractPlugin {
|
||||
|
@ -47,6 +50,9 @@ public class ArtemisDependencyScanPlugin extends ArtemisAbstractPlugin {
|
|||
@Parameter
|
||||
private String[] libList;
|
||||
|
||||
@Parameter
|
||||
private String[] extraRepositories;
|
||||
|
||||
@Parameter
|
||||
private String variableName;
|
||||
|
||||
|
@ -64,6 +70,16 @@ public class ArtemisDependencyScanPlugin extends ArtemisAbstractPlugin {
|
|||
|
||||
@Override
|
||||
protected void doExecute() throws MojoExecutionException, MojoFailureException {
|
||||
|
||||
int repositories = 0;
|
||||
List<RemoteRepository> listRepo = new ArrayList<>();
|
||||
if (extraRepositories != null) {
|
||||
for (String strRepo: extraRepositories) {
|
||||
RemoteRepository repo = new RemoteRepository.Builder("repo" + (repositories++), "default", strRepo).build();
|
||||
listRepo.add(repo);
|
||||
remoteRepos.add(repo);
|
||||
}
|
||||
}
|
||||
getLog().info("Local " + localRepository);
|
||||
MavenProject project = (MavenProject) getPluginContext().get("project");
|
||||
|
||||
|
@ -102,7 +118,13 @@ public class ArtemisDependencyScanPlugin extends ArtemisAbstractPlugin {
|
|||
} catch (Throwable e) {
|
||||
getLog().error(e);
|
||||
throw new MojoFailureException(e.getMessage());
|
||||
} finally {
|
||||
for (RemoteRepository repository : listRepo) {
|
||||
remoteRepos.remove(repository);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCon
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
|
||||
|
@ -84,51 +86,58 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
|||
private static final long serialVersionUID = 3348673114388400766L;
|
||||
public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
|
||||
|
||||
private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in) {
|
||||
final SessionSendMessage sendMessage = new SessionSendMessage(new CoreMessage());
|
||||
private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final SessionSendMessage sendMessage;
|
||||
|
||||
if (connection.isVersionBeforeAddressChange()) {
|
||||
sendMessage = new SessionSendMessage_1X(new CoreMessage());
|
||||
} else {
|
||||
sendMessage = new SessionSendMessage(new CoreMessage());
|
||||
}
|
||||
|
||||
sendMessage.decode(in);
|
||||
return sendMessage;
|
||||
}
|
||||
|
||||
private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in) {
|
||||
private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final SessionAcknowledgeMessage acknowledgeMessage = new SessionAcknowledgeMessage();
|
||||
acknowledgeMessage.decode(in);
|
||||
return acknowledgeMessage;
|
||||
}
|
||||
|
||||
private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in) {
|
||||
private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final SessionRequestProducerCreditsMessage requestProducerCreditsMessage = new SessionRequestProducerCreditsMessage();
|
||||
requestProducerCreditsMessage.decode(in);
|
||||
return requestProducerCreditsMessage;
|
||||
}
|
||||
|
||||
private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in) {
|
||||
private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final SessionConsumerFlowCreditMessage sessionConsumerFlowCreditMessage = new SessionConsumerFlowCreditMessage();
|
||||
sessionConsumerFlowCreditMessage.decode(in);
|
||||
return sessionConsumerFlowCreditMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Packet decode(final ActiveMQBuffer in) {
|
||||
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
|
||||
final byte packetType = in.readByte();
|
||||
//optimized for the most common cases: hottest and commons methods will be inlined and this::decode too due to the byte code size
|
||||
switch (packetType) {
|
||||
case SESS_SEND:
|
||||
return decodeSessionSendMessage(in);
|
||||
return decodeSessionSendMessage(in, connection);
|
||||
case SESS_ACKNOWLEDGE:
|
||||
return decodeSessionAcknowledgeMessage(in);
|
||||
return decodeSessionAcknowledgeMessage(in, connection);
|
||||
case SESS_PRODUCER_REQUEST_CREDITS:
|
||||
return decodeRequestProducerCreditsMessage(in);
|
||||
return decodeRequestProducerCreditsMessage(in, connection);
|
||||
case SESS_FLOWTOKEN:
|
||||
return decodeSessionConsumerFlowCreditMessage(in);
|
||||
return decodeSessionConsumerFlowCreditMessage(in, connection);
|
||||
default:
|
||||
return slowPathDecode(in, packetType);
|
||||
return slowPathDecode(in, packetType, connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// separating for performance reasons
|
||||
private Packet slowPathDecode(ActiveMQBuffer in, byte packetType) {
|
||||
private Packet slowPathDecode(ActiveMQBuffer in, byte packetType, CoreRemotingConnection connection) {
|
||||
Packet packet;
|
||||
|
||||
switch (packetType) {
|
||||
|
@ -242,7 +251,7 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
|||
break;
|
||||
}
|
||||
default: {
|
||||
packet = super.decode(packetType);
|
||||
packet = super.decode(packetType, connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -314,11 +314,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_CREATECONSUMER: {
|
||||
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
|
||||
requiresResponse = request.isRequiresResponse();
|
||||
session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getClientVersion()), request.getFilterString(), request.isBrowseOnly());
|
||||
session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getChannelVersion()), request.getFilterString(), request.isBrowseOnly());
|
||||
if (requiresResponse) {
|
||||
// We send back queue information on the queue as a response- this allows the queue to
|
||||
// be automatically recreated on failover
|
||||
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
|
||||
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion()));
|
||||
|
||||
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
|
||||
response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
|
||||
|
@ -387,9 +387,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_QUEUEQUERY: {
|
||||
requiresResponse = true;
|
||||
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
|
||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
|
||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getChannelVersion()));
|
||||
|
||||
if (remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
if (remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
|
||||
}
|
||||
|
||||
|
@ -405,7 +405,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
case SESS_BINDINGQUERY: {
|
||||
requiresResponse = true;
|
||||
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
|
||||
final int clientVersion = remotingConnection.getClientVersion();
|
||||
final int clientVersion = remotingConnection.getChannelVersion();
|
||||
BindingQueryResult result = session.executeBindingQuery(request.getAddress(clientVersion));
|
||||
|
||||
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
|
||||
|
|
|
@ -145,10 +145,10 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
|||
"Server will not accept create session requests");
|
||||
}*/
|
||||
|
||||
if (connection.getClientVersion() == 0) {
|
||||
connection.setClientVersion(request.getVersion());
|
||||
} else if (connection.getClientVersion() != request.getVersion()) {
|
||||
ActiveMQServerLogger.LOGGER.incompatibleVersionAfterConnect(request.getVersion(), connection.getClientVersion());
|
||||
if (connection.getChannelVersion() == 0) {
|
||||
connection.setChannelVersion(request.getVersion());
|
||||
} else if (connection.getChannelVersion() != request.getVersion()) {
|
||||
ActiveMQServerLogger.LOGGER.incompatibleVersionAfterConnect(request.getVersion(), connection.getChannelVersion());
|
||||
}
|
||||
|
||||
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
|
||||
|
@ -163,7 +163,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
|||
|
||||
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
|
||||
|
||||
if (connection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
if (connection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
routingTypeMap = new HashMap<>();
|
||||
routingTypeMap.put(PacketImpl.OLD_QUEUE_PREFIX, RoutingType.ANYCAST);
|
||||
routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST);
|
||||
|
|
|
@ -253,14 +253,14 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
|
|||
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage) packet;
|
||||
|
||||
if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY_V2) {
|
||||
channel0.getConnection().setClientVersion(((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion());
|
||||
channel0.getConnection().setChannelVersion(((SubscribeClusterTopologyUpdatesMessageV2) msg).getClientVersion());
|
||||
}
|
||||
|
||||
final ClusterTopologyListener listener = new ClusterTopologyListener() {
|
||||
@Override
|
||||
public void nodeUP(final TopologyMember topologyMember, final boolean last) {
|
||||
try {
|
||||
final Pair<TransportConfiguration, TransportConfiguration> connectorPair = BackwardsCompatibilityUtils.getTCPair(channel0.getConnection().getClientVersion(), topologyMember);
|
||||
final Pair<TransportConfiguration, TransportConfiguration> connectorPair = BackwardsCompatibilityUtils.getTCPair(channel0.getConnection().getChannelVersion(), topologyMember);
|
||||
|
||||
final String nodeID = topologyMember.getNodeId();
|
||||
// Using an executor as most of the notifications on the Topology
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
|
@ -112,7 +113,12 @@ public final class CoreSessionCallback implements SessionCallback {
|
|||
@Override
|
||||
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
|
||||
|
||||
Packet packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
|
||||
Packet packet;
|
||||
if (channel.getConnection().isVersionBeforeAddressChange()) {
|
||||
packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount);
|
||||
} else {
|
||||
packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
|
||||
}
|
||||
|
||||
int size = 0;
|
||||
|
||||
|
|
|
@ -237,7 +237,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
if (session.getRemotingConnection() instanceof CoreRemotingConnection) {
|
||||
CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection();
|
||||
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||
requiresLegacyPrefix = true;
|
||||
if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) {
|
||||
anycast = true;
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.activemq.examples.broker</groupId>
|
||||
<artifactId>jms-examples</artifactId>
|
||||
<version>2.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>send-acknowledgements-fail</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>ActiveMQ Artemis JMS Send Acknowledgements Example</name>
|
||||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-client-all</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-cli</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>create</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<instance>${basedir}/target/server0</instance>
|
||||
<args>
|
||||
<arg>--global-max-size</arg>
|
||||
<arg>10M</arg>
|
||||
</args>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>runClient</id>
|
||||
<goals>
|
||||
<goal>runClient</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<clientClass>org.apache.activemq.artemis.jms.example.SendAcknowledgementsExample</clientClass>
|
||||
<args>
|
||||
<param>${basedir}/target/server0</param>
|
||||
</args>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq.examples.broker</groupId>
|
||||
<artifactId>send-acknowledgements-fail</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,140 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<html>
|
||||
<head>
|
||||
<title>ActiveMQ Artemis Asynchronous Send Acknowledgements Example</title>
|
||||
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
|
||||
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
|
||||
<script type="text/javascript" src="../../../common/prettify.js"></script>
|
||||
</head>
|
||||
<body onload="prettyPrint()">
|
||||
<h1>Asynchronous Send Acknowledgements Example</h1>
|
||||
|
||||
<pre>To run the example, simply type <b>mvn verify</b> from this directory, <br>or <b>mvn -PnoServer verify</b> if you want to start and create the server manually.</pre>
|
||||
|
||||
|
||||
<p>Asynchronous Send Acknowledgements are an advanced feature of ActiveMQ Artemis which allow you to
|
||||
receive acknowledgements that messages were successfully received at the server in a separate thread to the sending thread<p/>
|
||||
<p>In this example we create a normal JMS session, then set a SendAcknowledgementHandler on the JMS
|
||||
session's underlying core session. We send many messages to the server without blocking and asynchronously
|
||||
receive send acknowledgements via the SendAcknowledgementHandler.
|
||||
|
||||
<p>For more information on Asynchronous Send Acknowledgements please see the user manual</p>
|
||||
<h2>Example step-by-step</h2>
|
||||
<p><i>To run the example, simply type <code>mvn verify -Pexample</code> from this directory</i></p>
|
||||
|
||||
<ol>
|
||||
<li>First we need to get an initial context so we can look-up the JMS connection factory and destination objects from JNDI. This initial context will get it's properties from the <code>client-jndi.properties</code> file in the directory <code>../common/config</code></li>
|
||||
<pre class="prettyprint">
|
||||
<code>InitialContext initialContext = getContext();</code>
|
||||
</pre>
|
||||
|
||||
<li>We look-up the JMS queue object from JNDI</li>
|
||||
<pre class="prettyprint">
|
||||
<code>Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");</code>
|
||||
</pre>
|
||||
|
||||
<li>We look-up the JMS connection factory object from JNDI</li>
|
||||
<pre class="prettyprint">
|
||||
<code>ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");</code>
|
||||
</pre>
|
||||
|
||||
<li>We create a JMS connection</li>
|
||||
<pre class="prettyprint">
|
||||
<code>connection = cf.createConnection();</code>
|
||||
</pre>
|
||||
|
||||
<li>Define a SendAcknowledgementHandler which will receive asynchronous acknowledgements</li>
|
||||
<pre class="prettyprint">
|
||||
<code>
|
||||
class MySendAcknowledgementsHandler implements SendAcknowledgementHandler
|
||||
{
|
||||
int count = 0;
|
||||
|
||||
public void sendAcknowledged(final Message message)
|
||||
{
|
||||
System.out.println("Received send acknowledgement for message " + count++);
|
||||
}
|
||||
}
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
<li>Create a JMS session</li>
|
||||
<pre class="prettyprint">
|
||||
<code>Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);</code>
|
||||
</pre>
|
||||
|
||||
<li>Set the handler on the underlying core session</li>
|
||||
<pre class="prettyprint">
|
||||
<code>
|
||||
ClientSession coreSession = ((ActiveMQSession)session).getCoreSession();
|
||||
|
||||
coreSession.setSendAcknowledgementHandler(new MySendAcknowledgementsHandler());
|
||||
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
<li>Create a JMS Message Producer</li>
|
||||
<pre class="prettyprint">
|
||||
<code>
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
<li>Send 5000 messages, the handler will get called asynchronously some time later after the messages are sent.</li>
|
||||
<pre class="prettyprint">
|
||||
<code>
|
||||
final int numMessages = 5000;
|
||||
|
||||
for (int i = 0; i < numMessages; i++)
|
||||
{
|
||||
javax.jms.Message jmsMessage = session.createMessage();
|
||||
|
||||
producer.send(jmsMessage);
|
||||
|
||||
System.out.println("Sent message " + i);
|
||||
}
|
||||
</code>
|
||||
</pre>
|
||||
|
||||
|
||||
<li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
|
||||
|
||||
<pre class="prettyprint">
|
||||
<code>finally
|
||||
{
|
||||
if (initialContext != null)
|
||||
{
|
||||
initialContext.close();
|
||||
}
|
||||
if (connection != null)
|
||||
{
|
||||
connection.close();
|
||||
}
|
||||
}</code>
|
||||
</pre>
|
||||
|
||||
|
||||
|
||||
</ol>
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jms.example;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.util.ServerUtil;
|
||||
|
||||
/**
|
||||
* Asynchronous Send Acknowledgements are an advanced feature of ActiveMQ Artemis which allow you to
|
||||
* receive acknowledgements that messages were successfully received at the server in a separate stream
|
||||
* to the stream of messages being sent to the server.
|
||||
* For more information please see the readme.html file
|
||||
*/
|
||||
public class SendAcknowledgementsExample {
|
||||
|
||||
private static Process server0;
|
||||
private static final int numMessages = 30_000;
|
||||
private static final SimpleString queueName = SimpleString.toSimpleString("testQueue");
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
|
||||
for (int i = 0; i < 500; i++) {
|
||||
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ Running test " + i);
|
||||
server0 = ServerUtil.startServer(args[0], SendAcknowledgementsExample.class.getSimpleName() + "0", 0, 10000);
|
||||
sendMessages();
|
||||
|
||||
server0 = ServerUtil.startServer(args[0], SendAcknowledgementsExample.class.getSimpleName() + "0", 0, 10000);
|
||||
consumeMessages();
|
||||
}
|
||||
}
|
||||
|
||||
private static void sendMessages() throws Exception {
|
||||
try {
|
||||
|
||||
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(false).setConfirmationWindowSize(1024 * 1024);
|
||||
|
||||
ClientSessionFactory factory = locator.createSessionFactory();
|
||||
|
||||
ClientSession session = factory.createSession(null, null, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE);
|
||||
|
||||
try {
|
||||
// Tried with and without the createAddress
|
||||
session.createAddress(queueName, RoutingType.MULTICAST, false);
|
||||
session.createQueue(queueName.toString(), RoutingType.MULTICAST, queueName.toString(), true);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
ClientProducer producer = session.createProducer(queueName);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(numMessages);
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
|
||||
if (i % 10000 == 0) {
|
||||
System.out.println("Send " + i);
|
||||
}
|
||||
ClientMessage message = session.createMessage(true);
|
||||
message.getBodyBuffer().writeBytes("hello world".getBytes());
|
||||
|
||||
// tried with producer.send(queueName, message, ...);; // didn't make a difference
|
||||
|
||||
producer.send(message, new SendAcknowledgementHandler() {
|
||||
@Override
|
||||
public void sendAcknowledged(Message message) {
|
||||
latch.countDown();
|
||||
if (latch.getCount() % 10_000 == 0) {
|
||||
System.out.println(latch.getCount() + " to go");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
latch.await(10, TimeUnit.MINUTES);
|
||||
} finally {
|
||||
server0.destroy();
|
||||
server0.waitFor();
|
||||
}
|
||||
}
|
||||
|
||||
private static void consumeMessages() throws Exception {
|
||||
try {
|
||||
|
||||
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(false).setConfirmationWindowSize(-1);
|
||||
|
||||
ClientSessionFactory factory = locator.createSessionFactory();
|
||||
|
||||
ClientSession session = factory.createSession(null, null, false, false, false, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE);
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(queueName);
|
||||
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
|
||||
if (i % 10000 == 0) {
|
||||
System.out.println("Received " + i);
|
||||
}
|
||||
|
||||
ClientMessage message = consumer.receive(5000);
|
||||
message.acknowledge();
|
||||
|
||||
if (message == null) {
|
||||
System.err.println("Expected message at " + i);
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
ClientMessage message = consumer.receiveImmediate();
|
||||
if (message != null) {
|
||||
System.err.println("Received too many messages");
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
session.close();
|
||||
locator.close();
|
||||
|
||||
} finally {
|
||||
server0.destroy();
|
||||
server0.waitFor();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
|
||||
connectionFactory.ConnectionFactory=tcp://localhost:61616?confirmationWindowSize=1048576
|
||||
queue.queue/exampleQueue=exampleQueue
|
6
pom.xml
6
pom.xml
|
@ -111,6 +111,9 @@
|
|||
<version.org.jacoco>0.7.9</version.org.jacoco>
|
||||
<version.org.jacoco.plugin>0.7.9</version.org.jacoco.plugin>
|
||||
|
||||
<!-- used on tests -->
|
||||
<groovy.version>2.4.3</groovy.version>
|
||||
|
||||
<owasp.version>1.4.3</owasp.version>
|
||||
<spring.version>5.0.1.RELEASE</spring.version>
|
||||
|
||||
|
@ -126,6 +129,7 @@
|
|||
<skipJmsTests>true</skipJmsTests>
|
||||
<skipExtraTests>true</skipExtraTests>
|
||||
<skipIntegrationTests>true</skipIntegrationTests>
|
||||
<skipCompatibilityTests>true</skipCompatibilityTests>
|
||||
<skipSmokeTests>true</skipSmokeTests>
|
||||
<skipJoramTests>true</skipJoramTests>
|
||||
<skipTimingTests>true</skipTimingTests>
|
||||
|
@ -959,6 +963,7 @@
|
|||
<skipJmsTests>false</skipJmsTests>
|
||||
<skipJoramTests>false</skipJoramTests>
|
||||
<skipIntegrationTests>false</skipIntegrationTests>
|
||||
<skipCompatibilityTests>false</skipCompatibilityTests>
|
||||
<skipSmokeTests>false</skipSmokeTests>
|
||||
<skipTimingTests>true</skipTimingTests>
|
||||
<skipConcurrentTests>false</skipConcurrentTests>
|
||||
|
@ -1005,6 +1010,7 @@
|
|||
<skipExtraTests>true</skipExtraTests>
|
||||
<skipStyleCheck>false</skipStyleCheck>
|
||||
<skipLicenseCheck>false</skipLicenseCheck>
|
||||
<skipCompatibilityTests>false</skipCompatibilityTests>
|
||||
</properties>
|
||||
</profile>
|
||||
<profile>
|
||||
|
|
|
@ -0,0 +1,475 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.activemq.tests</groupId>
|
||||
<artifactId>artemis-tests-pom</artifactId>
|
||||
<version>2.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>compatibility-tests</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>ActiveMQ Artemis Compatibility Tests</name>
|
||||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-server</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-commons</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq.tests</groupId>
|
||||
<artifactId>unit-tests</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-core-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-server</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-ra</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-cli</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-commons</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-spring-integration</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-journal</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jdbc-store</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-amqp-protocol</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-stomp-protocol</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-openwire-protocol</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jms_1.1_spec</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-hornetq-protocol</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-core-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-server</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-native</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-service-extensions</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq.tests</groupId>
|
||||
<artifactId>artemis-test-support</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-features</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>pom</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq.rest</groupId>
|
||||
<artifactId>artemis-rest</artifactId>
|
||||
<version>2.5.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.groovy</groupId>
|
||||
<artifactId>groovy-all</artifactId>
|
||||
<version>${groovy.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-junit</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-client</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jms_1.1_spec</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-mqtt-protocol</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.fusesource.mqtt-client</groupId>
|
||||
<artifactId>mqtt-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-json_1.0_spec</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jta_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jms_2.0_spec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.johnzon</groupId>
|
||||
<artifactId>johnzon-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-buffer</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-codec-http</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-codec-mqtt</artifactId>
|
||||
<version>${netty.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-handler</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-transport</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.logging</groupId>
|
||||
<artifactId>jboss-logging-processor</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.logging</groupId>
|
||||
<artifactId>jboss-logging-annotations</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.logging</groupId>
|
||||
<artifactId>jboss-logging</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.logmanager</groupId>
|
||||
<artifactId>jboss-logmanager</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>qpid-jms-client</artifactId>
|
||||
<version>${qpid.jms.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>proton-j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.5</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<testResources>
|
||||
<testResource>
|
||||
<directory>src/test/resources</directory>
|
||||
<filtering>true</filtering>
|
||||
</testResource>
|
||||
</testResources>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>test</phase>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<!-- The executions of dependency-scan will calculate dependencies for each specific version used here on this testsuite. -->
|
||||
<execution>
|
||||
<id>snapshot-check</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>dependency-scan</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<libListWithDeps>
|
||||
<arg>org.apache.activemq:artemis-jms-server:${project.version}</arg>
|
||||
<arg>org.apache.activemq:artemis-jms-client:${project.version}</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:${project.version}</arg>
|
||||
<arg>org.apache.activemq:artemis-amqp-protocol:${project.version}</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:${project.version}</arg>
|
||||
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
|
||||
</libListWithDeps>
|
||||
<libList>
|
||||
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
|
||||
</libList>
|
||||
<variableName>ARTEMIS-SNAPSHOT</variableName>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>dependency-scan</goal>
|
||||
</goals>
|
||||
<id>240-check</id>
|
||||
<configuration>
|
||||
<libListWithDeps>
|
||||
<arg>org.apache.activemq:artemis-jms-server:2.4.0</arg>
|
||||
<arg>org.apache.activemq:artemis-jms-client:2.4.0</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:2.4.0</arg>
|
||||
<arg>org.apache.activemq:artemis-amqp-protocol:2.4.0</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:2.4.0</arg>
|
||||
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
|
||||
</libListWithDeps>
|
||||
<libList>
|
||||
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
|
||||
</libList>
|
||||
<variableName>ARTEMIS-240</variableName>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>140-check</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>dependency-scan</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<libListWithDeps>
|
||||
<arg>org.apache.activemq:artemis-jms-server:1.4.0</arg>
|
||||
<arg>org.apache.activemq:artemis-jms-client:1.4.0</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:1.4.0</arg>
|
||||
<arg>org.apache.activemq:artemis-amqp-protocol:1.4.0</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:1.4.0</arg>
|
||||
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
|
||||
</libListWithDeps>
|
||||
<libList>
|
||||
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
|
||||
</libList>
|
||||
<variableName>ARTEMIS-140</variableName>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>hornetq-235</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>dependency-scan</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<extraRepositories>
|
||||
<!-- some of the dependencies are not on maven central,
|
||||
and the artemis maven plugin has the capability of using this extra repo -->
|
||||
<arg>https://repository.jboss.org/nexus/content/groups/public</arg>
|
||||
</extraRepositories>
|
||||
<libListWithDeps>
|
||||
<arg>org.hornetq:hornetq-jms-server:2.4.7.Final</arg>
|
||||
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
|
||||
</libListWithDeps>
|
||||
<libList>
|
||||
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
|
||||
</libList>
|
||||
<variableName>HORNETQ-235</variableName>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>hornetq-247</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>dependency-scan</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<extraRepositories>
|
||||
<!-- some of the dependencies are not on maven central,
|
||||
and the artemis maven plugin has the capability of using this extra repo -->
|
||||
<arg>https://repository.jboss.org/nexus/content/groups/public</arg>
|
||||
</extraRepositories>
|
||||
<libListWithDeps>
|
||||
<arg>org.hornetq:hornetq-jms-server:2.4.7.Final</arg>
|
||||
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
|
||||
</libListWithDeps>
|
||||
<libList>
|
||||
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
|
||||
</libList>
|
||||
<variableName>HORNETQ-247</variableName>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<systemProperties>
|
||||
<property>
|
||||
<name>ARTEMIS-SNAPSHOT</name>
|
||||
<value>${ARTEMIS-SNAPSHOT}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>ARTEMIS-240</name>
|
||||
<value>${ARTEMIS-240}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>ARTEMIS-140</name>
|
||||
<value>${ARTEMIS-140}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>HORNETQ-235</name>
|
||||
<value>${HORNETQ-235}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>HORNETQ-247</name>
|
||||
<value>${HORNETQ-247}</value>
|
||||
</property>
|
||||
</systemProperties>
|
||||
<skipTests>${skipCompatibilityTests}</skipTests>
|
||||
<argLine>-Djgroups.bind_addr=::1 ${activemq-surefire-argline}</argLine>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.servicemix.tooling</groupId>
|
||||
<artifactId>depends-maven-plugin</artifactId>
|
||||
<version>1.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>generate-depends-file</id>
|
||||
<goals>
|
||||
<goal>generate-depends-file</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<distributionManagement>
|
||||
<repository>
|
||||
<id>jboss-releases-repository</id>
|
||||
<name>JBoss Releases Repository</name>
|
||||
<url>https://repository.jboss.org/nexus/service/local/staging/deploy/maven2/</url>
|
||||
</repository>
|
||||
<snapshotRepository>
|
||||
<id>jboss-snapshots-repository</id>
|
||||
<name>JBoss Snapshots Repository</name>
|
||||
<url>https://repository.jboss.org/nexus/content/repositories/snapshots/</url>
|
||||
</snapshotRepository>
|
||||
</distributionManagement>
|
||||
|
||||
|
||||
</project>
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.compatibility;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
|
||||
import groovy.lang.Binding;
|
||||
import groovy.lang.GroovyShell;
|
||||
|
||||
public class GroovyRun {
|
||||
|
||||
public static final String SNAPSHOT = "ARTEMIS-SNAPSHOT";
|
||||
public static final String ONE_FOUR = "ARTEMIS-140";
|
||||
public static final String TWO_FOUR = "ARTEMIS-240";
|
||||
public static final String HORNETQ_235 = "HORNETQ-235";
|
||||
public static final String HORNETQ_247 = "HORNETQ-247";
|
||||
|
||||
public static final String WORD_START = "**SERVER STARTED**";
|
||||
|
||||
public static Binding binding = new Binding();
|
||||
public static GroovyShell shell = new GroovyShell(binding);
|
||||
|
||||
// Called with reflection
|
||||
public static void doMain(String script, String... arg) throws Throwable {
|
||||
int i = 0;
|
||||
for (String a : arg) {
|
||||
System.out.println("[" + (i++) + "]=" + a);
|
||||
}
|
||||
System.out.println();
|
||||
|
||||
evaluate(script, "arg", arg);
|
||||
|
||||
System.out.println(WORD_START);
|
||||
}
|
||||
|
||||
/**
|
||||
* This can be called from the scripts as well.
|
||||
* The scripts will use this method instead of its own groovy method.
|
||||
* As a classloader operation needs to be done here.
|
||||
*/
|
||||
public static void evaluate(String script,
|
||||
String argVariableName,
|
||||
String[] arg) throws URISyntaxException, IOException {
|
||||
URL scriptURL = GroovyRun.class.getClassLoader().getResource(script);
|
||||
if (scriptURL == null) {
|
||||
throw new RuntimeException("cannot find " + script);
|
||||
}
|
||||
URI scriptURI = scriptURL.toURI();
|
||||
|
||||
binding.setVariable(argVariableName, arg);
|
||||
|
||||
shell.evaluate(scriptURI);
|
||||
}
|
||||
|
||||
// Called with reflection
|
||||
public static void execute(String script) throws Throwable {
|
||||
shell.evaluate(script);
|
||||
}
|
||||
|
||||
public static void assertNotNull(Object value) {
|
||||
if (value == null) {
|
||||
throw new RuntimeException("Null value");
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertNull(Object value) {
|
||||
if (value != null) {
|
||||
throw new RuntimeException("Expected Null value");
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertTrue(boolean value) {
|
||||
if (!value) {
|
||||
throw new RuntimeException("Expected true");
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertEquals(Object value1, Object value2) {
|
||||
if (!value1.equals(value2)) {
|
||||
throw new RuntimeException(value1 + "!=" + value2);
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertEquals(int value1, int value2) {
|
||||
if (value1 != value2) {
|
||||
throw new RuntimeException(value1 + "!=" + value2);
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertEquals(byte[] value1, byte[] value2) {
|
||||
|
||||
assertEquals(value1.length, value2.length);
|
||||
|
||||
for (int i = 0; i < value1.length; i++) {
|
||||
assertEquals(value1[i], value2[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static byte getSamplebyte(final long position) {
|
||||
return (byte) ('a' + position % ('z' - 'a' + 1));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
package clients
|
||||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
// Create a client connection factory
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
|
||||
|
||||
println("serverType " + serverArg[0]);
|
||||
|
||||
if (serverArg[0].startsWith("HORNETQ")) {
|
||||
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false");
|
||||
} else {
|
||||
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false");
|
||||
}
|
||||
|
||||
|
||||
GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend());
|
||||
GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize());
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
package clients
|
||||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
// This script is called by sendMessages.groovy
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.hornetq.api.core.TransportConfiguration;
|
||||
import org.hornetq.api.jms.HornetQJMSConstants;
|
||||
import org.hornetq.core.message.impl.MessageImpl;
|
||||
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
|
||||
import org.hornetq.core.remoting.impl.netty.TransportConstants;
|
||||
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
|
||||
|
||||
|
||||
properties = new HashMap();
|
||||
properties.put(TransportConstants.HOST_PROP_NAME, "localhost");
|
||||
properties.put(TransportConstants.PORT_PROP_NAME, "61616");
|
||||
configuration = new TransportConfiguration(NettyConnectorFactory.class.getName(), properties);
|
||||
cf = new HornetQJMSConnectionFactory(false, configuration);
|
|
@ -0,0 +1,205 @@
|
|||
package meshTest
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
import javax.jms.*
|
||||
|
||||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
// starts an artemis server
|
||||
String serverType = arg[0];
|
||||
String clientType = arg[1];
|
||||
String operation = arg[2];
|
||||
|
||||
|
||||
String queueName = "queue";
|
||||
|
||||
int LARGE_MESSAGE_SIZE = 10 * 1024;
|
||||
|
||||
String propertyLargeMessage = "JMS_AMQ_InputStream";
|
||||
HDR_DUPLICATE_DETECTION_ID = "_AMQ_DUPL_ID";
|
||||
|
||||
if (clientType.startsWith("HORNETQ")) {
|
||||
HDR_DUPLICATE_DETECTION_ID = "_HQ_DUPL_ID";
|
||||
propertyLargeMessage = "JMS_HQ_InputStream"
|
||||
}
|
||||
|
||||
BYTES_BODY = new byte[3];
|
||||
BYTES_BODY[0] = (byte) 0x77;
|
||||
BYTES_BODY[1] = (byte) 0x77;
|
||||
BYTES_BODY[2] = (byte) 0x77;
|
||||
|
||||
String textBody = "a rapadura e doce mas nao e mole nao";
|
||||
|
||||
|
||||
println("serverType " + serverType);
|
||||
|
||||
if (clientType.startsWith("ARTEMIS")) {
|
||||
// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
|
||||
GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
|
||||
} else {
|
||||
// Can't depend directly on hornetq, otherwise it wouldn't compile in artemis
|
||||
GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg");
|
||||
}
|
||||
|
||||
|
||||
Connection connection = cf.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
|
||||
if (operation.equals("sendAckMessages")) {
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
||||
System.out.println("Sending messages");
|
||||
|
||||
TextMessage message = session.createTextMessage(textBody);
|
||||
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID, "some-duplicate");
|
||||
message.setStringProperty("prop", "test");
|
||||
producer.send(message);
|
||||
|
||||
BytesMessage bytesMessage = session.createBytesMessage();
|
||||
bytesMessage.writeBytes(BYTES_BODY);
|
||||
producer.send(bytesMessage);
|
||||
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
BytesMessage m = session.createBytesMessage();
|
||||
m.setIntProperty("count", i);
|
||||
|
||||
m.setObjectProperty(propertyLargeMessage, createFakeLargeStream(LARGE_MESSAGE_SIZE));
|
||||
|
||||
producer.send(m);
|
||||
}
|
||||
|
||||
producer.send(session.createObjectMessage("rapadura"));
|
||||
|
||||
MapMessage mapMessage = session.createMapMessage();
|
||||
mapMessage.setString("prop", "rapadura")
|
||||
producer.send(mapMessage);
|
||||
|
||||
StreamMessage streamMessage = session.createStreamMessage();
|
||||
streamMessage.writeString("rapadura");
|
||||
streamMessage.writeString("doce");
|
||||
streamMessage.writeInt(33);
|
||||
producer.send(streamMessage);
|
||||
|
||||
Message plain = session.createMessage();
|
||||
plain.setStringProperty("plain", "doce");
|
||||
producer.send(plain);
|
||||
|
||||
session.commit();
|
||||
|
||||
connection.close();
|
||||
System.out.println("Message sent");
|
||||
} else if (operation.equals("receiveMessages")) {
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
System.out.println("Receiving messages");
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||
GroovyRun.assertNotNull(message);
|
||||
GroovyRun.assertEquals(textBody, message.getText());
|
||||
GroovyRun.assertEquals("test", message.getStringProperty("prop"));
|
||||
GroovyRun.assertEquals("some-duplicate", message.getStringProperty(HDR_DUPLICATE_DETECTION_ID));
|
||||
|
||||
BytesMessage bm = (BytesMessage) consumer.receive(5000);
|
||||
GroovyRun.assertNotNull(bm);
|
||||
|
||||
GroovyRun.assertEquals(3L, bm.getBodyLength());
|
||||
|
||||
byte[] body = new byte[3];
|
||||
bm.readBytes(body);
|
||||
|
||||
GroovyRun.assertEquals(BYTES_BODY, body);
|
||||
|
||||
for (int m = 0; m < 10; m++) {
|
||||
BytesMessage rm = (BytesMessage) consumer.receive(10000);
|
||||
GroovyRun.assertNotNull(rm);
|
||||
GroovyRun.assertEquals(m, rm.getIntProperty("count"));
|
||||
|
||||
byte[] data = new byte[1024];
|
||||
|
||||
System.out.println("Message = " + rm);
|
||||
|
||||
for (int i = 0; i < LARGE_MESSAGE_SIZE; i += 1024) {
|
||||
int numberOfBytes = rm.readBytes(data);
|
||||
GroovyRun.assertEquals(1024, numberOfBytes);
|
||||
for (int j = 0; j < 1024; j++) {
|
||||
GroovyRun.assertEquals(GroovyRun.getSamplebyte(i + j), data[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ObjectMessage obj = consumer.receive(5000);
|
||||
GroovyRun.assertNotNull(obj);
|
||||
GroovyRun.assertEquals("rapadura", obj.getObject().toString());
|
||||
|
||||
MapMessage mapMessage = consumer.receive(5000);
|
||||
GroovyRun.assertNotNull(mapMessage);
|
||||
GroovyRun.assertEquals("rapadura", mapMessage.getString("prop"));
|
||||
|
||||
StreamMessage streamMessage = consumer.receive(5000);
|
||||
GroovyRun.assertNotNull(streamMessage);
|
||||
GroovyRun.assertEquals("rapadura", streamMessage.readString());
|
||||
GroovyRun.assertEquals("doce", streamMessage.readString());
|
||||
GroovyRun.assertTrue(streamMessage.readInt() == 33);
|
||||
|
||||
Message plain = consumer.receive(5000);
|
||||
GroovyRun.assertNotNull(plain);
|
||||
GroovyRun.assertEquals("doce", plain.getStringProperty("plain"));
|
||||
|
||||
session.commit();
|
||||
connection.close();
|
||||
System.out.println("Message received");
|
||||
} else {
|
||||
throw new RuntimeException("Invalid operation " + operation);
|
||||
}
|
||||
|
||||
|
||||
// Creates a Fake LargeStream without using a real file
|
||||
InputStream createFakeLargeStream(final long size) throws Exception {
|
||||
return new InputStream() {
|
||||
private long count;
|
||||
|
||||
private boolean closed = false;
|
||||
|
||||
@Override
|
||||
void close() throws IOException {
|
||||
super.close();
|
||||
closed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
int read() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Stream was closed");
|
||||
}
|
||||
if (count++ < size) {
|
||||
return GroovyRun.getSamplebyte(count - 1);
|
||||
}
|
||||
else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,97 @@
|
|||
package meshTest
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
import javax.jms.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
// starts an artemis server
|
||||
String serverType = arg[0];
|
||||
String clientType = arg[1];
|
||||
String operation = arg[2];
|
||||
|
||||
|
||||
String queueName = "queue";
|
||||
|
||||
|
||||
String textBody = "a rapadura e doce mas nao e mole nao";
|
||||
|
||||
println("serverType " + serverType);
|
||||
|
||||
if (clientType.startsWith("ARTEMIS")) {
|
||||
// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
|
||||
GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
|
||||
} else {
|
||||
// Can't depend directly on hornetq, otherwise it wouldn't compile in artemis
|
||||
GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg");
|
||||
}
|
||||
|
||||
|
||||
Connection connection = cf.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
|
||||
if (operation.equals("sendAckMessages")) {
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(10);
|
||||
|
||||
CompletionListener completionListener = new CompletionListener() {
|
||||
@Override
|
||||
void onCompletion(Message message) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
void onException(Message message, Exception exception) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
producer.send(session.createTextMessage(textBody + i), completionListener);
|
||||
}
|
||||
|
||||
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
System.out.println("Sending messages");
|
||||
connection.close();
|
||||
System.out.println("Message sent");
|
||||
} else if (operation.equals("receiveMessages")) {
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
System.out.println("Receiving messages");
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
TextMessage message = consumer.receive(1000);
|
||||
GroovyRun.assertNotNull(message);
|
||||
GroovyRun.assertEquals(textBody + i, message.getText());
|
||||
}
|
||||
|
||||
GroovyRun.assertNull(consumer.receiveNoWait());
|
||||
connection.close();
|
||||
System.out.println("Message received");
|
||||
} else {
|
||||
throw new RuntimeException("Invalid operation " + operation);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
package clients
|
||||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
// Create a client connection factory
|
||||
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
|
||||
import javax.jms.*;
|
||||
import org.apache.activemq.artemis.jms.client.*
|
||||
|
||||
file = arg[0]
|
||||
method = arg[1]
|
||||
System.out.println("File::" + file);
|
||||
|
||||
|
||||
if (method.equals("write")) {
|
||||
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false");
|
||||
queue = new ActiveMQQueue("queue");
|
||||
topic = new ActiveMQTopic("topic")
|
||||
|
||||
ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(file));
|
||||
objectOutputStream.writeObject(cf);
|
||||
objectOutputStream.writeObject(queue)
|
||||
objectOutputStream.writeObject(topic)
|
||||
objectOutputStream.close();
|
||||
} else {
|
||||
ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream(file))
|
||||
|
||||
cf = inputStream.readObject();
|
||||
queue = inputStream.readObject()
|
||||
topic = inputStream.readObject()
|
||||
inputStream.close();
|
||||
}
|
||||
|
||||
GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend());
|
||||
GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize());
|
||||
|
||||
Connection connection = cf.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
MessageProducer topicProducer = session.createProducer(topic)
|
||||
connection.close();
|
||||
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
package servers
|
||||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
// starts an artemis server
|
||||
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
|
||||
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
|
||||
|
||||
|
||||
String folder = arg[0];
|
||||
String id = arg[1];
|
||||
String type = arg[2];
|
||||
String producer = arg[3];
|
||||
String consumer = arg[4];
|
||||
|
||||
println("type = " + type);
|
||||
|
||||
configuration = new ConfigurationImpl();
|
||||
configuration.setJournalType(JournalType.NIO);
|
||||
System.out.println("folder:: " + folder);
|
||||
configuration.setBrokerInstance(new File(folder + "/" + id));
|
||||
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
|
||||
configuration.setSecurityEnabled(false);
|
||||
configuration.setPersistenceEnabled(false);
|
||||
try {
|
||||
if (!type.equals("ARTEMIS-140")) {
|
||||
configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateAddresses(true));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// need to ignore this for 1.4
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
jmsConfiguration = new JMSConfigurationImpl();
|
||||
|
||||
server = new EmbeddedJMS();
|
||||
server.setConfiguration(configuration);
|
||||
server.setJmsConfiguration(jmsConfiguration);
|
||||
server.start();
|
||||
|
||||
// uncomment this next statements to validate https://issues.apache.org/jira/browse/ARTEMIS-1561
|
||||
if (producer.toString().equals("ARTEMIS-140") && type.equals("ARTEMIS-SNAPSHOT") ||
|
||||
producer.toString().startsWith("HORNETQ")) {
|
||||
server.getJMSServerManager().createQueue(true, "queue", null, true);
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
package servers
|
||||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
// starts a hornetq server
|
||||
|
||||
import org.hornetq.api.core.TransportConfiguration
|
||||
import org.hornetq.core.config.impl.ConfigurationImpl
|
||||
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.hornetq.core.remoting.impl.netty.TransportConstants
|
||||
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl
|
||||
import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl
|
||||
import org.hornetq.jms.server.embedded.EmbeddedJMS
|
||||
|
||||
String folder = arg[0];
|
||||
String id = arg[1];
|
||||
String type = arg[2];
|
||||
String producer = arg[3];
|
||||
String consumer = arg[4];
|
||||
|
||||
configuration = new ConfigurationImpl();
|
||||
configuration.setSecurityEnabled(false);
|
||||
configuration.setJournalDirectory(folder + "/" + id + "/journal");
|
||||
configuration.setBindingsDirectory(folder + "/" + id + "/binding");
|
||||
configuration.setPagingDirectory(folder + "/" + id + "/paging");
|
||||
configuration.setLargeMessagesDirectory(folder + "/" + id + "/largemessage");
|
||||
configuration.setJournalType(org.hornetq.core.server.JournalType.NIO);
|
||||
configuration.setPersistenceEnabled(false);
|
||||
|
||||
HashMap map = new HashMap();
|
||||
map.put(TransportConstants.HOST_PROP_NAME, "localhost");
|
||||
map.put(TransportConstants.PORT_PROP_NAME, "61616");
|
||||
TransportConfiguration tpc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), map);
|
||||
configuration.getAcceptorConfigurations().add(tpc);
|
||||
|
||||
jmsConfiguration = new JMSConfigurationImpl();
|
||||
|
||||
JMSQueueConfigurationImpl queueConfiguration = new JMSQueueConfigurationImpl("queue", null, true);
|
||||
jmsConfiguration.getQueueConfigurations().add(queueConfiguration);
|
||||
server = new EmbeddedJMS();
|
||||
server.setConfiguration(configuration);
|
||||
server.setJmsConfiguration(jmsConfiguration);
|
||||
server.start();
|
||||
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.compatibility;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_247;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FOUR;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
|
||||
|
||||
/**
|
||||
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
|
||||
*
|
||||
* cd /compatibility-tests
|
||||
* mvn install -Ptests | tee output.log
|
||||
*
|
||||
* on the output.log you will see the output generated by {@link #getClasspathProperty(String)}
|
||||
*
|
||||
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
|
||||
* On Idea you would do the following:
|
||||
*
|
||||
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class MeshTest extends VersionedBaseTest {
|
||||
|
||||
// this will ensure that all tests in this class are run twice,
|
||||
// once with "true" passed to the class' constructor and once with "false"
|
||||
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
||||
public static Collection getParameters() {
|
||||
// we don't need every single version ever released..
|
||||
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
|
||||
List<Object[]> combinations = new ArrayList<>();
|
||||
|
||||
/*
|
||||
// during development sometimes is useful to comment out the combinations
|
||||
// and add the ones you are interested.. example:
|
||||
*/
|
||||
// combinations.add(new Object[]{SNAPSHOT, ONE_FOUR, ONE_FOUR});
|
||||
// combinations.add(new Object[]{ONE_FOUR, ONE_FOUR, ONE_FOUR});
|
||||
|
||||
combinations.addAll(combinatory(new Object[]{SNAPSHOT}, new Object[]{ONE_FOUR, TWO_FOUR, SNAPSHOT, HORNETQ_235}, new Object[]{ONE_FOUR, TWO_FOUR, SNAPSHOT, HORNETQ_235}));
|
||||
combinations.addAll(combinatory(new Object[]{ONE_FOUR}, new Object[]{ONE_FOUR, SNAPSHOT}, new Object[]{ONE_FOUR, SNAPSHOT}));
|
||||
combinations.addAll(combinatory(new Object[]{HORNETQ_235}, new Object[]{ONE_FOUR, SNAPSHOT, HORNETQ_235}, new Object[]{ONE_FOUR, SNAPSHOT, HORNETQ_235}));
|
||||
combinations.addAll(combinatory(new Object[]{HORNETQ_247}, new Object[]{SNAPSHOT, HORNETQ_247}, new Object[]{SNAPSHOT, HORNETQ_247}));
|
||||
return combinations;
|
||||
}
|
||||
|
||||
public MeshTest(String server, String sender, String receiver) throws Exception {
|
||||
super(server, sender, receiver);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendReceive() throws Throwable {
|
||||
callMain(senderClassloader, GroovyRun.class.getName(), "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
|
||||
callMain(receiverClassloader, GroovyRun.class.getName(), "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.compatibility;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FOUR;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
|
||||
|
||||
/**
|
||||
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
|
||||
*
|
||||
* cd /compatibility-tests
|
||||
* mvn install -Ptests | tee output.log
|
||||
*
|
||||
* on the output.log you will see the output generated by {@link #getClasspathProperty(String)}
|
||||
*
|
||||
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
|
||||
* On Idea you would do the following:
|
||||
*
|
||||
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class SendAckTest extends VersionedBaseTest {
|
||||
|
||||
// this will ensure that all tests in this class are run twice,
|
||||
// once with "true" passed to the class' constructor and once with "false"
|
||||
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
||||
public static Collection getParameters() {
|
||||
// we don't need every single version ever released..
|
||||
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
|
||||
List<Object[]> combinations = new ArrayList<>();
|
||||
|
||||
/*
|
||||
// during development sometimes is useful to comment out the combinations
|
||||
// and add the ones you are interested.. example:
|
||||
*/
|
||||
// combinations.add(new Object[]{SNAPSHOT, ONE_FOUR, ONE_FOUR});
|
||||
// combinations.add(new Object[]{ONE_FOUR, ONE_FOUR, ONE_FOUR});
|
||||
|
||||
combinations.addAll(combinatory(new Object[]{SNAPSHOT, ONE_FOUR}, new Object[]{ONE_FOUR, SNAPSHOT}, new Object[]{ONE_FOUR, SNAPSHOT}));
|
||||
|
||||
// not every combination on two four would make sense.. as there's a compatibility issue between 2.4 and 1.4 when crossing consumers and producers
|
||||
combinations.add(new Object[]{TWO_FOUR, SNAPSHOT, SNAPSHOT});
|
||||
combinations.add(new Object[]{SNAPSHOT, TWO_FOUR, TWO_FOUR});
|
||||
return combinations;
|
||||
}
|
||||
|
||||
public SendAckTest(String server, String sender, String receiver) throws Exception {
|
||||
super(server, sender, receiver);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendReceive() throws Throwable {
|
||||
callMain(senderClassloader, GroovyRun.class.getName(), "sendAckTest/sendAckMessages.groovy", server, sender, "sendAckMessages");
|
||||
callMain(receiverClassloader, GroovyRun.class.getName(), "sendAckTest/sendAckMessages.groovy", server, receiver, "receiveMessages");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.compatibility;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FOUR;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||
|
||||
/**
|
||||
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
|
||||
*
|
||||
* cd /compatibility-tests
|
||||
* mvn install -Ptests | tee output.log
|
||||
*
|
||||
* on the output.log you will see the output generated by {@link #getClasspathProperty(String)}
|
||||
*
|
||||
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
|
||||
* On Idea you would do the following:
|
||||
*
|
||||
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class SerializationTest extends VersionedBaseTest {
|
||||
|
||||
// this will ensure that all tests in this class are run twice,
|
||||
// once with "true" passed to the class' constructor and once with "false"
|
||||
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
||||
public static Collection getParameters() {
|
||||
// we don't need every single version ever released..
|
||||
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
|
||||
List<Object[]> combinations = new ArrayList<>();
|
||||
|
||||
/*
|
||||
// during development sometimes is useful to comment out the combinations
|
||||
// and add the ones you are interested.. example:
|
||||
*/
|
||||
// combinations.add(new Object[]{SNAPSHOT, ONE_FOUR, ONE_FOUR});
|
||||
// combinations.add(new Object[]{ONE_FOUR, ONE_FOUR, ONE_FOUR});
|
||||
|
||||
combinations.addAll(combinatory(new Object[]{SNAPSHOT}, new Object[]{ONE_FOUR, SNAPSHOT}, new Object[]{ONE_FOUR, SNAPSHOT}));
|
||||
return combinations;
|
||||
}
|
||||
|
||||
public SerializationTest(String server, String sender, String receiver) throws Exception {
|
||||
super(server, sender, receiver);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerializeFactory() throws Throwable {
|
||||
File file = serverFolder.newFile("objects.ser");
|
||||
file.mkdirs();
|
||||
callMain(senderClassloader, GroovyRun.class.getName(), "serial/serial.groovy", file.getAbsolutePath(), "write");
|
||||
callMain(receiverClassloader, GroovyRun.class.getName(), "serial/serial.groovy", file.getAbsolutePath(), "read");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,179 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.compatibility;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.utils.FileUtil;
|
||||
import org.apache.activemq.artemis.utils.RunnableEx;
|
||||
import org.junit.After;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||
|
||||
public abstract class VersionedBaseTest {
|
||||
|
||||
protected final String server;
|
||||
protected final String sender;
|
||||
protected final String receiver;
|
||||
|
||||
protected ClassLoader serverClassloader;
|
||||
protected ClassLoader senderClassloader;
|
||||
protected ClassLoader receiverClassloader;
|
||||
|
||||
public VersionedBaseTest(String server, String sender, String receiver) throws Exception {
|
||||
this.server = server;
|
||||
this.sender = sender;
|
||||
this.receiver = receiver;
|
||||
this.serverClassloader = getClasspathProperty(server);
|
||||
this.senderClassloader = getClasspathProperty(sender);
|
||||
this.receiverClassloader = getClasspathProperty(receiver);
|
||||
}
|
||||
|
||||
// This is a test optimization..
|
||||
// if false it will span a new VM for each classLoader used.
|
||||
// this can be a bit faster
|
||||
public static final boolean USE_CLASSLOADER = true;
|
||||
|
||||
private static HashSet<String> printed = new HashSet<>();
|
||||
|
||||
@ClassRule
|
||||
public static TemporaryFolder serverFolder;
|
||||
|
||||
static {
|
||||
File parent = new File("./target/tmp");
|
||||
parent.mkdirs();
|
||||
serverFolder = new TemporaryFolder(parent);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void startServer() throws Throwable {
|
||||
FileUtil.deleteDirectory(serverFolder.getRoot());
|
||||
|
||||
serverFolder.getRoot().mkdirs();
|
||||
|
||||
System.out.println("Folder::" + serverFolder.getRoot());
|
||||
|
||||
String scriptToUse;
|
||||
if (server.startsWith("ARTEMIS")) {
|
||||
scriptToUse = "servers/artemisServer.groovy";
|
||||
} else {
|
||||
scriptToUse = "servers/hornetqServer.groovy";
|
||||
}
|
||||
|
||||
callMain(serverClassloader, GroovyRun.class.getName(), scriptToUse, serverFolder.getRoot().getAbsolutePath(), "1", server, sender, receiver);
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopServer() throws Throwable {
|
||||
callExecute(serverClassloader, GroovyRun.class.getName(), "server.stop()");
|
||||
|
||||
// GC help!!!
|
||||
serverClassloader = null;
|
||||
senderClassloader = null;
|
||||
receiverClassloader = null;
|
||||
}
|
||||
|
||||
protected static void callMain(ClassLoader loader,
|
||||
String className,
|
||||
String script,
|
||||
String... arguments) throws Exception {
|
||||
tclCall(loader, () -> {
|
||||
Class clazz = loader.loadClass(className);
|
||||
Method method = clazz.getMethod("doMain", String.class, String[].class);
|
||||
method.invoke(null, script, arguments);
|
||||
});
|
||||
}
|
||||
|
||||
protected static void callExecute(ClassLoader loader, String className, String script) throws Exception {
|
||||
tclCall(loader, () -> {
|
||||
Class clazz = loader.loadClass(className);
|
||||
Method method = clazz.getMethod("execute", String.class);
|
||||
method.invoke(null, script);
|
||||
});
|
||||
}
|
||||
|
||||
protected static void tclCall(ClassLoader loader, RunnableEx run) throws Exception {
|
||||
|
||||
ClassLoader original = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
try {
|
||||
run.run();
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(original);
|
||||
}
|
||||
}
|
||||
|
||||
protected static ClassLoader defineClassLoader(String classPath) throws MalformedURLException {
|
||||
String[] classPathArray = classPath.split(File.pathSeparator);
|
||||
URL[] elements = new URL[classPathArray.length];
|
||||
for (int i = 0; i < classPathArray.length; i++) {
|
||||
elements[i] = new File(classPathArray[i]).toPath().toUri().toURL();
|
||||
}
|
||||
|
||||
return new URLClassLoader(elements, null);
|
||||
}
|
||||
|
||||
protected static ClassLoader getClasspathProperty(String name) throws Exception {
|
||||
|
||||
if (name.equals(SNAPSHOT)) {
|
||||
return VersionedBaseTest.class.getClassLoader();
|
||||
}
|
||||
String value = System.getProperty(name);
|
||||
|
||||
if (!printed.contains(name)) {
|
||||
boolean ok = value != null && !value.trim().isEmpty();
|
||||
if (!ok) {
|
||||
System.out.println("Add \"-D" + name + "=\'CLASSPATH\'\" into your VM settings");
|
||||
} else {
|
||||
printed.add(name);
|
||||
System.out.println("****************************************************************************");
|
||||
System.out.println("* If you want to debug this test, add this parameter to your IDE run settings...");
|
||||
System.out.println("****************************************************************************");
|
||||
System.out.println("-D" + name + "=\"" + value + "\"");
|
||||
System.out.println("****************************************************************************");
|
||||
|
||||
}
|
||||
|
||||
Assume.assumeTrue("Cannot run these tests, no classpath found", ok);
|
||||
}
|
||||
|
||||
return defineClassLoader(value);
|
||||
}
|
||||
|
||||
protected static List<Object[]> combinatory(Object[] rootSide, Object[] sideLeft, Object[] sideRight) {
|
||||
LinkedList<Object[]> combinations = new LinkedList<>();
|
||||
|
||||
for (Object root : rootSide) {
|
||||
for (Object left : sideLeft) {
|
||||
for (Object right : sideRight) {
|
||||
combinations.add(new Object[]{root, left, right});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return combinations;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* This package will contain tests that will validate Artemis within itself in different versions
|
||||
* and within some specific hornetQ versions.
|
||||
*
|
||||
* The integration tests pom will use the artemis-maven-plugin/dependency-scan to lookup for specific verions.
|
||||
* The tests will only work if the plugin was executed and the system property was filled,
|
||||
* or if it was manually added within the IDE running for debug purposes.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.compatibility;
|
|
@ -126,6 +126,7 @@
|
|||
<module>timing-tests</module>
|
||||
<module>jms-tests</module>
|
||||
<module>integration-tests</module>
|
||||
<module>compatibility-tests</module>
|
||||
<module>soak-tests</module>
|
||||
<module>stress-tests</module>
|
||||
<module>performance-tests</module>
|
||||
|
|
|
@ -90,10 +90,68 @@ public final class SpawnedVMSupport {
|
|||
final boolean logErrorOutput,
|
||||
final boolean useLogging,
|
||||
final String... args) throws Exception {
|
||||
ProcessBuilder builder = new ProcessBuilder();
|
||||
return spawnVM(System.getProperty("java.class.path"), wordMatch, wordRunning, className, memoryArg1, memoryArg2, vmargs, logOutput, logErrorOutput, useLogging, args);
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static Process spawnVM(String classPath,
|
||||
String wordMatch,
|
||||
Runnable wordRunning,
|
||||
String className,
|
||||
String memoryArg1,
|
||||
String memoryArg2,
|
||||
String[] vmargs,
|
||||
boolean logOutput,
|
||||
boolean logErrorOutput,
|
||||
boolean useLogging,
|
||||
String... args) throws IOException, ClassNotFoundException {
|
||||
return spawnVM(classPath, wordMatch, wordRunning, className, memoryArg1,memoryArg2, vmargs, logOutput, logErrorOutput, useLogging, -1, args);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param classPath
|
||||
* @param wordMatch
|
||||
* @param wordRunning
|
||||
* @param className
|
||||
* @param memoryArg1
|
||||
* @param memoryArg2
|
||||
* @param vmargs
|
||||
* @param logOutput
|
||||
* @param logErrorOutput
|
||||
* @param useLogging
|
||||
* @param debugPort if <=0 it means no debug
|
||||
* @param args
|
||||
* @return
|
||||
* @throws IOException
|
||||
* @throws ClassNotFoundException
|
||||
*/
|
||||
public static Process spawnVM(String classPath,
|
||||
String wordMatch,
|
||||
Runnable wordRunning,
|
||||
String className,
|
||||
String memoryArg1,
|
||||
String memoryArg2,
|
||||
String[] vmargs,
|
||||
boolean logOutput,
|
||||
boolean logErrorOutput,
|
||||
boolean useLogging,
|
||||
long debugPort,
|
||||
String... args) throws IOException, ClassNotFoundException {
|
||||
final String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java").toAbsolutePath().toString();
|
||||
ProcessBuilder builder = new ProcessBuilder();
|
||||
if (memoryArg1 == null) {
|
||||
memoryArg1 = "-Xms128m";
|
||||
}
|
||||
if (memoryArg2 == null) {
|
||||
memoryArg2 = "-Xmx128m";
|
||||
}
|
||||
builder.command(javaPath, memoryArg1, memoryArg2);
|
||||
builder.environment().put("CLASSPATH", System.getProperty("java.class.path"));
|
||||
if (debugPort > 0) {
|
||||
builder.command().add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=" + debugPort);
|
||||
}
|
||||
builder.environment().put("CLASSPATH", classPath);
|
||||
|
||||
List<String> commandList = builder.command();
|
||||
|
||||
|
@ -143,7 +201,6 @@ public final class SpawnedVMSupport {
|
|||
errorLogger.start();
|
||||
|
||||
return process;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue