mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-06 10:09:01 +00:00
This closes #2632
This commit is contained in:
commit
6da2d5e2cd
@ -46,6 +46,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
|
||||
return version >= PacketImpl.CONSUMER_PRIORITY_CHANGE_VERSION;
|
||||
}
|
||||
|
||||
default boolean isVersionNewFQQN() {
|
||||
int version = getChannelVersion();
|
||||
return version >= PacketImpl.ARTEMIS_2_7_0_VERSION;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the client protocol used on the communication. This will determine if the client has
|
||||
* support for certain packet types
|
||||
|
@ -36,6 +36,7 @@ public class PacketImpl implements Packet {
|
||||
public static final int ARTEMIS_2_7_0_VERSION = 130;
|
||||
public static final int ASYNC_RESPONSE_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
|
||||
public static final int CONSUMER_PRIORITY_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
|
||||
public static final int FQQN_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
|
||||
|
||||
|
||||
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
||||
|
@ -799,7 +799,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||
* Therefore, we must check if the queue names list contains the exact name of the address to know whether or
|
||||
* not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here.
|
||||
*/
|
||||
if (!response.isExists() || !response.getQueueNames().contains(CompositeAddress.extractQueueName(dest.getSimpleAddress()))) {
|
||||
if (!response.isExists() || !response.getQueueNames().contains(getCoreQueueName(dest))) {
|
||||
if (response.isAutoCreateQueues()) {
|
||||
try {
|
||||
createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response);
|
||||
@ -907,6 +907,14 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||
}
|
||||
}
|
||||
|
||||
private SimpleString getCoreQueueName(ActiveMQDestination dest) {
|
||||
if (session.getVersion() < PacketImpl.FQQN_CHANGE_VERSION) {
|
||||
return dest.getSimpleAddress();
|
||||
} else {
|
||||
return CompositeAddress.extractQueueName(dest.getSimpleAddress());
|
||||
}
|
||||
}
|
||||
|
||||
private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
|
||||
QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
|
||||
int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
|
||||
|
@ -500,7 +500,11 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||
|
||||
Queue locateQueue(SimpleString queueName);
|
||||
|
||||
BindingQueryResult bindingQuery(SimpleString address) throws Exception;
|
||||
default BindingQueryResult bindingQuery(SimpleString address) throws Exception {
|
||||
return bindingQuery(address, true);
|
||||
}
|
||||
|
||||
BindingQueryResult bindingQuery(SimpleString address, boolean newFQQN) throws Exception;
|
||||
|
||||
QueueQueryResult queueQuery(SimpleString name) throws Exception;
|
||||
|
||||
|
@ -886,7 +886,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
|
||||
public BindingQueryResult bindingQuery(SimpleString address, boolean newFQQN) throws Exception {
|
||||
if (address == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
|
||||
}
|
||||
@ -919,7 +919,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||
|
||||
for (Binding binding : bindings.getBindings()) {
|
||||
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
|
||||
names.add(binding.getUniqueName());
|
||||
SimpleString name;
|
||||
if (!newFQQN && CompositeAddress.isFullyQualified(address.toString())) {
|
||||
// need to use the FQQN here for backwards compatibility with core JMS client
|
||||
name = CompositeAddress.toFullyQualified(realAddress, binding.getUniqueName());
|
||||
} else {
|
||||
name = binding.getUniqueName();
|
||||
}
|
||||
names.add(name);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
|
||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
@ -1052,7 +1053,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||
|
||||
@Override
|
||||
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
|
||||
return server.bindingQuery(removePrefix(address));
|
||||
|
||||
boolean newFQQN = true;
|
||||
|
||||
// remotingConnection could be null on UnitTests
|
||||
// that's why I'm checking for null here, and it's best to do so
|
||||
if (remotingConnection != null && remotingConnection instanceof CoreRemotingConnection) {
|
||||
newFQQN = ((CoreRemotingConnection) remotingConnection).isVersionNewFQQN();
|
||||
}
|
||||
|
||||
return server.bindingQuery(removePrefix(address), newFQQN);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -328,6 +328,49 @@
|
||||
<variableName>ARTEMIS-SNAPSHOT</variableName>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>dependency-scan</goal>
|
||||
</goals>
|
||||
<id>263-check</id>
|
||||
<configuration>
|
||||
<libListWithDeps>
|
||||
<arg>org.apache.activemq:artemis-jms-server:2.6.3</arg>
|
||||
<arg>org.apache.activemq:artemis-jms-client:2.6.3</arg>
|
||||
<arg>org.apache.activemq:artemis-cli:2.6.3</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:2.6.3</arg>
|
||||
<arg>org.apache.activemq:artemis-amqp-protocol:2.6.3</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:2.6.3</arg>
|
||||
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
|
||||
</libListWithDeps>
|
||||
<libList>
|
||||
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
|
||||
</libList>
|
||||
<variableName>ARTEMIS-263</variableName>
|
||||
</configuration>
|
||||
</execution> <execution>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>dependency-scan</goal>
|
||||
</goals>
|
||||
<id>270-check</id>
|
||||
<configuration>
|
||||
<libListWithDeps>
|
||||
<arg>org.apache.activemq:artemis-jms-server:2.7.0</arg>
|
||||
<arg>org.apache.activemq:artemis-jms-client:2.7.0</arg>
|
||||
<arg>org.apache.activemq:artemis-cli:2.7.0</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:2.7.0</arg>
|
||||
<arg>org.apache.activemq:artemis-amqp-protocol:2.7.0</arg>
|
||||
<arg>org.apache.activemq:artemis-hornetq-protocol:2.7.0</arg>
|
||||
<arg>org.codehaus.groovy:groovy-all:${groovy.version}</arg>
|
||||
</libListWithDeps>
|
||||
<libList>
|
||||
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
|
||||
</libList>
|
||||
<variableName>ARTEMIS-270</variableName>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
|
@ -33,6 +33,8 @@ public class GroovyRun {
|
||||
public static final String TWO_ZERO = "ARTEMIS-200";
|
||||
public static final String TWO_ONE = "ARTEMIS-210";
|
||||
public static final String TWO_FOUR = "ARTEMIS-240";
|
||||
public static final String TWO_SIX_THREE = "ARTEMIS-263";
|
||||
public static final String TWO_SEVEN_ZERO = "ARTEMIS-270";
|
||||
public static final String HORNETQ_235 = "HORNETQ-235";
|
||||
public static final String HORNETQ_247 = "HORNETQ-247";
|
||||
|
||||
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package fqqnconsumertest
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
import org.apache.activemq.artemis.core.server.JournalType
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
|
||||
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
|
||||
|
||||
String folder = arg[0];
|
||||
|
||||
configuration = new ConfigurationImpl();
|
||||
configuration.setJournalType(JournalType.NIO);
|
||||
configuration.setBrokerInstance(new File(folder));
|
||||
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
|
||||
configuration.setSecurityEnabled(false);
|
||||
configuration.setPersistenceEnabled(false);
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings()
|
||||
.setAutoCreateAddresses(false)
|
||||
.setAutoCreateQueues(false);
|
||||
|
||||
configuration.addAddressesSetting("#", addressSettings);
|
||||
|
||||
configuration.addQueueConfiguration(new CoreQueueConfiguration().setAddress("address").setName("queue").setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
jmsConfiguration = new JMSConfigurationImpl();
|
||||
|
||||
server = new EmbeddedJMS();
|
||||
server.setConfiguration(configuration);
|
||||
server.setJmsConfiguration(jmsConfiguration);
|
||||
server.start();
|
@ -0,0 +1,79 @@
|
||||
package fqqnconsumertest
|
||||
|
||||
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||
|
||||
import javax.jms.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// starts an artemis server
|
||||
String serverType = arg[0];
|
||||
String clientType = arg[1];
|
||||
String operation = arg[2];
|
||||
|
||||
|
||||
String queueName = "address::queue";
|
||||
|
||||
|
||||
String textBody = "a rapadura e doce mas nao e mole nao";
|
||||
|
||||
if (clientType.startsWith("ARTEMIS")) {
|
||||
// Can't depend directly on artemis, otherwise it wouldn't compile in hornetq
|
||||
GroovyRun.evaluate("clients/artemisClient.groovy", "serverArg", serverType);
|
||||
} else {
|
||||
// Can't depend directly on hornetq, otherwise it wouldn't compile in artemis
|
||||
GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg");
|
||||
}
|
||||
|
||||
|
||||
Connection connection = cf.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
|
||||
if (operation.equals("sendMessage")) {
|
||||
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
producer.send(session.createTextMessage(textBody + i));
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
connection.close();
|
||||
} else if (operation.equals("receiveMessage")) {
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
TextMessage message = consumer.receive(1000);
|
||||
GroovyRun.assertNotNull(message);
|
||||
GroovyRun.assertEquals(textBody + i, message.getText());
|
||||
}
|
||||
|
||||
GroovyRun.assertNull(consumer.receiveNoWait());
|
||||
connection.close();
|
||||
} else {
|
||||
throw new RuntimeException("Invalid operation " + operation);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.compatibility;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.tests.compatibility.base.ServerBase;
|
||||
import org.apache.activemq.artemis.utils.FileUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ONE;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SEVEN_ZERO;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SIX_THREE;
|
||||
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_ZERO;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class FQQNConsumerTest extends ServerBase {
|
||||
|
||||
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
||||
public static Collection getParameters() {
|
||||
List<Object[]> combinations = new ArrayList<>();
|
||||
|
||||
// FQQN was added into 2.7.0, hence we only test the server as SNAPSHOT or TWO_SEVEN_ZERO
|
||||
List testsList = combinatory(new Object[]{SNAPSHOT}, new Object[]{SNAPSHOT, TWO_ZERO, TWO_FOUR, TWO_ONE, TWO_SIX_THREE, TWO_SEVEN_ZERO}, new Object[]{SNAPSHOT, TWO_ZERO, TWO_FOUR, TWO_ONE, TWO_SIX_THREE, TWO_SEVEN_ZERO});
|
||||
addCombinations(testsList, null, new Object[] {TWO_SEVEN_ZERO}, new Object[]{SNAPSHOT, TWO_SEVEN_ZERO}, new Object[]{SNAPSHOT, TWO_SEVEN_ZERO});
|
||||
return testsList;
|
||||
}
|
||||
|
||||
public FQQNConsumerTest(String server, String sender, String receiver) throws Exception {
|
||||
super(server, sender, receiver);
|
||||
}
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Throwable {
|
||||
FileUtil.deleteDirectory(serverFolder.getRoot());
|
||||
evaluate(serverClassloader, "fqqnconsumertest/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath());
|
||||
}
|
||||
|
||||
@After
|
||||
@Override
|
||||
public void tearDown() throws Throwable {
|
||||
execute(serverClassloader, "server.stop();");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendReceive() throws Throwable {
|
||||
evaluate(senderClassloader, "fqqnconsumertest/fqqnConsumerProducer.groovy", server, sender, "sendMessage");
|
||||
evaluate(receiverClassloader, "fqqnconsumertest/fqqnConsumerProducer.groovy", server, receiver, "receiveMessage");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -63,6 +63,16 @@ public abstract class VersionedBase extends ClasspathBase {
|
||||
Object[] sideRight) {
|
||||
LinkedList<Object[]> combinations = new LinkedList<>();
|
||||
|
||||
addCombinations(combinations, required, rootSide, sideLeft, sideRight);
|
||||
|
||||
return combinations;
|
||||
}
|
||||
|
||||
protected static void addCombinations(List<Object[]> combinations,
|
||||
Object required,
|
||||
Object[] rootSide,
|
||||
Object[] sideLeft,
|
||||
Object[] sideRight) {
|
||||
for (Object root : rootSide) {
|
||||
for (Object left : sideLeft) {
|
||||
for (Object right : sideRight) {
|
||||
@ -72,8 +82,6 @@ public abstract class VersionedBase extends ClasspathBase {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return combinations;
|
||||
}
|
||||
|
||||
public void startServer(File folder, ClassLoader loader, String serverName) throws Throwable {
|
||||
|
Loading…
x
Reference in New Issue
Block a user