ARTEMIS-2314 Improving Test and compatibility check on FQQN
This commit is contained in:
parent
a2cb44400f
commit
33d6721ffc
|
@ -46,6 +46,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
|
||||||
return version >= PacketImpl.CONSUMER_PRIORITY_CHANGE_VERSION;
|
return version >= PacketImpl.CONSUMER_PRIORITY_CHANGE_VERSION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default boolean isVersionNewFQQN() {
|
||||||
|
int version = getChannelVersion();
|
||||||
|
return version >= PacketImpl.ARTEMIS_2_7_0_VERSION;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the client protocol used on the communication. This will determine if the client has
|
* Sets the client protocol used on the communication. This will determine if the client has
|
||||||
* support for certain packet types
|
* support for certain packet types
|
||||||
|
|
|
@ -36,6 +36,7 @@ public class PacketImpl implements Packet {
|
||||||
public static final int ARTEMIS_2_7_0_VERSION = 130;
|
public static final int ARTEMIS_2_7_0_VERSION = 130;
|
||||||
public static final int ASYNC_RESPONSE_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
|
public static final int ASYNC_RESPONSE_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
|
||||||
public static final int CONSUMER_PRIORITY_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
|
public static final int CONSUMER_PRIORITY_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
|
||||||
|
public static final int FQQN_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
|
||||||
|
|
||||||
|
|
||||||
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
||||||
|
|
|
@ -71,6 +71,7 @@ import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompati
|
||||||
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompabileMessage;
|
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompabileMessage;
|
||||||
import org.apache.activemq.artemis.selector.filter.FilterException;
|
import org.apache.activemq.artemis.selector.filter.FilterException;
|
||||||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||||
|
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||||
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -798,7 +799,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
* Therefore, we must check if the queue names list contains the exact name of the address to know whether or
|
* Therefore, we must check if the queue names list contains the exact name of the address to know whether or
|
||||||
* not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here.
|
* not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here.
|
||||||
*/
|
*/
|
||||||
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
|
if (!response.isExists() || !response.getQueueNames().contains(getCoreQueueName(dest))) {
|
||||||
if (response.isAutoCreateQueues()) {
|
if (response.isAutoCreateQueues()) {
|
||||||
try {
|
try {
|
||||||
createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response);
|
createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response);
|
||||||
|
@ -906,6 +907,14 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private SimpleString getCoreQueueName(ActiveMQDestination dest) {
|
||||||
|
if (session.getVersion() < PacketImpl.FQQN_CHANGE_VERSION) {
|
||||||
|
return dest.getSimpleAddress();
|
||||||
|
} else {
|
||||||
|
return CompositeAddress.extractQueueName(dest.getSimpleAddress());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
|
private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
|
||||||
QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
|
QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
|
||||||
int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
|
int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
|
||||||
|
|
|
@ -500,7 +500,11 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||||
|
|
||||||
Queue locateQueue(SimpleString queueName);
|
Queue locateQueue(SimpleString queueName);
|
||||||
|
|
||||||
BindingQueryResult bindingQuery(SimpleString address) throws Exception;
|
default BindingQueryResult bindingQuery(SimpleString address) throws Exception {
|
||||||
|
return bindingQuery(address, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
BindingQueryResult bindingQuery(SimpleString address, boolean newFQQN) throws Exception;
|
||||||
|
|
||||||
QueueQueryResult queueQuery(SimpleString name) throws Exception;
|
QueueQueryResult queueQuery(SimpleString name) throws Exception;
|
||||||
|
|
||||||
|
|
|
@ -886,7 +886,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
|
public BindingQueryResult bindingQuery(SimpleString address, boolean newFQQN) throws Exception {
|
||||||
if (address == null) {
|
if (address == null) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
|
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
|
||||||
}
|
}
|
||||||
|
@ -920,7 +920,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
for (Binding binding : bindings.getBindings()) {
|
for (Binding binding : bindings.getBindings()) {
|
||||||
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
|
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
|
||||||
SimpleString name;
|
SimpleString name;
|
||||||
if (CompositeAddress.isFullyQualified(address.toString())) {
|
if (!newFQQN && CompositeAddress.isFullyQualified(address.toString())) {
|
||||||
// need to use the FQQN here for backwards compatibility with core JMS client
|
// need to use the FQQN here for backwards compatibility with core JMS client
|
||||||
name = CompositeAddress.toFullyQualified(realAddress, binding.getUniqueName());
|
name = CompositeAddress.toFullyQualified(realAddress, binding.getUniqueName());
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
|
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
|
||||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
|
@ -1052,7 +1053,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
|
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
|
||||||
return server.bindingQuery(removePrefix(address));
|
|
||||||
|
boolean newFQQN = true;
|
||||||
|
|
||||||
|
// remotingConnection could be null on UnitTests
|
||||||
|
// that's why I'm checking for null here, and it's best to do so
|
||||||
|
if (remotingConnection != null && remotingConnection instanceof CoreRemotingConnection) {
|
||||||
|
newFQQN = ((CoreRemotingConnection) remotingConnection).isVersionNewFQQN();
|
||||||
|
}
|
||||||
|
|
||||||
|
return server.bindingQuery(removePrefix(address), newFQQN);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -349,7 +349,28 @@
|
||||||
</libList>
|
</libList>
|
||||||
<variableName>ARTEMIS-263</variableName>
|
<variableName>ARTEMIS-263</variableName>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution> <execution>
|
||||||
|
<phase>compile</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>dependency-scan</goal>
|
||||||
|
</goals>
|
||||||
|
<id>270-check</id>
|
||||||
|
<configuration>
|
||||||
|
<libListWithDeps>
|
||||||
|
<arg>org.apache.activemq:artemis-jms-server:2.7.0</arg>
|
||||||
|
<arg>org.apache.activemq:artemis-jms-client:2.7.0</arg>
|
||||||
|
<arg>org.apache.activemq:artemis-cli:2.7.0</arg>
|
||||||
|
<arg>org.apache.activemq:artemis-hornetq-protocol:2.7.0</arg>
|
||||||
|
<arg>org.apache.activemq:artemis-amqp-protocol:2.7.0</arg>
|
||||||
|
<arg>org.apache.activemq:artemis-hornetq-protocol:2.7.0</arg>
|
||||||
|
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
|
||||||
|
</libListWithDeps>
|
||||||
|
<libList>
|
||||||
|
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
|
||||||
|
</libList>
|
||||||
|
<variableName>ARTEMIS-270</variableName>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
<execution>
|
<execution>
|
||||||
<phase>compile</phase>
|
<phase>compile</phase>
|
||||||
<goals>
|
<goals>
|
||||||
|
|
|
@ -34,6 +34,7 @@ public class GroovyRun {
|
||||||
public static final String TWO_ONE = "ARTEMIS-210";
|
public static final String TWO_ONE = "ARTEMIS-210";
|
||||||
public static final String TWO_FOUR = "ARTEMIS-240";
|
public static final String TWO_FOUR = "ARTEMIS-240";
|
||||||
public static final String TWO_SIX_THREE = "ARTEMIS-263";
|
public static final String TWO_SIX_THREE = "ARTEMIS-263";
|
||||||
|
public static final String TWO_SEVEN_ZERO = "ARTEMIS-270";
|
||||||
public static final String HORNETQ_235 = "HORNETQ-235";
|
public static final String HORNETQ_235 = "HORNETQ-235";
|
||||||
public static final String HORNETQ_247 = "HORNETQ-247";
|
public static final String HORNETQ_247 = "HORNETQ-247";
|
||||||
|
|
||||||
|
|
|
@ -44,32 +44,18 @@ if (clientType.startsWith("ARTEMIS")) {
|
||||||
|
|
||||||
|
|
||||||
Connection connection = cf.createConnection();
|
Connection connection = cf.createConnection();
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
Queue queue = session.createQueue(queueName);
|
Queue queue = session.createQueue(queueName);
|
||||||
|
|
||||||
if (operation.equals("sendMessage")) {
|
if (operation.equals("sendMessage")) {
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(10);
|
|
||||||
|
|
||||||
CompletionListener completionListener = new CompletionListener() {
|
|
||||||
@Override
|
|
||||||
void onCompletion(Message message) {
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void onException(Message message, Exception exception) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
MessageProducer producer = session.createProducer(queue);
|
MessageProducer producer = session.createProducer(queue);
|
||||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
producer.send(session.createTextMessage(textBody + i), completionListener);
|
producer.send(session.createTextMessage(textBody + i));
|
||||||
}
|
}
|
||||||
|
|
||||||
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
session.commit();
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
} else if (operation.equals("receiveMessage")) {
|
} else if (operation.equals("receiveMessage")) {
|
||||||
|
|
|
@ -32,24 +32,21 @@ import org.junit.runners.Parameterized;
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ONE;
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ONE;
|
||||||
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SEVEN_ZERO;
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SIX_THREE;
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SIX_THREE;
|
||||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ZERO;
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ZERO;
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class FQQNConsumerTest extends ServerBase {
|
public class FQQNConsumerTest extends ServerBase {
|
||||||
|
|
||||||
// this will ensure that all tests in this class are run twice,
|
|
||||||
// once with "true" passed to the class' constructor and once with "false"
|
|
||||||
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
||||||
public static Collection getParameters() {
|
public static Collection getParameters() {
|
||||||
// FQQN support was added in 2.0 so testing several 2.x versions before 2.7
|
|
||||||
List<Object[]> combinations = new ArrayList<>();
|
List<Object[]> combinations = new ArrayList<>();
|
||||||
combinations.add(new Object[]{SNAPSHOT, TWO_SIX_THREE, TWO_SIX_THREE});
|
|
||||||
combinations.add(new Object[]{SNAPSHOT, TWO_ZERO, TWO_ZERO});
|
|
||||||
combinations.add(new Object[]{SNAPSHOT, TWO_ONE, TWO_ONE});
|
|
||||||
combinations.add(new Object[]{SNAPSHOT, TWO_FOUR, TWO_FOUR});
|
|
||||||
|
|
||||||
return combinations;
|
// FQQN was added into 2.7.0, hence we only test the server as SNAPSHOT or TWO_SEVEN_ZERO
|
||||||
|
List testsList = combinatory(new Object[]{SNAPSHOT}, new Object[]{SNAPSHOT, TWO_ZERO, TWO_FOUR, TWO_ONE, TWO_SIX_THREE, TWO_SEVEN_ZERO}, new Object[]{SNAPSHOT, TWO_ZERO, TWO_FOUR, TWO_ONE, TWO_SIX_THREE, TWO_SEVEN_ZERO});
|
||||||
|
addCombinations(testsList, null, new Object[] {TWO_SEVEN_ZERO}, new Object[]{SNAPSHOT, TWO_SEVEN_ZERO}, new Object[]{SNAPSHOT, TWO_SEVEN_ZERO});
|
||||||
|
return testsList;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FQQNConsumerTest(String server, String sender, String receiver) throws Exception {
|
public FQQNConsumerTest(String server, String sender, String receiver) throws Exception {
|
||||||
|
|
|
@ -63,6 +63,16 @@ public abstract class VersionedBase extends ClasspathBase {
|
||||||
Object[] sideRight) {
|
Object[] sideRight) {
|
||||||
LinkedList<Object[]> combinations = new LinkedList<>();
|
LinkedList<Object[]> combinations = new LinkedList<>();
|
||||||
|
|
||||||
|
addCombinations(combinations, required, rootSide, sideLeft, sideRight);
|
||||||
|
|
||||||
|
return combinations;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void addCombinations(List<Object[]> combinations,
|
||||||
|
Object required,
|
||||||
|
Object[] rootSide,
|
||||||
|
Object[] sideLeft,
|
||||||
|
Object[] sideRight) {
|
||||||
for (Object root : rootSide) {
|
for (Object root : rootSide) {
|
||||||
for (Object left : sideLeft) {
|
for (Object left : sideLeft) {
|
||||||
for (Object right : sideRight) {
|
for (Object right : sideRight) {
|
||||||
|
@ -72,8 +82,6 @@ public abstract class VersionedBase extends ClasspathBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return combinations;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void startServer(File folder, ClassLoader loader, String serverName) throws Throwable {
|
public void startServer(File folder, ClassLoader loader, String serverName) throws Throwable {
|
||||||
|
|
Loading…
Reference in New Issue