mirror of https://github.com/apache/activemq.git
Missing synchronization would cause acks to not be delivered to the broker. After enough acks were missed,
the consumer would stop receiving messages due to the broker thinking the consumers prefetch is full. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@394710 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c46562ba7c
commit
178f34bd9b
|
@ -93,7 +93,7 @@ public class Subscription {
|
|||
out.write(builder.toFrame());
|
||||
}
|
||||
|
||||
private void addMessageDispatch(MessageDispatch md) {
|
||||
synchronized private void addMessageDispatch(MessageDispatch md) {
|
||||
dispatchedMessages.addLast(md);
|
||||
}
|
||||
|
||||
|
@ -117,7 +117,7 @@ public class Subscription {
|
|||
return subscriptionId;
|
||||
}
|
||||
|
||||
public MessageAck createMessageAck(String message_id) {
|
||||
synchronized public MessageAck createMessageAck(String message_id) {
|
||||
MessageAck ack = new MessageAck();
|
||||
ack.setDestination(consumerInfo.getDestination());
|
||||
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
||||
|
@ -136,6 +136,7 @@ public class Subscription {
|
|||
count++;
|
||||
if( id.equals(message_id) ) {
|
||||
ack.setLastMessageId(md.getMessage().getMessageId());
|
||||
break;
|
||||
}
|
||||
}
|
||||
ack.setMessageCount(count);
|
||||
|
|
|
@ -96,6 +96,8 @@ public class StompTest extends CombinationTestSupport {
|
|||
if( c < 0 ) {
|
||||
throw new IOException("socket closed.");
|
||||
} else if( c == 0 ) {
|
||||
c = is.read();
|
||||
assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
|
||||
byte[] ba = inputBuffer.toByteArray();
|
||||
inputBuffer.reset();
|
||||
return new String(ba, "UTF-8");
|
||||
|
|
Loading…
Reference in New Issue