From 57264bf8dc9970d7d808cf24216b6185ceb644e2 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 9 Mar 2016 22:13:37 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6206 - ensure properties are marshalled before dispatch to broker so that their values are reflected in the memory usage --- .../transport/stomp/ProtocolConverter.java | 1 + .../activemq/transport/stomp/Stomp12Test.java | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 8539b59ee0..b25860bf68 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -341,6 +341,7 @@ public class ProtocolConverter { } message.onSend(); + message.beforeMarshall(null); sendToActiveMQ(message, createResponseHandler(command)); } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java index 6e0ad0abdc..b7560c7234 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.Socket; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import javax.jms.Connection; @@ -543,4 +544,60 @@ public class Stomp12Test extends StompTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("ERROR")); } + + @Test(timeout = 60000) + public void testSizeAndBrokerUsage() throws Exception { + final int MSG_COUNT = 10; + final int numK = 4; + + final byte[] bigPropContent = new byte[numK*1024]; + // fill so we don't fall foul to trimming in v + Arrays.fill(bigPropContent, Byte.MAX_VALUE); + final String bigProp = new String(bigPropContent); + + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.2\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + + for(int i = 0; i < MSG_COUNT; ++i) { + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + + "receipt:0\n" + + "myXkProp:" + bigProp + "\n"+ + "\n" + "Hello World {" + i + "}" + Stomp.NULL; + stompConnection.sendFrame(message); + StompFrame repsonse = stompConnection.receive(); + LOG.info("response:" + repsonse); + assertEquals("0", repsonse.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID)); + } + + // verify usage accounts for our numK + long usageEnd = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + + long usageDiff = usageEnd - usageStart; + LOG.info("usageDiff:" + usageDiff); + assertTrue(usageDiff > MSG_COUNT * numK * 1024); + + String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "browser:true\n\n" + Stomp.NULL; + stompConnection.sendFrame(subscribe); + + for(int i = 0; i < MSG_COUNT; ++i) { + StompFrame message = stompConnection.receive(); + assertEquals(Stomp.Responses.MESSAGE, message.getAction()); + assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION)); + } + + } }