additional fix for https://issues.apache.org/activemq/browse/AMQ-1807 - fixing test case and making it work

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@804943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-08-17 11:43:34 +00:00
parent a15e6efabd
commit b0f24f34d0
5 changed files with 25 additions and 64 deletions

View File

@ -236,7 +236,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
dequeueCounter++;
dispatched.remove(node);
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
prefetchExtension--;
}
}

View File

@ -361,11 +361,7 @@ public class ProtocolConverter {
}
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
try {
sub.onStompAbort(activemqTx);
} catch (Exception e) {
throw new ProtocolException("Transaction abort failed", false, e);
}
}
TransactionInfo tx = new TransactionInfo();

View File

@ -188,15 +188,6 @@ public class StompConnection {
}
public void abort(String transaction) throws Exception {
// discard all content on the wire before
// aborting the transaction
try {
StompFrame discarded = this.receive(100);
while (discarded != null) {
discarded = this.receive(100);
}
} catch (Exception e) {
}
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("transaction", transaction);
StompFrame frame = new StompFrame("ABORT", headers);

View File

@ -99,44 +99,21 @@ public class StompSubscription {
protocolConverter.getTransportFilter().sendToStomp(command);
}
synchronized void onStompAbort(TransactionId transactionId) throws IOException, JMSException {
//ack all unacked messages
for (MessageDispatch md : dispatchedMessage.values()) {
if (!unconsumedMessage.contains(md)) {
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setLastMessageId(md.getMessage().getMessageId());
ack.setMessageCount(1);
ack.setTransactionId(transactionId);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
unconsumedMessage.add(md);
}
}
// redeliver all unconsumed messages
for (MessageDispatch md : unconsumedMessage) {
onMessageDispatch(md);
}
synchronized void onStompAbort(TransactionId transactionId) {
unconsumedMessage.clear();
}
synchronized void onStompCommit(TransactionId transactionId) {
// ack all messages
if (!unconsumedMessage.isEmpty()) {
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
ack.setMessageCount(unconsumedMessage.size());
ack.setTransactionId(transactionId);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
// clear lists
unconsumedMessage.clear();
for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Entry)iter.next();
MessageId id = (MessageId)entry.getKey();
MessageDispatch msg = (MessageDispatch)entry.getValue();
if (unconsumedMessage.contains(msg)) {
iter.remove();
}
}
unconsumedMessage.clear();
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
@ -151,11 +128,7 @@ public class StompSubscription {
ack.setConsumerId(consumerInfo.getConsumerId());
if (ackMode == CLIENT_ACK) {
if (transactionId != null) {
ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
} else {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
}
int count = 0;
for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
@ -168,10 +141,12 @@ public class StompSubscription {
}
if (transactionId != null) {
if (!unconsumedMessage.contains(msg))
if (!unconsumedMessage.contains(msg)) {
unconsumedMessage.add(msg);
}
} else {
iter.remove();
}
count++;

View File

@ -977,24 +977,24 @@ public class StompTest extends CombinationTestSupport {
stompConnection.begin("tx2");
// Previously delivered message need to get re-acked...
stompConnection.ack(frame, "tx2");
stompConnection.ack(frame1, "tx2");
StompFrame frame3 = stompConnection.receive();
assertEquals(frame3.getBody(), "message 1");
assertEquals(frame3.getBody(), "message 3");
stompConnection.ack(frame3, "tx2");
StompFrame frame4 = stompConnection.receive();
assertEquals(frame4.getBody(), "message 2");
assertEquals(frame4.getBody(), "message 4");
stompConnection.ack(frame4, "tx2");
StompFrame frame5 = stompConnection.receive();
assertEquals(frame5.getBody(), "message 3");
stompConnection.ack(frame5, "tx2");
stompConnection.commit("tx2");
stompConnection.begin("tx3");
StompFrame frame6 = stompConnection.receive();
assertEquals(frame6.getBody(), "message 4");
stompConnection.ack(frame6, "tx3");
StompFrame frame5 = stompConnection.receive();
assertEquals(frame5.getBody(), "message 5");
stompConnection.ack(frame5, "tx3");
stompConnection.commit("tx3");
stompDisconnect();