diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 99981528f0..f2df96eaf5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -310,11 +310,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong(); if (Long.compare((Long)futureOrLong, next) == 0) { setLastCachedId(SYNC_ADD, candidate); - } else { - // out of sequence, revert to sync state - if (LOG.isDebugEnabled()) { - LOG.debug("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size()); - } } } it.remove(); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index a18071b91d..4438292c45 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -656,23 +656,18 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public void setBatch(final MessageId identity) throws IOException { - final String key = identity.toProducerKey(); indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { @Override public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); - Long location = sd.messageIdIndex.get(tx, key); - if (location != null) { - Long pending = sd.orderIndex.minPendingAdd(); - if (pending != null) { - location = Math.min(location, pending-1); - } - sd.orderIndex.setBatch(tx, location); - } else { - LOG.warn("{} {} setBatch failed, location for {} not found in messageId index for {}", this, dest.getName(), identity.getFutureOrSequenceLong(), identity); + Long location = (Long) identity.getFutureOrSequenceLong(); + Long pending = sd.orderIndex.minPendingAdd(); + if (pending != null) { + location = Math.min(location, pending-1); } + sd.orderIndex.setBatch(tx, location); } }); } finally { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java new file mode 100644 index 0000000000..afcf54fbdb --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java @@ -0,0 +1,617 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +/** + Non transactional concurrent producer/consumer to single dest + */ +@RunWith(Parameterized.class) +public class AMQ5266SingleDestTest { + static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class); + String activemqURL; + BrokerService brokerService; + private EmbeddedDataSource dataSource; + + public int numDests = 1; + public int messageSize = 10*1000; + + @Parameterized.Parameter(0) + public int publisherMessagesPerThread = 1000; + + @Parameterized.Parameter(1) + public int publisherThreadCount = 20; + + @Parameterized.Parameter(2) + public int consumerThreadsPerQueue = 5; + + @Parameterized.Parameter(3) + public int destMemoryLimit = 50 * 1024; + + @Parameterized.Parameter(4) + public boolean useCache = true; + + @Parameterized.Parameter(5) + public boolean useDefaultStore = false; + + @Parameterized.Parameter(6) + public boolean optimizeDispatch = false; + + @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") + public static Iterable parameters() { + return Arrays.asList(new Object[][]{ + {1000, 80, 80, 1024*1024*5, true, true, false}, + }); + } + + public int consumerBatchSize = 25; + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + + dataSource = new EmbeddedDataSource(); + dataSource.setDatabaseName("target/derbyDb"); + dataSource.setCreateDatabase("create"); + + JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); + jdbcPersistenceAdapter.setDataSource(dataSource); + jdbcPersistenceAdapter.setUseLock(false); + + if (!useDefaultStore) { + brokerService.setPersistenceAdapter(jdbcPersistenceAdapter); + } else { + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true); + } + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setUseJmx(false); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! + defaultEntry.setMaxProducersToAudit(publisherThreadCount); + defaultEntry.setEnableAudit(true); + defaultEntry.setUseCache(useCache); + defaultEntry.setMaxPageSize(1000); + defaultEntry.setOptimizedDispatch(optimizeDispatch); + defaultEntry.setMemoryLimit(destMemoryLimit); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + + brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); + + TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + activemqURL = transportConnector.getPublishableConnectString(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + try { + dataSource.setShutdownDatabase("shutdown"); + dataSource.getConnection(); + } catch (Exception ignored) {} + } + + @Test + public void test() throws Exception { + + String activemqQueues = "activemq"; + for (int i=1;i> entry : consumer.getIDs().entrySet()) { + + List idList = entry.getValue(); + + int distinctConsumed = new TreeSet(idList).size(); + + StringBuilder sb = new StringBuilder(); + sb.append(" Queue: " + entry.getKey() + + " -> Total Messages Consumed: " + idList.size() + + ", Distinct IDs Consumed: " + distinctConsumed); + + int diff = distinctPublishedCount - distinctConsumed; + sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); + LOG.info(sb.toString()); + + assertEquals("expect to get all messages!", 0, diff); + + } + } + + public class ExportQueuePublisher { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + // Collection of distinct IDs that the publisher has published. + // After a message is published, its UUID will be written to this list for tracking. + // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. + //private Set ids = Collections.synchronizedSet(new TreeSet()); + private List ids = Collections.synchronizedList(new ArrayList()); + private List threads; + + public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + + threads = new ArrayList(); + + // Build the threads and tell them how many messages to publish + for (int i = 0; i < threadCount; i++) { + PublisherThread pt = new PublisherThread(messagesPerThread); + threads.add(pt); + } + } + + public List getIDs() { + return ids; + } + + // Kick off threads + public void start() throws Exception { + + for (PublisherThread pt : threads) { + pt.start(); + } + } + + // Wait for threads to complete. They will complete once they've published all of their messages. + public void waitForCompletion() throws Exception { + + for (PublisherThread pt : threads) { + pt.join(); + pt.close(); + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + private class PublisherThread extends Thread { + + private int count; + private QueueConnection qc; + private Session session; + private MessageProducer mp; + + private PublisherThread(int count) throws Exception { + + this.count = count; + + // Each Thread has its own Connection and Session, so no sync worries + qc = newQueueConnection(); + session = newSession(qc); + + // In our code, when publishing to multiple queues, + // we're using composite destinations like below + Queue q = new ActiveMQQueue(activemqQueues); + mp = session.createProducer(q); + } + + public void run() { + + try { + + // Loop until we've published enough messages + while (count-- > 0) { + + TextMessage tm = session.createTextMessage(getMessageText()); + String id = UUID.randomUUID().toString(); + tm.setStringProperty("KEY", id); + ids.add(id); // keep track of the key to compare against consumer + + mp.send(tm); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Called by waitForCompletion + public void close() { + + try { + mp.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + } + } + } + + } + + String messageText; + private String getMessageText() { + + if (messageText == null) { + + synchronized (this) { + + if (messageText == null) { + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i++) { + sb.append("X"); + } + messageText = sb.toString(); + } + } + } + + return messageText; + } + + + public class ExportQueueConsumer { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private final int totalToExpect; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + private String[] queues = null; + // Map of IDs that were consumed, keyed by queue name. + // We'll compare these against what was published to know if any got stuck or dropped. + private Map> idsByQueue = new HashMap>(); + private Map> threads; + + public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + this.totalToExpect = totalToExpect; + + queues = this.activemqQueues.split(","); + + for (int i = 0; i < queues.length; i++) { + queues[i] = queues[i].trim(); + } + + threads = new HashMap>(); + + // For each queue, create a list of threads and set up the list of ids + for (String q : queues) { + + List list = new ArrayList(); + + idsByQueue.put(q, Collections.synchronizedList(new ArrayList())); + + for (int i = 0; i < threadsPerQueue; i++) { + list.add(new ConsumerThread(q, batchSize)); + } + + threads.put(q, list); + } + } + + public Map> getIDs() { + return idsByQueue; + } + + // Start the threads + public void start() throws Exception { + + for (List list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.start(); + } + } + } + + // Tell the threads to stop + // Then wait for them to stop + public void shutdown() throws Exception { + + for (List list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.shutdown(); + } + } + + for (List list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.join(); + } + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + public boolean completed() { + for (List list : threads.values()) { + + for (ConsumerThread ct : list) { + + if (ct.isAlive()) { + LOG.info("thread for {} is still alive.", ct.qName); + return false; + } + } + } + return true; + } + + private class ConsumerThread extends Thread { + + private int batchSize; + private QueueConnection qc; + private Session session; + private MessageConsumer mc; + private List idList; + private boolean shutdown = false; + private String qName; + + private ConsumerThread(String queueName, int batchSize) throws Exception { + + this.batchSize = batchSize; + + // Each thread has its own connection and session + qName = queueName; + qc = newQueueConnection(); + session = newSession(qc); + Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize); + mc = session.createConsumer(q); + + idList = idsByQueue.get(queueName); + } + + public void run() { + + try { + + int count = 0; + + // Keep reading as long as it hasn't been told to shutdown + while (!shutdown) { + + if (idList.size() >= totalToExpect) { + LOG.info("Got {} for q: {}", +idList.size(), qName); + break; + } + Message m = mc.receive(4000); + + if (m != null) { + + // We received a non-null message, add the ID to our list + + idList.add(m.getStringProperty("KEY")); + + count++; + + // If we've reached our batch size, commit the batch and reset the count + + if (count == batchSize) { + count = 0; + } + } else { + + // We didn't receive anything this time, commit any current batch and reset the count + + count = 0; + + // Sleep a little before trying to read after not getting a message + + try { + if (idList.size() < totalToExpect) { + LOG.info("did not receive on {}, current count: {}", qName, idList.size()); + } + //sleep(3000); + } catch (Exception e) { + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + + // Once we exit, close everything + close(); + } + } + + public void shutdown() { + shutdown = true; + } + + public void close() { + + try { + mc.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + + } + } + } + } +}