git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1352933 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-06-22 15:40:03 +00:00
parent ac8c8d1bc9
commit bd31548920
2 changed files with 78 additions and 14 deletions

View File

@ -44,6 +44,7 @@ import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
@ -485,13 +486,13 @@ public class ProtocolConverter {
throw new ProtocolException("SUBSCRIBE received without a subscription id!");
}
ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
if (actualDest == null) {
throw new ProtocolException("Invalid Destination.");
}
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setPrefetchSize(1000);
consumerInfo.setDispatchAsync(true);
@ -540,9 +541,45 @@ public class ProtocolConverter {
subscriptions.put(subscriptionId, stompSubscription);
}
// dispatch can beat the receipt so send it early
sendReceipt(command);
sendToActiveMQ(consumerInfo, null);
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
final StompFrame cmd = command;
final int prefetch = consumerInfo.getPrefetchSize();
// Since dispatch could beat the receipt we set prefetch to zero to start and then
// once we've sent our Receipt we are safe to turn on dispatch if the response isn't
// an error message.
consumerInfo.setPrefetchSize(0);
final ResponseHandler handler = new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// Generally a command can fail.. but that does not invalidate the connection.
// We report back the failure but we don't close the connection.
Throwable exception = ((ExceptionResponse)response).getException();
handleException(exception, cmd);
} else {
StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.RECEIPT);
sc.setHeaders(new HashMap<String, String>(1));
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
stompTransport.sendToStomp(sc);
ConsumerControl control = new ConsumerControl();
control.setPrefetch(prefetch);
control.setDestination(actualDest);
control.setConsumerId(id);
sendToActiveMQ(control, null);
}
}
};
sendToActiveMQ(consumerInfo, handler);
} else {
sendToActiveMQ(consumerInfo, createResponseHandler(command));
}
}
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {

View File

@ -194,7 +194,6 @@ public class StompTest extends CombinationTestSupport {
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("response-id:1") >= 0);
}
public void testSendMessage() throws Exception {
@ -383,7 +382,6 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
}
public void testSubscriptionReceipts() throws Exception {
final int done = 500;
int count = 0;
@ -392,7 +390,6 @@ public class StompTest extends CombinationTestSupport {
URI connectUri = new URI(bindAddress);
do {
StompConnection sender = new StompConnection();
sender.open(createSocket(connectUri));
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@ -434,7 +431,6 @@ public class StompTest extends CombinationTestSupport {
receiver.disconnect();
} while (count < done);
}
public void testSubscribeWithAutoAck() throws Exception {
@ -570,7 +566,6 @@ public class StompTest extends CombinationTestSupport {
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@ -756,7 +751,6 @@ public class StompTest extends CombinationTestSupport {
sendMessage(getName());
StompFrame msg = stompConnection.receive();
assertTrue(msg.getAction().equals("MESSAGE"));
HashMap<String, String> ackHeaders = new HashMap<String, String>();
@ -938,9 +932,43 @@ public class StompTest extends CombinationTestSupport {
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
}
public void testSubscribeWithReceiptNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" +
"ack:auto\n" + "receipt:1\n" + "\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
}
public void testSubscribeWithInvalidSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector:foo.bar = 1\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testTransformationUnknownTranslator() throws Exception {
@ -1991,7 +2019,6 @@ public class StompTest extends CombinationTestSupport {
assertTrue("Receipt Frame: " + frame, frame.trim().startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id " + frame, frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
// The subscription should receive a response with the ReplyTo property set.
StompFrame received = responder.receive();
assertNotNull(received);