ARTEMIS-3743 / ARTEMIS-3766 Use ACKReason on Mirror to determine target operations and fixing Delivering statistics on Mirror

I merged these two JIRAs into one as I was doing an overal check on Mirroring
This commit is contained in:
Clebert Suconic 2022-06-28 16:41:08 -04:00 committed by clebertsuconic
parent 1bcf0f35f8
commit 68f6d8263d
21 changed files with 655 additions and 128 deletions

View File

@ -17,12 +17,15 @@
package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ACK_REASON;
/** <b>Warning:</b> do not use this class outside of the broker implementation.
* This is exposing package methods on this package that are not meant to be used on user's application. */
public class AMQPMessageBrokerAccessor {
@ -37,6 +40,11 @@ public class AMQPMessageBrokerAccessor {
return message.getMessageAnnotation(symbol);
}
public static AckReason getMessageAnnotationAckReason(AMQPMessage message) {
Number reasonVal = (Number) getMessageAnnotationProperty(message, ACK_REASON);
return reasonVal == null ? AckReason.NORMAL : AckReason.fromValue(reasonVal.byteValue());
}
/** Warning: this is a method specific to the broker. Do not use it on user's application. */
public static Header getCurrentHeader(AMQPMessage message) {
return message.getCurrentHeader();

View File

@ -34,7 +34,6 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@ -52,6 +51,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
private static final Logger logger = Logger.getLogger(AMQPMirrorControllerSource.class);
public static final Symbol EVENT_TYPE = Symbol.getSymbol("x-opt-amq-mr-ev-type");
public static final Symbol ACK_REASON = Symbol.getSymbol("x-opt-amq-mr-ack-reason");
public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr");
public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id");
@ -74,7 +74,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString());
private static final ThreadLocal<MirrorControlRouting> mirrorControlRouting = ThreadLocal.withInitial(() -> new MirrorControlRouting(null));
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorDisabled(true));
final Queue snfQueue;
final ActiveMQServer server;
@ -350,32 +350,24 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
if (logger.isTraceEnabled()) {
logger.trace(server + " sending ack message from server " + nodeID + " with messageID=" + internalID);
}
Message message = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID);
Message message = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
route(server, message);
ref.getMessage().usageDown();
}
private Message createMessage(SimpleString address, SimpleString queue, Object event, String brokerID, Object body) {
return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body);
return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body, null);
}
private Message createMessage(SimpleString address, SimpleString queue, Object event, String brokerID, Object body, AckReason ackReason) {
return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body, ackReason);
}
public static void route(ActiveMQServer server, Message message) throws Exception {
message.setMessageID(server.getStorageManager().generateID());
MirrorControlRouting ctx = mirrorControlRouting.get();
RoutingContext ctx = mirrorControlRouting.get();
ctx.clear();
server.getPostOffice().route(message, ctx, false);
}
private static class MirrorControlRouting extends RoutingContextImpl {
MirrorControlRouting(Transaction transaction) {
super(transaction);
}
@Override
public boolean isMirrorController() {
return true;
}
}
}

View File

@ -226,6 +226,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
} else if (eventType.equals(POST_ACK)) {
String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
AckReason ackReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(message);
if (nodeID == null) {
nodeID = getRemoteMirrorId(); // not sending the nodeID means it's data generated on that broker
}
@ -235,7 +238,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
if (logger.isDebugEnabled()) {
logger.debug(server + " Post ack address=" + address + " queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
}
if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation)) {
if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation, ackReason)) {
messageAckOperation = null;
}
}
@ -333,7 +336,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
}
public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage) throws Exception {
public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
final Queue targetQueue = server.locateQueue(queue);
if (targetQueue == null) {
@ -352,9 +355,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
logger.trace("Server " + server.getIdentity() + " with queue = " + queue + " being acked for " + messageID + " coming from " + messageID + " targetQueue = " + targetQueue);
}
performAck(nodeID, messageID, targetQueue, ackMessage, true);
performAck(nodeID, messageID, targetQueue, ackMessage, reason, true);
return true;
}
@ -363,18 +365,19 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
}
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, boolean retry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + messageID + ")" + ", targetQueue=" + targetQueue.getName());
}
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (reference == null && retry) {
if (logger.isDebugEnabled()) {
logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID);
}
targetQueue.flushOnIntermediate(() -> {
recoverContext();
performAck(nodeID, messageID, targetQueue, ackMessageOperation, false);
performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, false);
});
return;
}
@ -383,13 +386,24 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
logger.trace("Post ack Server " + server + " worked well for messageID=" + messageID + " nodeID=" + nodeID);
}
try {
targetQueue.acknowledge(reference);
switch (reason) {
case EXPIRED:
targetQueue.expire(reference, null, false);
break;
default:
targetQueue.acknowledge(null, reference, reason, null, false);
break;
}
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
} else {
performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
if (reason != AckReason.EXPIRED) {
// if expired, we don't need to check on paging
// as the message will expire again when depaged (if on paging)
performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
}
}
}
@ -508,7 +522,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
if (reference == null) {
return false;
} else {
targetQueue.acknowledge(reference);
targetQueue.acknowledge(null, reference, AckReason.NORMAL, null, false);
OperationContextImpl.getContext().executeOnCompletion(operation);
return true;
}

View File

@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@ -35,6 +36,7 @@ import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ACK_REASON;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
@ -49,12 +51,19 @@ public class AMQPMirrorMessageFactory {
* This method is open to make it testable,
* do not use on your applications.
*/
public static Message createMessage(String to, SimpleString address, SimpleString queue, Object event, String brokerID, Object body) {
public static Message createMessage(String to, SimpleString address, SimpleString queue, Object event, String brokerID, Object body, AckReason ackReason) {
Header header = new Header();
header.setDurable(true);
Map<Symbol, Object> annotations = new HashMap<>();
annotations.put(EVENT_TYPE, event);
if (ackReason != null && ackReason != AckReason.NORMAL) {
// if the ack reason is normal, we just send it null as the target will assume it as normal
// just to save some bits
annotations.put(ACK_REASON, ackReason.getVal());
}
if (address != null) {
annotations.put(ADDRESS, address.toString());
}

View File

@ -327,7 +327,7 @@ public class AMQConsumer {
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {
ref.getQueue().expire(ref, serverConsumer);
ref.getQueue().expire(ref, serverConsumer, true);
}
} else if (removeReferences) {

View File

@ -282,11 +282,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
@Override
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
if (tx == null) {
getQueue().acknowledge(this, reason, consumer);
} else {
getQueue().acknowledge(tx, this, reason, consumer);
}
getQueue().acknowledge(tx, this, reason, consumer, true);
}
/* (non-Javadoc)

View File

@ -245,7 +245,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
if (mirrorControllerSource != null) {
if (mirrorControllerSource != null && reason != AckReason.REPLACED) { // we don't send replacements on LVQ as they are replaced themselves on the target
try {
mirrorControllerSource.postAcknowledge(ref, reason);
} catch (Exception e) {
@ -1573,8 +1573,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
if (mirrorControllerSource != null && !context.isMirrorController()) {
// we check for isMirrorController as to avoid recursive loop from there
if (mirrorControllerSource != null && !context.isMirrorDisabled()) {
// we check for isMirrorDisabled as to avoid recursive loop from there
mirrorControllerSource.sendMessage(message, context, refs);
}

View File

@ -220,7 +220,7 @@ public interface Queue extends Bindable,CriticalComponent {
void acknowledge(Transaction tx, MessageReference ref) throws Exception;
void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception;
void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception;
void reacknowledge(Transaction tx, MessageReference ref) throws Exception;
@ -341,7 +341,7 @@ public interface Queue extends Bindable,CriticalComponent {
void expire(MessageReference ref) throws Exception;
void expire(MessageReference ref, ServerConsumer consumer) throws Exception;
void expire(MessageReference ref, ServerConsumer consumer, boolean delivering) throws Exception;
boolean sendMessageToDeadLetterAddress(long messageID) throws Exception;
@ -506,6 +506,8 @@ public interface Queue extends Bindable,CriticalComponent {
void postAcknowledge(MessageReference ref, AckReason reason);
void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering);
/**
* @return the user associated with this queue
*/

View File

@ -40,7 +40,9 @@ public interface RoutingContext {
/** If the routing is from MirrorController, we don't redo mirrorController
* to avoid*/
boolean isMirrorController();
boolean isMirrorDisabled();
RoutingContext setMirrorDisabled(boolean mirrorDisabled);
/** return true if every queue routed is internal */
boolean isInternal();

View File

@ -18,5 +18,33 @@
package org.apache.activemq.artemis.core.server.impl;
public enum AckReason {
KILLED, EXPIRED, NORMAL, REPLACED
NORMAL((byte)0), KILLED((byte)1), EXPIRED((byte)2), REPLACED((byte)3);
private byte value;
AckReason(byte value) {
this.value = value;
}
public byte getVal() {
return value;
}
public static AckReason fromValue(byte value) {
switch (value) {
case 0:
return NORMAL;
case 1:
return KILLED;
case 2:
return EXPIRED;
case 3:
return REPLACED;
default:
// in case a newer version connects with a not known type
// this will just play safe and use the NORMAL ack mode
return NORMAL;
}
}
}

View File

@ -281,7 +281,7 @@ public class LastValueQueue extends QueueImpl {
@Override
public QueueConfiguration getQueueConfiguration() {
return super.getQueueConfiguration().setLastValue(true);
return super.getQueueConfiguration().setLastValue(true).setLastValueKey(lastValueKey);
}
private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
@ -319,11 +319,11 @@ public class LastValueQueue extends QueueImpl {
public void acknowledge(Transaction tx,
MessageReference ref,
AckReason reason,
ServerConsumer consumer) throws Exception {
ServerConsumer consumer, boolean delivering) throws Exception {
if (reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
removeIfCurrent(ref);
}
super.acknowledge(tx, ref, reason, consumer);
super.acknowledge(tx, ref, reason, consumer, delivering);
}
@Override

View File

@ -275,7 +275,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
if (tx == null) {
getQueue().acknowledge(this, reason, consumer);
} else {
getQueue().acknowledge(tx, this, reason, consumer);
getQueue().acknowledge(tx, this, reason, consumer, true);
}
}

View File

@ -1825,20 +1825,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
acknowledge(null, ref, reason, consumer);
acknowledge(null, ref, reason, consumer, true);
}
@Override
public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception {
acknowledge(tx, ref, AckReason.NORMAL, null);
acknowledge(tx, ref, AckReason.NORMAL, null, true);
}
/** The parameter delivering can be sent as false in situation where the ack is coming outside of the context of delivering.
* Example: Mirror replication will call the ack here without any consumer involved. On that case no previous delivery happened,
* hence no information about delivering statistics should be updated. */
@Override
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer, boolean delivering) throws Exception {
boolean transactional = tx != null;
RefsOperation refsOperation = null;
if (transactional) {
refsOperation = getRefsOperation(tx, reason);
refsOperation = getRefsOperation(tx, reason, false, delivering);
}
if (logger.isTraceEnabled()) {
@ -1861,7 +1864,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refsOperation.addAck(ref);
} else {
pageSubscription.ack((PagedReference) ref);
postAcknowledge(ref, reason);
postAcknowledge(ref, reason, delivering);
}
} else {
Message message = ref.getMessage();
@ -1880,7 +1883,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ackAttempts.incrementAndGet();
refsOperation.addAck(ref);
} else {
postAcknowledge(ref, reason);
postAcknowledge(ref, reason, delivering);
}
}
@ -1919,10 +1922,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason) {
return getRefsOperation(tx, ackReason, false);
return getRefsOperation(tx, ackReason, false, true);
}
private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason, boolean ignoreRedlieveryCheck) {
private RefsOperation getRefsOperation(final Transaction tx, AckReason ackReason, boolean ignoreRedlieveryCheck, boolean delivering) {
synchronized (tx) {
RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
@ -1938,6 +1941,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
oper.setIgnoreRedeliveryCheck();
}
oper.setDelivering(delivering);
return oper;
}
}
@ -1949,7 +1954,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) {
getRefsOperation(tx, AckReason.NORMAL, ignoreRedeliveryCheck).addAck(reference);
getRefsOperation(tx, AckReason.NORMAL, ignoreRedeliveryCheck, true).addAck(reference);
}
@Override
@ -1968,11 +1973,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void expire(final MessageReference ref) throws Exception {
expire(ref, null);
expire(ref, null, true);
}
/** The parameter delivering can be sent as false in situation where the ack is coming outside of the context of delivering.
* Example: Mirror replication will call the ack here without any consumer involved. On that case no previous delivery happened,
* hence no information about delivering statistics should be updated. */
@Override
public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
public void expire(final MessageReference ref, final ServerConsumer consumer, boolean delivering) throws Exception {
if (addressSettings.getExpiryAddress() != null) {
createExpiryResources();
@ -1980,12 +1988,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("moving expired reference " + ref + " to address = " + addressSettings.getExpiryAddress() + " from queue=" + this.getName());
}
move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer);
move(null, addressSettings.getExpiryAddress(), null, ref, false, AckReason.EXPIRED, consumer, null, delivering);
} else {
if (logger.isTraceEnabled()) {
logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
}
acknowledge(ref, AckReason.EXPIRED, consumer);
acknowledge(null, ref, AckReason.EXPIRED, consumer, delivering);
}
// potentially auto-delete this queue if this expired the last message
@ -2087,7 +2095,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
incDelivering(ref);
acknowledge(tx, ref, ackReason, null);
acknowledge(tx, ref, ackReason, null, true);
return true;
}
};
@ -2341,7 +2349,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
incDelivering(ref);
expire(tx, ref);
expire(tx, ref, true);
iter.remove();
refRemoved(ref);
count++;
@ -2466,7 +2474,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final Transaction tx = new TransactionImpl(storageManager);
for (MessageReference ref : expiredMessages) {
try {
expire(tx, ref);
expire(tx, ref, true);
refRemoved(ref);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
@ -2538,7 +2546,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refRemoved(ref);
incDelivering(ref);
try {
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null);
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
} catch (Exception e) {
decDelivering(ref);
throw e;
@ -2598,7 +2606,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (!ignored) {
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null);
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
}
return true;
@ -2670,7 +2678,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue);
move(tx, SimpleString.toSimpleString(originalMessageAddress), null, ref, false, AckReason.NORMAL, null, targetQueue, true);
return true;
}
@ -2929,7 +2937,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// page cleanup
continue;
}
acknowledge(tx, ref, AckReason.KILLED, null);
acknowledge(tx, ref, AckReason.KILLED, null, true);
iter.remove();
refRemoved(ref);
txCount++;
@ -3297,7 +3305,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) {
checkIDSupplier(nodeStore);
return messageReferences.removeWithID(serverID, id);
MessageReference reference = messageReferences.removeWithID(serverID, id);
if (reference != null) {
refRemoved(reference);
}
return reference;
}
private void internalAddRedistributor() {
@ -3374,13 +3386,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return messageReferences.size();
}
private void move(final SimpleString toAddress,
final Transaction tx,
final MessageReference ref,
final boolean expiry,
final boolean rejectDuplicate,
final Long queueID) throws Exception {
Message copyMessage = makeCopy(ref, expiry, toAddress);
private RoutingStatus move(final Transaction originalTX,
final SimpleString address,
final Binding binding,
final MessageReference ref,
final boolean rejectDuplicate,
final AckReason reason,
final ServerConsumer consumer,
final Long queueID,
boolean delivering) throws Exception {
Transaction tx;
if (originalTX != null) {
tx = originalTX;
} else {
// if no TX we create a new one to commit at the end
tx = new TransactionImpl(storageManager);
}
Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, address);
Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
if (originalRoutingType != null && originalRoutingType instanceof Byte) {
@ -3394,14 +3418,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), encodedBuffer);
}
postOffice.route(copyMessage, tx, false, rejectDuplicate);
RoutingStatus routingStatus;
{
RoutingContext context = new RoutingContextImpl(tx);
if (reason == AckReason.EXPIRED) {
// we Disable mirror on expiration as the target might be also expiring it
// and this could cause races
// we will only send the ACK for the expiration with the reason=EXPIRE and the expire will be played on the mirror side
context.setMirrorDisabled(true);
}
if (expiry) {
acknowledge(tx, ref, AckReason.EXPIRED, null);
} else {
acknowledge(tx, ref);
routingStatus = postOffice.route(copyMessage, context, false, rejectDuplicate, binding);
}
acknowledge(tx, ref, reason, consumer, delivering);
if (originalTX == null) {
tx.commit();
}
return routingStatus;
}
@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
@ -3563,7 +3599,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return LargeServerMessageImpl.checkLargeMessage(copy, storageManager);
}
private void expire(final Transaction tx, final MessageReference ref) throws Exception {
private void expire(final Transaction tx, final MessageReference ref, boolean delivering) throws Exception {
SimpleString expiryAddress = addressSettings.getExpiryAddress();
if (expiryAddress != null && expiryAddress.length() != 0) {
@ -3574,9 +3610,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (bindingList == null || bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
acknowledge(tx, ref, AckReason.EXPIRED, null);
acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
} else {
move(expiryAddress, tx, ref, true, false, null);
move(tx, expiryAddress, null, ref, false, AckReason.EXPIRED, null, null, delivering);
}
} else {
if (!printErrorExpiring) {
@ -3585,7 +3621,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoAddress(name);
}
acknowledge(tx, ref, AckReason.EXPIRED, null);
acknowledge(tx, ref, AckReason.EXPIRED, null, delivering);
}
if (server != null && server.hasBrokerMessagePlugins()) {
@ -3648,7 +3684,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ref.acknowledge(tx, AckReason.KILLED, null);
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
RoutingStatus status = move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
RoutingStatus status = move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null, null, true);
// this shouldn't happen, but in case it does it's better to log a message than just drop the message silently
if (status.equals(RoutingStatus.NO_BINDINGS) && server.getAddressSettingsRepository().getMatch(getAddress().toString()).isAutoCreateDeadLetterResources()) {
@ -3658,7 +3694,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
ref.acknowledge(tx, AckReason.KILLED, null);
}
@ -3689,35 +3724,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private RoutingStatus move(final Transaction originalTX,
final SimpleString address,
final Binding binding,
final MessageReference ref,
final boolean rejectDuplicate,
final AckReason reason,
final ServerConsumer consumer) throws Exception {
Transaction tx;
if (originalTX != null) {
tx = originalTX;
} else {
// if no TX we create a new one to commit at the end
tx = new TransactionImpl(storageManager);
}
Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, address);
RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
acknowledge(tx, ref, reason, consumer);
if (originalTX == null) {
tx.commit();
}
return routingStatus;
}
/*
* This method delivers the reference on the callers thread - this can give us better latency in the case there is nothing in the queue
*/
@ -3913,10 +3919,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void postAcknowledge(final MessageReference ref, AckReason reason) {
postAcknowledge(ref, reason, true);
}
/** The parameter delivering can be sent as false in situation where the ack is coming outside of the context of delivering.
* Example: Mirror replication will call the ack here without any consumer involved. On that case no previous delivery happened,
* hence no information about delivering statistics should be updated. */
@Override
public void postAcknowledge(final MessageReference ref, AckReason reason, boolean delivering) {
QueueImpl queue = (QueueImpl) ref.getQueue();
try {
queue.decDelivering(ref);
if (delivering) {
queue.decDelivering(ref);
}
if (nonDestructive && reason == AckReason.NORMAL) {
// this is done to tell the difference between actual acks and just a closed consumer in the non-destructive use-case
ref.setInDelivery(false);
@ -4003,7 +4019,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Transaction transaction = new TransactionImpl(storageManager);
for (MessageReference reference : refs) {
incDelivering(reference); // post ack will decrement this, so need to inc
acknowledge(transaction, reference, AckReason.KILLED, null);
acknowledge(transaction, reference, AckReason.KILLED, null, true);
}
transaction.commit();
} catch (Exception e) {

View File

@ -51,6 +51,8 @@ public class RefsOperation extends TransactionOperationAbstract {
*/
protected boolean ignoreRedeliveryCheck = false;
private boolean delivering = true;
private String lingerSessionId = null;
public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) {
@ -60,6 +62,15 @@ public class RefsOperation extends TransactionOperationAbstract {
}
public RefsOperation setDelivering(boolean delivering) {
this.delivering = delivering;
return this;
}
public boolean isDelivering() {
return delivering;
}
// once turned on, we shouldn't turn it off, that's why no parameters
public void setIgnoreRedeliveryCheck() {
ignoreRedeliveryCheck = true;
@ -175,7 +186,7 @@ public class RefsOperation extends TransactionOperationAbstract {
clearLingerRef(ref);
synchronized (ref.getQueue()) {
ref.getQueue().postAcknowledge(ref, reason);
ref.getQueue().postAcknowledge(ref, reason, delivering);
}
}

View File

@ -67,6 +67,8 @@ public class RoutingContextImpl implements RoutingContext {
volatile int version;
boolean mirrorDisabled = false;
private final Executor executor;
private boolean duplicateDetection = true;
@ -92,8 +94,14 @@ public class RoutingContextImpl implements RoutingContext {
}
@Override
public boolean isMirrorController() {
return false;
public boolean isMirrorDisabled() {
return mirrorDisabled;
}
@Override
public RoutingContextImpl setMirrorDisabled(boolean mirrorDisabled) {
this.mirrorDisabled = mirrorDisabled;
return this;
}
@Override

View File

@ -1306,7 +1306,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
MessageReference ref = consumer.removeReferenceByID(messageID);
if (ref != null) {
ref.getQueue().expire(ref, consumer);
ref.getQueue().expire(ref, consumer, true);
}
}

View File

@ -441,7 +441,8 @@ public class RoutingContextTest {
public void acknowledge(Transaction tx,
MessageReference ref,
AckReason reason,
ServerConsumer consumer) throws Exception {
ServerConsumer consumer,
boolean delivering) throws Exception {
}
@ -661,7 +662,7 @@ public class RoutingContextTest {
}
@Override
public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {
public void expire(MessageReference ref, ServerConsumer consumer, boolean delivering) throws Exception {
}
@ -899,6 +900,11 @@ public class RoutingContextTest {
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering) {
}
@Override
public SimpleString getUser() {
return null;

View File

@ -1213,7 +1213,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception {
}
@ -1418,7 +1418,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {
public void expire(MessageReference ref, ServerConsumer consumer, boolean delivering) throws Exception {
}
@ -1633,6 +1633,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering) {
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {

View File

@ -26,18 +26,27 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.PrintStream;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.StringPrintStream;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -54,9 +63,7 @@ import org.junit.Test;
public class BrokerInSyncTest extends AmqpClientTestSupport {
public static final int TIME_BEFORE_RESTART = 1000;
protected static final int AMQP_PORT_2 = 5673;
protected static final int AMQP_PORT_3 = 5674;
private static final Logger logger = Logger.getLogger(BrokerInSyncTest.class);
ActiveMQServer server_2;
@ -221,6 +228,322 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
}
private org.apache.activemq.artemis.core.server.Queue locateQueueWithWait(ActiveMQServer server, String queueName) throws Exception {
Wait.assertTrue(() -> server.locateQueue(queueName) != null);
org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(queueName);
Assert.assertNotNull(queue);
return queue;
}
@Test
public void testExpiryNoReaper() throws Exception {
internalExpiry(false);
}
@Test
public void testExpiry() throws Exception {
internalExpiry(true);
}
private void internalExpiry(boolean useReaper) throws Exception {
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.getConfiguration().addAddressSetting("#", new AddressSettings().setExpiryAddress(SimpleString.toSimpleString("expiryQueue")));
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("expiryQueue"));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration("expiryQueue").setRoutingType(RoutingType.ANYCAST));
if (!useReaper) {
server.getConfiguration().setMessageExpiryScanPeriod(-1);
}
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("expiryQueue"));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration("expiryQueue").setRoutingType(RoutingType.ANYCAST));
if (!useReaper) {
server_2.getConfiguration().setMessageExpiryScanPeriod(-1);
}
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.getConfiguration().addAddressSetting("#", new AddressSettings().setExpiryAddress(SimpleString.toSimpleString("expiryQueue")));
server_2.start();
org.apache.activemq.artemis.core.server.Queue to1 = locateQueueWithWait(server_2, "$ACTIVEMQ_ARTEMIS_MIRROR_to_1");
org.apache.activemq.artemis.core.server.Queue to2 = locateQueueWithWait(server, "$ACTIVEMQ_ARTEMIS_MIRROR_to_2");
org.apache.activemq.artemis.core.server.Queue expiry1 = locateQueueWithWait(server, "expiryQueue");
org.apache.activemq.artemis.core.server.Queue expiry2 = locateQueueWithWait(server_2, "expiryQueue");
server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
org.apache.activemq.artemis.core.server.Queue queueOnServer1 = locateQueueWithWait(server, getQueueName());
org.apache.activemq.artemis.core.server.Queue queueOnServer2 = locateQueueWithWait(server_2, getQueueName());
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();
Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
connection1.start();
Queue queue = session1.createQueue(getQueueName());
MessageProducer producerServer1 = session1.createProducer(queue);
producerServer1.setTimeToLive(1000);
TextMessage message = session1.createTextMessage("test");
message.setIntProperty("i", 0);
message.setStringProperty("server", server.getIdentity());
producerServer1.send(message);
session1.commit();
Wait.assertEquals(1L, queueOnServer1::getMessageCount, 1000, 100);
Wait.assertEquals(1L, queueOnServer2::getMessageCount, 1000, 100);
if (useReaper) {
// if using the reaper, I want to test what would happen with races between the reaper and the Mirror target
// for that reason we only pause the SNFs if we are using the reaper
to1.pause();
to2.pause();
}
Thread.sleep(1500);
if (!useReaper) {
queueOnServer1.expireReferences(); // we will expire in just on queue hoping the other gets through mirror
}
Wait.assertEquals(0L, queueOnServer1::getMessageCount, 2000, 100);
Wait.assertEquals(0L, queueOnServer2::getMessageCount, 2000, 100);
Wait.assertEquals(1L, expiry1::getMessageCount, 1000, 100);
Wait.assertEquals(1L, expiry2::getMessageCount, 1000, 100);
to1.resume();
to2.resume();
if (!useReaper) {
queueOnServer1.expireReferences(); // in just one queue
}
Wait.assertEquals(1L, expiry1::getMessageCount, 1000, 100);
Wait.assertEquals(1L, expiry2::getMessageCount, 1000, 100);
connection1.close();
server_2.stop();
server.stop();
}
@Test
public void testDLA() throws Exception {
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2));
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue"));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST));
server.getConfiguration().setMessageExpiryScanPeriod(-1);
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue"));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST));
server_2.getConfiguration().setMessageExpiryScanPeriod(-1);
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2));
server_2.start();
org.apache.activemq.artemis.core.server.Queue to1 = locateQueueWithWait(server_2, "$ACTIVEMQ_ARTEMIS_MIRROR_to_1");
org.apache.activemq.artemis.core.server.Queue to2 = locateQueueWithWait(server, "$ACTIVEMQ_ARTEMIS_MIRROR_to_2");
org.apache.activemq.artemis.core.server.Queue dlq1 = locateQueueWithWait(server, "deadLetterQueue");
org.apache.activemq.artemis.core.server.Queue dlq2 = locateQueueWithWait(server_2, "deadLetterQueue");
server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
org.apache.activemq.artemis.core.server.Queue queueOnServer1 = locateQueueWithWait(server, getQueueName());
org.apache.activemq.artemis.core.server.Queue queueOnServer2 = locateQueueWithWait(server_2, getQueueName());
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();
Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
connection1.start();
Queue queue = session1.createQueue(getQueueName());
MessageProducer producerServer1 = session1.createProducer(queue);
TextMessage message = session1.createTextMessage("test");
message.setIntProperty("i", 0);
message.setStringProperty("server", server.getIdentity());
producerServer1.send(message);
session1.commit();
MessageConsumer consumer1 = session1.createConsumer(queue);
connection1.start();
for (int i = 0; i < 2; i++) {
TextMessage messageToCancel = (TextMessage) consumer1.receive(1000);
Assert.assertNotNull(messageToCancel);
session1.rollback();
}
Assert.assertNull(consumer1.receiveNoWait());
Wait.assertEquals(0L, queueOnServer1::getMessageCount, 1000, 100);
Wait.assertEquals(0L, queueOnServer2::getMessageCount, 1000, 100);
Wait.assertEquals(1L, dlq1::getMessageCount, 1000, 100);
Wait.assertEquals(1L, dlq2::getMessageCount, 1000, 100);
dlq1.retryMessages(new Filter() {
@Override
public boolean match(Message message) {
return true;
}
@Override
public boolean match(Map<String, String> map) {
return true;
}
@Override
public boolean match(Filterable filterable) {
return true;
}
@Override
public SimpleString getFilterString() {
return SimpleString.toSimpleString("Test");
}
});
Wait.assertEquals(1L, queueOnServer1::getMessageCount, 1000, 100);
Wait.assertEquals(1L, queueOnServer2::getMessageCount, 1000, 100);
Wait.assertEquals(0L, dlq1::getMessageCount, 1000, 100);
Wait.assertEquals(0L, dlq2::getMessageCount, 1000, 100);
connection1.close();
server_2.stop();
server.stop();
}
@Test
public void testLVQ() throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2));
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue"));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST));
String lvqName = "testLVQ_" + RandomUtil.randomString();
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(lvqName));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration(lvqName).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setLastValueKey("KEY"));
server.getConfiguration().setMessageExpiryScanPeriod(-1);
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue"));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST));
server_2.getConfiguration().setMessageExpiryScanPeriod(-1);
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2));
server_2.start();
org.apache.activemq.artemis.core.server.Queue lvqQueue1 = locateQueueWithWait(server, lvqName);
org.apache.activemq.artemis.core.server.Queue lvqQueue2 = locateQueueWithWait(server, lvqName);
Assert.assertTrue(lvqQueue1.isLastValue());
Assert.assertTrue(lvqQueue2.isLastValue());
Assert.assertTrue(lvqQueue1 instanceof LastValueQueue);
Assert.assertTrue(lvqQueue2 instanceof LastValueQueue);
Assert.assertEquals("KEY", lvqQueue1.getQueueConfiguration().getLastValueKey().toString());
Assert.assertEquals("KEY", lvqQueue2.getQueueConfiguration().getLastValueKey().toString());
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
Queue queue = session1.createQueue(lvqName);
MessageProducer producerServer1 = session1.createProducer(queue);
for (int i = 0; i < 1000; i++) {
TextMessage message = session1.createTextMessage("test");
message.setIntProperty("i", 0);
message.setStringProperty("KEY", "" + (i % 10));
producerServer1.send(message);
}
Wait.assertEquals(10L, lvqQueue1::getMessageCount, 2000, 100);
Wait.assertEquals(10L, lvqQueue2::getMessageCount, 2000, 100);
connection1.close();
server_2.stop();
server.stop();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222153"));
}
@Test
public void testSyncData() throws Exception {
int NUMBER_OF_MESSAGES = 100;
@ -324,6 +647,105 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
System.out.println("Queue on Server 1 = " + queueOnServer1.getMessageCount());
System.out.println("Queue on Server 2 = " + queueOnServer2.getMessageCount());
Assert.assertEquals(0, queueOnServer1.getDeliveringCount());
Assert.assertEquals(0, queueOnServer2.getDeliveringCount());
Assert.assertEquals(0, queueOnServer1.getDurableDeliveringCount());
Assert.assertEquals(0, queueOnServer2.getDurableDeliveringCount());
Assert.assertEquals(0, queueOnServer1.getDurableDeliveringSize());
Assert.assertEquals(0, queueOnServer2.getDurableDeliveringSize());
Assert.assertEquals(0, queueOnServer1.getDeliveringSize());
Assert.assertEquals(0, queueOnServer2.getDeliveringSize());
server_2.stop();
server.stop();
}
@Test
public void testStats() throws Exception {
int NUMBER_OF_MESSAGES = 1;
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.start();
server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();
Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
connection1.start();
ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection2 = cf2.createConnection();
Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
connection2.start();
Queue queue = session1.createQueue(getQueueName());
MessageProducer producerServer1 = session1.createProducer(queue);
MessageProducer producerServer2 = session2.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = session1.createTextMessage("test " + i);
message.setIntProperty("i", i);
message.setStringProperty("server", server.getIdentity());
producerServer1.send(message);
}
session1.commit();
org.apache.activemq.artemis.core.server.Queue queueOnServer1 = server.locateQueue(getQueueName());
org.apache.activemq.artemis.core.server.Queue queueOnServer2 = server_2.locateQueue(getQueueName());
Assert.assertNotNull(queueOnServer1);
Assert.assertNotNull(queueOnServer2);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount);
Assert.assertEquals(0, queueOnServer1.getDeliveringSize());
Assert.assertEquals(0, queueOnServer2.getDeliveringSize());
MessageConsumer consumerOn1 = session1.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumerOn1.receive(5000);
logger.debug("### Client acking message(" + i + ") on server 1, a message that was original sent on " + message.getStringProperty("server") + " text = " + message.getText());
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
Assert.assertEquals("test " + i, message.getText());
session1.commit();
int fi = i;
}
Wait.assertEquals(0L, queueOnServer1::getMessageCount, 2000, 100);
Wait.assertEquals(0L, queueOnServer2::getMessageCount, 2000, 100);
Assert.assertEquals(0, queueOnServer1.getDeliveringCount());
Assert.assertEquals(0, queueOnServer2.getDeliveringCount());
Assert.assertEquals(0, queueOnServer1.getDurableDeliveringCount());
Assert.assertEquals(0, queueOnServer2.getDurableDeliveringCount());
Assert.assertEquals(0, queueOnServer1.getDurableDeliveringSize());
Assert.assertEquals(0, queueOnServer2.getDurableDeliveringSize());
Assert.assertEquals(0, queueOnServer1.getDeliveringSize());
Assert.assertEquals(0, queueOnServer2.getDeliveringSize());
server_2.stop();
server.stop();
}
@ -500,7 +922,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
try (Connection connection = factory1.createConnection()) {
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
Wait.assertEquals(0, serverQueue::getMessageCount);
Wait.assertEquals(0L, serverQueue::getMessageCount, 2000, 100);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorMessageFactory;
@ -92,7 +93,7 @@ public class MirrorControllerBasicTest extends ActiveMQTestBase {
server.addAddressInfo(new AddressInfo("test").addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration("test").setAddress("test").setRoutingType(RoutingType.ANYCAST));
Message message = AMQPMirrorMessageFactory.createMessage("test", SimpleString.toSimpleString("ad1"), SimpleString.toSimpleString("qu1"), "test", "someUID", "body-test");
Message message = AMQPMirrorMessageFactory.createMessage("test", SimpleString.toSimpleString("ad1"), SimpleString.toSimpleString("qu1"), "test", "someUID", "body-test", AckReason.KILLED);
AMQPMirrorControllerSource.route(server, message);
AmqpClient client = new AmqpClient(new URI("tcp://localhost:61616"), null, null);
@ -108,6 +109,9 @@ public class MirrorControllerBasicTest extends ActiveMQTestBase {
Assert.assertEquals("qu1", amqpMessage.getMessageAnnotation(AMQPMirrorControllerSource.QUEUE.toString()));
Assert.assertEquals("someUID", amqpMessage.getMessageAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString()));
Assert.assertEquals("test", amqpMessage.getMessageAnnotation(AMQPMirrorControllerSource.EVENT_TYPE.toString()));
Number ackReason = (Number)amqpMessage.getMessageAnnotation("x-opt-amq-mr-ack-reason");
Assert.assertEquals(AckReason.KILLED.getVal(), ackReason.byteValue());
Assert.assertEquals(AckReason.KILLED, AckReason.fromValue(ackReason.byteValue()));
connection.close();

View File

@ -396,7 +396,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer, boolean decDel) throws Exception {
// no-op
}
@ -492,7 +492,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
public void expire(final MessageReference ref, final ServerConsumer consumer, boolean decDel) throws Exception {
// no-op
}
@ -966,6 +966,10 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return null;
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering) {
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
}