fix for prefetch size issue reported in AMQ-1807

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@727017 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2008-12-16 11:02:28 +00:00
parent 739e1378a6
commit d8bdf5b3e2
3 changed files with 38 additions and 9 deletions

View File

@ -243,11 +243,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// consumer
if (getPrefetchSize() != 0) {
prefetchExtension = Math.max(
prefetchExtension, index + 1);
prefetchExtension, index );
}
} else {
prefetchExtension = Math.max(0,
prefetchExtension - (index + 1));
prefetchExtension - index);
}
destination = node.getRegionDestination();
callDispatchMatched = true;

View File

@ -112,6 +112,11 @@ public class StompConnection {
headers.put("passcode", password);
StompFrame frame = new StompFrame("CONNECT", headers);
sendFrame(frame.toString());
StompFrame connect = receive();
if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
throw new Exception ("Not connected: " + connect.getBody());
}
}
public void disconnect() throws Exception {

View File

@ -884,7 +884,7 @@ public class StompTest extends CombinationTestSupport {
stompConnection.sendFrame(frame);
// wait a bit for MBean to get refreshed
try {
Thread.sleep(100);
Thread.sleep(200);
} catch (InterruptedException e){}
assertEquals(view.getDurableTopicSubscribers().length, 1);
@ -892,7 +892,7 @@ public class StompTest extends CombinationTestSupport {
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
Thread.sleep(100);
Thread.sleep(200);
} catch (InterruptedException e){}
//reconnect
@ -920,17 +920,41 @@ public class StompTest extends CombinationTestSupport {
stompConnection.begin("tx1");
stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null);
stompConnection.commit("tx1");
StompFrame connect = stompConnection.receive();
if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
throw new Exception ("Not connected");
}
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive();
assertNull(stompMessage.getHeaders().get("transaction"));
}
public void testPrefetchSize() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
stompConnection.subscribe("/queue/" + getQueueName(), "client", headers);
// send messages using JMS
sendMessage("message 1");
sendMessage("message 2");
sendMessage("message 3");
StompFrame frame = stompConnection.receive();
stompConnection.begin("tx1");
stompConnection.ack(frame, "tx1");
StompFrame frame1 = stompConnection.receive();
try {
StompFrame frame2 = stompConnection.receive(500);
if (frame2 != null) {
fail("Should not have received the second message");
}
} catch (SocketTimeoutException soe) {}
stompDisconnect();
}
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
int actual = clients.length;