This closes #1958
This commit is contained in:
commit
8b8eff053f
|
@ -389,7 +389,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
|
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
|
||||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
|
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
|
||||||
|
|
||||||
if (remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
if (result.isExists() && remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||||
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
|
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -412,7 +412,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
* names otherwise the older client won't realize the queue exists and will try to create it and receive
|
* names otherwise the older client won't realize the queue exists and will try to create it and receive
|
||||||
* an error
|
* an error
|
||||||
*/
|
*/
|
||||||
if (clientVersion < PacketImpl.ADDRESSING_CHANGE_VERSION && session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) {
|
if (result.isExists() && clientVersion < PacketImpl.ADDRESSING_CHANGE_VERSION && session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) {
|
||||||
final List<SimpleString> queueNames = result.getQueueNames();
|
final List<SimpleString> queueNames = result.getQueueNames();
|
||||||
if (!queueNames.isEmpty()) {
|
if (!queueNames.isEmpty()) {
|
||||||
final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
|
final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
package servers
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// starts an artemis server
|
||||||
|
import org.apache.activemq.artemis.core.server.JournalType
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||||
|
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
|
||||||
|
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
|
||||||
|
|
||||||
|
String folder = arg[0];
|
||||||
|
String type = arg[1]
|
||||||
|
String id = "server";
|
||||||
|
|
||||||
|
String queueName = "myQueue";
|
||||||
|
String queueAddress = "jms.queue.myQueue";
|
||||||
|
String topicAddress = "jms.topic.myTopic";
|
||||||
|
|
||||||
|
configuration = new ConfigurationImpl();
|
||||||
|
configuration.setJournalType(JournalType.NIO);
|
||||||
|
System.out.println("folder:: " + folder);
|
||||||
|
configuration.setBrokerInstance(new File(folder + "/" + id));
|
||||||
|
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
|
||||||
|
configuration.setSecurityEnabled(false);
|
||||||
|
configuration.setPersistenceEnabled(false);
|
||||||
|
|
||||||
|
AddressSettings addressSettings = new AddressSettings();
|
||||||
|
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK)
|
||||||
|
.setMaxSizeBytes(10 * 1024)
|
||||||
|
.setPageSizeBytes(1024)
|
||||||
|
.setDeadLetterAddress(SimpleString.toSimpleString("DLA"))
|
||||||
|
.setExpiryAddress(SimpleString.toSimpleString("Expiry"));
|
||||||
|
|
||||||
|
if (!(type.startsWith("ARTEMIS-1") || type.startsWith("HORNETQ"))) {
|
||||||
|
addressSettings.setAutoCreateAddresses(false);
|
||||||
|
addressSettings.setAutoCreateQueues(false);
|
||||||
|
}
|
||||||
|
configuration.addAddressesSetting("#", addressSettings);
|
||||||
|
|
||||||
|
addressSettings = new AddressSettings();
|
||||||
|
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE)
|
||||||
|
.setMaxSizeBytes(1024 * 1024 * 1024)
|
||||||
|
.setPageSizeBytes(1024)
|
||||||
|
.setDeadLetterAddress(SimpleString.toSimpleString("DLA"))
|
||||||
|
.setExpiryAddress(SimpleString.toSimpleString("Expiry"));
|
||||||
|
|
||||||
|
if (!(type.startsWith("ARTEMIS-1") || type.startsWith("HORNETQ"))) {
|
||||||
|
addressSettings.setAutoCreateAddresses(false);
|
||||||
|
addressSettings.setAutoCreateQueues(false);
|
||||||
|
}
|
||||||
|
configuration.addAddressesSetting("jms.#", addressSettings);
|
||||||
|
|
||||||
|
// if the client is using the wrong address, it will wrongly block
|
||||||
|
|
||||||
|
jmsConfiguration = new JMSConfigurationImpl();
|
||||||
|
|
||||||
|
server = new EmbeddedJMS();
|
||||||
|
server.setConfiguration(configuration);
|
||||||
|
server.setJmsConfiguration(jmsConfiguration);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
if (type.startsWith("ARTEMIS-1") || type.startsWith("HORNETQ")) {
|
||||||
|
server.getJMSServerManager().createQueue(true, queueName, null, true, null);
|
||||||
|
} else {
|
||||||
|
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
|
||||||
|
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
|
||||||
|
|
||||||
|
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topicAddress), RoutingType.MULTICAST));
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
package meshTest
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
|
||||||
|
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
|
||||||
|
|
||||||
|
import javax.jms.*
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ConnectionFactory cf = new ActiveMQConnectionFactory();
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
connection.setClientID("myClientId");
|
||||||
|
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
|
||||||
|
String clientType = arg[0];
|
||||||
|
|
||||||
|
Queue queue;
|
||||||
|
Topic topic;
|
||||||
|
|
||||||
|
if (clientType.startsWith("ARTEMIS-1") || clientType.startsWith("HORNETQ")) {
|
||||||
|
queue = session.createQueue("myQueue");
|
||||||
|
topic = session.createTopic("myTopic");
|
||||||
|
} else {
|
||||||
|
queue = session.createQueue("jms.queue.myQueue");
|
||||||
|
topic = session.createTopic("jms.topic.myTopic");
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("Receiving...");
|
||||||
|
|
||||||
|
MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "myDurableSub")
|
||||||
|
MessageConsumer queueConsumer = session.createConsumer(queue)
|
||||||
|
|
||||||
|
connection.start()
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
BytesMessage bytesMessage = (BytesMessage) queueConsumer.receive(5000);
|
||||||
|
GroovyRun.assertNotNull(bytesMessage)
|
||||||
|
if (i % 100) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
System.out.println("Consumed all messages from Queue");
|
||||||
|
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
BytesMessage bytesMessage = (BytesMessage) topicConsumer.receive(5000);
|
||||||
|
GroovyRun.assertNotNull(bytesMessage)
|
||||||
|
if (i % 100) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
System.out.println("Consumed all messages from Topic");
|
||||||
|
|
||||||
|
// Defined on AddressConfigTest.java at the test with setVariable
|
||||||
|
|
||||||
|
latch.countDown();
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
package meshTest
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
|
||||||
|
|
||||||
|
import javax.jms.*
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ConnectionFactory cf = new ActiveMQConnectionFactory();
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
|
||||||
|
String clientType = arg[0];
|
||||||
|
|
||||||
|
Queue queue;
|
||||||
|
Topic topic;
|
||||||
|
|
||||||
|
if (clientType.startsWith("ARTEMIS-1") || clientType.startsWith("HORNETQ")) {
|
||||||
|
queue = session.createQueue("myQueue");
|
||||||
|
topic = session.createTopic("myTopic");
|
||||||
|
} else {
|
||||||
|
queue = session.createQueue("jms.queue.myQueue");
|
||||||
|
topic = session.createTopic("jms.topic.myTopic");
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("Receiving ");
|
||||||
|
MessageProducer queueProducer = session.createProducer(queue)
|
||||||
|
MessageProducer topicProducer = session.createProducer(topic);
|
||||||
|
|
||||||
|
println("sending...")
|
||||||
|
|
||||||
|
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
BytesMessage bytesMessage = session.createBytesMessage();
|
||||||
|
bytesMessage.writeBytes(new byte[512]);
|
||||||
|
queueProducer.send(bytesMessage);
|
||||||
|
// we send a big batch as that should be enough to cause blocking on the address
|
||||||
|
// if the wrong address is being used
|
||||||
|
if (i % 100 == 0) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
println("Sent Queue Messages.")
|
||||||
|
|
||||||
|
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
BytesMessage bytesMessage = session.createBytesMessage();
|
||||||
|
bytesMessage.writeBytes(new byte[512]);
|
||||||
|
topicProducer.send(bytesMessage);
|
||||||
|
// we send a big batch as that should be enough to cause blocking on the address
|
||||||
|
// if the wrong address is being used
|
||||||
|
if (i % 100 == 0) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
println("Sent Topic Messages.")
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
System.out.println("All Messages sent");
|
||||||
|
senderLatch.countDown();
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.compatibility;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.utils.FileUtil;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
|
||||||
|
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class OldAddressSpaceTest extends VersionedBaseTest {
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
|
||||||
|
public static Collection getParameters() {
|
||||||
|
List<Object[]> combinations = new ArrayList<>();
|
||||||
|
combinations.addAll(combinatory(new Object[]{SNAPSHOT}, new Object[]{ONE_FIVE, SNAPSHOT}, new Object[]{ONE_FIVE, SNAPSHOT}));
|
||||||
|
return combinations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public OldAddressSpaceTest(String server, String sender, String receiver) throws Exception {
|
||||||
|
super(server, sender, receiver);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Throwable {
|
||||||
|
FileUtil.deleteDirectory(serverFolder.getRoot());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stopTest() throws Exception {
|
||||||
|
execute(serverClassloader, "server.stop()");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientSenderServerAddressSettings() throws Throwable {
|
||||||
|
evaluate(serverClassloader, "oldAddressSpace/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
|
||||||
|
|
||||||
|
CountDownLatch receiverLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch senderLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
setVariable(receiverClassloader, "latch", receiverLatch);
|
||||||
|
|
||||||
|
AtomicInteger errors = new AtomicInteger(0);
|
||||||
|
Thread t1 = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
evaluate(receiverClassloader, "oldAddressSpace/receiveMessages.groovy", receiver);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
errors.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t1.start();
|
||||||
|
|
||||||
|
setVariable(senderClassloader, "senderLatch", senderLatch);
|
||||||
|
Thread t2 = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
evaluate(senderClassloader, "oldAddressSpace/sendMessagesAddress.groovy", sender);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
errors.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t2.start();
|
||||||
|
|
||||||
|
|
||||||
|
try {
|
||||||
|
Assert.assertTrue("Sender is blocking by mistake", senderLatch.await(100, TimeUnit.SECONDS));
|
||||||
|
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(100, TimeUnit.SECONDS));
|
||||||
|
} finally {
|
||||||
|
|
||||||
|
t1.join(TimeUnit.SECONDS.toMillis(1));
|
||||||
|
t2.join(TimeUnit.SECONDS.toMillis(1));
|
||||||
|
|
||||||
|
if (t1.isAlive()) {
|
||||||
|
t1.interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (t2.isAlive()) {
|
||||||
|
t2.interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue