diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 6492e29b8a..6447a8b023 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -171,7 +171,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i if (cacheEnabled) { cacheEnabled=false; if (LOG.isDebugEnabled()) { - LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size); + LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size + + ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId()) + + " current node seqId: " + node.getMessageId().getBrokerSequenceId()); } // sync with store on disabling the cache if (lastCachedId != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 54ab84d863..cfbea6cbd4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -34,11 +34,11 @@ public interface JDBCAdapter { void doDropTables(TransactionContext c) throws SQLException, IOException; - void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException; + void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException; - void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException; + void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException; - byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException; + byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException; String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException; @@ -58,7 +58,7 @@ public interface JDBCAdapter { SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException; - long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException; + long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException; void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index da7e7176f9..7c38e773a6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -44,9 +44,10 @@ public class JDBCMessageStore extends AbstractMessageStore { protected final WireFormat wireFormat; protected final JDBCAdapter adapter; protected final JDBCPersistenceAdapter persistenceAdapter; - protected AtomicLong lastMessageId = new AtomicLong(-1); - protected ActiveMQMessageAudit audit; + protected AtomicLong lastStoreSequenceId = new AtomicLong(-1); + protected ActiveMQMessageAudit audit; + public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) { super(destination); this.persistenceAdapter = persistenceAdapter; @@ -67,6 +68,8 @@ public class JDBCMessageStore extends AbstractMessageStore { return; } + long sequenceId = persistenceAdapter.getNextSequenceId(); + // Serialize the Message.. byte data[]; try { @@ -78,8 +81,8 @@ public class JDBCMessageStore extends AbstractMessageStore { // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); - try { - adapter.doAddMessage(c, messageId, destination, data, message.getExpiration()); + try { + adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration()); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); @@ -92,7 +95,7 @@ public class JDBCMessageStore extends AbstractMessageStore { // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); try { - adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef); + adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); @@ -102,13 +105,10 @@ public class JDBCMessageStore extends AbstractMessageStore { } public Message getMessage(MessageId messageId) throws IOException { - - long id = messageId.getBrokerSequenceId(); - // Get a connection and pull the message out of the DB TransactionContext c = persistenceAdapter.getTransactionContext(); try { - byte data[] = adapter.doGetMessage(c, id); + byte data[] = adapter.doGetMessage(c, messageId); if (data == null) { return null; } @@ -143,7 +143,8 @@ public class JDBCMessageStore extends AbstractMessageStore { } public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - long seq = ack.getLastMessageId().getBrokerSequenceId(); + + long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId()); // Get a connection and remove the message from the DB TransactionContext c = persistenceAdapter.getTransactionContext(context); @@ -225,14 +226,14 @@ public class JDBCMessageStore extends AbstractMessageStore { TransactionContext c = persistenceAdapter.getTransactionContext(); try { - adapter.doRecoverNextMessages(c, destination, lastMessageId.get(), maxReturned, new JDBCMessageRecoveryListener() { + adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { if (listener.hasSpace()) { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); listener.recoverMessage(msg); - lastMessageId.set(sequenceId); + lastStoreSequenceId.set(sequenceId); return true; } return false; @@ -259,13 +260,38 @@ public class JDBCMessageStore extends AbstractMessageStore { * @see org.apache.activemq.store.MessageStore#resetBatching() */ public void resetBatching() { - lastMessageId.set(-1); + if (LOG.isDebugEnabled()) { + LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId: " + lastStoreSequenceId.get()); + } + lastStoreSequenceId.set(-1); } @Override public void setBatch(MessageId messageId) { - lastMessageId.set(messageId.getBrokerSequenceId()); + long storeSequenceId = -1; + try { + storeSequenceId = getStoreSequenceIdForMessageId(messageId); + } catch (IOException ignoredAsAlreadyLogged) { + // reset batch in effect with default -1 value + } + if (LOG.isDebugEnabled()) { + LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId + ",existing last seqId: " + lastStoreSequenceId.get()); + } + lastStoreSequenceId.set(storeSequenceId); } + private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException { + long result = -1; + TransactionContext c = persistenceAdapter.getTransactionContext(); + try { + result = adapter.getStoreSequenceId(c, messageId); + } catch (SQLException e) { + JDBCPersistenceAdapter.log("JDBC Failure: ", e); + throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); + } finally { + c.close(); + } + return result; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 5e342cade4..54ef632228 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -46,6 +46,7 @@ import org.apache.activemq.store.memory.MemoryTransactionStore; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.wireformat.WireFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -93,6 +94,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist protected boolean enableAudit=true; protected int auditRecoveryDepth = 1024; protected ActiveMQMessageAudit audit; + + protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); public JDBCPersistenceAdapter() { } @@ -152,6 +155,28 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist } } } + + public void initSequenceIdGenerator() { + TransactionContext c = null; + try { + c = getTransactionContext(); + getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { + public void messageId(MessageId id) { + audit.isDuplicate(id); + } + }); + } catch (Exception e) { + LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); + } finally { + if (c != null) { + try { + c.close(); + } catch (Throwable e) { + } + } + } + + } public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit); @@ -655,5 +680,11 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist public void setAuditRecoveryDepth(int auditRecoveryDepth) { this.auditRecoveryDepth = auditRecoveryDepth; } + + public long getNextSequenceId() { + synchronized(sequenceGenerator) { + return sequenceGenerator.getNextSequenceId(); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index 74b7079785..e6f6d70215 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -46,10 +46,10 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess } public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { - long seq = messageId.getBrokerSequenceId(); // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); try { + long seq = adapter.getStoreSequenceId(c, messageId); adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index cbbe2acc8b..6cba7f5a7f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -134,7 +134,7 @@ public class Statements { public String getFindMessageStatement() { if (findMessageStatement == null) { - findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?"; + findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND MSGID_SEQ=?"; } return findMessageStatement; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 31904c836f..e9561bcd56 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -23,11 +23,8 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; -import java.util.List; import java.util.Set; -import java.util.TreeSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; @@ -57,7 +54,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter { private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class); protected Statements statements; protected boolean batchStatments = true; - private Set lastRecoveredMessagesIds = new TreeSet(); protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { s.setBytes(index, data); @@ -167,7 +163,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } - public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data, + public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException { PreparedStatement s = c.getAddMessageStatement(); try { @@ -177,7 +173,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { c.setAddMessageStatement(s); } } - s.setLong(1, messageID.getBrokerSequenceId()); + s.setLong(1, sequence); s.setString(2, messageID.getProducerId().toString()); s.setLong(3, messageID.getProducerSequenceId()); s.setString(4, destination.getQualifiedName()); @@ -197,7 +193,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } - public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination destination, + public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException { PreparedStatement s = c.getAddMessageStatement(); try { @@ -225,7 +221,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } - public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException { + public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; try { @@ -243,12 +239,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } - public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException { + public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; try { s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); - s.setLong(1, seq); + s.setString(1, id.getProducerId().toString()); + s.setLong(2, id.getProducerSequenceId()); rs = s.executeQuery(); if (!rs.next()) { return null; diff --git a/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java b/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java index ec5b57daa5..d1bf61e9e7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java +++ b/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java @@ -67,18 +67,20 @@ public class LocalTransaction extends Transaction { setState(Transaction.FINISHED_STATE); context.getTransactions().remove(xid); - transactionStore.commit(getTransactionId(), false); + synchronized (transactionStore) { + transactionStore.commit(getTransactionId(), false); - try { - fireAfterCommit(); - } catch (Throwable e) { - // I guess this could happen. Post commit task failed - // to execute properly. - LOG.warn("POST COMMIT FAILED: ", e); - XAException xae = new XAException("POST COMMIT FAILED"); - xae.errorCode = XAException.XAER_RMERR; - xae.initCause(e); - throw xae; + try { + fireAfterCommit(); + } catch (Throwable e) { + // I guess this could happen. Post commit task failed + // to execute properly. + LOG.warn("POST COMMIT FAILED: ", e); + XAException xae = new XAException("POST COMMIT FAILED"); + xae.errorCode = XAException.XAER_RMERR; + xae.initCause(e); + throw xae; + } } } @@ -90,16 +92,18 @@ public class LocalTransaction extends Transaction { } setState(Transaction.FINISHED_STATE); context.getTransactions().remove(xid); - transactionStore.rollback(getTransactionId()); + synchronized (transactionStore) { + transactionStore.rollback(getTransactionId()); - try { - fireAfterRollback(); - } catch (Throwable e) { - LOG.warn("POST ROLLBACK FAILED: ", e); - XAException xae = new XAException("POST ROLLBACK FAILED"); - xae.errorCode = XAException.XAER_RMERR; - xae.initCause(e); - throw xae; + try { + fireAfterRollback(); + } catch (Throwable e) { + LOG.warn("POST ROLLBACK FAILED: ", e); + XAException xae = new XAException("POST ROLLBACK FAILED"); + xae.errorCode = XAException.XAER_RMERR; + xae.initCause(e); + throw xae; + } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java index d602e8c055..e91a1ee05f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java @@ -135,6 +135,7 @@ public class NegativeQueueTest extends TestCase { public void testWithNoPrefetch() throws Exception{ PREFETCH_SIZE = 1; + NUM_CONSUMERS = 20; blastAndConsume(); } @@ -192,7 +193,7 @@ public class NegativeQueueTest extends TestCase { consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1)); } - latch1.await(300000, TimeUnit.MILLISECONDS); + latch1.await(200000, TimeUnit.MILLISECONDS); if(DEBUG){ System.out.println(""); System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize()); @@ -295,6 +296,11 @@ public class NegativeQueueTest extends TestCase { PolicyEntry policy = new PolicyEntry(); policy.setMemoryLimit(QUEUE_MEMORY_LIMIT); policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy()); + + // disable the cache to be sure setBatch is the problem + // will get lots of duplicates + // policy.setUseCache(false); + PolicyMap pMap = new PolicyMap(); pMap.setDefaultEntry(policy); answer.setDestinationPolicy(pMap); diff --git a/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java index 608695b0bd..b6e37ec9be 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; @@ -33,6 +34,7 @@ import org.apache.activemq.command.MessageId; abstract public class PersistenceAdapterTestSupport extends TestCase { protected PersistenceAdapter pa; + protected BrokerService brokerService = new BrokerService(); abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception; diff --git a/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java b/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java new file mode 100644 index 0000000000..778ccc6f83 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +// https://issues.apache.org/activemq/browse/AMQ-2594 +public abstract class StoreOrderTest { + + private static final Log LOG = LogFactory.getLog(StoreOrderTest.class); + + protected BrokerService broker; + private ActiveMQConnection connection; + public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0"); + + protected abstract void setPersistentAdapter(BrokerService brokerService) throws Exception; + protected void dumpMessages() throws Exception {} + + public class TransactedSend implements Runnable { + + private CountDownLatch readyForCommit; + private CountDownLatch firstDone; + private boolean first; + private Session session; + private MessageProducer producer; + + public TransactedSend(CountDownLatch readyForCommit, + CountDownLatch firstDone, boolean b) throws Exception { + this.readyForCommit = readyForCommit; + this.firstDone = firstDone; + this.first = b; + session = connection.createSession(true, Session.SESSION_TRANSACTED); + producer = session.createProducer(destination); + } + + public void run() { + try { + if (!first) { + firstDone.await(30, TimeUnit.SECONDS); + } + producer.send(session.createTextMessage(first ? "first" : "second")); + if (first) { + firstDone.countDown(); + } + readyForCommit.countDown(); + + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected ex on run " + e); + } + } + + public void commit() throws Exception { + session.commit(); + session.close(); + } + } + + @Before + public void setup() throws Exception { + broker = createBroker(); + initConnection(); + } + + public void initConnection() throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setWatchTopicAdvisories(false); + connection.start(); + } + + @After + public void stopBroker() throws Exception { + if (connection != null) { + connection.close(); + } + if (broker != null) { + broker.stop(); + } + } + + @Test + public void validateUnorderedTxCommit() throws Exception { + + Executor executor = Executors.newCachedThreadPool(); + CountDownLatch readyForCommit = new CountDownLatch(2); + CountDownLatch firstDone = new CountDownLatch(1); + + TransactedSend first = new TransactedSend(readyForCommit, firstDone, true); + TransactedSend second = new TransactedSend(readyForCommit, firstDone, false); + executor.execute(first); + executor.execute(second); + + assertTrue("both started", readyForCommit.await(20, TimeUnit.SECONDS)); + + LOG.info("commit out of order"); + // send interleaved so sequence id at time of commit could be reversed + second.commit(); + + // force usage over the limit before second commit to flush cache + enqueueOneMessage(); + + // can get lost in the cursor as it is behind the last sequenceId that was cached + first.commit(); + + LOG.info("send/commit done.."); + + dumpMessages(); + + String received1, received2, received3 = null; + if (true) { + LOG.info("receive and rollback..."); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + received1 = receive(session); + received2 = receive(session); + received3 = receive(session); + + assertEquals("second", received1); + assertEquals("middle", received2); + assertEquals("first", received3); + + session.rollback(); + session.close(); + } + + + LOG.info("restart broker"); + stopBroker(); + broker = createRestartedBroker(); + initConnection(); + + if (true) { + LOG.info("receive and rollback after restart..."); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + received1 = receive(session); + received2 = receive(session); + received3 = receive(session); + assertEquals("second", received1); + assertEquals("middle", received2); + assertEquals("first", received3); + session.rollback(); + session.close(); + } + + LOG.info("receive and ack each message"); + received1 = receiveOne(); + received2 = receiveOne(); + received3 = receiveOne(); + + assertEquals("second", received1); + assertEquals("middle", received2); + assertEquals("first", received3); + } + + private void enqueueOneMessage() throws Exception { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(destination); + producer.send(session.createTextMessage("middle")); + session.commit(); + session.close(); + } + + + private String receiveOne() throws Exception { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + String received = receive(session); + session.commit(); + session.close(); + return received; + } + + private String receive(Session session) throws Exception { + MessageConsumer consumer = session.createConsumer(destination); + String result = null; + TextMessage message = (TextMessage) consumer.receive(5000); + if (message != null) { + LOG.info("got message: " + message); + result = message.getText(); + } + consumer.close(); + return result; + } + + protected BrokerService createBroker() throws Exception { + boolean deleteMessagesOnStartup = true; + return startBroker(deleteMessagesOnStartup); + } + + protected BrokerService createRestartedBroker() throws Exception { + boolean deleteMessagesOnStartup = false; + return startBroker(deleteMessagesOnStartup); + } + + protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exception { + BrokerService newBroker = new BrokerService(); + configureBroker(newBroker); + newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup); + newBroker.start(); + return newBroker; + } + + protected void configureBroker(BrokerService brokerService) throws Exception { + setPersistentAdapter(brokerService); + brokerService.setAdvisorySupport(false); + + PolicyMap map = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setMemoryLimit(1024*3); + defaultEntry.setCursorMemoryHighWaterMark(68); + map.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(map); + } + +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java index c7d66c90f6..1df3e0cb24 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java @@ -28,6 +28,8 @@ public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport { protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException { JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + brokerService.setPersistenceAdapter(jdbc); + jdbc.setBrokerService(brokerService); EmbeddedDataSource dataSource = new EmbeddedDataSource(); dataSource.setDatabaseName("derbyDb"); dataSource.setCreateDatabase("create"); diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java new file mode 100644 index 0000000000..062c6cfacf --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jdbc; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.Message; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.StoreOrderTest; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.derby.jdbc.EmbeddedDataSource; + +// https://issues.apache.org/activemq/browse/AMQ-2594 +public class JDBCStoreOrderTest extends StoreOrderTest { + + private static final Log LOG = LogFactory.getLog(JDBCStoreOrderTest.class); + + @Override + protected void dumpMessages() throws Exception { + WireFormat wireFormat = new OpenWireFormat(); + java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection(); + PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS"); + ResultSet result = statement.executeQuery(); + while(result.next()) { + long id = result.getLong(1); + Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2))); + LOG.error("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message); + } + statement.close(); + conn.close(); + } + + @Override + protected void setPersistentAdapter(BrokerService brokerService) + throws Exception { + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + EmbeddedDataSource dataSource = new EmbeddedDataSource(); + dataSource.setDatabaseName("derbyDb"); + dataSource.setCreateDatabase("create"); + jdbc.setDataSource(dataSource); + brokerService.setPersistenceAdapter(jdbc); + } + +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java new file mode 100644 index 0000000000..8cace613df --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.StoreOrderTest; + +// https://issues.apache.org/activemq/browse/AMQ-2594 +public class KahaDBStoreOrderTest extends StoreOrderTest { + + @Override + protected void setPersistentAdapter(BrokerService brokerService) + throws Exception { + KahaDBStore kaha = new KahaDBStore(); + File directory = new File("target/activemq-data/kahadb/storeOrder"); + kaha.setDirectory(directory); + brokerService.setPersistenceAdapter(kaha); + } +} \ No newline at end of file diff --git a/activemq-core/src/test/resources/log4j.properties b/activemq-core/src/test/resources/log4j.properties index bb20bc5e65..03001a55bf 100755 --- a/activemq-core/src/test/resources/log4j.properties +++ b/activemq-core/src/test/resources/log4j.properties @@ -21,6 +21,9 @@ log4j.rootLogger=INFO, out, stdout #log4j.logger.org.apache.activemq=DEBUG +#log4j.logger.org.apache.activemq.store.jdbc=DEBUG +log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG +log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG # CONSOLE appender not used by default log4j.appender.stdout=org.apache.log4j.ConsoleAppender