mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6206 - ensure properties are marshalled before dispatch to broker so that their values are reflected in the memory usage
This commit is contained in:
parent
521c4fd8c3
commit
57264bf8dc
|
@ -341,6 +341,7 @@ public class ProtocolConverter {
|
||||||
}
|
}
|
||||||
|
|
||||||
message.onSend();
|
message.onSend();
|
||||||
|
message.beforeMarshall(null);
|
||||||
sendToActiveMQ(message, createResponseHandler(command));
|
sendToActiveMQ(message, createResponseHandler(command));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
@ -543,4 +544,60 @@ public class Stomp12Test extends StompTestSupport {
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("ERROR"));
|
assertTrue(frame.startsWith("ERROR"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testSizeAndBrokerUsage() throws Exception {
|
||||||
|
final int MSG_COUNT = 10;
|
||||||
|
final int numK = 4;
|
||||||
|
|
||||||
|
final byte[] bigPropContent = new byte[numK*1024];
|
||||||
|
// fill so we don't fall foul to trimming in v<earlier than 1.2>
|
||||||
|
Arrays.fill(bigPropContent, Byte.MAX_VALUE);
|
||||||
|
final String bigProp = new String(bigPropContent);
|
||||||
|
|
||||||
|
String connectFrame = "STOMP\n" +
|
||||||
|
"login:system\n" +
|
||||||
|
"passcode:manager\n" +
|
||||||
|
"accept-version:1.2\n" +
|
||||||
|
"host:localhost\n" +
|
||||||
|
"\n" + Stomp.NULL;
|
||||||
|
|
||||||
|
stompConnection.sendFrame(connectFrame);
|
||||||
|
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
LOG.debug("Broker sent: " + f);
|
||||||
|
|
||||||
|
assertTrue(f.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage();
|
||||||
|
|
||||||
|
for(int i = 0; i < MSG_COUNT; ++i) {
|
||||||
|
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"receipt:0\n" +
|
||||||
|
"myXkProp:" + bigProp + "\n"+
|
||||||
|
"\n" + "Hello World {" + i + "}" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(message);
|
||||||
|
StompFrame repsonse = stompConnection.receive();
|
||||||
|
LOG.info("response:" + repsonse);
|
||||||
|
assertEquals("0", repsonse.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify usage accounts for our numK
|
||||||
|
long usageEnd = brokerService.getSystemUsage().getMemoryUsage().getUsage();
|
||||||
|
|
||||||
|
long usageDiff = usageEnd - usageStart;
|
||||||
|
LOG.info("usageDiff:" + usageDiff);
|
||||||
|
assertTrue(usageDiff > MSG_COUNT * numK * 1024);
|
||||||
|
|
||||||
|
String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"id:12345\n" + "browser:true\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(subscribe);
|
||||||
|
|
||||||
|
for(int i = 0; i < MSG_COUNT; ++i) {
|
||||||
|
StompFrame message = stompConnection.receive();
|
||||||
|
assertEquals(Stomp.Responses.MESSAGE, message.getAction());
|
||||||
|
assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue