ARTEMIS-2314 broken compat with old core JMS FQQN consumer

When auto-creation is off then older clients consuming messages from an
FQQN won't work. This commit fixes that problem and adds a compatibility
test to verify.
This commit is contained in:
Justin Bertram 2019-04-18 15:55:31 -05:00
parent aa0bf60649
commit a2cb44400f
7 changed files with 253 additions and 3 deletions

View File

@ -71,7 +71,6 @@ import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompati
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompabileMessage;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SelectorTranslator;
/**
@ -799,7 +798,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
* Therefore, we must check if the queue names list contains the exact name of the address to know whether or
* not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here.
*/
if (!response.isExists() || !response.getQueueNames().contains(CompositeAddress.extractQueueName(dest.getSimpleAddress()))) {
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
if (response.isAutoCreateQueues()) {
try {
createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response);

View File

@ -919,7 +919,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : bindings.getBindings()) {
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
names.add(binding.getUniqueName());
SimpleString name;
if (CompositeAddress.isFullyQualified(address.toString())) {
// need to use the FQQN here for backwards compatibility with core JMS client
name = CompositeAddress.toFullyQualified(realAddress, binding.getUniqueName());
} else {
name = binding.getUniqueName();
}
names.add(name);
}
}

View File

@ -328,6 +328,28 @@
<variableName>ARTEMIS-SNAPSHOT</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>263-check</id>
<configuration>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.6.3</arg>
<arg>org.apache.activemq:artemis-jms-client:2.6.3</arg>
<arg>org.apache.activemq:artemis-cli:2.6.3</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.6.3</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.6.3</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.6.3</arg>
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<variableName>ARTEMIS-263</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>

View File

@ -33,6 +33,7 @@ public class GroovyRun {
public static final String TWO_ZERO = "ARTEMIS-200";
public static final String TWO_ONE = "ARTEMIS-210";
public static final String TWO_FOUR = "ARTEMIS-240";
public static final String TWO_SIX_THREE = "ARTEMIS-263";
public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247";

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fqqnconsumertest
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
AddressSettings addressSettings = new AddressSettings()
.setAutoCreateAddresses(false)
.setAutoCreateQueues(false);
configuration.addAddressesSetting("#", addressSettings);
configuration.addQueueConfiguration(new CoreQueueConfiguration().setAddress("address").setName("queue").setRoutingType(RoutingType.ANYCAST));
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();

View File

@ -0,0 +1,93 @@
package fqqnconsumertest
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
String serverType = arg[0];
String clientType = arg[1];
String operation = arg[2];
String queueName = "address::queue";
String textBody = "a rapadura e doce mas nao e mole nao";
if (clientType.startsWith("ARTEMIS")) {
// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
} else {
// Can't depend directly on hornetq, otherwise it wouldn't compile in artemis
GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg");
}
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
if (operation.equals("sendMessage")) {
CountDownLatch latch = new CountDownLatch(10);
CompletionListener completionListener = new CompletionListener() {
@Override
void onCompletion(Message message) {
latch.countDown();
}
@Override
void onException(Message message, Exception exception) {
}
}
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage(textBody + i), completionListener);
}
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));
connection.close();
} else if (operation.equals("receiveMessage")) {
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 10; i++) {
TextMessage message = consumer.receive(1000);
GroovyRun.assertNotNull(message);
GroovyRun.assertEquals(textBody + i, message.getText());
}
GroovyRun.assertNull(consumer.receiveNoWait());
connection.close();
} else {
throw new RuntimeException("Invalid operation " + operation);
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.tests.compatibility.base.ServerBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ONE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SIX_THREE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ZERO;
@RunWith(Parameterized.class)
public class FQQNConsumerTest extends ServerBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
// FQQN support was added in 2.0 so testing several 2.x versions before 2.7
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, TWO_SIX_THREE, TWO_SIX_THREE});
combinations.add(new Object[]{SNAPSHOT, TWO_ZERO, TWO_ZERO});
combinations.add(new Object[]{SNAPSHOT, TWO_ONE, TWO_ONE});
combinations.add(new Object[]{SNAPSHOT, TWO_FOUR, TWO_FOUR});
return combinations;
}
public FQQNConsumerTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
@Override
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
evaluate(serverClassloader, "fqqnconsumertest/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath());
}
@After
@Override
public void tearDown() throws Throwable {
execute(serverClassloader, "server.stop();");
}
@Test
public void testSendReceive() throws Throwable {
evaluate(senderClassloader, "fqqnconsumertest/fqqnConsumerProducer.groovy", server, sender, "sendMessage");
evaluate(receiverClassloader, "fqqnconsumertest/fqqnConsumerProducer.groovy", server, receiver, "receiveMessage");
}
}