Merge branch #53

This commit is contained in:
Martyn Taylor 2015-01-06 16:57:18 +00:00
commit c9de5c763f
6 changed files with 252 additions and 28 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.core.paging;
import org.apache.activemq.core.journal.EncodingSupport;
import org.apache.activemq.core.paging.cursor.PageIterator;
import org.apache.activemq.core.paging.cursor.PagePosition;
import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.persistence.StorageManager;
@ -61,10 +62,8 @@ public interface PageTransactionInfo extends EncodingSupport
/**
* This method will hold the position to be delivered later in case this transaction is pending.
* If the tx is not pending, it will return false, so the caller can deliver it right away
* @param cursor
* @param cursorPos
* @return true if the message will be delivered later, false if it should be delivered right away
*/
boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos);
boolean deliverAfterCommit(PageIterator pageIterator, PageSubscription cursor, PagePosition cursorPos);
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.paging.cursor;
import org.apache.activemq.utils.LinkedListIterator;
/**
* @author clebertsuconic
*/
public interface PageIterator extends LinkedListIterator<PagedReference>
{
void redeliver(PagePosition reference);
}

View File

@ -125,12 +125,12 @@ public interface PageSubscription
* To be used on redeliveries
* @param position
*/
void redeliver(PagePosition position);
void redeliver(PageIterator iterator, PagePosition position);
void printDebug();
/**
* @param minPage
* @param page
* @return
*/
boolean isComplete(long page);

View File

@ -39,6 +39,7 @@ import org.apache.activemq.core.paging.PagedMessage;
import org.apache.activemq.core.paging.PagingStore;
import org.apache.activemq.core.paging.cursor.PageCache;
import org.apache.activemq.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.core.paging.cursor.PageIterator;
import org.apache.activemq.core.paging.cursor.PagePosition;
import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.paging.cursor.PageSubscriptionCounter;
@ -55,7 +56,6 @@ import org.apache.activemq.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.core.transaction.impl.TransactionImpl;
import org.apache.activemq.utils.ConcurrentHashSet;
import org.apache.activemq.utils.FutureLatch;
import org.apache.activemq.utils.LinkedListIterator;
/**
* A PageCursorImpl
@ -99,9 +99,6 @@ final class PageSubscriptionImpl implements PageSubscription
private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final java.util.Queue<PagePosition> redeliveries = new LinkedList<PagePosition>();
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
final StorageManager store,
@ -378,7 +375,7 @@ final class PageSubscriptionImpl implements PageSubscription
}
@Override
public LinkedListIterator<PagedReference> iterator()
public PageIterator iterator()
{
return new CursorIterator();
}
@ -583,12 +580,9 @@ final class PageSubscriptionImpl implements PageSubscription
}
@Override
public void redeliver(final PagePosition position)
public void redeliver(final PageIterator iterator, final PagePosition position)
{
synchronized (redeliveries)
{
redeliveries.add(position);
}
iterator.redeliver(position);
synchronized (consumedPages)
{
@ -1245,7 +1239,7 @@ final class PageSubscriptionImpl implements PageSubscription
}
private class CursorIterator implements LinkedListIterator<PagedReference>
private class CursorIterator implements PageIterator
{
private PagePosition position = null;
@ -1255,6 +1249,9 @@ final class PageSubscriptionImpl implements PageSubscription
private volatile PagedReference lastRedelivery = null;
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final java.util.Queue<PagePosition> redeliveries = new LinkedList<PagePosition>();
/**
* next element taken on hasNext test.
* it has to be delivered on next next operation
@ -1265,6 +1262,14 @@ final class PageSubscriptionImpl implements PageSubscription
{
}
public void redeliver(PagePosition reference)
{
synchronized (redeliveries)
{
redeliveries.add(reference);
}
}
public void repeat()
{
if (isredelivery)
@ -1390,7 +1395,7 @@ final class PageSubscriptionImpl implements PageSubscription
}
else
{
if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition()))
if (tx.deliverAfterCommit(CursorIterator.this, PageSubscriptionImpl.this, message.getPosition()))
{
valid = false;
ignored = false;

View File

@ -23,9 +23,9 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.Pair;
import org.apache.activemq.core.paging.PageTransactionInfo;
import org.apache.activemq.core.paging.PagingManager;
import org.apache.activemq.core.paging.cursor.PageIterator;
import org.apache.activemq.core.paging.cursor.PagePosition;
import org.apache.activemq.core.paging.cursor.PageSubscription;
import org.apache.activemq.core.persistence.StorageManager;
@ -58,7 +58,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
private final AtomicInteger numberOfPersistentMessages = new AtomicInteger(0);
private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
private List<LateDelivery> lateDeliveries;
// Static --------------------------------------------------------
@ -146,9 +146,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
if (lateDeliveries != null)
{
// This is to make sure deliveries that were touched before the commit arrived will be delivered
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
for (LateDelivery pos : lateDeliveries)
{
pos.getA().redeliver(pos.getB());
pos.getSubscription().redeliver(pos.getIterator(), pos.getPagePosition());
}
lateDeliveries.clear();
}
@ -225,9 +225,9 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
if (lateDeliveries != null)
{
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
for (LateDelivery pos : lateDeliveries)
{
pos.getA().lateDeliveryRollback(pos.getB());
pos.getSubscription().lateDeliveryRollback(pos.getPagePosition());
}
lateDeliveries = null;
}
@ -245,12 +245,12 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
}
@Override
public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos)
public synchronized boolean deliverAfterCommit(PageIterator iterator, PageSubscription cursor, PagePosition cursorPos)
{
if (committed && useRedelivery)
{
cursor.addPendingDelivery(cursorPos);
cursor.redeliver(cursorPos);
cursor.redeliver(iterator, cursorPos);
return true;
}
else if (committed)
@ -267,10 +267,10 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
useRedelivery = true;
if (lateDeliveries == null)
{
lateDeliveries = new LinkedList<Pair<PageSubscription, PagePosition>>();
lateDeliveries = new LinkedList<>();
}
cursor.addPendingDelivery(cursorPos);
lateDeliveries.add(new Pair<PageSubscription, PagePosition>(cursor, cursorPos));
lateDeliveries.add(new LateDelivery(cursor, cursorPos, iterator));
return true;
}
}
@ -283,6 +283,41 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo
// Inner classes -------------------------------------------------
/** a Message shouldn't be delivered until it's committed
* For that reason the page-refernce will be written right away
* But in certain cases we can only deliver after the commit
* For that reason we will perform a late delivery
* through the method redeliver.
*/
private static class LateDelivery
{
final PageSubscription subscription;
final PagePosition pagePosition;
final PageIterator iterator;
public LateDelivery(PageSubscription subscription, PagePosition pagePosition, PageIterator iterator)
{
this.subscription = subscription;
this.pagePosition = pagePosition;
this.iterator = iterator;
}
public PageSubscription getSubscription()
{
return subscription;
}
public PagePosition getPagePosition()
{
return pagePosition;
}
public PageIterator getIterator()
{
return iterator;
}
}
private static class UpdatePageTXOperation extends TransactionOperationAbstract
{

View File

@ -16,10 +16,17 @@
*/
package org.apache.activemq.tests.integration.paging;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.Message;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ClientConsumer;
import org.apache.activemq.api.core.client.ClientMessage;
@ -27,9 +34,14 @@ import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.impl.ConfigurationImpl;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.tests.util.ServiceTestBase;
import org.apache.activemq.utils.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -57,7 +69,9 @@ public class PagingSendTest extends ServiceTestBase
public void setUp() throws Exception
{
super.setUp();
Configuration config = new ConfigurationImpl();
server = newActiveMQServer();
server.start();
waitForServer(server);
locator = createFactory(isNetty());
@ -224,4 +238,146 @@ public class PagingSendTest extends ServiceTestBase
assertEquals(0, errors.get());
}
}
@Test
public void testPagingDoesNotDuplicateBatchMessages() throws Exception
{
int batchSize = 20;
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false);
// Create a queue
SimpleString queueAddr = new SimpleString("testQueue");
session.createQueue(queueAddr, queueAddr, null, true);
// Set up paging on the queue address
AddressSettings addressSettings = new AddressSettings();
addressSettings.setPageSizeBytes(10 * 1024);
/** This actually causes the address to start paging messages after 10 x messages with 1024 payload is sent.
Presumably due to additional meta-data, message headers etc... **/
addressSettings.setMaxSizeBytes(16 * 1024);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
sendMessageBatch(batchSize, session, queueAddr);
Queue queue = server.locateQueue(queueAddr);
checkBatchMessagesAreNotPagedTwice(queue);
for (int i = 0; i < 10; i++)
{
// execute the same count a couple times. This is to make sure the iterators have no impact regardless
// the number of times they are called
assertEquals(batchSize, processCountThroughIterator(queue));
}
}
@Test
public void testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws Exception
{
int batchSize = 20;
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false);
// Create a queue
SimpleString queueAddr = new SimpleString("testQueue");
session.createQueue(queueAddr, queueAddr, null, true);
// Set up paging on the queue address
AddressSettings addressSettings = new AddressSettings();
addressSettings.setPageSizeBytes(10 * 1024);
/** This actually causes the address to start paging messages after 10 x messages with 1024 payload is sent.
Presumably due to additional meta-data, message headers etc... **/
addressSettings.setMaxSizeBytes(16 * 1024);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
int numberOfMessages = 0;
// ensure the server is paging
while (!server.getPagingManager().getPageStore(queueAddr).isPaging())
{
sendMessageBatch(batchSize, session, queueAddr);
numberOfMessages += batchSize;
}
sendMessageBatch(batchSize, session, queueAddr);
numberOfMessages += batchSize;
Queue queue = server.locateQueue(queueAddr);
checkBatchMessagesAreNotPagedTwice(queue);
for (int i = 0; i < 10; i++)
{
// execute the same count a couple times. This is to make sure the iterators have no impact regardless
// the number of times they are called
assertEquals(numberOfMessages, processCountThroughIterator(queue));
}
}
public List<String> sendMessageBatch(int batchSize, ClientSession session, SimpleString queueAddr) throws ActiveMQException
{
List<String> messageIds = new ArrayList<String>();
ClientProducer producer = session.createProducer(queueAddr);
for (int i = 0; i < batchSize; i++)
{
Message message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
String id = UUID.randomUUID().toString();
message.putStringProperty("id", id);
message.putIntProperty("seq", i); // this is to make the print-data easier to debug
messageIds.add(id);
producer.send(message);
}
session.commit();
return messageIds;
}
/**
* checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the address has started paging (ignoring any
* duplicates that may have happened before this point).
*/
public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception
{
LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
Set<String> messageOrderSet = new HashSet<String>();
int duplicates = 0;
while (pageIterator.hasNext())
{
MessageReference reference = pageIterator.next();
String id = reference.getMessage().getStringProperty("id");
// If add(id) returns true it means that this id was already added to this set. Hence a duplicate is found.
if (!messageOrderSet.add(id))
{
duplicates++;
}
}
assertTrue(duplicates == 0);
}
/**
* checks that there are no message duplicates in the page. Any IDs found in the ignoreIds field will not be tested
* this allows us to test only those messages that have been sent after the address has started paging (ignoring any
* duplicates that may have happened before this point).
*/
protected int processCountThroughIterator(Queue queue) throws Exception
{
LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
int count = 0;
while (pageIterator.hasNext())
{
MessageReference reference = pageIterator.next();
count++;
}
return count;
}
}