From e0b0b6bf8930cf06c017ab0125b3acb33d586eea Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 5 Jan 2015 16:39:42 -0500 Subject: [PATCH] ACTIVEMQ6-64 Messages duplicated during ScaleDown or queue.totalIterator https://issues.apache.org/jira/browse/ACTIVEMQ6-64 The redelivery list was not isolated on the PageIterator. This is moving the redelivery list to the Iterator so we would have proper isolation of the functionality. The previous version was assuming a single instance of PageIterator, QueueImpl and PageSubscription. When we started using more than one instance of the Iterator we created this bug. --- .../core/paging/PageTransactionInfo.java | 5 +- .../core/paging/cursor/PageIterator.java | 29 ++++ .../core/paging/cursor/PageSubscription.java | 4 +- .../cursor/impl/PageSubscriptionImpl.java | 29 ++-- .../paging/impl/PageTransactionInfoImpl.java | 55 ++++-- .../integration/paging/PagingSendTest.java | 158 +++++++++++++++++- 6 files changed, 252 insertions(+), 28 deletions(-) create mode 100644 activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java index 02b6d9b832..39833c6207 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java @@ -17,6 +17,7 @@ package org.apache.activemq.core.paging; import org.apache.activemq.core.journal.EncodingSupport; +import org.apache.activemq.core.paging.cursor.PageIterator; import org.apache.activemq.core.paging.cursor.PagePosition; import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.persistence.StorageManager; @@ -61,10 +62,8 @@ public interface PageTransactionInfo extends EncodingSupport /** * This method will hold the position to be delivered later in case this transaction is pending. * If the tx is not pending, it will return false, so the caller can deliver it right away - * @param cursor - * @param cursorPos * @return true if the message will be delivered later, false if it should be delivered right away */ - boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos); + boolean deliverAfterCommit(PageIterator pageIterator, PageSubscription cursor, PagePosition cursorPos); } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java new file mode 100644 index 0000000000..a5a64c9c42 --- /dev/null +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.core.paging.cursor; + +import org.apache.activemq.utils.LinkedListIterator; + +/** + * @author clebertsuconic + */ + +public interface PageIterator extends LinkedListIterator +{ + void redeliver(PagePosition reference); +} diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java index af6a67d91a..5423658376 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java @@ -125,12 +125,12 @@ public interface PageSubscription * To be used on redeliveries * @param position */ - void redeliver(PagePosition position); + void redeliver(PageIterator iterator, PagePosition position); void printDebug(); /** - * @param minPage + * @param page * @return */ boolean isComplete(long page); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java index 39263a77ef..6482795668 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -39,6 +39,7 @@ import org.apache.activemq.core.paging.PagedMessage; import org.apache.activemq.core.paging.PagingStore; import org.apache.activemq.core.paging.cursor.PageCache; import org.apache.activemq.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.core.paging.cursor.PageIterator; import org.apache.activemq.core.paging.cursor.PagePosition; import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.paging.cursor.PageSubscriptionCounter; @@ -55,7 +56,6 @@ import org.apache.activemq.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.core.transaction.impl.TransactionImpl; import org.apache.activemq.utils.ConcurrentHashSet; import org.apache.activemq.utils.FutureLatch; -import org.apache.activemq.utils.LinkedListIterator; /** * A PageCursorImpl @@ -99,9 +99,6 @@ final class PageSubscriptionImpl implements PageSubscription private final AtomicLong deliveredCount = new AtomicLong(0); - // We only store the position for redeliveries. They will be read from the SoftCache again during delivery. - private final java.util.Queue redeliveries = new LinkedList(); - PageSubscriptionImpl(final PageCursorProvider cursorProvider, final PagingStore pageStore, final StorageManager store, @@ -378,7 +375,7 @@ final class PageSubscriptionImpl implements PageSubscription } @Override - public LinkedListIterator iterator() + public PageIterator iterator() { return new CursorIterator(); } @@ -583,12 +580,9 @@ final class PageSubscriptionImpl implements PageSubscription } @Override - public void redeliver(final PagePosition position) + public void redeliver(final PageIterator iterator, final PagePosition position) { - synchronized (redeliveries) - { - redeliveries.add(position); - } + iterator.redeliver(position); synchronized (consumedPages) { @@ -1245,7 +1239,7 @@ final class PageSubscriptionImpl implements PageSubscription } - private class CursorIterator implements LinkedListIterator + private class CursorIterator implements PageIterator { private PagePosition position = null; @@ -1255,6 +1249,9 @@ final class PageSubscriptionImpl implements PageSubscription private volatile PagedReference lastRedelivery = null; + // We only store the position for redeliveries. They will be read from the SoftCache again during delivery. + private final java.util.Queue redeliveries = new LinkedList(); + /** * next element taken on hasNext test. * it has to be delivered on next next operation @@ -1265,6 +1262,14 @@ final class PageSubscriptionImpl implements PageSubscription { } + public void redeliver(PagePosition reference) + { + synchronized (redeliveries) + { + redeliveries.add(reference); + } + } + public void repeat() { if (isredelivery) @@ -1390,7 +1395,7 @@ final class PageSubscriptionImpl implements PageSubscription } else { - if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition())) + if (tx.deliverAfterCommit(CursorIterator.this, PageSubscriptionImpl.this, message.getPosition())) { valid = false; ignored = false; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java index 0a9f81551a..80bdd86ffe 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java @@ -23,9 +23,9 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.api.core.ActiveMQBuffer; -import org.apache.activemq.api.core.Pair; import org.apache.activemq.core.paging.PageTransactionInfo; import org.apache.activemq.core.paging.PagingManager; +import org.apache.activemq.core.paging.cursor.PageIterator; import org.apache.activemq.core.paging.cursor.PagePosition; import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.persistence.StorageManager; @@ -58,7 +58,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo private final AtomicInteger numberOfPersistentMessages = new AtomicInteger(0); - private List> lateDeliveries; + private List lateDeliveries; // Static -------------------------------------------------------- @@ -146,9 +146,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo if (lateDeliveries != null) { // This is to make sure deliveries that were touched before the commit arrived will be delivered - for (Pair pos : lateDeliveries) + for (LateDelivery pos : lateDeliveries) { - pos.getA().redeliver(pos.getB()); + pos.getSubscription().redeliver(pos.getIterator(), pos.getPagePosition()); } lateDeliveries.clear(); } @@ -225,9 +225,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo if (lateDeliveries != null) { - for (Pair pos : lateDeliveries) + for (LateDelivery pos : lateDeliveries) { - pos.getA().lateDeliveryRollback(pos.getB()); + pos.getSubscription().lateDeliveryRollback(pos.getPagePosition()); } lateDeliveries = null; } @@ -245,12 +245,12 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo } @Override - public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos) + public synchronized boolean deliverAfterCommit(PageIterator iterator, PageSubscription cursor, PagePosition cursorPos) { if (committed && useRedelivery) { cursor.addPendingDelivery(cursorPos); - cursor.redeliver(cursorPos); + cursor.redeliver(iterator, cursorPos); return true; } else if (committed) @@ -267,10 +267,10 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo useRedelivery = true; if (lateDeliveries == null) { - lateDeliveries = new LinkedList>(); + lateDeliveries = new LinkedList<>(); } cursor.addPendingDelivery(cursorPos); - lateDeliveries.add(new Pair(cursor, cursorPos)); + lateDeliveries.add(new LateDelivery(cursor, cursorPos, iterator)); return true; } } @@ -283,6 +283,41 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo // Inner classes ------------------------------------------------- + /** a Message shouldn't be delivered until it's committed + * For that reason the page-refernce will be written right away + * But in certain cases we can only deliver after the commit + * For that reason we will perform a late delivery + * through the method redeliver. + */ + private static class LateDelivery + { + final PageSubscription subscription; + final PagePosition pagePosition; + final PageIterator iterator; + + public LateDelivery(PageSubscription subscription, PagePosition pagePosition, PageIterator iterator) + { + this.subscription = subscription; + this.pagePosition = pagePosition; + this.iterator = iterator; + } + + public PageSubscription getSubscription() + { + return subscription; + } + + public PagePosition getPagePosition() + { + return pagePosition; + } + + public PageIterator getIterator() + { + return iterator; + } + } + private static class UpdatePageTXOperation extends TransactionOperationAbstract { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java index b35c5ea055..5e6d9eb479 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java @@ -16,10 +16,17 @@ */ package org.apache.activemq.tests.integration.paging; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.Message; import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.client.ClientConsumer; import org.apache.activemq.api.core.client.ClientMessage; @@ -27,9 +34,14 @@ import org.apache.activemq.api.core.client.ClientProducer; import org.apache.activemq.api.core.client.ClientSession; import org.apache.activemq.api.core.client.ClientSessionFactory; import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.core.config.Configuration; +import org.apache.activemq.core.config.impl.ConfigurationImpl; import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.MessageReference; +import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.tests.util.ServiceTestBase; +import org.apache.activemq.utils.LinkedListIterator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -57,7 +69,9 @@ public class PagingSendTest extends ServiceTestBase public void setUp() throws Exception { super.setUp(); + Configuration config = new ConfigurationImpl(); server = newActiveMQServer(); + server.start(); waitForServer(server); locator = createFactory(isNetty()); @@ -224,4 +238,146 @@ public class PagingSendTest extends ServiceTestBase assertEquals(0, errors.get()); } -} \ No newline at end of file + + @Test + public void testPagingDoesNotDuplicateBatchMessages() throws Exception + { + int batchSize = 20; + + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession session = sf.createSession(false, false); + + // Create a queue + SimpleString queueAddr = new SimpleString("testQueue"); + session.createQueue(queueAddr, queueAddr, null, true); + + // Set up paging on the queue address + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setPageSizeBytes(10 * 1024); + /** This actually causes the address to start paging messages after 10 x messages with 1024 payload is sent. + Presumably due to additional meta-data, message headers etc... **/ + addressSettings.setMaxSizeBytes(16 * 1024); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + + sendMessageBatch(batchSize, session, queueAddr); + + Queue queue = server.locateQueue(queueAddr); + + checkBatchMessagesAreNotPagedTwice(queue); + + for (int i = 0; i < 10; i++) + { + // execute the same count a couple times. This is to make sure the iterators have no impact regardless + // the number of times they are called + assertEquals(batchSize, processCountThroughIterator(queue)); + } + + } + + @Test + public void testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws Exception + { + int batchSize = 20; + + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession session = sf.createSession(false, false); + + // Create a queue + SimpleString queueAddr = new SimpleString("testQueue"); + session.createQueue(queueAddr, queueAddr, null, true); + + // Set up paging on the queue address + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setPageSizeBytes(10 * 1024); + /** This actually causes the address to start paging messages after 10 x messages with 1024 payload is sent. + Presumably due to additional meta-data, message headers etc... **/ + addressSettings.setMaxSizeBytes(16 * 1024); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + + int numberOfMessages = 0; + // ensure the server is paging + while (!server.getPagingManager().getPageStore(queueAddr).isPaging()) + { + sendMessageBatch(batchSize, session, queueAddr); + numberOfMessages += batchSize; + + } + + sendMessageBatch(batchSize, session, queueAddr); + numberOfMessages += batchSize; + + Queue queue = server.locateQueue(queueAddr); + checkBatchMessagesAreNotPagedTwice(queue); + + for (int i = 0; i < 10; i++) + { + // execute the same count a couple times. This is to make sure the iterators have no impact regardless + // the number of times they are called + assertEquals(numberOfMessages, processCountThroughIterator(queue)); + } + } + + public List sendMessageBatch(int batchSize, ClientSession session, SimpleString queueAddr) throws ActiveMQException + { + List messageIds = new ArrayList(); + ClientProducer producer = session.createProducer(queueAddr); + for (int i = 0; i < batchSize; i++) + { + Message message = session.createMessage(true); + message.getBodyBuffer().writeBytes(new byte[1024]); + String id = UUID.randomUUID().toString(); + message.putStringProperty("id", id); + message.putIntProperty("seq", i); // this is to make the print-data easier to debug + messageIds.add(id); + producer.send(message); + } + session.commit(); + + return messageIds; + } + + /** + * checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested + * this allows us to test only those messages that have been sent after the address has started paging (ignoring any + * duplicates that may have happened before this point). + */ + public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception + { + LinkedListIterator pageIterator = queue.totalIterator(); + + Set messageOrderSet = new HashSet(); + + int duplicates = 0; + while (pageIterator.hasNext()) + { + MessageReference reference = pageIterator.next(); + + String id = reference.getMessage().getStringProperty("id"); + + // If add(id) returns true it means that this id was already added to this set. Hence a duplicate is found. + if (!messageOrderSet.add(id)) + { + duplicates++; + } + } + assertTrue(duplicates == 0); + } + + /** + * checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested + * this allows us to test only those messages that have been sent after the address has started paging (ignoring any + * duplicates that may have happened before this point). + */ + protected int processCountThroughIterator(Queue queue) throws Exception + { + LinkedListIterator pageIterator = queue.totalIterator(); + + int count = 0; + while (pageIterator.hasNext()) + { + MessageReference reference = pageIterator.next(); + count++; + } + return count; + } +}