From 0d47f62710ecd7e7513987dea289d362d3dc229f Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 14 Mar 2018 17:53:29 -0400 Subject: [PATCH 1/2] ARTEMIS-1286 Adding test replicating MQTT direct buffer leak (Test developed as an interaction between Justin Bertram, Philip Jenkins and Clebert Suconic through ARTEMIS-1286) --- tests/smoke-tests/pom.xml | 25 +++ .../main/resources/servers/mqtt/broker.xml | 199 ++++++++++++++++++ .../tests/smoke/mqtt/MQTTLeakTest.java | 193 +++++++++++++++++ 3 files changed, 417 insertions(+) create mode 100644 tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml create mode 100644 tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mqtt/MQTTLeakTest.java diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 90fdbcbff4..9d5dff0b1b 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -103,6 +103,15 @@ org.jboss.logmanager jboss-logmanager + + org.fusesource.mqtt-client + mqtt-client + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + RELEASE + @@ -153,6 +162,22 @@ ${basedir}/target/expire + + test-compile + create-mqtt + + create + + + + ${basedir}/target/classes/servers/mqtt + true + admin + admin + ${basedir}/target/mqtt + ${basedir}/target/classes/servers/mqtt + + diff --git a/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml b/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml new file mode 100644 index 0000000000..c318037b7a --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml @@ -0,0 +1,199 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 10485760 + + + + + + + + + + + + + + + + + + + + + 6488000 + + + + 5000 + + + 90 + + + 100Mb + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 + + + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + + 1M + 50000 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + + 50000 + 10 + PAGE + true + true + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ +
+
\ No newline at end of file diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mqtt/MQTTLeakTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mqtt/MQTTLeakTest.java new file mode 100644 index 0000000000..4ac374a159 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mqtt/MQTTLeakTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.smoke.mqtt; + +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.Semaphore; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.util.ServerUtil; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class MQTTLeakTest extends SmokeTestBase { + + public static final String SERVER_NAME_0 = "mqtt"; + + private static Process server0; + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + disableCheckThread(); + } + + @After + @Override + public void after() throws Exception { + super.after(); + cleanupData(SERVER_NAME_0); + } + + @Test + public void testMQTTLeak() throws Throwable { + + try { + server0 = startServer(SERVER_NAME_0, 0, 30000); + MQTTRunner.run(); + } finally { + + ServerUtil.killServer(server0); + } + } + + + + private static class MQTTRunner implements MqttCallback { + + private MqttAsyncClient mqttClient; + private MqttConnectOptions connOpts; + protected static MQTTRunner publisherClient; + protected static MQTTRunner consumerClient; + + private static String topicPaho1 = "State/PRN/"; + private static String topicPaho2 = "Soap/PRN/"; + public String name; + + private static final Semaphore semaphore = new Semaphore(2); + + public static void run() throws Exception { + publisherClient = new MQTTRunner(); + publisherClient.connect(); + publisherClient.name = "Pub"; + consumerClient = new MQTTRunner(); + consumerClient.connect(); + consumerClient.name = "Consumer"; + byte[] content = buildContent(); + + for (int idx = 0; idx < 500; idx++) { + if (idx % 100 == 0) { + System.out.println("Sent " + idx + " messages"); + } + MqttMessage msg = new MqttMessage(content); + semaphore.acquire(2); + publisherClient.mqttClient.publish(topicPaho1, msg); + } + } + + public void connect() { + // create a new Paho MqttClient + MemoryPersistence persistence = new MemoryPersistence(); + // establish the client ID for the life of this DPI publisherClient + String clientId = UUID.randomUUID().toString(); + try { + mqttClient = new MqttAsyncClient("tcp://localhost:1883", clientId, persistence); + // Create a set of connection options + connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + mqttClient.connect(connOpts); + } catch (MqttException e) { + e.printStackTrace(); + } + // pause a moment to get connected (prevents the race condition) + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // subscribe + try { + String[] topicsPaho = new String[]{topicPaho1, topicPaho2}; + int[] qos = new int[]{0, 0}; + mqttClient.subscribe(topicsPaho, qos); + } catch (MqttException e) { + e.printStackTrace(); + } + + mqttClient.setCallback(this); + } + + @Override + public void connectionLost(Throwable throwable) { + } + + int count = 0; + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + + count++; + + if (count % 100 == 0) { + System.out.println("Received " + count); + } + + semaphore.release(); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + public static byte[] buildContent() { + + ArrayList stringval2 = buildContentArray(); + int size = 0; + for (String value : stringval2) { + size += value.length(); + } + System.out.println(); + StringBuilder builder = new StringBuilder(size); + for (String value : stringval2) { + builder.append(value); + } + String msgContent = builder.toString(); + + return msgContent.getBytes(); + } + + public static ArrayList buildContentArray() { + ArrayList val = new ArrayList<>(); + String msgHdr = ""; + String msgChunk = "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiIHN0YW5kYWxvbmU9InllcyI/Pgo8bnMyOlRyYW5zZmVyIHhtbG5zOm5zMj0idXJuOmRwY2w6dXBkYXRlOjIwMTEtMTAtMTkiPgogICAgPGltYWdlU2VnbWVudD4KICAgICAgICA8Ym9hcmQ+MjU5PC9ib2FyZD4KICAgICAgICA8Y2F0ZWdvcnk+MjwvY2F0ZWdvcnk+CiAgICAgICAgPHZlcnNpb24+Mjg1NDA5Mjg1PC92ZXJzaW9uPgogICAgICAgIDxpZD4yNjwvaWQ+CiAgICAgICAgPHNpemU+MjA5NzE1Mjwvc2l6ZT4KICAgICAgICA8Y2hlY2tzdW0+NTE0ODI3MGJmZTM2ZmYzNmIyZTNmMjc0NWJlNmYyMGY8L2NoZWNrc3VtPgogICAgICAgIDxkYXRhPm5OQUJ1WHQvWG0xYlhGeC9aallZbEJ1K2NrWU1ncHBTMnZpTVZoOUxjTENjTFlTL1Z6YUxlSWNnWmtlMjI5Z1dlS1p6czlSclBrdVlsSHYvaWNlSldJeTUxaGFpVUx3NTY0NWtTTUlhMEhjNnZoYTB5UC91OEVNUEcvck9LL1JhVXpuS0tRdXF5WVNDVlZ3TWROS25IWjZ5Sm91TkdMcVJ3a0MvVDZUdStrTWxKak9TcjV6MUNYWDdtZWdvSGpLdkFuU1AyOFJWY0F3MWVXTUtIY0pQU0Z0bFZXSkFYVXErZjFzbE9HWXlNSGhiN2haV0VnMWc4TlRlVUJ2NHJGL0RtUitKRjRmbjlWdkRJSkJYanJpeE5CNWFyc1RKOTR3dEF2YWxVM28vVzVnODltbURNNHp0VlVuaHZvSlRTSlZ6bXlqTGpJMWQ5OExVVTVWU3dqWE5KMjZ2d0F4R1ptVmwrVGlMU0JaeWNYak45NlYxVUZ6eldOMStPN2h5SHRMZnMvOE9kRjVMK1ArbjZXOXNqNVA3aDdGZUU4UFVHbGpLcXhxWmFGbFZ4aXJPRjYrUExGTHFFMzAzUzVodzJPeDFBQjA5Sjl4VThjVXNtUVI0dlJBS3B0Y3ZpbXkzb1VncmxWQTBwNG83cFdlYkduak1kT1N6ZGR2M01uMi9rMldlOVRHNzI3OEhkdTdLQlNtVW95VTJSM0l6TitITXhXeGQ4"; + + val.add(msgHdr); + for (int idx = 0; idx < 300; idx++) { + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + } + return val; + } + } +} From a7333bcf9d671457d23b6b93ba3284f211f52dd2 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 14 Mar 2018 16:36:38 -0400 Subject: [PATCH 2/2] ARTEMIS-1286 Fixing MQTT Bytes message encode --- .../artemis/core/protocol/mqtt/MQTTPublishManager.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index ae0c0edb4d..febc364ae1 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -278,8 +278,9 @@ public class MQTTPublishManager { log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } default: - ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer(); - payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf(); + ActiveMQBuffer bodyBuffer = message.getReadOnlyBodyBuffer(); + payload = ByteBufAllocator.DEFAULT.buffer(bodyBuffer.writerIndex()); + payload.writeBytes(bodyBuffer.byteBuf()); break; } session.getProtocolHandler().send(messageId, address, qos, isRetain, payload, deliveryCount);