ARTEMIS-3093 Ordering on multiple consumers and core with rollback
This commit is contained in:
parent
898b406430
commit
12c8096a23
|
@ -398,7 +398,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
|
|
||||||
public void closeSender(final Object brokerConsumer) throws Exception {
|
public void closeSender(final Object brokerConsumer) throws Exception {
|
||||||
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
|
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
|
||||||
consumer.close(false, true);
|
consumer.close(false);
|
||||||
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
|
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -440,7 +440,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
|
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
|
||||||
OperationContext oldContext = recoverContext();
|
OperationContext oldContext = recoverContext();
|
||||||
try {
|
try {
|
||||||
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true);
|
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
|
||||||
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
|
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
|
||||||
} finally {
|
} finally {
|
||||||
resetContext(oldContext);
|
resetContext(oldContext);
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
|
||||||
private boolean discharged;
|
private boolean discharged;
|
||||||
|
|
||||||
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
|
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
|
||||||
super(xid, storageManager, timeoutSeconds, true);
|
super(xid, storageManager, timeoutSeconds);
|
||||||
addOperation(new TransactionOperationAbstract() {
|
addOperation(new TransactionOperationAbstract() {
|
||||||
@Override
|
@Override
|
||||||
public void afterCommit(Transaction tx) {
|
public void afterCommit(Transaction tx) {
|
||||||
|
|
|
@ -133,7 +133,7 @@ public class MQTTPublishManager {
|
||||||
sendServerMessage(mqttid, message, deliveryCount, qos);
|
sendServerMessage(mqttid, message, deliveryCount, qos);
|
||||||
} else {
|
} else {
|
||||||
// Client must have disconnected and it's Subscription QoS cleared
|
// Client must have disconnected and it's Subscription QoS cleared
|
||||||
consumer.individualCancel(message.getMessageID(), false, true);
|
consumer.individualCancel(message.getMessageID(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,8 +220,7 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
|
|
||||||
void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck);
|
void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck);
|
||||||
|
|
||||||
/** @param sorted it should use the messageID as a reference to where to add it in the queue */
|
void cancel(MessageReference reference, long timeBase) throws Exception;
|
||||||
void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception;
|
|
||||||
|
|
||||||
void deliverAsync();
|
void deliverAsync();
|
||||||
|
|
||||||
|
|
|
@ -64,8 +64,6 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
|
||||||
|
|
||||||
void close(boolean failed) throws Exception;
|
void close(boolean failed) throws Exception;
|
||||||
|
|
||||||
void close(boolean failed, boolean sorted) throws Exception;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is just to remove itself from Queues.
|
* This method is just to remove itself from Queues.
|
||||||
* If for any reason during a close an exception occurred, the exception treatment
|
* If for any reason during a close an exception occurred, the exception treatment
|
||||||
|
@ -101,7 +99,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
|
||||||
|
|
||||||
void reject(long messageID) throws Exception;
|
void reject(long messageID) throws Exception;
|
||||||
|
|
||||||
void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception;
|
void individualCancel(long messageID, boolean failed) throws Exception;
|
||||||
|
|
||||||
void forceDelivery(long sequence);
|
void forceDelivery(long sequence);
|
||||||
|
|
||||||
|
|
|
@ -355,7 +355,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||||
refqueue = ref.getQueue();
|
refqueue = ref.getQueue();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
refqueue.cancel(ref, timeBase, false);
|
refqueue.cancel(ref, timeBase);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// There isn't much we can do besides log an error
|
// There isn't much we can do besides log an error
|
||||||
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
|
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.artemis.core.server.impl;
|
package org.apache.activemq.artemis.core.server.impl;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -193,6 +194,18 @@ public class LastValueQueue extends QueueImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** LVQ has to use regular addHead due to last value queues calculations */
|
||||||
|
@Override
|
||||||
|
public void addSorted(MessageReference ref, boolean scheduling) {
|
||||||
|
this.addHead(ref, scheduling);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** LVQ has to use regular addHead due to last value queues calculations */
|
||||||
|
@Override
|
||||||
|
public void addSorted(List<MessageReference> refs, boolean scheduling) {
|
||||||
|
this.addHead(refs, scheduling);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
|
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
|
||||||
// we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
|
// we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
|
||||||
|
|
|
@ -1115,13 +1115,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
enterCritical(CRITICAL_PATH_ADD_HEAD);
|
enterCritical(CRITICAL_PATH_ADD_HEAD);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
try {
|
try {
|
||||||
|
if (ringSize != -1) {
|
||||||
|
enforceRing(ref, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ref.isAlreadyAcked()) {
|
||||||
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
|
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
internalAddSorted(ref);
|
internalAddSorted(ref);
|
||||||
|
|
||||||
directDeliver = false;
|
directDeliver = false;
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
leaveCritical(CRITICAL_PATH_ADD_HEAD);
|
leaveCritical(CRITICAL_PATH_ADD_HEAD);
|
||||||
}
|
}
|
||||||
|
@ -1948,15 +1953,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
|
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
|
||||||
Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
|
Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
|
||||||
if (redeliveryResult.getA()) {
|
if (redeliveryResult.getA()) {
|
||||||
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
|
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
|
||||||
if (sorted) {
|
|
||||||
internalAddSorted(reference);
|
internalAddSorted(reference);
|
||||||
} else {
|
|
||||||
internalAddHead(reference);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
resetAllIterators();
|
resetAllIterators();
|
||||||
|
@ -2862,6 +2863,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
int priority = getPriority(ref);
|
int priority = getPriority(ref);
|
||||||
|
|
||||||
messageReferences.addSorted(ref, priority);
|
messageReferences.addSorted(ref, priority);
|
||||||
|
|
||||||
|
ref.setInDelivery(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getPriority(MessageReference ref) {
|
private int getPriority(MessageReference ref) {
|
||||||
|
@ -3933,10 +3936,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
void postRollback(final LinkedList<MessageReference> refs) {
|
void postRollback(final LinkedList<MessageReference> refs) {
|
||||||
postRollback(refs, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
|
|
||||||
//if we have purged then ignore adding the messages back
|
//if we have purged then ignore adding the messages back
|
||||||
if (purgeOnNoConsumers && getConsumerCount() == 0) {
|
if (purgeOnNoConsumers && getConsumerCount() == 0) {
|
||||||
purgeAfterRollback(refs);
|
purgeAfterRollback(refs);
|
||||||
|
@ -3946,11 +3945,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
// if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue
|
// if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue
|
||||||
if (!isNonDestructive()) {
|
if (!isNonDestructive()) {
|
||||||
if (sorted) {
|
|
||||||
addSorted(refs, false);
|
addSorted(refs, false);
|
||||||
} else {
|
|
||||||
addHead(refs, false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -84,11 +84,6 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterRollback(final Transaction tx) {
|
public void afterRollback(final Transaction tx) {
|
||||||
afterRollback(tx, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterRollback(final Transaction tx, boolean sorted) {
|
|
||||||
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
|
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
|
||||||
|
|
||||||
long timeBase = System.currentTimeMillis();
|
long timeBase = System.currentTimeMillis();
|
||||||
|
@ -121,7 +116,7 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
QueueImpl queue = entry.getKey();
|
QueueImpl queue = entry.getKey();
|
||||||
|
|
||||||
synchronized (queue) {
|
synchronized (queue) {
|
||||||
queue.postRollback(refs, sorted);
|
queue.postRollback(refs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -539,12 +539,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close(final boolean failed) throws Exception {
|
public synchronized void close(final boolean failed) throws Exception {
|
||||||
close(failed, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void close(final boolean failed, boolean sorted) throws Exception {
|
|
||||||
|
|
||||||
// Close should only ever be done once per consumer.
|
// Close should only ever be done once per consumer.
|
||||||
if (isClosed) return;
|
if (isClosed) return;
|
||||||
|
@ -570,7 +565,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
|
|
||||||
List<MessageReference> refs = cancelRefs(failed, false, null);
|
List<MessageReference> refs = cancelRefs(failed, false, null);
|
||||||
|
|
||||||
Transaction tx = new TransactionImpl(storageManager, sorted);
|
Transaction tx = new TransactionImpl(storageManager);
|
||||||
|
|
||||||
refs.forEach(ref -> {
|
refs.forEach(ref -> {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -1022,7 +1017,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void individualCancel(final long messageID, boolean failed, boolean sorted) throws Exception {
|
public synchronized void individualCancel(final long messageID, boolean failed) throws Exception {
|
||||||
if (browseOnly) {
|
if (browseOnly) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1037,7 +1032,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
ref.decrementDeliveryCount();
|
ref.decrementDeliveryCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
ref.getQueue().cancel(ref, System.currentTimeMillis(), sorted);
|
ref.getQueue().cancel(ref, System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1268,7 +1268,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
ServerConsumer consumer = locateConsumer(consumerID);
|
ServerConsumer consumer = locateConsumer(consumerID);
|
||||||
|
|
||||||
if (consumer != null) {
|
if (consumer != null) {
|
||||||
consumer.individualCancel(messageID, failed, false);
|
consumer.individualCancel(messageID, failed);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,10 +52,6 @@ public interface TransactionOperation {
|
||||||
*/
|
*/
|
||||||
void afterRollback(Transaction tx);
|
void afterRollback(Transaction tx);
|
||||||
|
|
||||||
default void afterRollback(Transaction tx, boolean sorted) {
|
|
||||||
afterRollback(tx);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<MessageReference> getRelatedMessageReferences();
|
List<MessageReference> getRelatedMessageReferences();
|
||||||
|
|
||||||
List<MessageReference> getListOnConsumer(long consumerID);
|
List<MessageReference> getListOnConsumer(long consumerID);
|
||||||
|
|
|
@ -63,8 +63,6 @@ public class TransactionImpl implements Transaction {
|
||||||
|
|
||||||
private final long createTime;
|
private final long createTime;
|
||||||
|
|
||||||
private final boolean sorted;
|
|
||||||
|
|
||||||
private volatile boolean containsPersistent;
|
private volatile boolean containsPersistent;
|
||||||
|
|
||||||
private int timeoutSeconds = -1;
|
private int timeoutSeconds = -1;
|
||||||
|
@ -98,34 +96,23 @@ public class TransactionImpl implements Transaction {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
|
public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
|
||||||
this(storageManager.generateID(), null, storageManager, timeoutSeconds, false);
|
this(storageManager.generateID(), null, storageManager, timeoutSeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransactionImpl(final StorageManager storageManager) {
|
public TransactionImpl(final StorageManager storageManager) {
|
||||||
this(storageManager, false);
|
this(storageManager.generateID(), null, storageManager,-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransactionImpl(final StorageManager storageManager, boolean sorted) {
|
|
||||||
this(storageManager.generateID(), null, storageManager,-1, sorted);
|
|
||||||
}
|
|
||||||
|
|
||||||
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
|
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
|
||||||
this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false);
|
this(storageManager.generateID(), xid, storageManager, timeoutSeconds);
|
||||||
}
|
|
||||||
|
|
||||||
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) {
|
|
||||||
this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
|
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
|
||||||
this(id, xid, storageManager, -1, false);
|
this(id, xid, storageManager, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) {
|
private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
|
||||||
this(id, xid, storageManager, -1, sorted);
|
|
||||||
}
|
|
||||||
|
|
||||||
private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) {
|
|
||||||
this.storageManager = storageManager;
|
this.storageManager = storageManager;
|
||||||
|
|
||||||
this.xid = xid;
|
this.xid = xid;
|
||||||
|
@ -135,8 +122,6 @@ public class TransactionImpl implements Transaction {
|
||||||
this.createTime = System.currentTimeMillis();
|
this.createTime = System.currentTimeMillis();
|
||||||
|
|
||||||
this.timeoutSeconds = timeoutSeconds;
|
this.timeoutSeconds = timeoutSeconds;
|
||||||
|
|
||||||
this.sorted = sorted;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transaction implementation
|
// Transaction implementation
|
||||||
|
@ -217,7 +202,7 @@ public class TransactionImpl implements Transaction {
|
||||||
logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
|
logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
|
||||||
}
|
}
|
||||||
|
|
||||||
internalRollback(sorted);
|
internalRollback();
|
||||||
|
|
||||||
if (exception != null) {
|
if (exception != null) {
|
||||||
throw exception;
|
throw exception;
|
||||||
|
@ -276,7 +261,7 @@ public class TransactionImpl implements Transaction {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (state == State.ROLLBACK_ONLY) {
|
if (state == State.ROLLBACK_ONLY) {
|
||||||
internalRollback(sorted);
|
internalRollback();
|
||||||
|
|
||||||
if (exception != null) {
|
if (exception != null) {
|
||||||
throw exception;
|
throw exception;
|
||||||
|
@ -367,7 +352,7 @@ public class TransactionImpl implements Transaction {
|
||||||
}
|
}
|
||||||
if (state != State.PREPARED) {
|
if (state != State.PREPARED) {
|
||||||
try {
|
try {
|
||||||
internalRollback(sorted);
|
internalRollback();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// nothing we can do beyond logging
|
// nothing we can do beyond logging
|
||||||
// no need to special handler here as this was not even supposed to happen at this point
|
// no need to special handler here as this was not even supposed to happen at this point
|
||||||
|
@ -400,11 +385,11 @@ public class TransactionImpl implements Transaction {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internalRollback(sorted);
|
internalRollback();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void internalRollback(boolean sorted) throws Exception {
|
private void internalRollback() throws Exception {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("TransactionImpl::internalRollback " + this);
|
logger.trace("TransactionImpl::internalRollback " + this);
|
||||||
}
|
}
|
||||||
|
@ -439,7 +424,7 @@ public class TransactionImpl implements Transaction {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void done() {
|
public void done() {
|
||||||
afterRollback(operationsToComplete, sorted);
|
afterRollback(operationsToComplete);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -453,7 +438,7 @@ public class TransactionImpl implements Transaction {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void done() {
|
public void done() {
|
||||||
afterRollback(storeOperationsToComplete, sorted);
|
afterRollback(storeOperationsToComplete);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -583,10 +568,10 @@ public class TransactionImpl implements Transaction {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) {
|
private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
|
||||||
if (operationsToComplete != null) {
|
if (operationsToComplete != null) {
|
||||||
for (TransactionOperation operation : operationsToComplete) {
|
for (TransactionOperation operation : operationsToComplete) {
|
||||||
operation.afterRollback(this, sorted);
|
operation.afterRollback(this);
|
||||||
}
|
}
|
||||||
// Help out GC here
|
// Help out GC here
|
||||||
operationsToComplete.clear();
|
operationsToComplete.clear();
|
||||||
|
|
|
@ -1217,7 +1217,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel(MessageReference reference, long timeBase, boolean backInPlace) throws Exception {
|
public void cancel(MessageReference reference, long timeBase) throws Exception {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -91,11 +91,6 @@ public class DummyServerConsumer implements ServerConsumer {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close(boolean failed, boolean sorted) throws Exception {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeItself() throws Exception {
|
public void removeItself() throws Exception {
|
||||||
|
|
||||||
|
@ -156,7 +151,7 @@ public class DummyServerConsumer implements ServerConsumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception {
|
public void individualCancel(long messageID, boolean failed) throws Exception {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,8 +30,15 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
@ -177,4 +184,101 @@ public class JMSOrderTest extends JMSTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleConsumersRollback() throws Exception {
|
||||||
|
internalMultipleConsumers(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleConsumersClose() throws Exception {
|
||||||
|
internalMultipleConsumers(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalMultipleConsumers(final boolean rollback) throws Exception {
|
||||||
|
|
||||||
|
|
||||||
|
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(false));
|
||||||
|
|
||||||
|
int numberOfMessages = 100;
|
||||||
|
int numberOfConsumers = 3;
|
||||||
|
|
||||||
|
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
|
||||||
|
final javax.jms.Queue jmsQueue;
|
||||||
|
|
||||||
|
try (Connection connection = factory.createConnection()) {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
jmsQueue = session.createQueue(getName());
|
||||||
|
MessageProducer producer = session.createProducer(jmsQueue);
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
TextMessage message = session.createTextMessage("test " + i);
|
||||||
|
message.setIntProperty("i", i);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
|
||||||
|
|
||||||
|
AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
AtomicInteger errors = new AtomicInteger(0);
|
||||||
|
Runnable r = () -> {
|
||||||
|
try (Connection c = factory.createConnection()) {
|
||||||
|
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageConsumer cs = s.createConsumer(jmsQueue);
|
||||||
|
c.start();
|
||||||
|
int rollbacks = 0;
|
||||||
|
while (running.get()) {
|
||||||
|
TextMessage txt = (TextMessage)cs.receive(500);
|
||||||
|
if (txt != null) {
|
||||||
|
if (rollback) {
|
||||||
|
s.rollback();
|
||||||
|
rollbacks++;
|
||||||
|
|
||||||
|
if (rollbacks >= 3) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
errors.incrementAndGet();
|
||||||
|
running.set(false);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Thread[] threads = new Thread[numberOfConsumers];
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfConsumers; i++) {
|
||||||
|
threads[i] = new Thread(r, "consumer " + i);
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Thread t : threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(0, errors.get());
|
||||||
|
|
||||||
|
Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
|
||||||
|
|
||||||
|
try (Connection c = factory.createConnection()) {
|
||||||
|
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageConsumer cs = s.createConsumer(jmsQueue);
|
||||||
|
c.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
TextMessage message = (TextMessage) cs.receive(1000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
Assert.assertEquals(i, message.getIntProperty("i"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertNull(cs.receiveNoWait());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,12 +149,12 @@ public class RingQueueTest extends ActiveMQTestBase {
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
message = createTextMessage(clientSession, "hello1");
|
message = createTextMessage(clientSession, "hello1");
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
Wait.assertTrue(() -> queue.getMessageCount() == 2);
|
Wait.assertEquals(2, queue::getMessageCount);
|
||||||
Wait.assertTrue(() -> queue.getDeliveringCount() == 2);
|
Wait.assertEquals(2, queue::getDeliveringCount);
|
||||||
consumer.close();
|
consumer.close();
|
||||||
Wait.assertTrue(() -> queue.getMessageCount() == 1);
|
Wait.assertEquals(1, queue::getMessageCount);
|
||||||
Wait.assertTrue(() -> queue.getDeliveringCount() == 0);
|
Wait.assertEquals(0, queue::getDeliveringCount);
|
||||||
Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
|
Wait.assertEquals(1, queue::getMessagesReplaced);
|
||||||
consumer = clientSession.createConsumer(qName);
|
consumer = clientSession.createConsumer(qName);
|
||||||
message = consumer.receiveImmediate();
|
message = consumer.receiveImmediate();
|
||||||
assertNotNull(message);
|
assertNotNull(message);
|
||||||
|
@ -242,13 +242,20 @@ public class RingQueueTest extends ActiveMQTestBase {
|
||||||
message.acknowledge();
|
message.acknowledge();
|
||||||
}
|
}
|
||||||
consumer.close();
|
consumer.close();
|
||||||
Wait.assertTrue(() -> queue.getMessageCount() == 5);
|
Wait.assertEquals(5, queue::getMessageCount);
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
producer.send(clientSession.createMessage(true));
|
producer.send(clientSession.createMessage(true));
|
||||||
}
|
}
|
||||||
Wait.assertTrue(() -> queue.getMessageCount() == 10);
|
Wait.assertEquals(10, queue::getMessageCount);
|
||||||
Wait.assertTrue(() -> queue.getMessagesReplaced() == 5);
|
|
||||||
|
// these sends will be replacing the old values
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
producer.send(clientSession.createMessage(true));
|
||||||
|
Wait.assertEquals(10, queue::getMessageCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertEquals(5, queue::getMessagesReplaced);
|
||||||
consumer = clientSession.createConsumer(qName);
|
consumer = clientSession.createConsumer(qName);
|
||||||
message = consumer.receiveImmediate();
|
message = consumer.receiveImmediate();
|
||||||
assertNotNull(message);
|
assertNotNull(message);
|
||||||
|
|
|
@ -423,7 +423,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
|
public void cancel(final MessageReference reference, final long timeBase) throws Exception {
|
||||||
// no-op
|
// no-op
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue