diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java index b3847a436c..4aaa146717 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java @@ -77,11 +77,11 @@ public class StompSubscriptionRemoveTest extends TestCase { stompSocket = new Socket("localhost", 61613); inputBuffer = new ByteArrayOutputStream(); - String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL; + String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n"; sendFrame(connect_frame); String f = receiveFrame(100000); - String frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n" + Stomp.NULL; + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n"; sendFrame(frame); int messagesCount = 0; int count = 0; @@ -107,7 +107,7 @@ public class StompSubscriptionRemoveTest extends TestCase { } String messageId = line.substring(line.indexOf(':') + 1); messageId = messageId.trim(); - String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n" + Stomp.NULL; + String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n"; sendFrame(ackmessage); log.debug(receiveFrame); //Thread.sleep(1000); @@ -115,25 +115,20 @@ public class StompSubscriptionRemoveTest extends TestCase { ++count; } stompSocket.close(); - Thread.sleep(10000); - - // for (int idx = 0; idx < 500; ++idx) { - // producer.send(message); - // log.debug("Sending: " +idx); - // } stompSocket = new Socket("localhost", 61613); inputBuffer = new ByteArrayOutputStream(); - connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL; + connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n"; sendFrame(connect_frame); - f = receiveFrame(100000); - frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n" + Stomp.NULL; + f = receiveFrame(5000); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n"; sendFrame(frame); try { while (count != 2000) { - String receiveFrame = receiveFrame(10000); + String receiveFrame = receiveFrame(5000); DataInput input = new DataInputStream(new ByteArrayInputStream(receiveFrame.getBytes())); String line; while (true) { @@ -148,13 +143,14 @@ public class StompSubscriptionRemoveTest extends TestCase { } } } + line = input.readLine(); if (line == null) { throw new IOException("connection was closed"); } String messageId = line.substring(line.indexOf(':') + 1); messageId = messageId.trim(); - String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n" + Stomp.NULL; + String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n"; sendFrame(ackmessage); log.debug("Received: " + receiveFrame); //Thread.sleep(1000); @@ -164,10 +160,12 @@ public class StompSubscriptionRemoveTest extends TestCase { } catch (IOException ex) { - // timeout + ex.printStackTrace(); } + stompSocket.close(); broker.stop(); + log.info("Total messages received: " + messagesCount); assertTrue("Messages received after connection loss: " + messagesCount, messagesCount >= 2000); @@ -181,9 +179,8 @@ public class StompSubscriptionRemoveTest extends TestCase { public void sendFrame(String data) throws Exception { byte[] bytes = data.getBytes("UTF-8"); OutputStream outputStream = stompSocket.getOutputStream(); - for (int i = 0; i < bytes.length; i++) { - outputStream.write(bytes[i]); - } + outputStream.write(bytes); + outputStream.write(0); outputStream.flush(); }