This commit is contained in:
Clebert Suconic 2018-08-30 15:04:14 -04:00
commit f80e87768b
51 changed files with 935 additions and 110 deletions

View File

@ -38,7 +38,7 @@ import static org.apache.activemq.artemis.reader.MapMessageUtil.writeBodyMap;
/**
* ActiveMQ Artemis implementation of a JMS MapMessage.
*/
public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
// Constants -----------------------------------------------------
public static final byte TYPE = Message.MAP_TYPE;

View File

@ -197,7 +197,7 @@ public class ActiveMQMessage implements javax.jms.Message {
private String msgID;
// Cache it
private Destination replyTo;
protected Destination replyTo;
// Cache it
private String jmsCorrelationID;
@ -209,8 +209,6 @@ public class ActiveMQMessage implements javax.jms.Message {
private boolean clientAck;
private boolean enable1xPrefixes;
private long jmsDeliveryTime;
// Constructors --------------------------------------------------
@ -366,23 +364,11 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
SimpleString address = MessageUtil.getJMSReplyTo(message);
if (address != null) {
String name = address.toString();
// swap the old prefixes for the new ones so the proper destination type gets created
if (enable1xPrefixes) {
if (address.startsWith(OLD_QUEUE_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_QUEUE_QUALIFIED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TEMP_QUEUE_QUALIFED_PREFIX)) {
name = address.subSeq(OLD_TEMP_QUEUE_QUALIFED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TOPIC_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_TOPIC_QUALIFIED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TEMP_TOPIC_QUALIFED_PREFIX)) {
name = address.subSeq(OLD_TEMP_TOPIC_QUALIFED_PREFIX.length(), address.length()).toString();
}
}
replyTo = ActiveMQDestination.fromPrefixedName(address.toString(), name);
SimpleString repl = MessageUtil.getJMSReplyTo(message);
if (repl != null) {
replyTo = ActiveMQDestination.fromPrefixedName(repl.toString());
}
}
return replyTo;
@ -417,23 +403,20 @@ public class ActiveMQMessage implements javax.jms.Message {
}
}
protected SimpleString checkPrefix(SimpleString address) {
return address;
}
protected SimpleString checkPrefixStr(SimpleString address) {
return address;
}
@Override
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
SimpleString address = message.getAddressSimpleString();
SimpleString name = address;
if (address != null & enable1xPrefixes) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_QUEUE_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TEMP_QUEUE_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TOPIC_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length(), address.length());
}
}
SimpleString changedAddress = checkPrefix(address);
if (address == null) {
dest = null;
@ -445,8 +428,8 @@ public class ActiveMQMessage implements javax.jms.Message {
dest = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address.toString());
}
if (name != null) {
((ActiveMQDestination) dest).setName(name.toString());
if (changedAddress != null) {
((ActiveMQDestination) dest).setName(changedAddress.toString());
}
}
@ -903,10 +886,6 @@ public class ActiveMQMessage implements javax.jms.Message {
}
}
public void setEnable1xPrefixes(boolean enable1xPrefixes) {
this.enable1xPrefixes = enable1xPrefixes;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer("ActiveMQMessage[");

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
/**
* ActiveMQ Artemis implementation of a JMS MessageConsumer.
@ -218,10 +219,11 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE ||
ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE ||
coreMessage.getType() == ActiveMQObjectMessage.TYPE;
jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? coreSession : null, options);
if (session.isEnable1xPrefixes()) {
jmsMsg.setEnable1xPrefixes(true);
jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, needSession ? coreSession : null, options);
} else {
jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? coreSession : null, options);
}
try {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
import org.apache.activemq.artemis.utils.SelectorTranslator;
/**
@ -141,10 +142,10 @@ public final class ActiveMQQueueBrowser implements QueueBrowser {
if (hasMoreElements()) {
ClientMessage next = current;
current = null;
msg = ActiveMQMessage.createMessage(next, session, options);
if (enable1xPrefixes) {
msg.setEnable1xPrefixes(true);
msg = ActiveMQCompatibleMessage.createMessage(next, session, options);
} else {
msg = ActiveMQMessage.createMessage(next, session, options);
}
try {

View File

@ -62,6 +62,12 @@ import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery;
import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQBytesCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQMapCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQObjectCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompabileMessage;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.SelectorTranslator;
@ -144,8 +150,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
public BytesMessage createBytesMessage() throws JMSException {
checkClosed();
ActiveMQBytesMessage message = new ActiveMQBytesMessage(session);
message.setEnable1xPrefixes(enable1xPrefixes);
ActiveMQBytesMessage message;
if (enable1xPrefixes) {
message = new ActiveMQBytesCompatibleMessage(session);
} else {
message = new ActiveMQBytesMessage(session);
}
return message;
}
@ -153,8 +163,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
public MapMessage createMapMessage() throws JMSException {
checkClosed();
ActiveMQMapMessage message = new ActiveMQMapMessage(session);
message.setEnable1xPrefixes(enable1xPrefixes);
ActiveMQMapMessage message;
if (enable1xPrefixes) {
message = new ActiveMQMapCompatibleMessage(session);
} else {
message = new ActiveMQMapMessage(session);
}
return message;
}
@ -162,8 +176,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
public Message createMessage() throws JMSException {
checkClosed();
ActiveMQMessage message = new ActiveMQMessage(session);
message.setEnable1xPrefixes(enable1xPrefixes);
ActiveMQMessage message;
if (enable1xPrefixes) {
message = new ActiveMQCompatibleMessage(session);
} else {
message = new ActiveMQMessage(session);
}
return message;
}
@ -171,8 +189,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
public ObjectMessage createObjectMessage() throws JMSException {
checkClosed();
ActiveMQObjectMessage message = new ActiveMQObjectMessage(session, options);
message.setEnable1xPrefixes(enable1xPrefixes);
ActiveMQObjectMessage message;
if (enable1xPrefixes) {
message = new ActiveMQObjectCompatibleMessage(session, options);
} else {
message = new ActiveMQObjectMessage(session, options);
}
return message;
}
@ -180,9 +202,13 @@ public class ActiveMQSession implements QueueSession, TopicSession {
public ObjectMessage createObjectMessage(final Serializable object) throws JMSException {
checkClosed();
ActiveMQObjectMessage msg = new ActiveMQObjectMessage(session, options);
ActiveMQObjectMessage msg;
if (enable1xPrefixes) {
msg = new ActiveMQObjectCompatibleMessage(session, options);
} else {
msg = new ActiveMQObjectMessage(session, options);
}
msg.setObject(object);
msg.setEnable1xPrefixes(enable1xPrefixes);
return msg;
}
@ -191,8 +217,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
public StreamMessage createStreamMessage() throws JMSException {
checkClosed();
ActiveMQStreamMessage message = new ActiveMQStreamMessage(session);
message.setEnable1xPrefixes(enable1xPrefixes);
ActiveMQStreamMessage message;
if (enable1xPrefixes) {
message = new ActiveMQStreamMessage(session);
} else {
message = new ActiveMQStreamCompatibleMessage(session);
}
return message;
}
@ -200,9 +230,13 @@ public class ActiveMQSession implements QueueSession, TopicSession {
public TextMessage createTextMessage() throws JMSException {
checkClosed();
ActiveMQTextMessage msg = new ActiveMQTextMessage(session);
ActiveMQTextMessage msg;
if (enable1xPrefixes) {
msg = new ActiveMQTextCompabileMessage(session);
} else {
msg = new ActiveMQTextMessage(session);
}
msg.setText(null);
msg.setEnable1xPrefixes(enable1xPrefixes);
return msg;
}
@ -211,9 +245,13 @@ public class ActiveMQSession implements QueueSession, TopicSession {
public TextMessage createTextMessage(final String text) throws JMSException {
checkClosed();
ActiveMQTextMessage msg = new ActiveMQTextMessage(session);
ActiveMQTextMessage msg;
if (enable1xPrefixes) {
msg = new ActiveMQTextCompabileMessage(session);
} else {
msg = new ActiveMQTextMessage(session);
}
msg.setText(text);
msg.setEnable1xPrefixes(enable1xPrefixes);
return msg;
}

View File

@ -44,7 +44,7 @@ import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadStr
/**
* ActiveMQ Artemis implementation of a JMS StreamMessage.
*/
public final class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
public static final byte TYPE = Message.STREAM_TYPE;

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
public class JMSMessageListenerWrapper implements MessageHandler {
@ -72,7 +73,13 @@ public class JMSMessageListenerWrapper implements MessageHandler {
*/
@Override
public void onMessage(final ClientMessage message) {
ActiveMQMessage msg = ActiveMQMessage.createMessage(message, session.getCoreSession(), options);
ActiveMQMessage msg;
if (session.isEnable1xPrefixes()) {
msg = ActiveMQCompatibleMessage.createMessage(message, session.getCoreSession(), options);
} else {
msg = ActiveMQMessage.createMessage(message, session.getCoreSession(), options);
}
if (individualACK) {
msg.setIndividualAcknowledge();
@ -82,10 +89,6 @@ public class JMSMessageListenerWrapper implements MessageHandler {
msg.setClientAcknowledge();
}
if (session.isEnable1xPrefixes()) {
msg.setEnable1xPrefixes(true);
}
try {
msg.doBeforeReceive();
} catch (Exception e) {

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.client.compatible1X;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
public class ActiveMQBytesCompatibleMessage extends ActiveMQBytesMessage {
@Override
protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
replyTo = ActiveMQCompatibleMessage.findCompatibleReplyTo(message);
}
return replyTo;
}
public ActiveMQBytesCompatibleMessage(ClientSession session) {
super(session);
}
protected ActiveMQBytesCompatibleMessage(ClientMessage message, ClientSession session) {
super(message, session);
}
public ActiveMQBytesCompatibleMessage(BytesMessage foreign, ClientSession session) throws JMSException {
super(foreign, session);
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.client.compatible1X;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMapMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQObjectMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQStreamMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.reader.MessageUtil;
public class ActiveMQCompatibleMessage extends ActiveMQMessage {
public ActiveMQCompatibleMessage(byte type, ClientSession session) {
super(type, session);
}
public ActiveMQCompatibleMessage(ClientSession session) {
super(session);
}
public ActiveMQCompatibleMessage(ClientMessage message, ClientSession session) {
super(message, session);
}
public ActiveMQCompatibleMessage(Message foreign, ClientSession session) throws JMSException {
super(foreign, session);
}
public ActiveMQCompatibleMessage() {
}
public ActiveMQCompatibleMessage(Message foreign, byte type, ClientSession session) throws JMSException {
super(foreign, type, session);
}
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
replyTo = findCompatibleReplyTo(message);
}
return replyTo;
}
public static Destination findCompatibleReplyTo(ClientMessage message) {
SimpleString address = MessageUtil.getJMSReplyTo(message);
if (address != null) {
String name = address.toString();
// swap the old prefixes for the new ones so the proper destination type gets created
if (address.startsWith(OLD_QUEUE_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_QUEUE_QUALIFIED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TEMP_QUEUE_QUALIFED_PREFIX)) {
name = address.subSeq(OLD_TEMP_QUEUE_QUALIFED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TOPIC_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_TOPIC_QUALIFIED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TEMP_TOPIC_QUALIFED_PREFIX)) {
name = address.subSeq(OLD_TEMP_TOPIC_QUALIFED_PREFIX.length(), address.length()).toString();
}
return ActiveMQDestination.fromPrefixedName(address.toString(), name);
}
return null;
}
@Override
public SimpleString checkPrefix(SimpleString address) {
return checkPrefix1X(address);
}
protected static SimpleString checkPrefix1X(SimpleString address) {
if (address != null) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX)) {
return address.subSeq(PacketImpl.OLD_QUEUE_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
return address.subSeq(PacketImpl.OLD_TEMP_QUEUE_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX)) {
return address.subSeq(PacketImpl.OLD_TOPIC_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX)) {
return address.subSeq(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length(), address.length());
}
}
return null;
}
public static ActiveMQMessage createMessage(final ClientMessage message,
final ClientSession session,
final ConnectionFactoryOptions options) {
int type = message.getType();
ActiveMQMessage msg;
switch (type) {
case ActiveMQMessage.TYPE: // 0
{
msg = new ActiveMQCompatibleMessage(message, session);
break;
}
case ActiveMQBytesMessage.TYPE: // 4
{
msg = new ActiveMQBytesCompatibleMessage(message, session);
break;
}
case ActiveMQMapMessage.TYPE: // 5
{
msg = new ActiveMQMapCompatibleMessage(message, session);
break;
}
case ActiveMQObjectMessage.TYPE: {
msg = new ActiveMQObjectCompatibleMessage(message, session, options);
break;
}
case ActiveMQStreamMessage.TYPE: // 6
{
msg = new ActiveMQStreamCompatibleMessage(message, session);
break;
}
case ActiveMQTextMessage.TYPE: // 3
{
msg = new ActiveMQTextCompabileMessage(message, session);
break;
}
default: {
throw new JMSRuntimeException("Invalid message type " + type);
}
}
return msg;
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.client.compatible1X;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ActiveMQMapMessage;
public class ActiveMQMapCompatibleMessage extends ActiveMQMapMessage {
@Override
protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
replyTo = ActiveMQCompatibleMessage.findCompatibleReplyTo(message);
}
return replyTo;
}
public ActiveMQMapCompatibleMessage(ClientSession session) {
super(session);
}
public ActiveMQMapCompatibleMessage(ClientMessage message, ClientSession session) {
super(message, session);
}
public ActiveMQMapCompatibleMessage() {
}
public ActiveMQMapCompatibleMessage(MapMessage foreign, ClientSession session) throws JMSException {
super(foreign, session);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.client.compatible1X;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ActiveMQObjectMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
public class ActiveMQObjectCompatibleMessage extends ActiveMQObjectMessage {
@Override
protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
replyTo = ActiveMQCompatibleMessage.findCompatibleReplyTo(message);
}
return replyTo;
}
public ActiveMQObjectCompatibleMessage(ClientSession session, ConnectionFactoryOptions options) {
super(session, options);
}
public ActiveMQObjectCompatibleMessage(ClientMessage message,
ClientSession session,
ConnectionFactoryOptions options) {
super(message, session, options);
}
public ActiveMQObjectCompatibleMessage(ObjectMessage foreign,
ClientSession session,
ConnectionFactoryOptions options) throws JMSException {
super(foreign, session, options);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.client.compatible1X;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.StreamMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ActiveMQStreamMessage;
public class ActiveMQStreamCompatibleMessage extends ActiveMQStreamMessage {
@Override
protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
replyTo = ActiveMQCompatibleMessage.findCompatibleReplyTo(message);
}
return replyTo;
}
public ActiveMQStreamCompatibleMessage(ClientSession session) {
super(session);
}
public ActiveMQStreamCompatibleMessage(ClientMessage message, ClientSession session) {
super(message, session);
}
public ActiveMQStreamCompatibleMessage(StreamMessage foreign, ClientSession session) throws JMSException {
super(foreign, session);
}
public ActiveMQStreamCompatibleMessage() {
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.client.compatible1X;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage {
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
replyTo = ActiveMQCompatibleMessage.findCompatibleReplyTo(message);
}
return replyTo;
}
public ActiveMQTextCompabileMessage(ClientSession session) {
super(session);
}
public ActiveMQTextCompabileMessage(ClientMessage message, ClientSession session) {
super(message, session);
}
public ActiveMQTextCompabileMessage(TextMessage foreign, ClientSession session) throws JMSException {
super(foreign, session);
}
}

View File

@ -384,6 +384,11 @@ public class ActiveMQActivation {
for (ActiveMQMessageHandler handler : handlersCopy) {
Thread interruptThread = handler.getCurrentThread();
if (interruptThread != null) {
try {
logger.tracef("Interrupting thread %s", interruptThread.getName());
} catch (Throwable justLog) {
logger.warn(justLog);
}
try {
interruptThread.interrupt();
} catch (Throwable e) {

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
import org.apache.activemq.artemis.ra.ActiveMQRALogger;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
@ -86,6 +87,8 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
private volatile boolean connected;
private boolean enable1XPrefix;
public ActiveMQMessageHandler(final ConnectionFactoryOptions options,
final ActiveMQActivation activation,
final TransactionManager tm,
@ -105,6 +108,8 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
logger.trace("setup()");
}
this.enable1XPrefix = activation.getConnectionFactory().isEnable1xPrefixes();
ActiveMQActivationSpec spec = activation.getActivationSpec();
String selector = spec.getMessageSelector();
@ -281,8 +286,12 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
logger.trace("onMessage(" + message + ")");
}
ActiveMQMessage msg = ActiveMQMessage.createMessage(message, session, options);
msg.setEnable1xPrefixes(activation.getConnectionFactory().isEnable1xPrefixes());
ActiveMQMessage msg;
if (enable1XPrefix) {
msg = ActiveMQCompatibleMessage.createMessage(message, session, options);
} else {
msg = ActiveMQMessage.createMessage(message, session, options);
}
boolean beforeDelivery = false;

View File

@ -39,6 +39,11 @@ public class GroovyRun {
public static Binding binding = new Binding();
public static GroovyShell shell = new GroovyShell(binding);
public static void clear() {
binding = new Binding();
shell = new GroovyShell(binding);
}
/**
* This can be called from the scripts as well.
* The scripts will use this method instead of its own groovy method.
@ -68,6 +73,7 @@ public class GroovyRun {
return shell.evaluate(scriptURI);
}
public static void setVariable(String name, Object arg) {
binding.setVariable(name, arg);
}

View File

@ -1,3 +1,5 @@
package ActiveMQJMSClientCompatibilityTest
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient
import org.apache.activemq.artemis.jms.client.ActiveMQQueue
import org.apache.activemq.artemis.jms.client.ActiveMQTopic

View File

@ -0,0 +1,87 @@
package ReplyToTest
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.jms.client.ActiveMQQueue
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("queue");
QueueBrowser browser = session.createBrowser(queue);
Enumeration<Message> messageEnumeration = browser.getEnumeration();
ArrayList<Message> messages = new ArrayList<>();
while (messageEnumeration.hasMoreElements()) {
messages.add(messageEnumeration.nextElement());
}
check(messages);
MessageConsumer consumer = session.createConsumer(queue);
messages.clear();
while(true) {
Message message = consumer.receiveNoWait();
if (message == null) {
break;
}
messages.add(message);
}
check(messages);
connection.close();
void check(List<Message> messages) {
Iterator<Message> iterator = messages.iterator();
Message bareMessage = iterator.next();
checkMessage(bareMessage);
BytesMessage bytesMessage = iterator.next();
checkMessage(bytesMessage);
MapMessage mapMessage = iterator.next();
checkMessage(mapMessage);
ObjectMessage objectMessage = iterator.next();
checkMessage(objectMessage);
StreamMessage streamMessage = iterator.next();
checkMessage(streamMessage);
TextMessage textMessage = iterator.next();
checkMessage(objectMessage);
}
void checkMessage(Message message) {
ActiveMQQueue queue = message.getJMSReplyTo();
GroovyRun.assertEquals("jms.queue.t1", queue.getAddress());
GroovyRun.assertEquals("t1", queue.getName());
}

View File

@ -0,0 +1,70 @@
package ReplyToTest
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.jms.client.ActiveMQQueue
import org.apache.activemq.artemis.jms.client.ActiveMQTopic
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQQueue queue = (ActiveMQQueue) ActiveMQJMSClient.createQueue("q1");
GroovyRun.assertEquals("jms.queue.q1", queue.getAddress());
GroovyRun.assertEquals("q1", queue.getQueueName());
ActiveMQTopic topic = (ActiveMQTopic) ActiveMQJMSClient.createTopic("t1");
GroovyRun.assertEquals("jms.topic.t1", topic.getAddress());
GroovyRun.assertEquals("t1", topic.getTopicName());
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue("queue");
replyToQueue = ActiveMQJMSClient.createQueue("t1");
producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message bareMessage = session.createMessage();
send(bareMessage);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("hello".getBytes());
send(bytesMessage);
MapMessage mapMessage = session.createMapMessage();
send(mapMessage);
ObjectMessage objectMessage = session.createObjectMessage("hello");
send(objectMessage);
send(session.createStreamMessage());
TextMessage textMessage = session.createTextMessage("May the force be with you");
send(textMessage);
session.commit();
void send(Message message) {
message.setJMSReplyTo(replyToQueue);
producer.send(message);
}

View File

@ -1,4 +1,4 @@
package servers
package addressConfig
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with

View File

@ -1,4 +1,4 @@
package meshTest
package addressConfig
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun

View File

@ -1,4 +1,4 @@
package meshTest
package addressConfig
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory

View File

@ -1,4 +1,4 @@
package servers
package exportimport
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with

View File

@ -1,3 +1,4 @@
package exportimport
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with

View File

@ -1,3 +1,4 @@
package exportimport
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with

View File

@ -1,3 +1,4 @@
package exportimport
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with

View File

@ -1,3 +1,5 @@
package journalcompatibility
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.server.Queue
import org.apache.activemq.artemis.tests.compatibility.GroovyRun

View File

@ -1,3 +1,5 @@
package journalcompatibility
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.server.Queue
import org.apache.activemq.artemis.tests.compatibility.GroovyRun

View File

@ -65,6 +65,8 @@ for (int i = 0; i < 500; i++) {
}
session.commit();
connection.close();
// Defined on AddressConfigTest.java at the test with setVariable
latch.countDown();

View File

@ -1,4 +1,4 @@
package servers
package prefixSendAckTest
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with

View File

@ -1,4 +1,4 @@
package meshTest
package prefixSendAckTest
import org.apache.activemq.artemis.tests.compatibility.GroovyRun

View File

@ -1,4 +1,4 @@
package meshTest
package sendAckTest
import org.apache.activemq.artemis.tests.compatibility.GroovyRun

View File

@ -1,4 +1,4 @@
package clients
package serial
import io.netty.buffer.Unpooled
import org.apache.activemq.artemis.api.core.ActiveMQBuffer

View File

@ -1,4 +1,4 @@
package clients
package serial
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with

View File

@ -1,4 +1,4 @@
package clients
package serial
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with

View File

@ -23,13 +23,14 @@ import java.io.PrintStream;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
public class ActiveMQJMSClientCompatibilityTest extends ClasspathBaseTest {
public class ActiveMQJMSClientCompatibilityTest extends ClasspathBase {
@Test
public void testActiveMQJMSCompatibility_1XPrefix_SNAPSHOT() throws Exception {

View File

@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
@ -36,7 +37,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class AddressConfigTest extends VersionedBaseTest {
public class AddressConfigTest extends VersionedBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
@ -47,7 +48,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class ConnectionFactoryConfigurationSerializationTest extends VersionedBaseTest {
public class ConnectionFactoryConfigurationSerializationTest extends VersionedBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
@ -45,7 +46,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class ExportImportTest extends VersionedBaseTest {
public class ExportImportTest extends VersionedBase {
private String serverScriptToUse;
// this will ensure that all tests in this class are run twice,

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.compatibility;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
@ -37,7 +38,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT
* correct connector parameters (keys must be dash-delimited instead of camelCase).
*/
@RunWith(Parameterized.class)
public class HQClientTopologyTest extends VersionedBaseTest {
public class HQClientTopologyTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
@ -46,7 +47,7 @@ import static org.junit.Assert.assertTrue;
* and it will make sure that failover happens without any problems.
*/
@RunWith(Parameterized.class)
public class HQFailoverTest extends VersionedBaseTest {
public class HQFailoverTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
@ -47,7 +48,7 @@ import org.junit.runners.Parameterized;
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class JournalCompatibilityTest extends VersionedBaseTest {
public class JournalCompatibilityTest extends VersionedBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"

View File

@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.ServerBase;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -50,7 +51,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class MeshTest extends ServerBaseTest {
public class MeshTest extends ServerBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@ -67,10 +68,10 @@ public class MeshTest extends ServerBaseTest {
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.addAll(combinatory(new Object[]{SNAPSHOT}, new Object[]{ONE_FIVE, TWO_FOUR, SNAPSHOT, HORNETQ_235}, new Object[]{ONE_FIVE, TWO_FOUR, SNAPSHOT, HORNETQ_235}));
combinations.addAll(combinatory(new Object[]{ONE_FIVE}, new Object[]{ONE_FIVE, SNAPSHOT}, new Object[]{ONE_FIVE, SNAPSHOT}));
combinations.addAll(combinatory(new Object[]{HORNETQ_235}, new Object[]{ONE_FIVE, SNAPSHOT, HORNETQ_235}, new Object[]{ONE_FIVE, SNAPSHOT, HORNETQ_235}));
combinations.addAll(combinatory(new Object[]{HORNETQ_247}, new Object[]{SNAPSHOT, HORNETQ_247}, new Object[]{SNAPSHOT, HORNETQ_247}));
combinations.addAll(combinatory(SNAPSHOT, new Object[]{SNAPSHOT}, new Object[]{ONE_FIVE, TWO_FOUR, SNAPSHOT, HORNETQ_235}, new Object[]{ONE_FIVE, TWO_FOUR, SNAPSHOT, HORNETQ_235}));
combinations.addAll(combinatory(SNAPSHOT, new Object[]{ONE_FIVE}, new Object[]{ONE_FIVE, SNAPSHOT}, new Object[]{ONE_FIVE, SNAPSHOT}));
combinations.addAll(combinatory(SNAPSHOT, new Object[]{HORNETQ_235}, new Object[]{ONE_FIVE, SNAPSHOT, HORNETQ_235}, new Object[]{ONE_FIVE, SNAPSHOT, HORNETQ_235}));
combinations.addAll(combinatory(SNAPSHOT, new Object[]{HORNETQ_247}, new Object[]{SNAPSHOT, HORNETQ_247}, new Object[]{SNAPSHOT, HORNETQ_247}));
combinations.add(new Object[]{SNAPSHOT, ONE_FOUR, ONE_FOUR});
return combinations;
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
@ -35,7 +36,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class OldAddressSpaceTest extends VersionedBaseTest {
public class OldAddressSpaceTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.ServerBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -31,7 +32,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
@RunWith(Parameterized.class)
public class PrefixSendAckTest extends ServerBaseTest {
public class PrefixSendAckTest extends ServerBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.tests.compatibility.base.ServerBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
/**
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
*
* cd /compatibility-tests
* mvn install -Ptests | tee output.log
*
* on the output.log you will see the output generated by {@link #getClasspath(String)}
*
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
* On Idea you would do the following:
*
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class ReplyToTest extends ServerBase {
@Before
@Override
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
serverFolder.getRoot().mkdirs();
File file = serverFolder.newFile(ActiveMQJMSClient.class.getName() + ".properties");
FileOutputStream fileOutputStream = new FileOutputStream(file);
PrintStream stream = new PrintStream(fileOutputStream);
stream.println("enable1xPrefixes=true");
stream.close();
setVariable(serverClassloader, "persistent", Boolean.FALSE);
startServer(serverFolder.getRoot(), serverClassloader, "live");
}
@After
@Override
public void tearDown() throws Throwable {
super.tearDown();
}
@Override
public ClassLoader getClasspath(String name) throws Exception {
if (name.equals(SNAPSHOT)) {
String snapshotPath = System.getProperty(SNAPSHOT);
Assume.assumeNotNull(snapshotPath);
String path = serverFolder.getRoot().getAbsolutePath() + File.pathSeparator + snapshotPath;
ClassLoader loader = defineClassLoader(path);
clearGroovy(loader);
return loader;
} else {
return super.getClasspath(name);
}
}
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
// we don't need every single version ever released..
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
List<Object[]> combinations = new ArrayList<>();
/*
// during development sometimes is useful to comment out the combinations
// and add the ones you are interested.. example:
*/
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{ONE_FIVE, SNAPSHOT, SNAPSHOT});
// TODO: It's not currently possible to mix reply to between 1.x and SNAPSHOT. Both sides need to be on the same version!
// combinations.addAll(combinatory(SNAPSHOT, new Object[]{SNAPSHOT, ONE_FIVE}, new Object[]{SNAPSHOT, ONE_FIVE}, new Object[]{SNAPSHOT, ONE_FIVE}));
return combinations;
}
public ReplyToTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Test
public void testSendReceive() throws Throwable {
setVariable(receiverClassloader, "latch", null);
evaluate(senderClassloader, "ReplyToTest/replyToSend.groovy");
evaluate(receiverClassloader, "ReplyToTest/replyToReceive.groovy");
}
}

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.ServerBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -44,7 +45,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class SendAckTest extends ServerBaseTest {
public class SendAckTest extends ServerBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
@ -46,7 +47,7 @@ import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class SerializationTest extends VersionedBaseTest {
public class SerializationTest extends VersionedBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
package org.apache.activemq.artemis.tests.compatibility.base;
import java.io.File;
import java.lang.reflect.Method;
@ -26,13 +26,14 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
public class ClasspathBaseTest {
public class ClasspathBase {
@ClassRule
@ -48,7 +49,7 @@ public class ClasspathBaseTest {
private static HashSet<String> printed = new HashSet<>();
protected static ClassLoader defineClassLoader(String classPath) throws MalformedURLException {
protected ClassLoader defineClassLoader(String classPath) throws MalformedURLException {
String[] classPathArray = classPath.split(File.pathSeparator);
URL[] elements = new URL[classPathArray.length];
for (int i = 0; i < classPathArray.length; i++) {
@ -58,19 +59,21 @@ public class ClasspathBaseTest {
return new URLClassLoader(elements, null);
}
public static ClassLoader getClasspath(String name) throws Exception {
protected ClassLoader getClasspath(String name) throws Exception {
return getClasspath(name, false);
}
public static ClassLoader getClasspath(String name, boolean forceNew) throws Exception {
protected ClassLoader getClasspath(String name, boolean forceNew) throws Exception {
if (!forceNew) {
if (name.equals(SNAPSHOT)) {
return VersionedBaseTest.class.getClassLoader();
GroovyRun.clear();
return VersionedBase.class.getClassLoader();
}
ClassLoader loader = loaderMap.get(name);
if (loader != null && !forceNew) {
clearGroovy(loader);
return loader;
}
}
@ -117,6 +120,15 @@ public class ClasspathBaseTest {
});
}
protected static void clearGroovy(ClassLoader loader) throws Exception {
tclCall(loader, () -> {
Class clazz = loader.loadClass(GroovyRun.class.getName());
Method method = clazz.getMethod("clear");
method.invoke(null);
return null;
});
}
protected static Object setVariable(ClassLoader loader, String name) throws Exception {
return tclCall(loader, () -> {
Class clazz = loader.loadClass(GroovyRun.class.getName());

View File

@ -15,15 +15,15 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
package org.apache.activemq.artemis.tests.compatibility.base;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
public class ServerBaseTest extends VersionedBaseTest {
public class ServerBase extends VersionedBase {
public ServerBaseTest(String server, String sender, String receiver) throws Exception {
public ServerBase(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
package org.apache.activemq.artemis.tests.compatibility.base;
import java.io.File;
import java.util.LinkedList;
@ -23,17 +23,17 @@ import java.util.List;
import org.junit.AfterClass;
public abstract class VersionedBaseTest extends ClasspathBaseTest {
public abstract class VersionedBase extends ClasspathBase {
protected final String server;
protected final String sender;
protected final String receiver;
protected ClassLoader serverClassloader;
protected ClassLoader senderClassloader;
protected ClassLoader receiverClassloader;
protected final ClassLoader serverClassloader;
protected final ClassLoader senderClassloader;
protected final ClassLoader receiverClassloader;
public VersionedBaseTest(String server, String sender, String receiver) throws Exception {
public VersionedBase(String server, String sender, String receiver) throws Exception {
if (server == null) {
server = sender;
}
@ -43,6 +43,9 @@ public abstract class VersionedBaseTest extends ClasspathBaseTest {
this.serverClassloader = getClasspath(server);
this.senderClassloader = getClasspath(sender);
this.receiverClassloader = getClasspath(receiver);
clearGroovy(senderClassloader);
clearGroovy(receiverClassloader);
clearGroovy(serverClassloader);
}
@AfterClass
@ -51,15 +54,24 @@ public abstract class VersionedBaseTest extends ClasspathBaseTest {
}
protected static List<Object[]> combinatory(Object[] rootSide, Object[] sideLeft, Object[] sideRight) {
return combinatory(null, rootSide, sideLeft, sideRight);
}
protected static List<Object[]> combinatory(Object required,
Object[] rootSide,
Object[] sideLeft,
Object[] sideRight) {
LinkedList<Object[]> combinations = new LinkedList<>();
for (Object root : rootSide) {
for (Object left : sideLeft) {
for (Object right : sideRight) {
if (required == null || root.equals(required) || left.equals(required) || right.equals(required)) {
combinations.add(new Object[]{root, left, right});
}
}
}
}
return combinations;
}