ARTEMIS-2197 Page deleted before transaction finishes

When a receiving transaction is committed in a paging situation,
if a page happens to be completed and it will be deleted in a
transaction operation (PageCursorTx). The other tx operation
RefsOperation needs to access the page (in PageCache) to finish
its job. There is a chance that the PageCursorTx removes the
page before RefsOperation and it will cause the RefsOperation
failed to find a message in a page.
This commit is contained in:
Howard Gao 2018-12-10 13:35:04 +08:00 committed by Clebert Suconic
parent f5b509b962
commit b36dc37c15
6 changed files with 90 additions and 3 deletions

View File

@ -170,4 +170,5 @@ public interface PageSubscription {
void incrementDeliveredSize(long size);
void removePendingDelivery(PagePosition position);
}

View File

@ -28,4 +28,20 @@ public interface PagedReference extends MessageReference {
boolean isLargeMessage();
long getTransactionID();
/** this method affects paging clean up
It adds to the flag that prevents its page from cleanup.
It's a helper method to call the proper {@link PageSubscription#addPendingDelivery(PagePosition)}
@see PageSubscription#addPendingDelivery(PagePosition)
*/
void addPendingFlag();
/** this method affects paging clean up
It adds to the flag that prevents its page from cleanup.
It's a helper method to call the proper {@link PageSubscription#addPendingDelivery(PagePosition)}
@see PageSubscription#addPendingDelivery(PagePosition)
*/
void removePendingFlag();
}

View File

@ -324,6 +324,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
return transactionID;
}
@Override
public void addPendingFlag() {
subscription.addPendingDelivery(position);
}
@Override
public void removePendingFlag() {
subscription.removePendingDelivery(position);
}
@Override
public long getMessageID() {
if (messageID < 0) {

View File

@ -59,7 +59,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
final class PageSubscriptionImpl implements PageSubscription {
public final class PageSubscriptionImpl implements PageSubscription {
private static final Logger logger = Logger.getLogger(PageSubscriptionImpl.class);
@ -555,6 +555,15 @@ final class PageSubscriptionImpl implements PageSubscription {
}
}
@Override
public void removePendingDelivery(final PagePosition position) {
PageCursorInfo info = getPageInfo(position);
if (info != null) {
info.decrementPendingTX();
}
}
@Override
public void redeliver(final PageIterator iterator, final PagePosition position) {
iterator.redeliver(position);
@ -780,7 +789,7 @@ final class PageSubscriptionImpl implements PageSubscription {
return getPageInfo(pos.getPageNr());
}
private PageCursorInfo getPageInfo(final long pageNr) {
public PageCursorInfo getPageInfo(final long pageNr) {
synchronized (consumedPages) {
PageCursorInfo pageInfo = consumedPages.get(pageNr);
@ -916,7 +925,7 @@ final class PageSubscriptionImpl implements PageSubscription {
* This instance will be released as soon as the entire page is consumed, releasing the memory at
* that point The ref counts are increased also when a message is ignored for any reason.
*/
private final class PageCursorInfo {
public final class PageCursorInfo {
// Number of messages existent on this page
private final int numberOfMessages;
@ -934,6 +943,7 @@ final class PageSubscriptionImpl implements PageSubscription {
private final boolean wasLive;
// There's a pending TX to add elements on this page
// also can be used to prevent the page from being deleted too soon.
private final AtomicInteger pendingTX = new AtomicInteger(0);
// There's a pending delete on the async IO pipe
@ -1108,6 +1118,9 @@ final class PageSubscriptionImpl implements PageSubscription {
return cache != null ? cache.get() : null;
}
public int getPendingTx() {
return pendingTX.get();
}
}
private final class PageCursorTX extends TransactionOperationAbstract {

View File

@ -67,6 +67,9 @@ public class RefsOperation extends TransactionOperationAbstract {
pagedMessagesToPostACK = new ArrayList<>();
}
pagedMessagesToPostACK.add(ref);
//here we do something to prevent page file
//from being deleted until the operation is done.
((PagedReference)ref).addPendingFlag();
}
}
@ -129,6 +132,12 @@ public class RefsOperation extends TransactionOperationAbstract {
message.incrementRefCount();
}
ackedTX.commit(true);
if (pagedMessagesToPostACK != null) {
for (MessageReference refmsg : pagedMessagesToPostACK) {
((PagedReference)refmsg).removePendingFlag();
}
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToProcessMessageReferenceAfterRollback(e);
}
@ -160,6 +169,7 @@ public class RefsOperation extends TransactionOperationAbstract {
if (pagedMessagesToPostACK != null) {
for (MessageReference refmsg : pagedMessagesToPostACK) {
((PagedReference)refmsg).removePendingFlag();
if (((PagedReference) refmsg).isLargeMessage()) {
decrementRefCount(refmsg);
}

View File

@ -23,6 +23,9 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -63,6 +66,40 @@ public class PagingReceiveTest extends ActiveMQTestBase {
assertEquals(numMsgs, queue.getMessagesAdded());
}
@Test
public void testReceiveTx() throws Exception {
receiveAllMessagesTxAndPageCheckPendingTx();
}
private void receiveAllMessagesTxAndPageCheckPendingTx() throws Exception {
final ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, false, false, 0);
session.start();
ClientConsumer consumer = session.createConsumer(ADDRESS);
for (int i = 0; i < numMsgs; i++) {
ClientMessage message = consumer.receive(2000);
assertNotNull(message);
message.acknowledge();
}
//before committing the pendingTx should be positive.
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
long qid = server.locateQueue(ADDRESS).getID();
PageSubscription pageSub = store.getCursorProvider().getSubscription(qid);
long pageNr = store.getCurrentWritingPage();
PageSubscriptionImpl.PageCursorInfo info = ((PageSubscriptionImpl)pageSub).getPageInfo(pageNr);
System.out.println("pendingTx: " + info.getPendingTx());
//The positive pendingTx will prevent a page being removed
//before ResOperation is completed.
assertTrue(info.getPendingTx() > 0);
session.commit();
session.close();
}
@Override
@Before