diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index c9c6928d5f..71642f81a1 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.DelayQueue; @@ -83,6 +84,7 @@ import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.transaction.Transaction; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; import org.apache.activemq.util.BrokerSupport; @@ -734,6 +736,120 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } + final ConcurrentHashMap sendSyncs = new ConcurrentHashMap(); + private volatile LinkedList orderIndexUpdates = new LinkedList(); + + // roll up all message sends + class SendSync extends Synchronization { + + class MessageContext { + public Message message; + public ConnectionContext context; + + public MessageContext(ConnectionContext context, Message message) { + this.context = context; + this.message = message; + } + } + + final Transaction transaction; + List additions = new ArrayList(); + + public SendSync(Transaction transaction) { + this.transaction = transaction; + } + + public void add(ConnectionContext context, Message message) { + additions.add(new MessageContext(context, message)); + } + + @Override + public void beforeCommit() throws Exception { + synchronized (sendLock) { + orderIndexUpdates.addLast(transaction); + } + } + + @Override + public void afterCommit() throws Exception { + LinkedList orderedWork = null; + // use existing object to sync orderIndexUpdates that can be reassigned + synchronized (sendLock) { + if (transaction == orderIndexUpdates.peek()) { + orderedWork = orderIndexUpdates; + orderIndexUpdates = new LinkedList(); + + // talking all the ordered work means that earlier + // and later threads do nothing. + // this avoids contention/race on the sendLock that + // guards the actual work. + } + } + // do the ordered work + if (orderedWork != null) { + sendLock.lockInterruptibly(); + try { + for (Transaction tx : orderedWork) { + sendSyncs.get(tx).processSend(); + } + } finally { + sendLock.unlock(); + } + for (Transaction tx : orderedWork) { + sendSyncs.get(tx).processSent(); + } + sendSyncs.remove(transaction); + } + } + + // called with sendLock + private void processSend() throws Exception { + + for (Iterator iterator = additions.iterator(); iterator.hasNext(); ) { + MessageContext messageContext = iterator.next(); + // It could take while before we receive the commit + // op, by that time the message could have expired.. + if (broker.isExpired(messageContext.message)) { + broker.messageExpired(messageContext.context, messageContext.message, null); + destinationStatistics.getExpired().increment(); + iterator.remove(); + continue; + } + sendMessage(messageContext.message); + messageContext.message.decrementReferenceCount(); + } + } + + private void processSent() throws Exception { + for (MessageContext messageContext : additions) { + messageSent(messageContext.context, messageContext.message); + } + } + + @Override + public void afterRollback() throws Exception { + try { + for (MessageContext messageContext : additions) { + messageContext.message.decrementReferenceCount(); + } + } finally { + sendSyncs.remove(transaction); + } + } + } + + // called while holding the sendLock + private void registerSendSync(Message message, ConnectionContext context) { + final Transaction transaction = context.getTransaction(); + Queue.SendSync currentSync = sendSyncs.get(transaction); + if (currentSync == null) { + currentSync = new Queue.SendSync(transaction); + transaction.addSynchronization(currentSync); + sendSyncs.put(transaction, currentSync); + } + currentSync.add(context, message); + } + void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); @@ -759,30 +875,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // our memory. This increment is decremented once the tx finishes.. message.incrementReferenceCount(); - context.getTransaction().addSynchronization(new Synchronization() { - @Override - public void afterCommit() throws Exception { - sendLock.lockInterruptibly(); - try { - // It could take while before we receive the commit - // op, by that time the message could have expired.. - if (broker.isExpired(message)) { - broker.messageExpired(context, message, null); - destinationStatistics.getExpired().increment(); - return; - } - sendMessage(message); - } finally { - sendLock.unlock(); - message.decrementReferenceCount(); - } - messageSent(context, message); - } - @Override - public void afterRollback() throws Exception { - message.decrementReferenceCount(); - } - }); + registerSendSync(message, context); } else { // Add to the pending list, this takes care of incrementing the // usage manager. diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java index 60b74f82aa..dce90e9954 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java @@ -128,7 +128,7 @@ public abstract class Transaction { @Override public String toString() { - return super.toString() + "[synchronizations=" + synchronizations + "]"; + return "Local-" + getTransactionId() + "[synchronizations=" + synchronizations + "]"; } public abstract void commit(boolean onePhase) throws XAException, IOException; diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 8fd78feda9..b75aaade79 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -43,7 +43,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; -import java.util.Stack; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; @@ -833,7 +832,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe lastRecoveryPosition = nextRecoveryPosition; metadata.lastUpdate = lastRecoveryPosition; JournalCommand message = load(lastRecoveryPosition); - process(message, lastRecoveryPosition, (Runnable)null); + process(message, lastRecoveryPosition, (Runnable)null, (Runnable)null); nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); } } finally { @@ -913,10 +912,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * the JournalMessage is used to update the index just like it would be done * during a recovery process. */ - public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException { - if (before != null) { - before.run(); - } + public Location store(JournalCommand data, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException { try { ByteSequence sequence = toByteSequence(data); @@ -927,7 +923,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe long start = System.currentTimeMillis(); location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; long start2 = System.currentTimeMillis(); - process(data, location, after); + process(data, location, before, after); long end = System.currentTimeMillis(); if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { @@ -940,18 +936,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe checkpointLock.readLock().unlock(); } if (after != null) { - Runnable afterCompletion = null; - synchronized (orderedTransactionAfters) { - if (!orderedTransactionAfters.empty()) { - afterCompletion = orderedTransactionAfters.pop(); - } - } - if (afterCompletion != null) { - afterCompletion.run(); - } else { - // non persistent message case - after.run(); - } + after.run(); } if (checkpointThread != null && !checkpointThread.isAlive()) { @@ -1004,7 +989,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe */ void process(JournalCommand data, final Location location, final Location inDoubtlocation) throws IOException { if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { - process(data, location, (Runnable) null); + process(data, location, (Runnable) null, (Runnable) null); } else { // just recover producer audit data.visit(new Visitor() { @@ -1022,7 +1007,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // from the recovery method too so they need to be idempotent // ///////////////////////////////////////////////////////////////// - void process(JournalCommand data, final Location location, final Runnable after) throws IOException { + void process(JournalCommand data, final Location location, final Runnable before, final Runnable after) throws IOException { data.visit(new Visitor() { @Override public void visit(KahaAddMessageCommand command) throws IOException { @@ -1041,7 +1026,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @Override public void visit(KahaCommitCommand command) throws IOException { - process(command, location, after); + process(command, location, before, after); } @Override @@ -1153,17 +1138,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - private final Stack orderedTransactionAfters = new Stack(); - private void push(Runnable after) { - if (after != null) { - synchronized (orderedTransactionAfters) { - orderedTransactionAfters.push(after); - } - } - } - @SuppressWarnings("rawtypes") - protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException { + protected void process(KahaCommitCommand command, Location location, final Runnable before, final Runnable after) throws IOException { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); List inflightTx; synchronized (inflightTransactions) { @@ -1173,9 +1149,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } if (inflightTx == null) { - if (after != null) { - // since we don't push this after and we may find another, lets run it now - after.run(); + // only non persistent messages in this tx + if (before != null) { + before.run(); } return; } @@ -1183,6 +1159,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe final List messagingTx = inflightTx; this.indexLock.writeLock().lock(); try { + // run before with the index lock so that queue can order cursor updates with index updates + if (before != null) { + before.run(); + } pageFile.tx().execute(new Transaction.Closure() { @Override public void execute(Transaction tx) throws IOException { @@ -1192,7 +1172,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } }); metadata.lastUpdate = location; - push(after); } finally { this.indexLock.writeLock().unlock(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java index 8015f3889e..5c7f29d257 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java @@ -145,14 +145,8 @@ public class MessageExpirationTest extends BrokerTestSupport { connection.send(closeConnectionInfo(connectionInfo2)); } - /** - * Small regression. Looks like persistent messages to a queue are not being - * timedout when in a long transaction. See: - * http://issues.apache.org/activemq/browse/AMQ-1269 Commenting out the - * DeliveryMode.PERSISTENT test combination for now. - */ public void initCombosForTestMessagesInLongTransactionExpire() { - addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)}); + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.PERSISTENT), Integer.valueOf(DeliveryMode.NON_PERSISTENT)}); addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java index ccb1bf39c1..1401b35efb 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java @@ -88,7 +88,8 @@ public class NegativeQueueTest extends AutoFailTestSupport { private static final long MEMORY_USAGE = 400000000; private static final long TEMP_USAGE = 200000000; private static final long STORE_USAGE = 1000000000; - private static final int MESSAGE_COUNT = 1100; + // ensure we exceed the cache 70% + private static final int MESSAGE_COUNT = 2100; protected static final boolean TRANSACTED = true; protected static final boolean DEBUG = true; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java new file mode 100644 index 0000000000..1126d310e3 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.HashSet; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4485Test extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(AMQ4485Test.class); + BrokerService broker; + ActiveMQConnectionFactory factory; + final int messageCount = 20; + int memoryLimit = 40 * 1024; + final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." + this.getClass().getName()); + final Vector exceptions = new Vector(); + final CountDownLatch slowSendResume = new CountDownLatch(1); + + + protected void configureBroker(long memoryLimit) throws Exception { + broker.setDeleteAllMessagesOnStartup(true); + broker.setAdvisorySupport(false); + + PolicyEntry policy = new PolicyEntry(); + policy.setExpireMessagesPeriod(0); + policy.setMemoryLimit(memoryLimit); + policy.setProducerFlowControl(false); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + + broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() { + @Override + public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception { + if (messageSend.isInTransaction() && messageSend.getProperty("NUM") != null) { + final Integer num = (Integer) messageSend.getProperty("NUM"); + if (true) { + TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class); + transactionBroker.getTransaction(producerExchange.getConnectionContext(), messageSend.getTransactionId(), false).addSynchronization( + new Synchronization() { + public void afterCommit() throws Exception { + LOG.error("AfterCommit, NUM:" + num + ", " + messageSend.getMessageId() + ", tx: " + messageSend.getTransactionId()); + if (num == 5) { + // we want to add to cursor after usage is exhausted by message 20 and when + // all other messages have been processed + LOG.error("Pausing on latch in afterCommit for: " + num + ", " + messageSend.getMessageId()); + slowSendResume.await(20, TimeUnit.SECONDS); + LOG.error("resuming on latch afterCommit for: " + num + ", " + messageSend.getMessageId()); + } else if (messageCount + 1 == num) { + LOG.error("releasing latch. " + num + ", " + messageSend.getMessageId()); + slowSendResume.countDown(); + // for message X, we need to delay so message 5 can setBatch + TimeUnit.SECONDS.sleep(5); + LOG.error("resuming afterCommit for: " + num + ", " + messageSend.getMessageId()); + } + } + }); + } + } + super.send(producerExchange, messageSend); + } + } + }); + + } + + + public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws Exception { + + Set expected = new HashSet(); + final Vector sessionVector = new Vector(); + ExecutorService executorService = Executors.newCachedThreadPool(); + for (int i = 1; i <= messageCount; i++) { + sessionVector.add(send(i, 1, true)); + expected.add(i); + } + + // get parallel commit so that the sync writes are batched + for (int i = 0; i < messageCount; i++) { + final int id = i; + executorService.submit(new Runnable() { + @Override + public void run() { + try { + sessionVector.get(id).commit(); + } catch (Exception fail) { + exceptions.add(fail); + } + } + }); + } + + final DestinationViewMBean queueViewMBean = (DestinationViewMBean) + broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], DestinationViewMBean.class, false); + + // not sure how many messages will get enqueued + TimeUnit.SECONDS.sleep(3); + if (false) + assertTrue("all " + messageCount + " on the q", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("enqueueCount: " + queueViewMBean.getEnqueueCount()); + return messageCount == queueViewMBean.getEnqueueCount(); + } + })); + + LOG.info("Big send to blow available destination usage before slow send resumes"); + send(messageCount + 1, 35*1024, true).commit(); + + + // consume and verify all received + Connection cosumerConnection = factory.createConnection(); + cosumerConnection.start(); + MessageConsumer consumer = cosumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination); + for (int i = 1; i <= messageCount + 1; i++) { + BytesMessage bytesMessage = (BytesMessage) consumer.receive(10000); + assertNotNull("Got message: " + i + ", " + expected, bytesMessage); + MessageId mqMessageId = ((ActiveMQBytesMessage) bytesMessage).getMessageId(); + LOG.info("got: " + expected + ", " + mqMessageId + ", NUM=" + ((ActiveMQBytesMessage) bytesMessage).getProperty("NUM")); + expected.remove(((ActiveMQBytesMessage) bytesMessage).getProperty("NUM")); + } + } + + private Session send(int id, int messageSize, boolean transacted) throws Exception { + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(new byte[messageSize]); + bytesMessage.setIntProperty("NUM", id); + producer.send(bytesMessage); + LOG.info("Sent:" + bytesMessage.getJMSMessageID() + " session tx: " + ((ActiveMQBytesMessage) bytesMessage).getTransactionId()); + return session; + } + + protected void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + broker.setBrokerName("thisOne"); + configureBroker(memoryLimit); + broker.start(); + factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true"); + factory.setWatchTopicAdvisories(false); + + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + broker = null; + } + } + +} \ No newline at end of file