mirror of https://github.com/apache/activemq.git
added test case to show the configuration of prefetch sizes in Stomp using header "activemq.prefetchSize: 1"
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@360062 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f1dc4779c5
commit
20b343110c
|
@ -43,7 +43,7 @@ class Subscribe implements StompCommand {
|
|||
ci.setPrefetchSize(1000);
|
||||
ci.setDispatchAsync(true);
|
||||
|
||||
IntrospectionSupport.setProperties(ci, headers, "activemq:");
|
||||
IntrospectionSupport.setProperties(ci, headers, "activemq.");
|
||||
|
||||
ci.setDestination(DestinationNamer.convert(destination));
|
||||
|
||||
|
|
|
@ -17,16 +17,18 @@
|
|||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.Response;
|
||||
import org.apache.activemq.command.SessionInfo;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.*;
|
||||
import org.apache.activemq.transport.stomp.Stomp;
|
||||
import org.apache.activemq.transport.stomp.StompWireFormat;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
|
@ -37,33 +39,45 @@ public class StompWireFormatTest extends TestCase {
|
|||
wire = new StompWireFormat();
|
||||
}
|
||||
|
||||
public void testDummy() throws Exception {
|
||||
}
|
||||
|
||||
public void TODO_testValidConnectHandshake() throws Exception {
|
||||
String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL;
|
||||
DataInputStream din = new DataInputStream(new ByteArrayInputStream(connect_frame.getBytes()));
|
||||
public void testValidConnectHandshake() throws Exception {
|
||||
ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
||||
DataOutputStream dout = new DataOutputStream(bout);
|
||||
|
||||
ConnectionInfo ci = (ConnectionInfo) wire.readCommand(din);
|
||||
ConnectionInfo ci = (ConnectionInfo) parseCommand("CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL);
|
||||
assertNotNull(ci);
|
||||
assertTrue(ci.isResponseRequired());
|
||||
|
||||
Response cr = new Response();
|
||||
cr.setCorrelationId(ci.getCommandId());
|
||||
wire.writeCommand(cr, dout);
|
||||
|
||||
String response = writeCommand(cr);
|
||||
System.out.println("Received: " + response);
|
||||
|
||||
SessionInfo si = (SessionInfo) wire.readCommand(null);
|
||||
assertNotNull(si);
|
||||
assertTrue(si.isResponseRequired());
|
||||
assertTrue(!si.isResponseRequired());
|
||||
|
||||
ProducerInfo pi = (ProducerInfo) wire.readCommand(null);
|
||||
assertNotNull(pi);
|
||||
assertTrue(pi.isResponseRequired());
|
||||
|
||||
Response sr = new Response();
|
||||
sr.setCorrelationId(si.getCommandId());
|
||||
wire.writeCommand(sr, dout);
|
||||
sr.setCorrelationId(pi.getCommandId());
|
||||
response = writeCommand(sr);
|
||||
System.out.println("Received: " + response);
|
||||
assertTrue("Response should start with CONNECTED: " + response, response.startsWith("CONNECTED"));
|
||||
|
||||
String response = new String(bout.toByteArray());
|
||||
assertTrue(response.startsWith("CONNECTED"));
|
||||
// now lets test subscribe
|
||||
ConsumerInfo consumerInfo = (ConsumerInfo) parseCommand("SUBSCRIBE\n" + "destination: /queue/foo\n" + "ack: client\n" + "activemq.prefetchSize: 1\n"
|
||||
+ "\n" + Stomp.NULL);
|
||||
assertNotNull(consumerInfo);
|
||||
// assertTrue(consumerInfo.isResponseRequired());
|
||||
assertEquals("prefetch size", 1, consumerInfo.getPrefetchSize());
|
||||
|
||||
cr = new Response();
|
||||
cr.setCorrelationId(consumerInfo.getCommandId());
|
||||
response = writeCommand(cr);
|
||||
System.out.println("Received: " + response);
|
||||
}
|
||||
|
||||
public void _testFakeServer() throws Exception {
|
||||
|
@ -85,4 +99,18 @@ public class StompWireFormatTest extends TestCase {
|
|||
|
||||
System.err.println(System.in.read());
|
||||
}
|
||||
|
||||
protected Command parseCommand(String connect_frame) throws IOException, JMSException {
|
||||
DataInputStream din = new DataInputStream(new ByteArrayInputStream(connect_frame.getBytes()));
|
||||
|
||||
return wire.readCommand(din);
|
||||
}
|
||||
|
||||
protected String writeCommand(Command command) throws IOException, JMSException {
|
||||
ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
||||
DataOutputStream dout = new DataOutputStream(bout);
|
||||
wire.writeCommand(command, dout);
|
||||
return new String(bout.toByteArray());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue