From e801df79747bfe8c79a8e6642712822603a76190 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20T=C3=B3th?= Date: Wed, 21 Jun 2017 11:37:45 +0200 Subject: [PATCH 1/2] ARTEMIS-607 New tests AMQP,MQTT,Stomp interceptor properties tests Added tests for AMQP, MQTT and Stomp protocol for interceptors. Tests are checking intercepted message properties. Linked issue ARTEMIS-607 --- .../amqp/AmqpSendReceiveInterceptorTest.java | 102 +++++++++++++ .../MQTTInterceptorPropertiesTest.java | 136 ++++++++++++++++++ .../stomp/StompTestPropertiesInterceptor.java | 98 +++++++++++++ 3 files changed, 336 insertions(+) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestPropertiesInterceptor.java diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java index beb6a882b3..20be85d236 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java @@ -26,8 +26,12 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.Properties; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -71,6 +75,104 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport { AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); assertEquals(latch2.getCount(), 0); + sender.close(); + receiver.close(); + connection.close(); + } + + private static final String ADDRESS = "address"; + private static final String MESSAGE_ID = "messageId"; + private static final String CORRELATION_ID = "correlationId"; + private static final String MESSAGE_TEXT = "messageText"; + private static final String DURABLE = "durable"; + private static final String PRIORITY = "priority"; + private static final String REPLY_TO = "replyTo"; + private static final String TIME_TO_LIVE = "timeToLive"; + + + private boolean checkMessageProperties(AMQPMessage message, Map expectedProperties) { + assertNotNull(message); + assertNotNull(server.getNodeID()); + + assertNotNull(message.getConnectionID()); + assertEquals(message.getAddress(), expectedProperties.get(ADDRESS)); + assertEquals(message.isDurable(), expectedProperties.get(DURABLE)); + + Properties props = message.getProperties(); + assertEquals(props.getCorrelationId(), expectedProperties.get(CORRELATION_ID)); + assertEquals(props.getReplyTo(), expectedProperties.get(REPLY_TO)); + assertEquals(props.getMessageId(), expectedProperties.get(MESSAGE_ID)); + + Header header = message.getHeader(); + assertEquals(header.getDurable(), expectedProperties.get(DURABLE)); + assertEquals(header.getTtl().toString(), expectedProperties.get(TIME_TO_LIVE).toString()); + assertEquals(header.getPriority().toString(), expectedProperties.get(PRIORITY).toString()); + return true; + } + + @Test(timeout = 60000) + public void testCheckInterceptedMessageProperties() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + final String addressQueue = getTestName(); + final String messageId = "lala200"; + final String correlationId = "lala-corrId"; + final String msgText = "Test intercepted message"; + final boolean durableMsg = false; + final short priority = 8; + final long timeToLive = 10000; + final String replyTo = "reply-to-myQueue"; + + Map expectedProperties = new HashMap<>(); + expectedProperties.put(ADDRESS, addressQueue); + expectedProperties.put(MESSAGE_ID, messageId); + expectedProperties.put(CORRELATION_ID, correlationId); + expectedProperties.put(MESSAGE_TEXT, msgText); + expectedProperties.put(DURABLE, durableMsg); + expectedProperties.put(PRIORITY, priority); + expectedProperties.put(REPLY_TO, replyTo); + expectedProperties.put(TIME_TO_LIVE, timeToLive); + + server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() { + @Override + public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException { + latch.countDown(); + return checkMessageProperties(message, expectedProperties); + } + }); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + + message.setMessageId(messageId); + message.setCorrelationId(correlationId); + message.setText(msgText); + message.setDurable(durableMsg); + message.setPriority(priority); + message.setReplyToAddress(replyTo); + message.setTimeToLive(timeToLive); + + sender.send(message); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + final CountDownLatch latch2 = new CountDownLatch(1); + server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() { + @Override + public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException { + latch2.countDown(); + return checkMessageProperties(packet, expectedProperties); + } + }); + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(2); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(latch2.getCount(), 0); + sender.close(); receiver.close(); connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java new file mode 100644 index 0000000000..375e2f2986 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.apache.felix.resolver.util.ArrayMap; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ErrorCollector; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { + + @Override + @Before + public void setUp() throws Exception { + Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); + sessions.setAccessible(true); + sessions.set(null, new ConcurrentHashMap<>()); + + Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS"); + connectedClients.setAccessible(true); + connectedClients.set(null, new ConcurrentHashSet<>()); + super.setUp(); + } + + private static final String ADDRESS = "address"; + private static final String MESSAGE_TEXT = "messageText"; + private static final String RETAINED = "retained"; + + + private boolean checkMessageProperties(MqttMessage message, Map expectedProperties) { + System.out.println("Checking properties in interceptor"); + try { + assertNotNull(message); + assertNotNull(server.getNodeID()); + MqttFixedHeader header = message.fixedHeader(); + assertNotNull(header.messageType()); + assertEquals(header.qosLevel().value(), AT_MOST_ONCE); + assertEquals(header.isRetain(), expectedProperties.get(RETAINED)); + } catch (Throwable t) { + collector.addError(t); + } + return true; + } + + @Rule + public ErrorCollector collector = new ErrorCollector(); + + @Test(timeout = 60000) + public void testCheckInterceptedMQTTMessageProperties() throws Exception { + final String addressQueue = name.getMethodName(); + final String msgText = "Test intercepted message"; + final boolean retained = true; + + Map expectedProperties = new ArrayMap<>(); + expectedProperties.put(ADDRESS, addressQueue); + expectedProperties.put(MESSAGE_TEXT, msgText); + expectedProperties.put(RETAINED, retained); + + + final MQTTClientProvider subscribeProvider = getMQTTClientProvider(); + initializeConnection(subscribeProvider); + + subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE); + + final CountDownLatch latch = new CountDownLatch(1); + MQTTInterceptor incomingInterceptor = new MQTTInterceptor() { + @Override + public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { + System.out.println("incoming"); + return checkMessageProperties(packet, expectedProperties); + } + }; + + MQTTInterceptor outgoingInterceptor = new MQTTInterceptor() { + @Override + public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { + System.out.println("outgoing"); + return checkMessageProperties(packet, expectedProperties); + } + }; + server.getRemotingService().addIncomingInterceptor(incomingInterceptor); + server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor); + + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + byte[] payload = subscribeProvider.receive(10000); + assertNotNull("Should get a message", payload); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + thread.start(); + + final MQTTClientProvider publishProvider = getMQTTClientProvider(); + initializeConnection(publishProvider); + publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, retained); + + latch.await(10, TimeUnit.SECONDS); + subscribeProvider.disconnect(); + publishProvider.disconnect(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestPropertiesInterceptor.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestPropertiesInterceptor.java new file mode 100644 index 0000000000..d380911f43 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestPropertiesInterceptor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.stomp; + +import org.apache.activemq.artemis.core.protocol.stomp.StompFrame; +import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.felix.resolver.util.ArrayMap; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class StompTestPropertiesInterceptor extends StompTestBase { + + @Override + public List getIncomingInterceptors() { + List stompIncomingInterceptor = new ArrayList<>(); + stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestPropertiesInterceptor$StompFramePropertiesInterceptor"); + return stompIncomingInterceptor; + } + + @Override + public List getOutgoingInterceptors() { + List stompOutgoingInterceptor = new ArrayList<>(); + stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestPropertiesInterceptor$StompFramePropertiesInterceptor"); + + return stompOutgoingInterceptor; + } + + public static class StompFramePropertiesInterceptor implements StompFrameInterceptor { + + @Override + public boolean intercept(StompFrame stompFrame, RemotingConnection connection) { + if (stompFrame.getCommand().equals("CONNECT") || stompFrame.getCommand().equals("CONNECTED")) { + return true; + } + System.out.println("Checking properties in interceptor"); + assertNotNull(stompFrame); + assertEquals(stompFrame.getHeader(MY_HEADER), expectedProperties.get(MY_HEADER)); + assertEquals(stompFrame.getBody(), expectedProperties.get(MESSAGE_TEXT)); + return true; + } + } + + + private static final String MESSAGE_TEXT = "messageText"; + private static final String MY_HEADER = "my-header"; + private static Map expectedProperties = new ArrayMap<>(); + + @Test(timeout = 60000) + public void testCheckInterceptedStompMessageProperties() throws Exception { + final String msgText = "Test intercepted message"; + final String myHeader = "TestInterceptedHeader"; + expectedProperties.put(MESSAGE_TEXT, msgText); + expectedProperties.put(MY_HEADER, myHeader); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + + ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); + + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", name.getMethodName()); + subFrame.addHeader("ack", "auto"); + subFrame.addHeader(MY_HEADER, myHeader); + subFrame.setBody(msgText); + + conn.sendFrame(subFrame); + + ClientStompFrame frame = conn.createFrame("SEND"); + frame.addHeader("destination", name.getMethodName()); + frame.addHeader("ack", "auto"); + frame.addHeader(MY_HEADER, myHeader); + conn.sendFrame(frame); + + conn.disconnect(); + + } +} From 2b804261532ae10db073cdccbfdf04ace999dc95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20T=C3=B3th?= Date: Wed, 21 Jun 2017 13:17:54 +0200 Subject: [PATCH 2/2] ARTEMIS-1244 Retain flag code todo comment Identified possible bug in MQTT receive of missing retained flag. --- .../integration/mqtt/imported/MQTTInterceptorPropertiesTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java index 375e2f2986..0b6219456e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java @@ -64,6 +64,7 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { MqttFixedHeader header = message.fixedHeader(); assertNotNull(header.messageType()); assertEquals(header.qosLevel().value(), AT_MOST_ONCE); + // TODO resolve the following line based on result of ARTEMIS-1244, currently fails (2.1.0) assertEquals(header.isRetain(), expectedProperties.get(RETAINED)); } catch (Throwable t) { collector.addError(t);