From 26752a7aafa5651e41abf23ac550c6c09bb08287 Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Fri, 11 Aug 2017 21:27:49 +0100 Subject: [PATCH] ARTEMIS-1348 Support LVQ for AMQP Add support for LVQ, using the same property key as core "_AMQ_LVQ_NAME" Add test case for AMQP LVQ. --- .../protocol/amqp/broker/AMQPMessage.java | 6 + .../amqp/JMSClientTestSupport.java | 53 ++++++ .../tests/integration/amqp/JMSLVQTest.java | 152 ++++++++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index db39c063f0..b4752082d7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -62,6 +62,7 @@ import io.netty.buffer.Unpooled; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { + public static final String HDR_LAST_VALUE_NAME = org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(); public static final int DEFAULT_MESSAGE_PRIORITY = 4; public static final int MAX_MESSAGE_PRIORITY = 9; @@ -1000,6 +1001,11 @@ public class AMQPMessage extends RefCountMessage { } } + @Override + public SimpleString getLastValueProperty() { + return getSimpleStringProperty(HDR_LAST_VALUE_NAME); + } + @Override public SimpleString getReplyTo() { if (getProperties() != null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java index 3f96711976..190dd786d4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -23,6 +23,7 @@ import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory; import org.jboss.logging.Logger; @@ -206,4 +207,56 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { return connection; } + + protected String getBrokerOpenWireJMSConnectionString() { + + try { + int port = AMQP_PORT; + + String uri = null; + + if (isUseSSL()) { + uri = "tcp://127.0.0.1:" + port; + } else { + uri = "tcp://127.0.0.1:" + port; + } + + if (!getJmsConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getJmsConnectionURIOptions(); + } else { + uri = uri + "?wireFormat.cacheEnabled=true"; + } + + return uri; + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected Connection createOpenWireConnection() throws JMSException { + return createCoreConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true); + } + + private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString); + + Connection connection = trackJMSConnection(factory.createConnection(username, password)); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + + if (start) { + connection.start(); + } + + return connection; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java new file mode 100644 index 0000000000..b634a3954a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.junit.Test; + +public class JMSLVQTest extends JMSClientTestSupport { + + private static final String LVQ_QUEUE_NAME = "LVQ"; + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + @Override + protected void addConfiguration(ActiveMQServer server) { + server.getAddressSettingsRepository().addMatch(LVQ_QUEUE_NAME, new AddressSettings().setLastValueQueue(true)); + } + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + super.createAddressAndQueues(server); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(LVQ_QUEUE_NAME), RoutingType.ANYCAST)); + server.createQueue(SimpleString.toSimpleString(LVQ_QUEUE_NAME), RoutingType.ANYCAST, SimpleString.toSimpleString("LVQ"), null, true, false, -1, false, true); + } + + + @Test + public void testLVQAMQPProducerAMQPConsumer() throws Exception { + Connection producerConnection = createConnection(); + Connection consumerConnection = createConnection(); + testLVQ(producerConnection, consumerConnection); + } + + @Test + public void testLVQCoreProducerCoreConsumer() throws Exception { + Connection producerConnection = createCoreConnection(); + Connection consumerConnection = createCoreConnection(); + testLVQ(producerConnection, consumerConnection); + } + + @Test + public void testLVQCoreProducerAMQPConsumer() throws Exception { + Connection producerConnection = createCoreConnection(); + Connection consumerConnection = createConnection(); + testLVQ(producerConnection, consumerConnection); + } + + @Test + public void testLVQAMQPProducerCoreConsumer() throws Exception { + Connection producerConnection = createConnection(); + Connection consumerConnection = createCoreConnection(); + testLVQ(producerConnection, consumerConnection); + } + + @Test + public void testLVQOpenWireProducerOpenWireConsumer() throws Exception { + Connection producerConnection = createOpenWireConnection(); + Connection consumerConnection = createOpenWireConnection(); + testLVQ(producerConnection, consumerConnection); + } + + @Test + public void testLVQCoreProducerOpenWireConsumer() throws Exception { + Connection producerConnection = createCoreConnection(); + Connection consumerConnection = createOpenWireConnection(); + testLVQ(producerConnection, consumerConnection); + } + + @Test + public void testLVQOpenWireProducerCoreConsumer() throws Exception { + Connection producerConnection = createOpenWireConnection(); + Connection consumerConnection = createCoreConnection(); + testLVQ(producerConnection, consumerConnection); + } + + @Test + public void testLVQAMQPProducerOpenWireConsumer() throws Exception { + Connection producerConnection = createConnection(); + Connection consumerConnection = createOpenWireConnection(); + testLVQ(producerConnection, consumerConnection); + } + + @Test + public void testLVQOpenWireProducerAMQPConsumer() throws Exception { + Connection producerConnection = createOpenWireConnection(); + Connection consumerConnection = createConnection(); + testLVQ(producerConnection, consumerConnection); + } + + public void testLVQ(Connection producerConnection, Connection consumerConnection) throws Exception { + + try { + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue1 = producerSession.createQueue(LVQ_QUEUE_NAME); + MessageProducer p = producerSession.createProducer(null); + + TextMessage message1 = producerSession.createTextMessage(); + message1.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY"); + message1.setText("hello"); + p.send(queue1, message1); + + TextMessage message2 = producerSession.createTextMessage(); + message2.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY"); + message2.setText("how are you"); + p.send(queue1, message2); + + + Session consumerSession = consumerConnection.createSession(); + Queue consumerQueue = consumerSession.createQueue(LVQ_QUEUE_NAME); + MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME)); + assertTrue(msg instanceof TextMessage); + assertEquals("how are you", ((TextMessage)msg).getText()); + consumer.close(); + + } finally { + producerConnection.close(); + consumerConnection.close(); + } + } +} \ No newline at end of file