From 1c45d1758db0fa3543d6ee07359a6fdd1fbb7ded Mon Sep 17 00:00:00 2001 From: Tomas Hofman Date: Wed, 3 Jul 2019 12:30:44 +0200 Subject: [PATCH 1/2] ARTEMIS-2409 Convert HornetQ field names in consumer/queue selector strings --- .../artemis/utils/SelectorTranslator.java | 15 +++ .../HornetQProtocolManagerFactory.java | 1 + .../HQFilterConversionInterceptor.java | 62 ++++++++++++ .../hornetq/HornetQProtocolTest.java | 94 +++++++++++++++++++ .../jms/client/SelectorTranslatorTest.java | 31 ++++++ 5 files changed, 203 insertions(+) create mode 100644 artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQFilterConversionInterceptor.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java index dd391a9e1d..d006783483 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java @@ -54,6 +54,21 @@ public class SelectorTranslator { } + public static String convertHQToActiveMQFilterString(final String hqFilterString) { + if (hqFilterString == null) { + return null; + } + + String filterString = SelectorTranslator.parse(hqFilterString, "HQDurable", "AMQDurable"); + filterString = SelectorTranslator.parse(filterString, "HQPriority", "AMQPriority"); + filterString = SelectorTranslator.parse(filterString, "HQTimestamp", "AMQTimestamp"); + filterString = SelectorTranslator.parse(filterString, "HQUserID", "AMQUserID"); + filterString = SelectorTranslator.parse(filterString, "HQExpiration", "AMQExpiration"); + + return filterString; + + } + private static String parse(final String input, final String match, final String replace) { final char quote = '\''; diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java index de378c5901..cb8cc167c0 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java @@ -47,6 +47,7 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory { List hqOutgoing = filterInterceptors(outgoingInterceptors); hqIncoming.add(new HQPropertiesConversionInterceptor(true)); + hqIncoming.add(new HQFilterConversionInterceptor()); hqOutgoing.add(new HQPropertiesConversionInterceptor(false)); stripPasswordParameters(parameters); diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQFilterConversionInterceptor.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQFilterConversionInterceptor.java new file mode 100644 index 0000000000..e1a5257d13 --- /dev/null +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQFilterConversionInterceptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.hornetq; + +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.SelectorTranslator; + +public class HQFilterConversionInterceptor implements Interceptor { + @Override + public boolean intercept(Packet packet, RemotingConnection connection) { + if (packet.getType() == PacketImpl.SESS_CREATECONSUMER) { + handleMessage((SessionCreateConsumerMessage) packet); + } else if (packet.getType() == PacketImpl.CREATE_QUEUE || packet.getType() == PacketImpl.CREATE_QUEUE_V2) { + handleMessage((CreateQueueMessage) packet); + } else if (packet.getType() == PacketImpl.CREATE_SHARED_QUEUE || packet.getType() == PacketImpl.CREATE_SHARED_QUEUE_V2) { + handleMessage((CreateSharedQueueMessage) packet); + } + return true; + } + + private void handleMessage(SessionCreateConsumerMessage message) { + message.setFilterString(replaceFilterString(message.getFilterString())); + } + + private void handleMessage(CreateQueueMessage message) { + message.setFilterString(replaceFilterString(message.getFilterString())); + } + + private void handleMessage(CreateSharedQueueMessage message) { + message.setFilterString(replaceFilterString(message.getFilterString())); + } + + private SimpleString replaceFilterString(SimpleString filterString) { + if (filterString == null) { + return null; + } + return SimpleString.toSimpleString( + SelectorTranslator.convertHQToActiveMQFilterString(filterString.toString())); + } +} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java index 920b77a87a..b498d34fb5 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java @@ -32,7 +32,9 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.utils.UUIDGenerator; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -104,6 +106,45 @@ public class HornetQProtocolTest extends ActiveMQTestBase { hqSession.close(); } + + @Test + public void testCreateConsumerHQSelectorIsTransformed() throws Exception { + try (org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession()) { + + // Create Queue + String queueName = "test.hq.queue"; + hqSession.createQueue(queueName, queueName, true); + + hqSession.start(); + + // Send message with UserID set + org.hornetq.utils.UUID userID = UUIDGenerator.getInstance().generateUUID(); + try (org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName)) { + org.hornetq.api.core.client.ClientMessage message = createHQTestMessage(hqSession); + message.setUserID(userID); + hqProducer.send(message); + } + + // Verify that selector using AMQ field works + verifyConsumerWithSelector(hqSession, queueName, + String.format("AMQUserID = 'ID:%s'", userID.toString()), userID); + + // Verify that selector using HornetQ field works + verifyConsumerWithSelector(hqSession, queueName, + String.format("HQUserID = 'ID:%s'", userID.toString()), userID); + } + } + + @Test + public void testCreateQueueHQFilterIsTransformed() throws Exception { + testQueueWithHQFilter(false); + } + + @Test + public void testCreateTemporaryQueueHQFilterIsTransformed() throws Exception { + testQueueWithHQFilter(true); + } + @Test public void testLargeMessagesOverHornetQClients() throws Exception { org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession(); @@ -236,4 +277,57 @@ public class HornetQProtocolTest extends ActiveMQTestBase { return sf.createSession(); } + + private static void verifyConsumerWithSelector(org.hornetq.api.core.client.ClientSession hqSession, String queueName, + String selector, org.hornetq.utils.UUID expectedUserID) + throws HornetQException { + try (org.hornetq.api.core.client.ClientConsumer hqConsumer = + hqSession.createConsumer(queueName, selector, true)) { + org.hornetq.api.core.client.ClientMessage message = hqConsumer.receive(1000); + + Assert.assertNotNull(message); + Assert.assertEquals(expectedUserID, message.getUserID()); + } + } + + private void testQueueWithHQFilter(boolean temporary) throws Exception { + try (org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession()) { + + org.hornetq.utils.UUID userID = UUIDGenerator.getInstance().generateUUID(); + + // Create queue with filter + String queueName = "test.hq.queue"; + String filter = String.format("HQUserID = 'ID:%s'", userID.toString()); + if (temporary) { + hqSession.createTemporaryQueue(queueName, queueName, filter); + } else { + hqSession.createQueue(queueName, queueName, filter, true); + } + + hqSession.start(); + + // Send two messages with different UserIDs + try (org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName)) { + org.hornetq.api.core.client.ClientMessage message = createHQTestMessage(hqSession); + message.setUserID(userID); + hqProducer.send(message); + + message = createHQTestMessage(hqSession); + message.setUserID(UUIDGenerator.getInstance().generateUUID()); + hqProducer.send(message); + } + + // Only the message matching the queue filter should be present + try (org.hornetq.api.core.client.ClientConsumer hqConsumer = + hqSession.createConsumer(queueName, true)) { + org.hornetq.api.core.client.ClientMessage message = hqConsumer.receiveImmediate(); + + Assert.assertNotNull(message); + Assert.assertEquals(userID, message.getUserID()); + + message = hqConsumer.receiveImmediate(); + Assert.assertNull(message); + } + } + } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/jms/client/SelectorTranslatorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/jms/client/SelectorTranslatorTest.java index c2b14eba37..cc8667b597 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/jms/client/SelectorTranslatorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/jms/client/SelectorTranslatorTest.java @@ -204,6 +204,37 @@ public class SelectorTranslatorTest extends ActiveMQTestBase { checkNoSubstitute("JMSType"); } + @Test + public void testConvertHQFilterString() { + String selector = "HQUserID = 'ID:AMQ-12435678'"; + + Assert.assertEquals("AMQUserID = 'ID:AMQ-12435678'", SelectorTranslator.convertHQToActiveMQFilterString(selector)); + + selector = "HQUserID = 'HQUserID'"; + + Assert.assertEquals("AMQUserID = 'HQUserID'", SelectorTranslator.convertHQToActiveMQFilterString(selector)); + + selector = "HQUserID = 'ID:AMQ-12435678'"; + + Assert.assertEquals("AMQUserID = 'ID:AMQ-12435678'", SelectorTranslator.convertHQToActiveMQFilterString(selector)); + + selector = "HQDurable='NON_DURABLE'"; + + Assert.assertEquals("AMQDurable='NON_DURABLE'", SelectorTranslator.convertHQToActiveMQFilterString(selector)); + + selector = "HQPriority=5"; + + Assert.assertEquals("AMQPriority=5", SelectorTranslator.convertHQToActiveMQFilterString(selector)); + + selector = "HQTimestamp=12345678"; + + Assert.assertEquals("AMQTimestamp=12345678", SelectorTranslator.convertHQToActiveMQFilterString(selector)); + + selector = "HQExpiration=12345678"; + + Assert.assertEquals("AMQExpiration=12345678", SelectorTranslator.convertHQToActiveMQFilterString(selector)); + } + // Private ------------------------------------------------------------------------------------- private void checkNoSubstitute(final String fieldName) { From 0585db74214ccbc4b838f8498ab55b2ba2a4f5b1 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 17 Jul 2019 11:32:10 -0400 Subject: [PATCH 2/2] ARTEMIS-2409 Adding Compatibility test for hornetQ selector client --- .../resources/hqselector/sendMessages.groovy | 64 +++++++++++++ .../tests/compatibility/HQSelectorTest.java | 89 +++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 tests/compatibility-tests/src/main/resources/hqselector/sendMessages.groovy create mode 100644 tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQSelectorTest.java diff --git a/tests/compatibility-tests/src/main/resources/hqselector/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/hqselector/sendMessages.groovy new file mode 100644 index 0000000000..ba06f45fa8 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/hqselector/sendMessages.groovy @@ -0,0 +1,64 @@ +package hqselector + +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +try { + legacyOption = legacy; +} catch (Throwable e) { + legacyOption = false; +} + + +if (legacyOption) { + queueName = "jms.queue.queue" + topicName = "jms.topic.topic" +} else { + queueName = "queue"; + topicName = "topic"; +} + +// Can't depend directly on hornetq, otherwise it wouldn't compile in artemis +GroovyRun.evaluate("clients/hornetqClient.groovy", "serverArg"); + +Connection connection = cf.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +Queue queue = session.createQueue(queueName) +MessageProducer messageProducer = session.createProducer(queue); +Message message = session.createMessage(); +messageProducer.setPriority(5); +messageProducer.send(message); +message = session.createMessage(); +messageProducer.setPriority(1) +messageProducer.send(message); + +connection.start(); + +MessageConsumer consumer = session.createConsumer(queue, "HQPriority>=5"); + +message = consumer.receive(5000); +GroovyRun.assertNotNull(message); +GroovyRun.assertEquals(5, message.getJMSPriority()); + +message = consumer.receiveNoWait(); +GroovyRun.assertNull(message); + +connection.close(); diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQSelectorTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQSelectorTest.java new file mode 100644 index 0000000000..c12b04c298 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/HQSelectorTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_235; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.HORNETQ_247; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; + +/** + * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: + * + * cd /compatibility-tests + * mvn install -Ptests | tee output.log + * + * on the output.log you will see the output generated by {@link #getClasspath(String)} + * + * On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test. + * On Idea you would do the following: + * + * Run->Edit Configuration->Add ArtemisMeshTest and add your properties. + */ +@RunWith(Parameterized.class) +public class HQSelectorTest extends VersionedBase { + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + // we don't need every single version ever released.. + // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time + List combinations = new ArrayList<>(); + + /* + // during development sometimes is useful to comment out the combinations + // and add the ones you are interested.. example: + */ + // combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE}); + // combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE}); + + combinations.add(new Object[]{SNAPSHOT, HORNETQ_247, HORNETQ_247}); + combinations.add(new Object[]{SNAPSHOT, HORNETQ_235, HORNETQ_235}); + return combinations; + } + + public HQSelectorTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Test + public void testSendReceive() throws Throwable { + + FileUtil.deleteDirectory(serverFolder.getRoot()); + setVariable(serverClassloader, "persistent", Boolean.TRUE); + startServer(serverFolder.getRoot(), serverClassloader, "live"); + + try { + evaluate(senderClassloader, "hqselector/sendMessages.groovy"); + } finally { + stopServer(serverClassloader); + } + } + +} +