diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index 43713e695e..faa6c1fbcf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -125,6 +125,10 @@ abstract public class AbstractMessageStore implements MessageStore { this.indexListener = indexListener; } + public IndexListener getIndexListener() { + return indexListener; + } + static { FUTURE = new InlineListenableFuture(); } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 1d368b6901..df4fcf354a 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -105,7 +105,7 @@ public interface JDBCAdapter { void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException; - void doCommitAddOp(TransactionContext c, long storeSequenceIdForMessageId) throws SQLException, IOException; + void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException; void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String subId, String subName) throws SQLException, IOException; diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 75a68c7883..923f3f1d45 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -30,6 +30,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.MessageRecoveryListener; @@ -70,7 +71,6 @@ public class JDBCMessageStore extends AbstractMessageStore { protected final JDBCPersistenceAdapter persistenceAdapter; protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1); protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); - final Set recoveredAdditions = new LinkedHashSet(); protected ActiveMQMessageAudit audit; protected final LinkedList pendingAdditions = new LinkedList(); @@ -112,6 +112,9 @@ public class JDBCMessageStore extends AbstractMessageStore { return; } + // if xaXid present - this is a prepare - so we don't yet have an outcome + final XATransactionId xaXid = context != null ? context.getXid() : null; + // Serialize the Message.. byte data[]; try { @@ -127,38 +130,44 @@ public class JDBCMessageStore extends AbstractMessageStore { synchronized (pendingAdditions) { sequenceId = persistenceAdapter.getNextSequenceId(); final long sequence = sequenceId; - pendingAdditions.add(sequence); - c.onCompletion(new Runnable() { - public void run() { - // jdbc close or jms commit - while futureOrSequenceLong==null ordered - // work will remain pending on the Queue - message.getMessageId().setFutureOrSequenceLong(sequence); - message.getMessageId().setEntryLocator(sequence); - } - }); + message.getMessageId().setEntryLocator(sequence); - if (indexListener != null) { - indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { - @Override + if (xaXid == null) { + pendingAdditions.add(sequence); + + c.onCompletion(new Runnable() { public void run() { - // cursor add complete - synchronized (pendingAdditions) { pendingAdditions.remove(sequence); } + // jdbc close or jms commit - while futureOrSequenceLong==null ordered + // work will remain pending on the Queue + message.getMessageId().setFutureOrSequenceLong(sequence); } - })); - } else { - pendingAdditions.remove(sequence); + }); + + if (indexListener != null) { + indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { + @Override + public void run() { + // cursor add complete + synchronized (pendingAdditions) { pendingAdditions.remove(sequence); } + } + })); + } else { + pendingAdditions.remove(sequence); + } } } try { adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(), - this.isPrioritizedMessages() ? message.getPriority() : 0, context != null ? context.getXid() : null); + this.isPrioritizedMessages() ? message.getPriority() : 0, xaXid); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); } finally { c.close(); } - onAdd(message, sequenceId, message.getPriority()); + if (xaXid == null) { + onAdd(message, sequenceId, message.getPriority()); + } } // jdbc commit order is random with concurrent connections - limit scan to lowest pending @@ -186,12 +195,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } } - protected void onAdd(Message message, long sequenceId, byte priority) { - if (message.getTransactionId() != null && message.getTransactionId().isXATransaction() - && lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) { - recoveredAdditions.add(sequenceId); - } - } + protected void onAdd(Message message, long sequenceId, byte priority) {} public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { // Get a connection and insert the message into the DB. @@ -329,18 +333,6 @@ public class JDBCMessageStore extends AbstractMessageStore { public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { TransactionContext c = persistenceAdapter.getTransactionContext(); try { - if (!recoveredAdditions.isEmpty()) { - for (Iterator iterator = recoveredAdditions.iterator(); iterator.hasNext(); ) { - Long sequenceId = iterator.next(); - iterator.remove(); - maxReturned--; - if (sequenceId <= lastRecoveredSequenceId.get()) { - Message msg = (Message)wireFormat.unmarshal(new ByteSequence(adapter.doGetMessageById(c, sequenceId))); - LOG.trace("recovered add {} {}", this, msg.getMessageId()); - listener.recoverMessage(msg); - } - } - } if (LOG.isTraceEnabled()) { LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId()); } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index e335926cef..4236e9d105 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -766,11 +766,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements } } - public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException { + public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException { TransactionContext c = getTransactionContext(context); try { - long sequence = (Long)messageId.getFutureOrSequenceLong(); - getAdapter().doCommitAddOp(c, sequence); + long sequence = (Long)messageId.getEntryLocator(); + getAdapter().doCommitAddOp(c, preparedSequenceId, sequence); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e); diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index 1841f11c1c..a0cb133cbd 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -316,6 +316,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName)); } + @Override protected void onAdd(Message message, long sequenceId, byte priority) { // update last recovered state for (LastRecovered last : subscriberLastRecoveredMap.values()) { @@ -329,7 +330,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess } } - public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { TransactionContext c = persistenceAdapter.getTransactionContext(); try { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index b2fedf734c..7f42c7fa78 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -27,6 +27,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.ProxyMessageStore; import org.apache.activemq.store.ProxyTopicMessageStore; @@ -90,34 +91,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { ArrayList updateFromPreparedStateCommands = new ArrayList(); for (Iterator iter = tx.messages.iterator(); iter.hasNext();) { final AddMessageCommand addMessageCommand = iter.next(); - updateFromPreparedStateCommands.add(new AddMessageCommand() { - @Override - public Message getMessage() { - return addMessageCommand.getMessage(); - } - - @Override - public MessageStore getMessageStore() { - return addMessageCommand.getMessageStore(); - } - - @Override - public void run(ConnectionContext context) throws IOException { - JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter; - Message message = addMessageCommand.getMessage(); - jdbcPersistenceAdapter.commitAdd(context, message.getMessageId()); - ((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd( - message, - (Long)message.getMessageId().getFutureOrSequenceLong(), - message.getPriority()); - - } - - @Override - public void setMessageStore(MessageStore messageStore) { - throw new RuntimeException("MessageStore already known"); - } - }); + updateFromPreparedStateCommands.add(new CommitAddOutcome(addMessageCommand)); } tx.messages = updateFromPreparedStateCommands; preparedTransactions.put(txid, tx); @@ -125,6 +99,60 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { } + class CommitAddOutcome implements AddMessageCommand { + final Message message; + JDBCMessageStore jdbcMessageStore; + + public CommitAddOutcome(JDBCMessageStore jdbcMessageStore, Message message) { + this.jdbcMessageStore = jdbcMessageStore; + this.message = message; + } + + public CommitAddOutcome(AddMessageCommand addMessageCommand) { + this((JDBCMessageStore)addMessageCommand.getMessageStore(), addMessageCommand.getMessage()); + } + + @Override + public Message getMessage() { + return message; + } + + @Override + public MessageStore getMessageStore() { + return jdbcMessageStore; + } + + @Override + public void run(final ConnectionContext context) throws IOException { + JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter; + final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator(); + TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context); + + synchronized (jdbcMessageStore.pendingAdditions) { + message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId()); + + c.onCompletion(new Runnable() { + @Override + public void run() { + message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator()); + } + }); + + if (jdbcMessageStore.getIndexListener() != null) { + jdbcMessageStore.getIndexListener().onAdd(new IndexListener.MessageContext(context, message, null)); + } + } + + jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence); + jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority()); + } + + @Override + public void setMessageStore(MessageStore messageStore) { + jdbcMessageStore = (JDBCMessageStore) messageStore; + } + } + @Override public void rollback(TransactionId txid) throws IOException { @@ -148,9 +176,9 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { if (removeMessageCommand instanceof LastAckCommand ) { ((LastAckCommand)removeMessageCommand).rollback(ctx); } else { + MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId(); // need to unset the txid flag on the existing row - ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, - removeMessageCommand.getMessageAck().getLastMessageId()); + ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator()); } } } catch (IOException e) { @@ -171,36 +199,15 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { public void recoverAdd(long id, byte[] messageBytes) throws IOException { final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes)); message.getMessageId().setFutureOrSequenceLong(id); + message.getMessageId().setEntryLocator(id); Tx tx = getPreparedTx(message.getTransactionId()); - tx.add(new AddMessageCommand() { - MessageStore messageStore; - @Override - public Message getMessage() { - return message; - } - - @Override - public MessageStore getMessageStore() { - return messageStore; - } - - @Override - public void run(ConnectionContext context) throws IOException { - ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId()); - ((JDBCMessageStore)messageStore).onAdd(message, ((Long)message.getMessageId().getFutureOrSequenceLong()).longValue(), message.getPriority()); - } - - @Override - public void setMessageStore(MessageStore messageStore) { - this.messageStore = messageStore; - } - - }); + tx.add(new CommitAddOutcome(null, message)); } public void recoverAck(long id, byte[] xid, byte[] message) throws IOException { Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message)); msg.getMessageId().setFutureOrSequenceLong(id); + msg.getMessageId().setEntryLocator(id); Tx tx = getPreparedTx(new XATransactionId(xid)); final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1); tx.add(new RemoveMessageCommand() { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java index a595f33dde..8ee3123ed2 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -617,7 +617,7 @@ public class Statements { public String getClearXidFlagStatement() { if (clearXidFlagStatement == null) { clearXidFlagStatement = "UPDATE " + getFullMessageTableName() - + " SET XID = NULL WHERE ID = ?"; + + " SET XID = NULL, ID = ? WHERE ID = ?"; } return clearXidFlagStatement; } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 970e0f8474..a94abdf97b 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -1048,12 +1048,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } @Override - public void doCommitAddOp(TransactionContext c, long sequence) throws SQLException, IOException { + public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException { PreparedStatement s = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement()); s.setLong(1, sequence); + s.setLong(2, preparedSequence); if (s.executeUpdate() != 1) { throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence); } diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 94ebc9941c..c7757b86b7 100755 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -443,7 +443,7 @@ true - target + target/ true diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java index 38e8ee7195..fb0296c4ce 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.store; +import java.io.File; import java.util.ArrayList; import java.util.concurrent.TimeUnit; import junit.framework.Test; @@ -53,9 +54,14 @@ public class RecoverExpiredMessagesTest extends BrokerRestartTestSupport { public void initCombosForTestRecovery() throws Exception { addCombinationValues("queuePendingPolicy", new PendingQueueMessageStoragePolicy[] {new FilePendingQueueMessageStoragePolicy(), new VMPendingQueueMessageStoragePolicy()}); - addCombinationValues("persistenceAdapter", new PersistenceAdapter[] {new KahaDBPersistenceAdapter(), - // need to supply the dataSource as it is used in parameter matching via the toString - new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())}); + PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[] { + new KahaDBPersistenceAdapter(), + new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat()) + }; + for (PersistenceAdapter adapter : persistenceAdapters) { + adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory())); + } + addCombinationValues("persistenceAdapter", persistenceAdapters); } public void testRecovery() throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java new file mode 100755 index 0000000000..a8739ae2da --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.concurrent.TimeUnit; +import javax.jms.JMSException; +import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import junit.framework.Test; +import org.apache.activemq.broker.BrokerRestartTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; +import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ5567Test extends BrokerRestartTestSupport { + protected static final Logger LOG = LoggerFactory.getLogger(AMQ5567Test.class); + ActiveMQQueue destination = new ActiveMQQueue("Q"); + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + broker.setPersistenceAdapter(persistenceAdapter); + } + + protected PolicyEntry getDefaultPolicy() { + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(60*1024); + return policy; + } + + public void initCombosForTestPreparedTransactionNotDispatched() throws Exception { + PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{ + new KahaDBPersistenceAdapter(), + new LevelDBPersistenceAdapter(), + new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat()) + }; + for (PersistenceAdapter adapter : persistenceAdapters) { + adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory())); + } + addCombinationValues("persistenceAdapter", persistenceAdapters); + } + + public void testPreparedTransactionNotDispatched() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("Q"); + + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + connection.send(createPrepareTransaction(connectionInfo, txid)); + + + // send another non tx, will poke dispatch + message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + + + // Since prepared but not committed.. only one should get delivered + StubConnection connectionC = createConnection(); + ConnectionInfo connectionInfoC = createConnectionInfo(); + SessionInfo sessionInfoC = createSessionInfo(connectionInfoC); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination); + connectionC.send(connectionInfoC); + connectionC.send(sessionInfoC); + connectionC.send(consumerInfo); + + Message m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got message", m); + assertNull("Got non tx message", m.getTransactionId()); + + // cannot get the prepared message till commit + assertNull(receiveMessage(connectionC)); + assertNoMessagesLeft(connectionC); + + LOG.info("commit: " + txid); + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + + m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got non null message", m); + + } + + public void initCombosForTestCursorStoreSync() throws Exception { + PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{ + new KahaDBPersistenceAdapter(), + new LevelDBPersistenceAdapter(), + new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat()) + }; + for (PersistenceAdapter adapter : persistenceAdapters) { + adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory())); + } + addCombinationValues("persistenceAdapter", persistenceAdapters); + } + + public void testCursorStoreSync() throws Exception { + + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.request(message); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + QueueViewMBean proxy = getProxyToQueueViewMBean(); + assertTrue("cache is enabled", proxy.isCacheEnabled()); + + // send another non tx, will fill cursor + String payload = new String(new byte[10*1024]); + for (int i=0; i<6; i++) { + message = createMessage(producerInfo, destination); + message.setPersistent(true); + ((TextMessage)message).setText(payload); + connection.request(message); + } + + assertTrue("cache is disabled", !proxy.isCacheEnabled()); + + StubConnection connectionC = createConnection(); + ConnectionInfo connectionInfoC = createConnectionInfo(); + SessionInfo sessionInfoC = createSessionInfo(connectionInfoC); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination); + connectionC.send(connectionInfoC); + connectionC.send(sessionInfoC); + connectionC.send(consumerInfo); + + Message m = null; + for (int i=0; i<3; i++) { + m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got message", m); + assertNull("Got non tx message", m.getTransactionId()); + connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + } + + LOG.info("commit: " + txid); + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + // consume the rest including the 2pc send in TX + + for (int i=0; i<4; i++) { + m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received[" + i + "] " + m); + assertNotNull("Got message", m); + if (i==3 ) { + assertNotNull("Got tx message", m.getTransactionId()); + } else { + assertNull("Got non tx message", m.getTransactionId()); + } + connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + } + } + + private QueueViewMBean getProxyToQueueViewMBean() + throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + + ":destinationType=Queue,destinationName=" + destination.getQueueName() + + ",type=Broker,brokerName=localhost"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, + QueueViewMBean.class, true); + return proxy; + } + + public static Test suite() { + return suite(AMQ5567Test.class); + } + +}