ACTIVEMQ6-64 Messages duplicated during ScaleDown or queue.totalIterator

https://issues.apache.org/jira/browse/ACTIVEMQ6-64

The redelivery list was not isolated on the PageIterator. This is moving the
redelivery list to the Iterator so we would have proper isolation of the functionality.

The previous version was assuming a single instance of PageIterator, QueueImpl and PageSubscription.
When we started using more than one instance of the Iterator we created this bug.
This commit is contained in:
Clebert Suconic 2015-01-05 16:39:42 -05:00
parent 887743f09d
commit e0b0b6bf89
6 changed files with 252 additions and 28 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.core.paging;
import org.apache.activemq.core.journal.EncodingSupport;
import org.apache.activemq.core.paging.cursor.PageIterator;
import org.apache.activemq.core.paging.cursor.PagePosition;
import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.persistence.StorageManager;
@ -61,10 +62,8 @@ public interface PageTransactionInfo extends EncodingSupport
/**
* This method will hold the position to be delivered later in case this transaction is pending.
* If the tx is not pending, it will return false, so the caller can deliver it right away
* @param cursor
* @param cursorPos
* @return true if the message will be delivered later, false if it should be delivered right away
*/
boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos);
boolean deliverAfterCommit(PageIterator pageIterator, PageSubscription cursor, PagePosition cursorPos);
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.paging.cursor;
import org.apache.activemq.utils.LinkedListIterator;
/**
* @author clebertsuconic
*/
public interface PageIterator extends LinkedListIterator<PagedReference>
{
void redeliver(PagePosition reference);
}

View File

@ -125,12 +125,12 @@ public interface PageSubscription
* To be used on redeliveries
* @param position
*/
void redeliver(PagePosition position);
void redeliver(PageIterator iterator, PagePosition position);
void printDebug();
/**
* @param minPage
* @param page
* @return
*/
boolean isComplete(long page);

View File

@ -39,6 +39,7 @@ import org.apache.activemq.core.paging.PagedMessage;
import org.apache.activemq.core.paging.PagingStore;
import org.apache.activemq.core.paging.cursor.PageCache;
import org.apache.activemq.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.core.paging.cursor.PageIterator;
import org.apache.activemq.core.paging.cursor.PagePosition;
import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.paging.cursor.PageSubscriptionCounter;
@ -55,7 +56,6 @@ import org.apache.activemq.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.core.transaction.impl.TransactionImpl;
import org.apache.activemq.utils.ConcurrentHashSet;
import org.apache.activemq.utils.FutureLatch;
import org.apache.activemq.utils.LinkedListIterator;
/**
* A PageCursorImpl
@ -99,9 +99,6 @@ final class PageSubscriptionImpl implements PageSubscription
private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final java.util.Queue<PagePosition> redeliveries = new LinkedList<PagePosition>();
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
final StorageManager store,
@ -378,7 +375,7 @@ final class PageSubscriptionImpl implements PageSubscription
}
@Override
public LinkedListIterator<PagedReference> iterator()
public PageIterator iterator()
{
return new CursorIterator();
}
@ -583,12 +580,9 @@ final class PageSubscriptionImpl implements PageSubscription
}
@Override
public void redeliver(final PagePosition position)
public void redeliver(final PageIterator iterator, final PagePosition position)
{
synchronized (redeliveries)
{
redeliveries.add(position);
}
iterator.redeliver(position);
synchronized (consumedPages)
{
@ -1245,7 +1239,7 @@ final class PageSubscriptionImpl implements PageSubscription
}
private class CursorIterator implements LinkedListIterator<PagedReference>
private class CursorIterator implements PageIterator
{
private PagePosition position = null;
@ -1255,6 +1249,9 @@ final class PageSubscriptionImpl implements PageSubscription
private volatile PagedReference lastRedelivery = null;
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final java.util.Queue<PagePosition> redeliveries = new LinkedList<PagePosition>();
/**
* next element taken on hasNext test.
* it has to be delivered on next next operation
@ -1265,6 +1262,14 @@ final class PageSubscriptionImpl implements PageSubscription
{
}
public void redeliver(PagePosition reference)
{
synchronized (redeliveries)
{
redeliveries.add(reference);
}
}
public void repeat()
{
if (isredelivery)
@ -1390,7 +1395,7 @@ final class PageSubscriptionImpl implements PageSubscription
}
else
{
if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition()))
if (tx.deliverAfterCommit(CursorIterator.this, PageSubscriptionImpl.this, message.getPosition()))
{
valid = false;
ignored = false;

View File

@ -23,9 +23,9 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.Pair;
import org.apache.activemq.core.paging.PageTransactionInfo;
import org.apache.activemq.core.paging.PagingManager;
import org.apache.activemq.core.paging.cursor.PageIterator;
import org.apache.activemq.core.paging.cursor.PagePosition;
import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.persistence.StorageManager;
@ -58,7 +58,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
private final AtomicInteger numberOfPersistentMessages = new AtomicInteger(0);
private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
private List<LateDelivery> lateDeliveries;
// Static --------------------------------------------------------
@ -146,9 +146,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
if (lateDeliveries != null)
{
// This is to make sure deliveries that were touched before the commit arrived will be delivered
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
for (LateDelivery pos : lateDeliveries)
{
pos.getA().redeliver(pos.getB());
pos.getSubscription().redeliver(pos.getIterator(), pos.getPagePosition());
}
lateDeliveries.clear();
}
@ -225,9 +225,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
if (lateDeliveries != null)
{
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
for (LateDelivery pos : lateDeliveries)
{
pos.getA().lateDeliveryRollback(pos.getB());
pos.getSubscription().lateDeliveryRollback(pos.getPagePosition());
}
lateDeliveries = null;
}
@ -245,12 +245,12 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
}
@Override
public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos)
public synchronized boolean deliverAfterCommit(PageIterator iterator, PageSubscription cursor, PagePosition cursorPos)
{
if (committed && useRedelivery)
{
cursor.addPendingDelivery(cursorPos);
cursor.redeliver(cursorPos);
cursor.redeliver(iterator, cursorPos);
return true;
}
else if (committed)
@ -267,10 +267,10 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
useRedelivery = true;
if (lateDeliveries == null)
{
lateDeliveries = new LinkedList<Pair<PageSubscription, PagePosition>>();
lateDeliveries = new LinkedList<>();
}
cursor.addPendingDelivery(cursorPos);
lateDeliveries.add(new Pair<PageSubscription, PagePosition>(cursor, cursorPos));
lateDeliveries.add(new LateDelivery(cursor, cursorPos, iterator));
return true;
}
}
@ -283,6 +283,41 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
// Inner classes -------------------------------------------------
/** a Message shouldn't be delivered until it's committed
* For that reason the page-refernce will be written right away
* But in certain cases we can only deliver after the commit
* For that reason we will perform a late delivery
* through the method redeliver.
*/
private static class LateDelivery
{
final PageSubscription subscription;
final PagePosition pagePosition;
final PageIterator iterator;
public LateDelivery(PageSubscription subscription, PagePosition pagePosition, PageIterator iterator)
{
this.subscription = subscription;
this.pagePosition = pagePosition;
this.iterator = iterator;
}
public PageSubscription getSubscription()
{
return subscription;
}
public PagePosition getPagePosition()
{
return pagePosition;
}
public PageIterator getIterator()
{
return iterator;
}
}
private static class UpdatePageTXOperation extends TransactionOperationAbstract
{

View File

@ -16,10 +16,17 @@
*/
package org.apache.activemq.tests.integration.paging;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.Message;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ClientConsumer;
import org.apache.activemq.api.core.client.ClientMessage;
@ -27,9 +34,14 @@ import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.impl.ConfigurationImpl;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.tests.util.ServiceTestBase;
import org.apache.activemq.utils.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -57,7 +69,9 @@ public class PagingSendTest extends ServiceTestBase
public void setUp() throws Exception
{
super.setUp();
Configuration config = new ConfigurationImpl();
server = newActiveMQServer();
server.start();
waitForServer(server);
locator = createFactory(isNetty());
@ -224,4 +238,146 @@ public class PagingSendTest extends ServiceTestBase
assertEquals(0, errors.get());
}
@Test
public void testPagingDoesNotDuplicateBatchMessages() throws Exception
{
int batchSize = 20;
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false);
// Create a queue
SimpleString queueAddr = new SimpleString("testQueue");
session.createQueue(queueAddr, queueAddr, null, true);
// Set up paging on the queue address
AddressSettings addressSettings = new AddressSettings();
addressSettings.setPageSizeBytes(10 * 1024);
/** This actually causes the address to start paging messages after 10 x messages with 1024 payload is sent.
Presumably due to additional meta-data, message headers etc... **/
addressSettings.setMaxSizeBytes(16 * 1024);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
sendMessageBatch(batchSize, session, queueAddr);
Queue queue = server.locateQueue(queueAddr);
checkBatchMessagesAreNotPagedTwice(queue);
for (int i = 0; i < 10; i++)
{
// execute the same count a couple times. This is to make sure the iterators have no impact regardless
// the number of times they are called
assertEquals(batchSize, processCountThroughIterator(queue));
}
}
@Test
public void testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws Exception
{
int batchSize = 20;
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false);
// Create a queue
SimpleString queueAddr = new SimpleString("testQueue");
session.createQueue(queueAddr, queueAddr, null, true);
// Set up paging on the queue address
AddressSettings addressSettings = new AddressSettings();
addressSettings.setPageSizeBytes(10 * 1024);
/** This actually causes the address to start paging messages after 10 x messages with 1024 payload is sent.
Presumably due to additional meta-data, message headers etc... **/
addressSettings.setMaxSizeBytes(16 * 1024);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
int numberOfMessages = 0;
// ensure the server is paging
while (!server.getPagingManager().getPageStore(queueAddr).isPaging())
{
sendMessageBatch(batchSize, session, queueAddr);
numberOfMessages += batchSize;
}
sendMessageBatch(batchSize, session, queueAddr);
numberOfMessages += batchSize;
Queue queue = server.locateQueue(queueAddr);
checkBatchMessagesAreNotPagedTwice(queue);
for (int i = 0; i < 10; i++)
{
// execute the same count a couple times. This is to make sure the iterators have no impact regardless
// the number of times they are called
assertEquals(numberOfMessages, processCountThroughIterator(queue));
}
}
public List<String> sendMessageBatch(int batchSize, ClientSession session, SimpleString queueAddr) throws ActiveMQException
{
List<String> messageIds = new ArrayList<String>();
ClientProducer producer = session.createProducer(queueAddr);
for (int i = 0; i < batchSize; i++)
{
Message message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
String id = UUID.randomUUID().toString();
message.putStringProperty("id", id);
message.putIntProperty("seq", i); // this is to make the print-data easier to debug
messageIds.add(id);
producer.send(message);
}
session.commit();
return messageIds;
}
/**
* checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the address has started paging (ignoring any
* duplicates that may have happened before this point).
*/
public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception
{
LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
Set<String> messageOrderSet = new HashSet<String>();
int duplicates = 0;
while (pageIterator.hasNext())
{
MessageReference reference = pageIterator.next();
String id = reference.getMessage().getStringProperty("id");
// If add(id) returns true it means that this id was already added to this set. Hence a duplicate is found.
if (!messageOrderSet.add(id))
{
duplicates++;
}
}
assertTrue(duplicates == 0);
}
/**
* checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the address has started paging (ignoring any
* duplicates that may have happened before this point).
*/
protected int processCountThroughIterator(Queue queue) throws Exception
{
LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
int count = 0;
while (pageIterator.hasNext())
{
MessageReference reference = pageIterator.next();
count++;
}
return count;
}
}