diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java index 02b6d9b832..39833c6207 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/PageTransactionInfo.java @@ -17,6 +17,7 @@ package org.apache.activemq.core.paging; import org.apache.activemq.core.journal.EncodingSupport; +import org.apache.activemq.core.paging.cursor.PageIterator; import org.apache.activemq.core.paging.cursor.PagePosition; import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.persistence.StorageManager; @@ -61,10 +62,8 @@ public interface PageTransactionInfo extends EncodingSupport /** * This method will hold the position to be delivered later in case this transaction is pending. * If the tx is not pending, it will return false, so the caller can deliver it right away - * @param cursor - * @param cursorPos * @return true if the message will be delivered later, false if it should be delivered right away */ - boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos); + boolean deliverAfterCommit(PageIterator pageIterator, PageSubscription cursor, PagePosition cursorPos); } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java new file mode 100644 index 0000000000..a5a64c9c42 --- /dev/null +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageIterator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.core.paging.cursor; + +import org.apache.activemq.utils.LinkedListIterator; + +/** + * @author clebertsuconic + */ + +public interface PageIterator extends LinkedListIterator +{ + void redeliver(PagePosition reference); +} diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java index af6a67d91a..5423658376 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/PageSubscription.java @@ -125,12 +125,12 @@ public interface PageSubscription * To be used on redeliveries * @param position */ - void redeliver(PagePosition position); + void redeliver(PageIterator iterator, PagePosition position); void printDebug(); /** - * @param minPage + * @param page * @return */ boolean isComplete(long page); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java index 39263a77ef..6482795668 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -39,6 +39,7 @@ import org.apache.activemq.core.paging.PagedMessage; import org.apache.activemq.core.paging.PagingStore; import org.apache.activemq.core.paging.cursor.PageCache; import org.apache.activemq.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.core.paging.cursor.PageIterator; import org.apache.activemq.core.paging.cursor.PagePosition; import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.paging.cursor.PageSubscriptionCounter; @@ -55,7 +56,6 @@ import org.apache.activemq.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.core.transaction.impl.TransactionImpl; import org.apache.activemq.utils.ConcurrentHashSet; import org.apache.activemq.utils.FutureLatch; -import org.apache.activemq.utils.LinkedListIterator; /** * A PageCursorImpl @@ -99,9 +99,6 @@ final class PageSubscriptionImpl implements PageSubscription private final AtomicLong deliveredCount = new AtomicLong(0); - // We only store the position for redeliveries. They will be read from the SoftCache again during delivery. - private final java.util.Queue redeliveries = new LinkedList(); - PageSubscriptionImpl(final PageCursorProvider cursorProvider, final PagingStore pageStore, final StorageManager store, @@ -378,7 +375,7 @@ final class PageSubscriptionImpl implements PageSubscription } @Override - public LinkedListIterator iterator() + public PageIterator iterator() { return new CursorIterator(); } @@ -583,12 +580,9 @@ final class PageSubscriptionImpl implements PageSubscription } @Override - public void redeliver(final PagePosition position) + public void redeliver(final PageIterator iterator, final PagePosition position) { - synchronized (redeliveries) - { - redeliveries.add(position); - } + iterator.redeliver(position); synchronized (consumedPages) { @@ -1245,7 +1239,7 @@ final class PageSubscriptionImpl implements PageSubscription } - private class CursorIterator implements LinkedListIterator + private class CursorIterator implements PageIterator { private PagePosition position = null; @@ -1255,6 +1249,9 @@ final class PageSubscriptionImpl implements PageSubscription private volatile PagedReference lastRedelivery = null; + // We only store the position for redeliveries. They will be read from the SoftCache again during delivery. + private final java.util.Queue redeliveries = new LinkedList(); + /** * next element taken on hasNext test. * it has to be delivered on next next operation @@ -1265,6 +1262,14 @@ final class PageSubscriptionImpl implements PageSubscription { } + public void redeliver(PagePosition reference) + { + synchronized (redeliveries) + { + redeliveries.add(reference); + } + } + public void repeat() { if (isredelivery) @@ -1390,7 +1395,7 @@ final class PageSubscriptionImpl implements PageSubscription } else { - if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition())) + if (tx.deliverAfterCommit(CursorIterator.this, PageSubscriptionImpl.this, message.getPosition())) { valid = false; ignored = false; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java index 0a9f81551a..80bdd86ffe 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/impl/PageTransactionInfoImpl.java @@ -23,9 +23,9 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.api.core.ActiveMQBuffer; -import org.apache.activemq.api.core.Pair; import org.apache.activemq.core.paging.PageTransactionInfo; import org.apache.activemq.core.paging.PagingManager; +import org.apache.activemq.core.paging.cursor.PageIterator; import org.apache.activemq.core.paging.cursor.PagePosition; import org.apache.activemq.core.paging.cursor.PageSubscription; import org.apache.activemq.core.persistence.StorageManager; @@ -58,7 +58,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo private final AtomicInteger numberOfPersistentMessages = new AtomicInteger(0); - private List> lateDeliveries; + private List lateDeliveries; // Static -------------------------------------------------------- @@ -146,9 +146,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo if (lateDeliveries != null) { // This is to make sure deliveries that were touched before the commit arrived will be delivered - for (Pair pos : lateDeliveries) + for (LateDelivery pos : lateDeliveries) { - pos.getA().redeliver(pos.getB()); + pos.getSubscription().redeliver(pos.getIterator(), pos.getPagePosition()); } lateDeliveries.clear(); } @@ -225,9 +225,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo if (lateDeliveries != null) { - for (Pair pos : lateDeliveries) + for (LateDelivery pos : lateDeliveries) { - pos.getA().lateDeliveryRollback(pos.getB()); + pos.getSubscription().lateDeliveryRollback(pos.getPagePosition()); } lateDeliveries = null; } @@ -245,12 +245,12 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo } @Override - public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos) + public synchronized boolean deliverAfterCommit(PageIterator iterator, PageSubscription cursor, PagePosition cursorPos) { if (committed && useRedelivery) { cursor.addPendingDelivery(cursorPos); - cursor.redeliver(cursorPos); + cursor.redeliver(iterator, cursorPos); return true; } else if (committed) @@ -267,10 +267,10 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo useRedelivery = true; if (lateDeliveries == null) { - lateDeliveries = new LinkedList>(); + lateDeliveries = new LinkedList<>(); } cursor.addPendingDelivery(cursorPos); - lateDeliveries.add(new Pair(cursor, cursorPos)); + lateDeliveries.add(new LateDelivery(cursor, cursorPos, iterator)); return true; } } @@ -283,6 +283,41 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo // Inner classes ------------------------------------------------- + /** a Message shouldn't be delivered until it's committed + * For that reason the page-refernce will be written right away + * But in certain cases we can only deliver after the commit + * For that reason we will perform a late delivery + * through the method redeliver. + */ + private static class LateDelivery + { + final PageSubscription subscription; + final PagePosition pagePosition; + final PageIterator iterator; + + public LateDelivery(PageSubscription subscription, PagePosition pagePosition, PageIterator iterator) + { + this.subscription = subscription; + this.pagePosition = pagePosition; + this.iterator = iterator; + } + + public PageSubscription getSubscription() + { + return subscription; + } + + public PagePosition getPagePosition() + { + return pagePosition; + } + + public PageIterator getIterator() + { + return iterator; + } + } + private static class UpdatePageTXOperation extends TransactionOperationAbstract { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java index b35c5ea055..5e6d9eb479 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/paging/PagingSendTest.java @@ -16,10 +16,17 @@ */ package org.apache.activemq.tests.integration.paging; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.Message; import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.client.ClientConsumer; import org.apache.activemq.api.core.client.ClientMessage; @@ -27,9 +34,14 @@ import org.apache.activemq.api.core.client.ClientProducer; import org.apache.activemq.api.core.client.ClientSession; import org.apache.activemq.api.core.client.ClientSessionFactory; import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.core.config.Configuration; +import org.apache.activemq.core.config.impl.ConfigurationImpl; import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.MessageReference; +import org.apache.activemq.core.server.Queue; import org.apache.activemq.core.settings.impl.AddressSettings; import org.apache.activemq.tests.util.ServiceTestBase; +import org.apache.activemq.utils.LinkedListIterator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -57,7 +69,9 @@ public class PagingSendTest extends ServiceTestBase public void setUp() throws Exception { super.setUp(); + Configuration config = new ConfigurationImpl(); server = newActiveMQServer(); + server.start(); waitForServer(server); locator = createFactory(isNetty()); @@ -224,4 +238,146 @@ public class PagingSendTest extends ServiceTestBase assertEquals(0, errors.get()); } -} \ No newline at end of file + + @Test + public void testPagingDoesNotDuplicateBatchMessages() throws Exception + { + int batchSize = 20; + + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession session = sf.createSession(false, false); + + // Create a queue + SimpleString queueAddr = new SimpleString("testQueue"); + session.createQueue(queueAddr, queueAddr, null, true); + + // Set up paging on the queue address + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setPageSizeBytes(10 * 1024); + /** This actually causes the address to start paging messages after 10 x messages with 1024 payload is sent. + Presumably due to additional meta-data, message headers etc... **/ + addressSettings.setMaxSizeBytes(16 * 1024); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + + sendMessageBatch(batchSize, session, queueAddr); + + Queue queue = server.locateQueue(queueAddr); + + checkBatchMessagesAreNotPagedTwice(queue); + + for (int i = 0; i < 10; i++) + { + // execute the same count a couple times. This is to make sure the iterators have no impact regardless + // the number of times they are called + assertEquals(batchSize, processCountThroughIterator(queue)); + } + + } + + @Test + public void testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws Exception + { + int batchSize = 20; + + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession session = sf.createSession(false, false); + + // Create a queue + SimpleString queueAddr = new SimpleString("testQueue"); + session.createQueue(queueAddr, queueAddr, null, true); + + // Set up paging on the queue address + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setPageSizeBytes(10 * 1024); + /** This actually causes the address to start paging messages after 10 x messages with 1024 payload is sent. + Presumably due to additional meta-data, message headers etc... **/ + addressSettings.setMaxSizeBytes(16 * 1024); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + + int numberOfMessages = 0; + // ensure the server is paging + while (!server.getPagingManager().getPageStore(queueAddr).isPaging()) + { + sendMessageBatch(batchSize, session, queueAddr); + numberOfMessages += batchSize; + + } + + sendMessageBatch(batchSize, session, queueAddr); + numberOfMessages += batchSize; + + Queue queue = server.locateQueue(queueAddr); + checkBatchMessagesAreNotPagedTwice(queue); + + for (int i = 0; i < 10; i++) + { + // execute the same count a couple times. This is to make sure the iterators have no impact regardless + // the number of times they are called + assertEquals(numberOfMessages, processCountThroughIterator(queue)); + } + } + + public List sendMessageBatch(int batchSize, ClientSession session, SimpleString queueAddr) throws ActiveMQException + { + List messageIds = new ArrayList(); + ClientProducer producer = session.createProducer(queueAddr); + for (int i = 0; i < batchSize; i++) + { + Message message = session.createMessage(true); + message.getBodyBuffer().writeBytes(new byte[1024]); + String id = UUID.randomUUID().toString(); + message.putStringProperty("id", id); + message.putIntProperty("seq", i); // this is to make the print-data easier to debug + messageIds.add(id); + producer.send(message); + } + session.commit(); + + return messageIds; + } + + /** + * checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested + * this allows us to test only those messages that have been sent after the address has started paging (ignoring any + * duplicates that may have happened before this point). + */ + public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception + { + LinkedListIterator pageIterator = queue.totalIterator(); + + Set messageOrderSet = new HashSet(); + + int duplicates = 0; + while (pageIterator.hasNext()) + { + MessageReference reference = pageIterator.next(); + + String id = reference.getMessage().getStringProperty("id"); + + // If add(id) returns true it means that this id was already added to this set. Hence a duplicate is found. + if (!messageOrderSet.add(id)) + { + duplicates++; + } + } + assertTrue(duplicates == 0); + } + + /** + * checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested + * this allows us to test only those messages that have been sent after the address has started paging (ignoring any + * duplicates that may have happened before this point). + */ + protected int processCountThroughIterator(Queue queue) throws Exception + { + LinkedListIterator pageIterator = queue.totalIterator(); + + int count = 0; + while (pageIterator.hasNext()) + { + MessageReference reference = pageIterator.next(); + count++; + } + return count; + } +}