mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-03-08 18:29:20 +00:00
This closes #2455
This commit is contained in:
commit
76cce9037e
@ -170,4 +170,5 @@ public interface PageSubscription {
|
|||||||
|
|
||||||
void incrementDeliveredSize(long size);
|
void incrementDeliveredSize(long size);
|
||||||
|
|
||||||
|
void removePendingDelivery(PagePosition position);
|
||||||
}
|
}
|
||||||
|
@ -28,4 +28,20 @@ public interface PagedReference extends MessageReference {
|
|||||||
boolean isLargeMessage();
|
boolean isLargeMessage();
|
||||||
|
|
||||||
long getTransactionID();
|
long getTransactionID();
|
||||||
|
|
||||||
|
/** this method affects paging clean up
|
||||||
|
It adds to the flag that prevents its page from cleanup.
|
||||||
|
It's a helper method to call the proper {@link PageSubscription#addPendingDelivery(PagePosition)}
|
||||||
|
|
||||||
|
@see PageSubscription#addPendingDelivery(PagePosition)
|
||||||
|
*/
|
||||||
|
void addPendingFlag();
|
||||||
|
|
||||||
|
/** this method affects paging clean up
|
||||||
|
It adds to the flag that prevents its page from cleanup.
|
||||||
|
It's a helper method to call the proper {@link PageSubscription#addPendingDelivery(PagePosition)}
|
||||||
|
|
||||||
|
@see PageSubscription#addPendingDelivery(PagePosition)
|
||||||
|
*/
|
||||||
|
void removePendingFlag();
|
||||||
}
|
}
|
||||||
|
@ -324,6 +324,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
|
|||||||
return transactionID;
|
return transactionID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addPendingFlag() {
|
||||||
|
subscription.addPendingDelivery(position);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removePendingFlag() {
|
||||||
|
subscription.removePendingDelivery(position);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getMessageID() {
|
public long getMessageID() {
|
||||||
if (messageID < 0) {
|
if (messageID < 0) {
|
||||||
|
@ -59,7 +59,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
|||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
final class PageSubscriptionImpl implements PageSubscription {
|
public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(PageSubscriptionImpl.class);
|
private static final Logger logger = Logger.getLogger(PageSubscriptionImpl.class);
|
||||||
|
|
||||||
@ -555,6 +555,15 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removePendingDelivery(final PagePosition position) {
|
||||||
|
PageCursorInfo info = getPageInfo(position);
|
||||||
|
|
||||||
|
if (info != null) {
|
||||||
|
info.decrementPendingTX();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void redeliver(final PageIterator iterator, final PagePosition position) {
|
public void redeliver(final PageIterator iterator, final PagePosition position) {
|
||||||
iterator.redeliver(position);
|
iterator.redeliver(position);
|
||||||
@ -780,7 +789,7 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||||||
return getPageInfo(pos.getPageNr());
|
return getPageInfo(pos.getPageNr());
|
||||||
}
|
}
|
||||||
|
|
||||||
private PageCursorInfo getPageInfo(final long pageNr) {
|
public PageCursorInfo getPageInfo(final long pageNr) {
|
||||||
synchronized (consumedPages) {
|
synchronized (consumedPages) {
|
||||||
PageCursorInfo pageInfo = consumedPages.get(pageNr);
|
PageCursorInfo pageInfo = consumedPages.get(pageNr);
|
||||||
|
|
||||||
@ -916,7 +925,7 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||||||
* This instance will be released as soon as the entire page is consumed, releasing the memory at
|
* This instance will be released as soon as the entire page is consumed, releasing the memory at
|
||||||
* that point The ref counts are increased also when a message is ignored for any reason.
|
* that point The ref counts are increased also when a message is ignored for any reason.
|
||||||
*/
|
*/
|
||||||
private final class PageCursorInfo {
|
public final class PageCursorInfo {
|
||||||
|
|
||||||
// Number of messages existent on this page
|
// Number of messages existent on this page
|
||||||
private final int numberOfMessages;
|
private final int numberOfMessages;
|
||||||
@ -934,6 +943,7 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||||||
private final boolean wasLive;
|
private final boolean wasLive;
|
||||||
|
|
||||||
// There's a pending TX to add elements on this page
|
// There's a pending TX to add elements on this page
|
||||||
|
// also can be used to prevent the page from being deleted too soon.
|
||||||
private final AtomicInteger pendingTX = new AtomicInteger(0);
|
private final AtomicInteger pendingTX = new AtomicInteger(0);
|
||||||
|
|
||||||
// There's a pending delete on the async IO pipe
|
// There's a pending delete on the async IO pipe
|
||||||
@ -1108,6 +1118,9 @@ final class PageSubscriptionImpl implements PageSubscription {
|
|||||||
return cache != null ? cache.get() : null;
|
return cache != null ? cache.get() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getPendingTx() {
|
||||||
|
return pendingTX.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class PageCursorTX extends TransactionOperationAbstract {
|
private final class PageCursorTX extends TransactionOperationAbstract {
|
||||||
|
@ -67,6 +67,9 @@ public class RefsOperation extends TransactionOperationAbstract {
|
|||||||
pagedMessagesToPostACK = new ArrayList<>();
|
pagedMessagesToPostACK = new ArrayList<>();
|
||||||
}
|
}
|
||||||
pagedMessagesToPostACK.add(ref);
|
pagedMessagesToPostACK.add(ref);
|
||||||
|
//here we do something to prevent page file
|
||||||
|
//from being deleted until the operation is done.
|
||||||
|
((PagedReference)ref).addPendingFlag();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,6 +132,12 @@ public class RefsOperation extends TransactionOperationAbstract {
|
|||||||
message.incrementRefCount();
|
message.incrementRefCount();
|
||||||
}
|
}
|
||||||
ackedTX.commit(true);
|
ackedTX.commit(true);
|
||||||
|
|
||||||
|
if (pagedMessagesToPostACK != null) {
|
||||||
|
for (MessageReference refmsg : pagedMessagesToPostACK) {
|
||||||
|
((PagedReference)refmsg).removePendingFlag();
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.failedToProcessMessageReferenceAfterRollback(e);
|
ActiveMQServerLogger.LOGGER.failedToProcessMessageReferenceAfterRollback(e);
|
||||||
}
|
}
|
||||||
@ -160,6 +169,7 @@ public class RefsOperation extends TransactionOperationAbstract {
|
|||||||
|
|
||||||
if (pagedMessagesToPostACK != null) {
|
if (pagedMessagesToPostACK != null) {
|
||||||
for (MessageReference refmsg : pagedMessagesToPostACK) {
|
for (MessageReference refmsg : pagedMessagesToPostACK) {
|
||||||
|
((PagedReference)refmsg).removePendingFlag();
|
||||||
if (((PagedReference) refmsg).isLargeMessage()) {
|
if (((PagedReference) refmsg).isLargeMessage()) {
|
||||||
decrementRefCount(refmsg);
|
decrementRefCount(refmsg);
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,9 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|||||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
@ -63,6 +66,40 @@ public class PagingReceiveTest extends ActiveMQTestBase {
|
|||||||
assertEquals(numMsgs, queue.getMessagesAdded());
|
assertEquals(numMsgs, queue.getMessagesAdded());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReceiveTx() throws Exception {
|
||||||
|
receiveAllMessagesTxAndPageCheckPendingTx();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void receiveAllMessagesTxAndPageCheckPendingTx() throws Exception {
|
||||||
|
final ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
|
ClientSession session = sf.createSession(null, null, false, true, false, false, 0);
|
||||||
|
|
||||||
|
session.start();
|
||||||
|
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||||
|
for (int i = 0; i < numMsgs; i++) {
|
||||||
|
ClientMessage message = consumer.receive(2000);
|
||||||
|
assertNotNull(message);
|
||||||
|
message.acknowledge();
|
||||||
|
}
|
||||||
|
|
||||||
|
//before committing the pendingTx should be positive.
|
||||||
|
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
|
||||||
|
long qid = server.locateQueue(ADDRESS).getID();
|
||||||
|
PageSubscription pageSub = store.getCursorProvider().getSubscription(qid);
|
||||||
|
long pageNr = store.getCurrentWritingPage();
|
||||||
|
|
||||||
|
PageSubscriptionImpl.PageCursorInfo info = ((PageSubscriptionImpl)pageSub).getPageInfo(pageNr);
|
||||||
|
|
||||||
|
System.out.println("pendingTx: " + info.getPendingTx());
|
||||||
|
|
||||||
|
//The positive pendingTx will prevent a page being removed
|
||||||
|
//before ResOperation is completed.
|
||||||
|
assertTrue(info.getPendingTx() > 0);
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Before
|
@Before
|
||||||
|
Loading…
x
Reference in New Issue
Block a user