This closes #2618
This commit is contained in:
commit
3589dd797a
|
@ -2252,4 +2252,14 @@ public interface AuditLogger extends BasicLogger {
|
||||||
@Message(id = 601500, value = "User {0} is sending a core message on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 601500, value = "User {0} is sending a core message on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void coreSendMessage(String user, Object source, Object... args);
|
void coreSendMessage(String user, Object source, Object... args);
|
||||||
|
|
||||||
|
|
||||||
|
static void getAcknowledgeAttempts(Object source) {
|
||||||
|
LOGGER.getMessagesAcknowledged(getCaller(), source);
|
||||||
|
}
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 601501, value = "User {0} is getting messages acknowledged attemps on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void getAcknowledgeAttempts(String user, Object source, Object... args);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,6 +168,13 @@ public interface QueueControl {
|
||||||
@Attribute(desc = "number of messages acknowledged from this queue since it was created")
|
@Attribute(desc = "number of messages acknowledged from this queue since it was created")
|
||||||
long getMessagesAcknowledged();
|
long getMessagesAcknowledged();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of messages added to this queue since it was created.
|
||||||
|
*/
|
||||||
|
@Attribute(desc = "number of messages acknowledged attempts from this queue since it was created")
|
||||||
|
long getAcknowledgeAttempts();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of messages expired from this queue since it was created.
|
* Returns the number of messages expired from this queue since it was created.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
||||||
|
@ -67,8 +68,8 @@ public class ProtonTransactionImpl extends TransactionImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RefsOperation createRefsOperation(Queue queue) {
|
public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
|
||||||
return new ProtonTransactionRefsOperation(queue, storageManager);
|
return new ProtonTransactionRefsOperation(queue, reason, storageManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
@ -36,8 +37,8 @@ import org.apache.qpid.proton.engine.Delivery;
|
||||||
*/
|
*/
|
||||||
public class ProtonTransactionRefsOperation extends RefsOperation {
|
public class ProtonTransactionRefsOperation extends RefsOperation {
|
||||||
|
|
||||||
public ProtonTransactionRefsOperation(final Queue queue, StorageManager storageManager) {
|
public ProtonTransactionRefsOperation(final Queue queue, AckReason reason, StorageManager storageManager) {
|
||||||
super(queue, storageManager);
|
super(queue, reason, storageManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -400,6 +400,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getAcknowledgeAttempts() {
|
||||||
|
if (AuditLogger.isEnabled()) {
|
||||||
|
AuditLogger.getMessagesAcknowledged(queue);
|
||||||
|
}
|
||||||
|
checkStarted();
|
||||||
|
|
||||||
|
clearIO();
|
||||||
|
try {
|
||||||
|
return queue.getAcknowledgeAttempts();
|
||||||
|
} finally {
|
||||||
|
blockOnIO();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getMessagesExpired() {
|
public long getMessagesExpired() {
|
||||||
if (AuditLogger.isEnabled()) {
|
if (AuditLogger.isEnabled()) {
|
||||||
|
|
|
@ -234,6 +234,8 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
|
|
||||||
long getMessagesAdded();
|
long getMessagesAdded();
|
||||||
|
|
||||||
|
long getAcknowledgeAttempts();
|
||||||
|
|
||||||
long getMessagesAcknowledged();
|
long getMessagesAcknowledged();
|
||||||
|
|
||||||
long getMessagesExpired();
|
long getMessagesExpired();
|
||||||
|
@ -393,7 +395,7 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
*/
|
*/
|
||||||
void deliverScheduledMessages() throws ActiveMQException;
|
void deliverScheduledMessages() throws ActiveMQException;
|
||||||
|
|
||||||
void postAcknowledge(MessageReference ref);
|
void postAcknowledge(MessageReference ref, AckReason reason);
|
||||||
|
|
||||||
float getRate();
|
float getRate();
|
||||||
|
|
||||||
|
|
|
@ -193,6 +193,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
private AtomicLong messagesAcknowledged = new AtomicLong(0);
|
private AtomicLong messagesAcknowledged = new AtomicLong(0);
|
||||||
|
|
||||||
|
private AtomicLong ackAttempts = new AtomicLong(0);
|
||||||
|
|
||||||
private AtomicLong messagesExpired = new AtomicLong(0);
|
private AtomicLong messagesExpired = new AtomicLong(0);
|
||||||
|
|
||||||
private AtomicLong messagesKilled = new AtomicLong(0);
|
private AtomicLong messagesKilled = new AtomicLong(0);
|
||||||
|
@ -1473,7 +1475,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
} else {
|
} else {
|
||||||
if (ref.isPaged()) {
|
if (ref.isPaged()) {
|
||||||
pageSubscription.ack((PagedReference) ref);
|
pageSubscription.ack((PagedReference) ref);
|
||||||
postAcknowledge(ref);
|
postAcknowledge(ref, reason);
|
||||||
} else {
|
} else {
|
||||||
Message message = ref.getMessage();
|
Message message = ref.getMessage();
|
||||||
|
|
||||||
|
@ -1482,18 +1484,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
if (durableRef) {
|
if (durableRef) {
|
||||||
storageManager.storeAcknowledge(id, message.getMessageID());
|
storageManager.storeAcknowledge(id, message.getMessageID());
|
||||||
}
|
}
|
||||||
postAcknowledge(ref);
|
postAcknowledge(ref, reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reason == AckReason.EXPIRED) {
|
ackAttempts.incrementAndGet();
|
||||||
messagesExpired.incrementAndGet();
|
|
||||||
} else if (reason == AckReason.KILLED) {
|
|
||||||
messagesKilled.incrementAndGet();
|
|
||||||
} else if (reason == AckReason.REPLACED) {
|
|
||||||
messagesReplaced.incrementAndGet();
|
|
||||||
} else {
|
|
||||||
messagesAcknowledged.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (server != null && server.hasBrokerMessagePlugins()) {
|
if (server != null && server.hasBrokerMessagePlugins()) {
|
||||||
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
|
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
|
||||||
|
@ -1508,10 +1502,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
|
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
|
||||||
|
RefsOperation refsOperation = getRefsOperation(tx, reason);
|
||||||
|
|
||||||
if (ref.isPaged()) {
|
if (ref.isPaged()) {
|
||||||
pageSubscription.ackTx(tx, (PagedReference) ref);
|
pageSubscription.ackTx(tx, (PagedReference) ref);
|
||||||
|
|
||||||
getRefsOperation(tx).addAck(ref);
|
refsOperation.addAck(ref);
|
||||||
} else {
|
} else {
|
||||||
Message message = ref.getMessage();
|
Message message = ref.getMessage();
|
||||||
|
|
||||||
|
@ -1523,15 +1519,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
tx.setContainsPersistent();
|
tx.setContainsPersistent();
|
||||||
}
|
}
|
||||||
|
|
||||||
getRefsOperation(tx).addAck(ref);
|
ackAttempts.incrementAndGet();
|
||||||
}
|
|
||||||
|
|
||||||
if (reason == AckReason.EXPIRED) {
|
refsOperation.addAck(ref);
|
||||||
messagesExpired.incrementAndGet();
|
|
||||||
} else if (reason == AckReason.KILLED) {
|
|
||||||
messagesKilled.incrementAndGet();
|
|
||||||
} else {
|
|
||||||
messagesAcknowledged.incrementAndGet();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server != null && server.hasBrokerMessagePlugins()) {
|
if (server != null && server.hasBrokerMessagePlugins()) {
|
||||||
|
@ -1547,7 +1537,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
tx.setContainsPersistent();
|
tx.setContainsPersistent();
|
||||||
}
|
}
|
||||||
|
|
||||||
getRefsOperation(tx).addAck(ref);
|
getRefsOperation(tx, AckReason.NORMAL).addAck(ref);
|
||||||
|
|
||||||
// https://issues.jboss.org/browse/HORNETQ-609
|
// https://issues.jboss.org/browse/HORNETQ-609
|
||||||
incDelivering(ref);
|
incDelivering(ref);
|
||||||
|
@ -1555,16 +1545,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
messagesAcknowledged.incrementAndGet();
|
messagesAcknowledged.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
private RefsOperation getRefsOperation(final Transaction tx) {
|
private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason) {
|
||||||
return getRefsOperation(tx, false);
|
return getRefsOperation(tx, ackReason, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private RefsOperation getRefsOperation(final Transaction tx, boolean ignoreRedlieveryCheck) {
|
private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason, boolean ignoreRedlieveryCheck) {
|
||||||
synchronized (tx) {
|
synchronized (tx) {
|
||||||
RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
|
RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
|
||||||
|
|
||||||
if (oper == null) {
|
if (oper == null) {
|
||||||
oper = tx.createRefsOperation(this);
|
oper = tx.createRefsOperation(this, ackReason);
|
||||||
|
|
||||||
tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
|
tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
|
||||||
|
|
||||||
|
@ -1586,7 +1576,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) {
|
public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) {
|
||||||
getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
|
getRefsOperation(tx, AckReason.NORMAL, ignoreRedeliveryCheck).addAck(reference);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1705,6 +1695,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
return messagesAcknowledged.get();
|
return messagesAcknowledged.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getAcknowledgeAttempts() {
|
||||||
|
return ackAttempts.get();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getMessagesExpired() {
|
public long getMessagesExpired() {
|
||||||
return messagesExpired.get();
|
return messagesExpired.get();
|
||||||
|
@ -3300,11 +3295,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postAcknowledge(final MessageReference ref) {
|
public void postAcknowledge(final MessageReference ref, AckReason reason) {
|
||||||
QueueImpl queue = (QueueImpl) ref.getQueue();
|
QueueImpl queue = (QueueImpl) ref.getQueue();
|
||||||
|
|
||||||
queue.decDelivering(ref);
|
queue.decDelivering(ref);
|
||||||
|
|
||||||
|
if (reason == AckReason.EXPIRED) {
|
||||||
|
messagesExpired.incrementAndGet();
|
||||||
|
} else if (reason == AckReason.KILLED) {
|
||||||
|
messagesKilled.incrementAndGet();
|
||||||
|
} else if (reason == AckReason.REPLACED) {
|
||||||
|
messagesReplaced.incrementAndGet();
|
||||||
|
} else {
|
||||||
|
messagesAcknowledged.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
if (ref.isPaged()) {
|
if (ref.isPaged()) {
|
||||||
// nothing to be done
|
// nothing to be done
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -38,6 +38,8 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(RefsOperation.class);
|
private static final Logger logger = Logger.getLogger(RefsOperation.class);
|
||||||
|
|
||||||
|
private final AckReason reason;
|
||||||
|
|
||||||
private final StorageManager storageManager;
|
private final StorageManager storageManager;
|
||||||
private Queue queue;
|
private Queue queue;
|
||||||
List<MessageReference> refsToAck = new ArrayList<>();
|
List<MessageReference> refsToAck = new ArrayList<>();
|
||||||
|
@ -50,11 +52,13 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
*/
|
*/
|
||||||
protected boolean ignoreRedeliveryCheck = false;
|
protected boolean ignoreRedeliveryCheck = false;
|
||||||
|
|
||||||
public RefsOperation(Queue queue, StorageManager storageManager) {
|
public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) {
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
|
this.reason = reason;
|
||||||
this.storageManager = storageManager;
|
this.storageManager = storageManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// once turned on, we shouldn't turn it off, that's why no parameters
|
// once turned on, we shouldn't turn it off, that's why no parameters
|
||||||
public void setIgnoreRedeliveryCheck() {
|
public void setIgnoreRedeliveryCheck() {
|
||||||
ignoreRedeliveryCheck = true;
|
ignoreRedeliveryCheck = true;
|
||||||
|
@ -163,7 +167,7 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
public void afterCommit(final Transaction tx) {
|
public void afterCommit(final Transaction tx) {
|
||||||
for (MessageReference ref : refsToAck) {
|
for (MessageReference ref : refsToAck) {
|
||||||
synchronized (ref.getQueue()) {
|
synchronized (ref.getQueue()) {
|
||||||
queue.postAcknowledge(ref);
|
queue.postAcknowledge(ref, reason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -95,5 +96,5 @@ public interface Transaction {
|
||||||
|
|
||||||
void setTimeout(int timeout);
|
void setTimeout(int timeout);
|
||||||
|
|
||||||
RefsOperation createRefsOperation(Queue queue);
|
RefsOperation createRefsOperation(Queue queue, AckReason reason);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.transaction.impl;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
||||||
|
|
||||||
public class BindingsTransactionImpl extends TransactionImpl {
|
public class BindingsTransactionImpl extends TransactionImpl {
|
||||||
|
@ -45,7 +46,7 @@ public class BindingsTransactionImpl extends TransactionImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RefsOperation createRefsOperation(Queue queue) {
|
public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
|
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
|
||||||
|
@ -162,8 +163,8 @@ public class TransactionImpl implements Transaction {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RefsOperation createRefsOperation(Queue queue) {
|
public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
|
||||||
return new RefsOperation(queue, storageManager);
|
return new RefsOperation(queue, reason, storageManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -793,6 +793,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getAcknowledgeAttempts() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean allowsReferenceCallback() {
|
public boolean allowsReferenceCallback() {
|
||||||
return false;
|
return false;
|
||||||
|
@ -1477,7 +1482,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postAcknowledge(MessageReference ref) {
|
public void postAcknowledge(MessageReference ref, AckReason reason) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.QueueConfig;
|
import org.apache.activemq.artemis.core.server.QueueConfig;
|
||||||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||||
|
@ -523,7 +524,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postAcknowledge(final MessageReference ref) {
|
public void postAcknowledge(final MessageReference ref, AckReason reason) {
|
||||||
System.out.println("Ignoring postACK on message " + ref);
|
System.out.println("Ignoring postACK on message " + ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,7 +105,7 @@ public class JmxConnectionTest extends ActiveMQTestBase {
|
||||||
logAndSystemOut("Successfully connected to: " + urlString);
|
logAndSystemOut("Successfully connected to: " + urlString);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logAndSystemOut("JMX connection failed: " + urlString, e);
|
logAndSystemOut("JMX connection failed: " + urlString, e);
|
||||||
Assert.fail();
|
Assert.fail(e.getMessage());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ import javax.json.JsonArray;
|
||||||
import javax.json.JsonObject;
|
import javax.json.JsonObject;
|
||||||
import javax.management.Notification;
|
import javax.management.Notification;
|
||||||
import javax.management.openmbean.CompositeData;
|
import javax.management.openmbean.CompositeData;
|
||||||
|
import javax.transaction.xa.XAResource;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -66,6 +67,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||||
import org.apache.activemq.artemis.junit.Wait;
|
import org.apache.activemq.artemis.junit.Wait;
|
||||||
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
|
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
|
||||||
import org.apache.activemq.artemis.utils.Base64;
|
import org.apache.activemq.artemis.utils.Base64;
|
||||||
|
@ -370,6 +372,92 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
session.deleteQueue(queue);
|
session.deleteQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetMessagesAcknowledgedOnXARollback() throws Exception {
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
|
Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
producer.send(session.createMessage(durable));
|
||||||
|
|
||||||
|
ClientSessionFactory xaFactory = createSessionFactory(locator);
|
||||||
|
ClientSession xaSession = addClientSession(xaFactory.createSession(true, false, false));
|
||||||
|
xaSession.start();
|
||||||
|
|
||||||
|
ClientConsumer consumer = xaSession.createConsumer(queue);
|
||||||
|
|
||||||
|
int tries = 10;
|
||||||
|
for (int i = 0; i < tries; i++) {
|
||||||
|
XidImpl xid = newXID();
|
||||||
|
xaSession.start(xid, XAResource.TMNOFLAGS);
|
||||||
|
ClientMessage message = consumer.receive(1000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
message.acknowledge();
|
||||||
|
Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
|
||||||
|
xaSession.end(xid, XAResource.TMSUCCESS);
|
||||||
|
Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
|
||||||
|
xaSession.prepare(xid);
|
||||||
|
Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
|
||||||
|
if (i + 1 == tries) {
|
||||||
|
xaSession.commit(xid, false);
|
||||||
|
} else {
|
||||||
|
xaSession.rollback(xid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertEquals(1, queueControl::getMessagesAcknowledged);
|
||||||
|
Wait.assertEquals(10, queueControl::getAcknowledgeAttempts);
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
session.deleteQueue(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetMessagesAcknowledgedOnRegularRollback() throws Exception {
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
|
Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
producer.send(session.createMessage(durable));
|
||||||
|
|
||||||
|
ClientSessionFactory xaFactory = createSessionFactory(locator);
|
||||||
|
ClientSession txSession = addClientSession(xaFactory.createSession(false, false, false));
|
||||||
|
txSession.start();
|
||||||
|
|
||||||
|
ClientConsumer consumer = txSession.createConsumer(queue);
|
||||||
|
|
||||||
|
int tries = 10;
|
||||||
|
for (int i = 0; i < tries; i++) {
|
||||||
|
ClientMessage message = consumer.receive(1000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
message.acknowledge();
|
||||||
|
Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
|
||||||
|
if (i + 1 == tries) {
|
||||||
|
txSession.commit();
|
||||||
|
} else {
|
||||||
|
txSession.rollback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertEquals(1, queueControl::getMessagesAcknowledged);
|
||||||
|
Wait.assertEquals(10, queueControl::getAcknowledgeAttempts);
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
session.deleteQueue(queue);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetScheduledCount() throws Exception {
|
public void testGetScheduledCount() throws Exception {
|
||||||
long delay = 500;
|
long delay = 500;
|
||||||
|
|
|
@ -216,6 +216,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
||||||
return (Integer) proxy.retrieveAttributeValue("messagesAcknowledged", Integer.class);
|
return (Integer) proxy.retrieveAttributeValue("messagesAcknowledged", Integer.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getAcknowledgeAttempts() {
|
||||||
|
return (Integer) proxy.retrieveAttributeValue("acknowledgeAttempts", Integer.class);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getMessagesExpired() {
|
public long getMessagesExpired() {
|
||||||
return (Long) proxy.retrieveAttributeValue("messagesExpired", Long.class);
|
return (Long) proxy.retrieveAttributeValue("messagesExpired", Long.class);
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
|
||||||
import org.apache.activemq.artemis.core.server.Bindable;
|
import org.apache.activemq.artemis.core.server.Bindable;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
@ -251,7 +252,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RefsOperation createRefsOperation(Queue queue) {
|
public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -206,6 +206,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getAcknowledgeAttempts() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck) {
|
public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck) {
|
||||||
// no-op
|
// no-op
|
||||||
|
@ -841,7 +846,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postAcknowledge(MessageReference ref) {
|
public void postAcknowledge(MessageReference ref, AckReason reason) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue