diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java index 7e82764c0a..bc4829e4c1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java @@ -43,9 +43,6 @@ public class BasicOpenWireTest extends OpenWireTestBase { @Rule public TestName name = new TestName(); - - protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; - protected static final String urlStringLoose = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.tightEncodingEnabled=false"; protected ActiveMQConnectionFactory factory; protected ActiveMQConnectionFactory looseFactory; protected ActiveMQXAConnectionFactory xaFactory; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java index 44be8054d8..91f4e24105 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java @@ -53,9 +53,6 @@ import java.util.Collection; @RunWith(Parameterized.class) public class FQQNOpenWireTest extends OpenWireTestBase { - protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; - - @Parameterized.Parameters(name = "{0}") public static Collection params() { return Arrays.asList(new Object[][]{{"OpenWire"}, {"Artemis"}}); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java new file mode 100644 index 0000000000..f8a03fa0d7 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.utils.CompositeAddress; +import org.junit.Assert; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase { + + @Override + protected boolean isExclusive() { + return true; + } + + @Test + public void testSingleExclusiveDivert() throws Exception { + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession coreSession = sf.createSession(false, true, true); + + final SimpleString queueName1 = new SimpleString("queue1"); + + final SimpleString queueName2 = new SimpleString("queue2"); + + final SimpleString queueName3 = new SimpleString("queue3"); + + final SimpleString queueName4 = new SimpleString("queue4"); + + coreSession.createQueue(new SimpleString(forwardAddress), RoutingType.MULTICAST, queueName1, null, false); + coreSession.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2, null, false); + coreSession.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName3, null, false); + coreSession.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName4, null, false); + + ClientProducer producer = coreSession.createProducer(new SimpleString(testAddress)); + + final int numMessages = 10; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = coreSession.createMessage(false); + message.putIntProperty(propKey, i); + producer.send(message); + } + coreSession.close(); + factory = new ActiveMQConnectionFactory(urlString); + Connection openwireConnection = factory.createConnection(); + + try { + Session session = openwireConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + openwireConnection.start(); + + Queue q1 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue1")); + Queue q2 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue2")); + Queue q3 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue3")); + Queue q4 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue4")); + + MessageConsumer consumer1 = session.createConsumer(q1); + MessageConsumer consumer2 = session.createConsumer(q2); + MessageConsumer consumer3 = session.createConsumer(q3); + MessageConsumer consumer4 = session.createConsumer(q4); + + System.out.println("receiving ..."); + for (int i = 0; i < numMessages; i++) { + Message message = consumer1.receive(TIMEOUT); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey.toString())); + + message.acknowledge(); + } + Assert.assertNull(consumer1.receive(50)); + + Assert.assertNull(consumer2.receive(50)); + Assert.assertNull(consumer3.receive(50)); + Assert.assertNull(consumer4.receive(50)); + } finally { + if (openwireConnection != null) { + openwireConnection.close(); + } + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java new file mode 100644 index 0000000000..64e8e4f38c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.utils.CompositeAddress; +import org.junit.Assert; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +public class OpenWireDivertNonExclusiveTest extends OpenWireDivertTestBase { + + @Override + protected boolean isExclusive() { + return false; + } + + @Test + //core sending, openwire receiving + public void testSingleNonExclusiveDivert() throws Exception { + ServerLocator locator = createInVMNonHALocator(); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession coreSession = sf.createSession(false, true, true); + + final SimpleString queueName1 = new SimpleString("queue1"); + + final SimpleString queueName2 = new SimpleString("queue2"); + + coreSession.createQueue(new SimpleString(forwardAddress), RoutingType.MULTICAST, queueName1, null, false); + + coreSession.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2, null, false); + + ClientProducer producer = coreSession.createProducer(new SimpleString(testAddress)); + final int numMessages = 1; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = coreSession.createMessage(false); + + message.putIntProperty(propKey, i); + + producer.send(message); + } + + coreSession.close(); + + //use openwire to receive + factory = new ActiveMQConnectionFactory(urlString); + Connection openwireConnection = factory.createConnection(); + + try { + Session session = openwireConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + openwireConnection.start(); + + Queue q1 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue1")); + Queue q2 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue2")); + + MessageConsumer consumer1 = session.createConsumer(q1); + MessageConsumer consumer2 = session.createConsumer(q2); + + System.out.println("receiving ..."); + for (int i = 0; i < numMessages; i++) { + Message message = consumer1.receive(TIMEOUT); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey.toString())); + + message.acknowledge(); + } + + Assert.assertNull(consumer1.receive(50)); + + for (int i = 0; i < numMessages; i++) { + Message message = consumer2.receive(TIMEOUT); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey.toString())); + + message.acknowledge(); + } + + Assert.assertNull(consumer2.receive(50)); + } finally { + if (openwireConnection != null) { + openwireConnection.close(); + } + } + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertTestBase.java new file mode 100644 index 0000000000..5aee84b500 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertTestBase.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.DivertConfiguration; + +import javax.jms.ConnectionFactory; + +public abstract class OpenWireDivertTestBase extends OpenWireTestBase { + + protected static final int TIMEOUT = 3000; + protected ConnectionFactory factory; + protected final String testAddress = "testAddress"; + protected final String forwardAddress = "forwardAddress"; + protected abstract boolean isExclusive(); + + @Override + protected void extraServerConfig(Configuration serverConfig) { + DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(isExclusive()); + serverConfig.addDivertConfiguration(divertConf); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireTestBase.java index b779a5fc55..e61a0f41c7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireTestBase.java @@ -45,6 +45,9 @@ public class OpenWireTestBase extends ActiveMQTestBase { public static final String OWHOST = "localhost"; public static final int OWPORT = 61616; + protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; + protected static final String urlStringLoose = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.tightEncodingEnabled=false"; + protected ActiveMQServer server; protected JMSServerManagerImpl jmsServer;