git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@885488 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-11-30 16:47:36 +00:00
parent adb7fccda0
commit 5f6f23e3c9
3 changed files with 72 additions and 3 deletions

View File

@ -359,10 +359,14 @@ public abstract class AbstractRegion implements Region {
if (sub == null) {
sub = subscriptions.get(ack.getConsumerId());
if (sub == null) {
if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
LOG.warn("Ack for non existent subscription, ack:" + ack);
throw new IllegalArgumentException(
"The subscription does not exist: "
+ ack.getConsumerId());
} else {
return;
}
}
consumerExchange.setSubscription(sub);
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.impl.async.Location;
@ -55,8 +56,15 @@ public class AMQTxMarshaller implements Marshaller<AMQTx> {
public void writePayload(AMQTx amqtx, DataOutput dataOut) throws IOException {
amqtx.getLocation().writeExternal(dataOut);
List<AMQTxOperation> list = amqtx.getOperations();
dataOut.writeInt(list.size());
List<AMQTxOperation> ops = new ArrayList<AMQTxOperation>();
for (AMQTxOperation op : list) {
if (op.getOperationType() == op.ADD_OPERATION_TYPE) {
ops.add(op);
}
}
dataOut.writeInt(ops.size());
for (AMQTxOperation op : ops) {
op.writeExternal(wireFormat, dataOut);
}
}

View File

@ -209,6 +209,63 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertNull(m);
}
public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
for (int i = 0; i < 4; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
}
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
for (int i = 0; i < 4; i++) {
Message m = receiveMessage(connection);
assertNotNull(m);
MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
connection.request(createPrepareTransaction(connectionInfo, txid));
// restart the broker.
restartBroker();
// Setup the consumer and receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// All messages should be re-delivered.
for (int i = 0; i < 4; i++) {
Message m = receiveMessage(connection);
assertNotNull(m);
}
assertNoMessagesLeft(connection);
}
public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();