ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX (fix)

This commit is contained in:
Clebert Suconic 2018-01-26 23:05:52 -05:00
parent 59d2ac53ff
commit c10b74412a
14 changed files with 131 additions and 52 deletions

View File

@ -51,7 +51,9 @@ public interface PageTransactionInfo extends EncodingSupport {
int increment) throws Exception;
// To be used after the update was stored or reload
void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
boolean onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
boolean checkSize(StorageManager storageManager, PagingManager pagingManager);
void increment(int durableSize, int nonDurableSize);

View File

@ -89,16 +89,30 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
}
@Override
public void onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) {
int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
if (sizeAfterUpdate == 0 && storageManager != null) {
try {
storageManager.deletePageTransactional(this.recordID);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
}
public boolean onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) {
int afterUpdate = numberOfMessages.addAndGet(-update);
return internalCheckSize(storageManager, pagingManager, afterUpdate);
}
pagingManager.removeTransaction(this.transactionID);
@Override
public boolean checkSize(StorageManager storageManager, PagingManager pagingManager) {
return internalCheckSize(storageManager, pagingManager, numberOfMessages.get());
}
public boolean internalCheckSize(StorageManager storageManager, PagingManager pagingManager, int size) {
if (size <= 0) {
if (storageManager != null) {
try {
storageManager.deletePageTransactional(this.recordID);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
}
pagingManager.removeTransaction(this.transactionID);
}
return false;
} else {
return true;
}
}

View File

@ -839,6 +839,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
Set<PageTransactionInfo> invalidPageTransactions = null;
Map<Long, Message> messages = new HashMap<>();
readLock();
try {
@ -971,6 +973,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
break;
}
case JournalRecordIds.PAGE_TRANSACTION: {
PageTransactionInfo invalidPGTx = null;
if (record.isUpdate) {
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
@ -981,7 +984,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (pageTX == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindPageTX(pageUpdate.pageTX);
} else {
pageTX.onUpdate(pageUpdate.recods, null, null);
if (!pageTX.onUpdate(pageUpdate.recods, null, null)) {
invalidPGTx = pageTX;
}
}
} else {
PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
@ -991,6 +996,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
pageTransactionInfo.setRecordID(record.id);
pagingManager.addTransaction(pageTransactionInfo);
if (!pageTransactionInfo.checkSize(null, null)) {
invalidPGTx = pageTransactionInfo;
}
}
if (invalidPGTx != null) {
if (invalidPageTransactions == null) {
invalidPageTransactions = new HashSet<>();
}
invalidPageTransactions.add(invalidPGTx);
}
break;
@ -1170,6 +1186,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap);
checkInvalidPageTransactions(pagingManager, invalidPageTransactions);
journalLoaded = true;
return info;
} finally {
@ -1177,6 +1196,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
public void checkInvalidPageTransactions(PagingManager pagingManager,
Set<PageTransactionInfo> invalidPageTransactions) {
if (invalidPageTransactions != null) {
for (PageTransactionInfo pginfo : invalidPageTransactions) {
pginfo.checkSize(this, pagingManager);
}
}
}
/**
* @param queueID
* @param pageSubscriptions

View File

@ -1464,7 +1464,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
for (MessageReference ref : refs) {
Message message = ref.getMessage();
if (message.isDurable() && ref.getQueue().isDurable()) {
if (message.isDurable() && ref.getQueue().isDurableMessage()) {
message.decrementDurableRefCount();
}

View File

@ -73,6 +73,34 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
*/
public interface ActiveMQServer extends ServiceComponent {
enum SERVER_STATE {
/**
* start() has been called but components are not initialized. The whole point of this state,
* is to be in a state which is different from {@link SERVER_STATE#STARTED} and
* {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as
* {@link #stop(boolean)} worked as intended.
*/
STARTING, /**
* server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
* about it hold.
*/
STARTED, /**
* stop() was called but has not finished yet. Meant to avoids starting components while
* stop() is executing.
*/
STOPPING, /**
* Stopped: either stop() has been called and has finished running, or start() has never been
* called.
*/
STOPPED
}
void setState(SERVER_STATE state);
SERVER_STATE getState();
/**
* Sets the server identity.
* <p>

View File

@ -53,6 +53,13 @@ public interface Queue extends Bindable,CriticalComponent {
boolean isDurable();
/**
* The queue definition could be durable, but the messages could eventually be considered non durable.
* (e.g. purgeOnNoConsumers)
* @return
*/
boolean isDurableMessage();
boolean isTemporary();
boolean isAutoCreated();
@ -161,7 +168,11 @@ public interface Queue extends Bindable,CriticalComponent {
int deleteMatchingReferences(Filter filter) throws Exception;
int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception;
default int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
return deleteMatchingReferences(flushLImit, filter, AckReason.NORMAL);
}
int deleteMatchingReferences(int flushLImit, Filter filter, AckReason ackReason) throws Exception;
boolean expireReference(long messageID) throws Exception;

View File

@ -207,28 +207,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private HAPolicy haPolicy;
enum SERVER_STATE {
/**
* start() has been called but components are not initialized. The whole point of this state,
* is to be in a state which is different from {@link SERVER_STATE#STARTED} and
* {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as
* {@link #stop(boolean)} worked as intended.
*/
STARTING, /**
* server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
* about it hold.
*/
STARTED, /**
* stop() was called but has not finished yet. Meant to avoids starting components while
* stop() is executing.
*/
STOPPING, /**
* Stopped: either stop() has been called and has finished running, or start() has never been
* called.
*/
STOPPED
}
private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
private final Version version;
@ -712,10 +690,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
super.finalize();
}
@Override
public void setState(SERVER_STATE state) {
this.state = state;
}
@Override
public SERVER_STATE getState() {
return state;
}

View File

@ -139,7 +139,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile Filter filter;
private final boolean durable;
private final boolean propertyDurable;
private final boolean temporary;
@ -405,7 +405,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.pageSubscription = pageSubscription;
this.durable = durable;
this.propertyDurable = durable;
this.temporary = temporary;
@ -495,7 +495,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public boolean isDurable() {
return durable;
return propertyDurable;
}
@Override
public boolean isDurableMessage() {
return propertyDurable && !purgeOnNoConsumers;
}
@Override
@ -1126,7 +1131,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else {
Message message = ref.getMessage();
boolean durableRef = message.isDurable() && durable;
boolean durableRef = message.isDurable() && isDurableMessage();
if (durableRef) {
storageManager.storeAcknowledge(id, message.getMessageID());
@ -1161,7 +1166,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else {
Message message = ref.getMessage();
boolean durableRef = message.isDurable() && durable;
boolean durableRef = message.isDurable() && isDurableMessage();
if (durableRef) {
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
@ -1189,7 +1194,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
Message message = ref.getMessage();
if (message.isDurable() && durable) {
if (message.isDurable() && isDurableMessage()) {
tx.setContainsPersistent();
}
@ -1372,12 +1377,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1) throws Exception {
public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
return iterQueue(flushLimit, filter1, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
incDelivering();
acknowledge(tx, ref);
acknowledge(tx, ref, ackReason);
refRemoved(ref);
}
});
@ -2385,7 +2390,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
if (!internalQueue && message.isDurable() && durable && !reference.isPaged()) {
if (!internalQueue && message.isDurable() && isDurableMessage() && !reference.isPaged()) {
storageManager.updateDeliveryCount(reference);
}
@ -2414,7 +2419,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
if (!reference.isPaged() && message.isDurable() && durable) {
if (!reference.isPaged() && message.isDurable() && isDurableMessage()) {
storageManager.updateScheduledDeliveryTime(reference);
}
}
@ -2858,7 +2863,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (message == null)
return;
boolean durableRef = message.isDurable() && queue.durable;
boolean durableRef = message.isDurable() && queue.isDurableMessage();
try {
message.decrementRefCount();

View File

@ -59,7 +59,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
}
try {
queue.deleteAllReferences();
queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
}

View File

@ -60,7 +60,7 @@ public final class RoutingContextImpl implements RoutingContext {
RouteContextList listing = getContextListing(address);
if (queue.isDurable()) {
if (queue.isDurableMessage()) {
listing.getDurableQueues().add(queue);
} else {
listing.getNonDurableQueues().add(queue);

View File

@ -409,7 +409,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCountAfterCancel would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged()) {
if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
if (ref.getMessage().isDurable() && ref.getQueue().isDurableMessage() &&
!ref.getQueue().isInternalQueue() &&
!ref.isPaged()) {
storageManager.updateDeliveryCount(ref);

View File

@ -247,7 +247,7 @@ public class ManagementServiceImpl implements ManagementService {
QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount());
MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);
messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
}

View File

@ -859,6 +859,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return false;
}
@Override
public boolean isDurableMessage() {
return false;
}
@Override
public boolean isTemporary() {
return false;
@ -1087,7 +1092,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws Exception {
return 0;
}

View File

@ -441,6 +441,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return null;
}
@Override
public boolean isDurableMessage() {
// no-op
return false;
}
@Override
public boolean isDurable() {
// no-op
@ -601,7 +607,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws Exception {
return 0;
}