https://issues.apache.org/jira/browse/AMQ-3653 - tidy up stomptest and resolve content length for stomp+nio, resolve break of stompnio and stompniossl tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1242911 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-02-10 20:18:50 +00:00
parent 33f9f2968b
commit 93a379fe3c
5 changed files with 44 additions and 49 deletions

View File

@ -583,7 +583,7 @@ public class ProtocolConverter {
HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA))); HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS)); acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
if (acceptsVersions.isEmpty()) { if (acceptsVersions.isEmpty()) {
throw new ProtocolException("Invlid Protocol version, supported versions are: " + throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true); Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
} else { } else {
this.version = Collections.max(acceptsVersions); this.version = Collections.max(acceptsVersions);

View File

@ -58,7 +58,7 @@ public class StompCodec {
action = ((StompWireFormat)transport.getWireFormat()).parseAction(data); action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data); headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH); String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
if (contentLengthHeader != null) { if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader); contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
} else { } else {
contentLength = -1; contentLength = -1;

View File

@ -52,7 +52,6 @@ public class StompConnection {
byte[] bytes = data.getBytes("UTF-8"); byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = stompSocket.getOutputStream(); OutputStream outputStream = stompSocket.getOutputStream();
outputStream.write(bytes); outputStream.write(bytes);
outputStream.write(0);
outputStream.flush(); outputStream.flush();
} }
@ -61,7 +60,6 @@ public class StompConnection {
OutputStream outputStream = stompSocket.getOutputStream(); OutputStream outputStream = stompSocket.getOutputStream();
outputStream.write(bytes); outputStream.write(bytes);
outputStream.write(data); outputStream.write(data);
outputStream.write(0);
outputStream.flush(); outputStream.flush();
} }

View File

@ -205,6 +205,8 @@ public class StompFrame implements Command {
buffer.append(Arrays.toString(getContent())); buffer.append(Arrays.toString(getContent()));
} }
} }
// terminate the frame
buffer.append('\u0000');
return buffer.toString(); return buffer.toString();
} }
} }

View File

@ -24,6 +24,7 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -36,6 +37,7 @@ import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -47,10 +49,10 @@ public class StompTest extends CombinationTestSupport {
protected String jmsUri = "vm://localhost"; protected String jmsUri = "vm://localhost";
private BrokerService broker; private BrokerService broker;
private StompConnection stompConnection = new StompConnection(); protected StompConnection stompConnection = new StompConnection();
private Connection connection; protected Connection connection;
private Session session; protected Session session;
private ActiveMQQueue queue; protected ActiveMQQueue queue;
private final String xmlObject = "<pojo>\n" private final String xmlObject = "<pojo>\n"
+ " <name>Dejan</name>\n" + " <name>Dejan</name>\n"
+ " <city>Belgrade</city>\n" + " <city>Belgrade</city>\n"
@ -115,7 +117,7 @@ public class StompTest extends CombinationTestSupport {
connection.start(); connection.start();
} }
private void stompConnect() throws IOException, URISyntaxException, UnknownHostException { protected void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
URI connectUri = new URI(bindAddress); URI connectUri = new URI(bindAddress);
stompConnection.open(createSocket(connectUri)); stompConnection.open(createSocket(connectUri));
} }
@ -146,7 +148,7 @@ public class StompTest extends CombinationTestSupport {
} }
} }
private void stompDisconnect() throws IOException { protected void stompDisconnect() throws IOException {
if (stompConnection != null) { if (stompConnection != null) {
stompConnection.close(); stompConnection.close();
stompConnection = null; stompConnection = null;
@ -351,8 +353,6 @@ public class StompTest extends CombinationTestSupport {
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
receiver.sendFrame(frame); receiver.sendFrame(frame);
waitForFrameToTakeEffect();
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL; frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL;
@ -362,7 +362,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(frame.startsWith("RECEIPT")); assertTrue(frame.startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
TextMessage message = (TextMessage)consumer.receive(2500); TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull(message); assertNotNull(message);
assertNull("JMS Message does not contain receipt request", message.getStringProperty(Stomp.Headers.RECEIPT_REQUESTED)); assertNull("JMS Message does not contain receipt request", message.getStringProperty(Stomp.Headers.RECEIPT_REQUESTED));
@ -388,7 +388,7 @@ public class StompTest extends CombinationTestSupport {
frame = sender.receiveFrame(); frame = sender.receiveFrame();
assertTrue(frame.startsWith("CONNECTED")); assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n\n" + "Hello World:" + (count++) + Stomp.NULL; frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n" + "Hello World:" + (count++) + "\n\n" + Stomp.NULL;
sender.sendFrame(frame); sender.sendFrame(frame);
frame = sender.receiveFrame(); frame = sender.receiveFrame();
assertTrue("" + frame, frame.startsWith("RECEIPT")); assertTrue("" + frame, frame.startsWith("RECEIPT"));
@ -586,7 +586,7 @@ public class StompTest extends CombinationTestSupport {
} }
// sleep a while before publishing another set of messages // sleep a while before publishing another set of messages
waitForFrameToTakeEffect(); TimeUnit.SECONDS.sleep(2);
for (int i = 0; i < ctr; ++i) { for (int i = 0; i < ctr; ++i) {
data[i] = getName() + ":second:" + i; data[i] = getName() + ":second:" + i;
@ -729,7 +729,7 @@ public class StompTest extends CombinationTestSupport {
assertTrue(message.getJMSRedelivered()); assertTrue(message.getJMSRedelivered());
} }
public void testSubscribeWithClientAckAndContentLength() throws Exception { public void testSubscribeWithClientAckedAndContentLength() throws Exception {
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -753,8 +753,14 @@ public class StompTest extends CombinationTestSupport {
StompFrame ack = new StompFrame("ACK", ackHeaders); StompFrame ack = new StompFrame("ACK", ackHeaders);
stompConnection.sendFrame(ack.format()); stompConnection.sendFrame(ack.format());
// Need some time for the Ack to get processed. final QueueViewMBean queueView = getProxyToQueue(getQueueName());
waitForFrameToTakeEffect(); assertTrue("dequeue complete", Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
LOG.info("queueView, enqueue:" + queueView.getEnqueueCount() +", dequeue:" + queueView.getDequeueCount() + ", inflight:" + queueView.getInFlightCount());
return queueView.getDequeueCount() == 1;
}
}));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
@ -785,10 +791,11 @@ public class StompTest extends CombinationTestSupport {
assertTrue(frame.startsWith("MESSAGE")); assertTrue(frame.startsWith("MESSAGE"));
// remove suscription // remove suscription
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n" + Stomp.NULL; frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt:1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
waitForFrameToTakeEffect(); frame = stompConnection.receiveFrame();
assertTrue("" + frame, frame.startsWith("RECEIPT"));
// send a message to our queue // send a message to our queue
sendMessage("second message"); sendMessage("second message");
@ -819,9 +826,7 @@ public class StompTest extends CombinationTestSupport {
frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
waitForFrameToTakeEffect(); TextMessage message = (TextMessage)consumer.receive(10000);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull("Should have received a message", message); assertNotNull("Should have received a message", message);
} }
@ -853,11 +858,8 @@ public class StompTest extends CombinationTestSupport {
frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
// This test case is currently failing
waitForFrameToTakeEffect();
// only second msg should be received since first msg was rolled back // only second msg should be received since first msg was rolled back
TextMessage message = (TextMessage)consumer.receive(2500); TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull(message); assertNotNull(message);
assertEquals("second message", message.getText().trim()); assertEquals("second message", message.getText().trim());
} }
@ -868,16 +870,11 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
// This test case is currently failing
waitForFrameToTakeEffect();
assertClients(2); assertClients(2);
// now lets kill the stomp connection // now lets kill the stomp connection
stompConnection.close(); stompConnection.close();
Thread.sleep(2000);
assertClients(1); assertClients(1);
} }
@ -1486,8 +1483,6 @@ public class StompTest extends CombinationTestSupport {
stompConnection.ack(frame5, "tx3"); stompConnection.ack(frame5, "tx3");
stompConnection.commit("tx3"); stompConnection.commit("tx3");
waitForFrameToTakeEffect();
stompDisconnect(); stompDisconnect();
} }
@ -1725,9 +1720,13 @@ public class StompTest extends CombinationTestSupport {
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
waitForFrameToTakeEffect(); final QueueViewMBean queueView = getProxyToQueue(getQueueName());
Wait.waitFor(new Wait.Condition(){
QueueViewMBean queueView = getProxyToQueue(getQueueName()); @Override
public boolean isSatisified() throws Exception {
return queueView.getDequeueCount() == 2;
}
});
assertEquals(2, queueView.getDispatchCount()); assertEquals(2, queueView.getDispatchCount());
assertEquals(2, queueView.getDequeueCount()); assertEquals(2, queueView.getDequeueCount());
assertEquals(0, queueView.getQueueSize()); assertEquals(0, queueView.getQueueSize());
@ -1896,7 +1895,14 @@ public class StompTest extends CombinationTestSupport {
return proxy; return proxy;
} }
protected void assertClients(int expected) throws Exception { protected void assertClients(final int expected) throws Exception {
Wait.waitFor(new Wait.Condition()
{
@Override
public boolean isSatisified() throws Exception {
return broker.getBroker().getClients().length == expected;
}
});
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
int actual = clients.length; int actual = clients.length;
@ -1936,8 +1942,6 @@ public class StompTest extends CombinationTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL; frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
waitForFrameToTakeEffect();
stompConnection.sendFrame(test); stompConnection.sendFrame(test);
// We only want one of them, to trigger the shutdown and potentially // We only want one of them, to trigger the shutdown and potentially
@ -1959,15 +1963,6 @@ public class StompTest extends CombinationTestSupport {
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
waitForFrameToTakeEffect();
stompConnection.close(); stompConnection.close();
} }
protected void waitForFrameToTakeEffect() throws InterruptedException {
// bit of a dirty hack :)
// another option would be to force some kind of receipt to be returned
// from the frame
Thread.sleep(2000);
}
} }