ARTEMIS-4233 Large Message Issues After Failed Clients

- interrupted message breaking reference counting
After the server writing to the client is interrupted in AMQP, the reference counting was broken what would require the server restarted
in order to cleanup the files of any interrupted sends.

- Removed consumer during large message delivery damaging large messages
If the consumer failed to deliver messages for any reason, the message on the queue would be duplicated. what would wipe out the body of the message
and other journal errors would happen because of this.

extra debug capabilities added into RefCountMessage as part of ARTEMIS-4206 in order to identify these issues
This commit is contained in:
Clebert Suconic 2023-03-27 13:47:11 -04:00 committed by clebertsuconic
parent 23bbf76bdf
commit 6d3dbc4383
31 changed files with 1176 additions and 87 deletions

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.lang.ref.Cleaner;
public class ObjectCleaner {
private static final Cleaner cleaner;
static {
cleaner = Cleaner.create();
}
public static void register(Object obj, Runnable callback) {
cleaner.register(obj, callback);
}
}

View File

@ -16,13 +16,115 @@
*/
package org.apache.activemq.artemis.api.core;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
// import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; -- #ifdef DEBUG
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.ObjectCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;
/**
* RefCountMessage is a base-class for any message intending to do reference counting. Currently it is used for
* large message removal.
*
* Additional validation on reference counting will be done If you set a system property named "ARTEMIS_REF_DEBUG" and enable logging on this class.
* Additional logging output will be written when reference counting is broken and these debug options are applied.
* */
public class RefCountMessage {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final boolean REF_DEBUG = System.getProperty("ARTEMIS_REF_DEBUG") != null;
public static boolean isRefDebugEnabled() {
return REF_DEBUG && logger.isDebugEnabled();
}
public static boolean isRefTraceEnabled() {
return REF_DEBUG && logger.isTraceEnabled();
}
/** Sub classes constructors willing to debug reference counts,
* can register the objectCleaner through this method. */
protected void registerDebug() {
if (debugStatus == null) {
debugStatus = new DebugState(this.toString());
ObjectCleaner.register(this, debugStatus);
}
}
private static class DebugState implements Runnable {
private final ArrayList<Exception> debugCrumbs = new ArrayList<>();
// this means the object is accounted for and it should not print any warnings
volatile boolean accounted;
volatile boolean referenced;
String description;
/**
* Notice: This runnable cannot hold any reference back to message otherwise it won't ever happen and you will get a memory leak.
* */
Runnable runWhenLeaked;
DebugState(String description) {
this.description = description;
addDebug("registered");
}
/** this marks the Status as accounted for
* and no need to report an issue when DEBUG hits */
void accountedFor() {
accounted = true;
}
static String getTime() {
return Instant.now().toString();
}
void addDebug(String event) {
debugCrumbs.add(new Exception(event + " at " + getTime()));
if (accounted) {
logger.debug("Message Previously Released {}, {}, \n{}", description, event, debugLocations());
}
}
void up(String description) {
referenced = true;
debugCrumbs.add(new Exception("up:" + description + " at " + getTime()));
}
void down(String description) {
debugCrumbs.add(new Exception("down:" + description + " at " + getTime()));
}
@Override
public void run() {
if (!accounted && referenced) {
runWhenLeaked.run();
logger.debug("Message Leaked reference counting{}\n{}", description, debugLocations());
}
}
String debugLocations() {
StringWriter writer = new StringWriter();
PrintWriter outWriter = new PrintWriter(writer);
outWriter.println("Locations:");
debugCrumbs.forEach(e -> e.printStackTrace(outWriter));
return writer.toString();
}
}
private DebugState debugStatus;
private static final AtomicIntegerFieldUpdater<RefCountMessage> DURABLE_REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "durableRefCount");
private static final AtomicIntegerFieldUpdater<RefCountMessage> REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "refCount");
private static final AtomicIntegerFieldUpdater<RefCountMessage> REF_USAGE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "usageCount");
@ -35,7 +137,41 @@ public class RefCountMessage {
private volatile int usageCount = 0;
private volatile boolean fired = false;
private volatile boolean released = false;
/** has the refCount fired the action already? */
public boolean isReleased() {
return released;
}
public String debugLocations() {
if (debugStatus != null) {
return debugStatus.debugLocations();
} else {
return "";
}
}
public static void deferredDebug(Message message, String debugMessage, Object... args) {
if (message instanceof RefCountMessage && isRefDebugEnabled()) {
deferredDebug((RefCountMessage) message, debugMessage, args);
}
}
public static void deferredDebug(RefCountMessage message, String debugMessage, Object... args) {
String formattedDebug = MessageFormatter.arrayFormat(debugMessage, args).getMessage();
message.deferredDebug(formattedDebug);
}
/** Deferred debug, that will be used in case certain conditions apply to the RefCountMessage */
public void deferredDebug(String message) {
if (parentRef != null) {
parentRef.deferredDebug(message);
}
if (debugStatus != null) {
debugStatus.addDebug(message);
}
}
public int getRefCount() {
return REF_COUNT_UPDATER.get(this);
@ -58,55 +194,44 @@ public class RefCountMessage {
public RefCountMessage getParentRef() {
return parentRef;
}
// I am usually against keeping commented out code
// However this is very useful for me to debug referencing counting.
// Uncomment out anything between #ifdef DEBUG and #endif
// #ifdef DEBUG -- comment out anything before endif if you want to debug REFERENCE COUNTS
//final ConcurrentHashSet<Exception> upSet = new ConcurrentHashSet<>();
// #endif
private void onUp() {
// #ifdef DEBUG -- comment out anything before endif if you want to debug REFERENCE COUNTS
// upSet.add(new Exception("upEvent(" + debugString() + ")"));
// #endif
protected void onUp() {
if (debugStatus != null) {
debugStatus.up(counterString());
}
}
private void onDown() {
// #ifdef DEBUG -- comment out anything before endif if you want to debug REFERENCE COUNTS
// upSet.add(new Exception("upEvent(" + debugString() + ")"));
// #endif
if (getRefCount() <= 0 && getUsage() <= 0 && getDurableCount() <= 0 && !fired) {
protected void released() {
released = true;
accountedFor();
}
debugRefs();
fired = true;
void runOnLeak(Runnable run) {
if (debugStatus != null) {
debugStatus.runWhenLeaked = run;
}
}
public void accountedFor() {
if (debugStatus != null) {
debugStatus.accountedFor();
}
}
protected void onDown() {
if (debugStatus != null) {
debugStatus.down(counterString());
}
if (getRefCount() <= 0 && getUsage() <= 0 && getDurableCount() <= 0 && !released) {
released();
releaseComplete();
}
}
/**
*
* This method will be useful if you remove commented out code around #ifdef AND #endif COMMENTS
* */
public final void debugRefs() {
// #ifdef DEBUG -- comment out anything before endif if you want to debug REFERENCE COUNTS
// try {
// System.err.println("************************************************************************************************************************");
// System.err.println("Printing refcounts for " + debugString() + " this = " + this);
// for (Exception e : upSet) {
// e.printStackTrace();
// }
// System.err.println("************************************************************************************************************************");
// } catch (Throwable e) {
// e.printStackTrace();
// }
// #ifdef DEBUG -- comment out anything before endif if you want to debug REFERENCE COUNTS
}
public String debugString() {
protected String counterString() {
return "refCount=" + getRefCount() + ", durableRefCount=" + getDurableCount() + ", usageCount=" + getUsage() + ", parentRef=" + this.parentRef;
}
public void setParentRef(RefCountMessage origin) {
// if copy of a copy.. just go to the parent:
if (origin.getParentRef() != null) {
@ -127,6 +252,7 @@ public class RefCountMessage {
onUp();
return count;
}
public int usageDown() {
if (parentRef != null) {
return parentRef.usageDown();
@ -150,15 +276,28 @@ public class RefCountMessage {
return parentRef.durableDown();
}
int count = DURABLE_REF_COUNT_UPDATER.decrementAndGet(this);
if (count < 0) {
reportNegativeCount();
}
onDown();
return count;
}
private void reportNegativeCount() {
if (debugStatus != null) {
debugStatus.addDebug("Negative counter " + counterString());
}
ActiveMQClientLogger.LOGGER.negativeRefCount(String.valueOf(this), counterString(), debugLocations());
}
public int refDown() {
if (parentRef != null) {
return parentRef.refDown();
}
int count = REF_COUNT_UPDATER.decrementAndGet(this);
if (count < 0) {
reportNegativeCount();
}
onDown();
return count;
}
@ -186,5 +325,4 @@ public class RefCountMessage {
}
userContext.put(key, value);
}
}

View File

@ -344,4 +344,7 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 214033, value = "Cannot resolve host ", level = LogMessage.Level.ERROR)
void unableToResolveHost(UnknownHostException e);
@LogMessage(id = 214034, value = "{} has negative counts {}\n{}", level = LogMessage.Level.ERROR)
void negativeRefCount(String message, String count, String debugString);
}

View File

@ -92,6 +92,12 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
/** this is used to parse the initial packets from the buffer */
private CompositeReadableBuffer parsingBuffer;
private void checkDebug() {
if (isRefDebugEnabled()) {
registerDebug();
}
}
public AMQPLargeMessage(long id,
long messageFormat,
TypedProperties extraProperties,
@ -101,6 +107,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
this.setMessageID(id);
largeBody = new LargeBody(this, storageManager);
this.storageManager = storageManager;
checkDebug();
}
public AMQPLargeMessage(long id,
@ -113,6 +120,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
this.setMessageID(id);
this.largeBody = largeBody;
this.storageManager = storageManager;
checkDebug();
}
/**
@ -127,6 +135,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
this.storageManager = copy.largeBody.getStorageManager();
this.reencoded = copy.reencoded;
setMessageID(newID);
checkDebug();
}
public void releaseEncodedBuffer() {
@ -436,6 +445,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
@Override
public void deleteFile() throws Exception {
accountedFor(); // if LargeServerMessage.DEBUG this will make sure this message is not reported
largeBody.deleteFile();
}

View File

@ -674,8 +674,8 @@ public class AMQPSessionCallback implements SessionCallback {
}
@Override
public void disconnect(ServerConsumer consumer, SimpleString queueName) {
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
public void disconnect(ServerConsumer consumer, String errorMessage) {
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, errorMessage);
connection.runNow(() -> {
try {
((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);

View File

@ -335,4 +335,3 @@ public class AMQPStandardMessage extends AMQPMessage {
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
@ -313,12 +314,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
protonSession.removeSender(sender);
connection.runLater(() -> {
connection.runNow(() -> {
sender.close();
try {
sessionSPI.closeSender(brokerConsumer);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
lmUsageDown();
}
sender.close();
connection.flush();
@ -358,6 +361,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} catch (Exception e) {
logger.warn(e.getMessage(), e);
throw new ActiveMQAMQPInternalErrorException(e.getMessage());
} finally {
// check if there is a pending large message
// and ref count down its usage
lmUsageDown();
}
}
@ -436,6 +443,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// we have to individual ack as we can't guarantee we will get the delivery
// (including acks) in order from dealer, a performance hit but a must
try {
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(message, "Adding ACK message to TX {}", (tx == null ? "no-tx" : tx.getID()));
}
sessionSPI.ack(tx, brokerConsumer, message);
tx.addDelivery(delivery, this);
} catch (Exception e) {
@ -808,7 +818,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (localRunnable != null) {
localRunnable.run();
}
pendingLargeMessage = null;
hasLarge = false;
brokerConsumer.promptDelivery();
}
@ -818,9 +827,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
AMQPLargeMessage lm = null;
if (pendingLargeMessage != null) {
lm = pendingLargeMessage.message;
pendingLargeMessage = null;
}
if (lm != null) {
lm.usageDown();
connection.runNow(lm::usageDown);
}
}

View File

@ -79,7 +79,7 @@ public class MQTTSessionCallback implements SessionCallback {
}
@Override
public void disconnect(ServerConsumer consumer, SimpleString queueName) {
public void disconnect(ServerConsumer consumer, String errorMessage) {
try {
consumer.removeItself();
} catch (Exception e) {

View File

@ -354,11 +354,11 @@ public class AMQSession implements SessionCallback {
}
@Override
public void disconnect(ServerConsumer serverConsumer, SimpleString queueName) {
public void disconnect(ServerConsumer serverConsumer, String errorMessage) {
// for an openwire consumer this is fatal because unlike with activemq5 sending
// to the address will not auto create the consumer binding and it will be in limbo.
// forcing disconnect allows it to failover and recreate its binding.
final IOException forcePossibleFailoverReconnect = new IOException("Destination : " + queueName + " has been deleted.");
final IOException forcePossibleFailoverReconnect = new IOException(errorMessage);
try {
connection.serviceException(forcePossibleFailoverReconnect);
} catch (Exception ignored) {

View File

@ -226,7 +226,7 @@ public class StompSession implements SessionCallback {
}
@Override
public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
public void disconnect(ServerConsumer consumerId, String errorDescription) {
StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());
if (stompSubscription != null) {
StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);

View File

@ -252,7 +252,7 @@ public class ManagementRemotingConnection implements RemotingConnection {
}
@Override
public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
public void disconnect(ServerConsumer consumerId, String message) {
}
@Override

View File

@ -116,15 +116,21 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
private final StorageManager storageManager;
public long getBodySize() throws ActiveMQException {
return largeBody.getBodySize();
}
private void checkDebug() {
if (isRefDebugEnabled()) {
registerDebug();
}
}
public LargeServerMessageImpl(final StorageManager storageManager) {
largeBody = new LargeBody(this, storageManager);
this.storageManager = storageManager;
}
public long getBodySize() throws ActiveMQException {
return largeBody.getBodySize();
}
/**
* Copy constructor
*
@ -160,7 +166,13 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
} else {
return new java.util.Date(timestamp).toString();
}
}
@Override
public LargeServerMessageImpl setMessageID(long messageID) {
super.setMessageID(messageID);
checkDebug();
return this;
}
@Override
@ -251,6 +263,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
@Override
public void deleteFile() throws Exception {
released();
synchronized (largeBody) {
largeBody.deleteFile();
}

View File

@ -104,6 +104,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
@Override
public void deleteFile() throws Exception {
released();
// nothing to be done here.. we don really have a file on this Storage
}

View File

@ -162,11 +162,11 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
public void disconnect(ServerConsumer consumerId, String errorMessage) {
if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {
channel.send(new DisconnectConsumerMessage(consumerId.getID()));
} else {
ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName.toString());
ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(errorMessage);
}
}

View File

@ -815,8 +815,8 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222168, value = "The '" + TransportConstants.PROTOCOL_PROP_NAME + "' property is deprecated. If you want this Acceptor to support multiple protocols, use the '" + TransportConstants.PROTOCOLS_PROP_NAME + "' property, e.g. with value 'CORE,AMQP,STOMP'", level = LogMessage.Level.WARN)
void warnDeprecatedProtocol();
@LogMessage(id = 222169, value = "You have old legacy clients connected to the queue {} and we can't disconnect them, these clients may just hang", level = LogMessage.Level.WARN)
void warnDisconnectOldClient(String queueName);
@LogMessage(id = 222169, value = "Server needs to disconnect the consumer because of ( {} ) but you have a legacy client connected and it cannot do so, these consumers may just hang", level = LogMessage.Level.WARN)
void warnDisconnectOldClient(String message);
@LogMessage(id = 222170, value = "Bridge {} forwarding address {} has confirmation-window-size ({}) greater than address' max-size-bytes' ({})", level = LogMessage.Level.WARN)
void bridgeConfirmationWindowTooSmall(String bridgeName, String address, int windowConfirmation, long maxSizeBytes);

View File

@ -93,6 +93,8 @@ public interface Consumer extends PriorityAware {
*/
void disconnect();
void failed(Throwable t);
/** an unique sequential ID for this consumer */
long sequentialID();

View File

@ -491,6 +491,17 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
@Override
public void failed(Throwable t) {
if (t instanceof ActiveMQException) {
connectionFailed((ActiveMQException) t, false);
} else {
ActiveMQException exception = new ActiveMQException(t.getMessage());
exception.initCause(t);
connectionFailed(exception, false);
}
}
/* Hook for processing message before forwarding */
protected Message beforeForward(Message message, final SimpleString forwardingAddress) {
message = message.copy();
@ -549,6 +560,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override
public HandleStatus handle(final MessageReference ref) throws Exception {
if (RefCountMessage.isRefTraceEnabled() && ref.getMessage() instanceof RefCountMessage) {
RefCountMessage.deferredDebug(ref.getMessage(), "Going through the bridge");
}
if (filter != null && !filter.match(ref.getMessage())) {
logger.trace("message reference {} is no match for bridge {}", ref, configuration.getName());
return HandleStatus.NO_MATCH;

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
@ -125,6 +126,10 @@ public class Redistributor implements Consumer {
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(reference.getMessage(), "redistributing");
}
ackRedistribution(reference, tx);
return HandleStatus.HANDLED;
@ -135,6 +140,12 @@ public class Redistributor implements Consumer {
// no op
}
@Override
public void failed(Throwable t) {
// no op... there's no proceedDeliver on this class
}
private void ackRedistribution(final MessageReference reference, final Transaction tx) throws Exception {
reference.handled();

View File

@ -50,6 +50,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
@ -1108,6 +1109,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
}
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
if (ringSize != -1) {
@ -1133,6 +1135,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
}
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (QueueImpl.this) {
if (ringSize != -1) {
@ -1248,6 +1251,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (scheduleIfPossible(ref)) {
return;
}
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(ref.getMessage(), "add tail queue {}", this.getName());
}
if (direct && supportsDirectDeliver && !directDeliver && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
logger.trace("Checking to re-enable direct deliver on queue {}", name);
@ -2943,6 +2949,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* @param ref
*/
private void internalAddHead(final MessageReference ref) {
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(ref.getMessage(), "add head queue {}", this.getAddress());
}
queueMemorySize.addSize(ref.getMessageMemoryEstimate());
pendingMetrics.incrementMetrics(ref);
refAdded(ref);
@ -2962,6 +2971,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* @param ref
*/
private void internalAddSorted(final MessageReference ref) {
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(ref.getMessage(), "add sorted queue {}", this.getAddress());
}
queueMemorySize.addSize(ref.getMessageMemoryEstimate());
pendingMetrics.incrementMetrics(ref);
refAdded(ref);
@ -3961,22 +3973,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/** This will print errors and decide what to do with the errored consumer from the protocol layer. */
@Override
public void errorProcessing(Consumer consumer, Throwable t, MessageReference reference) {
executor.execute(() -> internalErrorProcessing(consumer, t, reference));
}
private void internalErrorProcessing(Consumer consumer, Throwable t, MessageReference reference) {
synchronized (this) {
ActiveMQServerLogger.LOGGER.removingBadConsumer(consumer, reference, t);
// If the consumer throws an exception we remove the consumer
try {
removeConsumer(consumer);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
}
// The message failed to be delivered, hence we try again
addHead(reference, false);
}
ActiveMQServerLogger.LOGGER.removingBadConsumer(consumer, reference, t);
executor.execute(() -> consumer.failed(t));
}
private boolean checkExpired(final MessageReference reference) {
@ -4011,7 +4009,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// If the consumer throws an exception we remove the consumer
try {
removeConsumer(consumer);
errorProcessing(consumer, t, reference);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
}

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -165,6 +166,9 @@ public class RefsOperation extends TransactionOperationAbstract {
}
protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(ref.getMessage(), "RollbackDelivery");
}
// if ignore redelivery check, we just perform redelivery straight
if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck).getA()) {
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
@ -997,6 +998,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
tx.markAsRollbackOnly(ils);
throw ils;
}
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(ref.getMessage(), "Individually acked on tx={}", tx.getID());
}
metrics.addAcknowledge(ref.getMessage().getEncodeSize(), tx);
ref.acknowledge(tx, this);
@ -1061,6 +1067,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public synchronized void backToDelivering(MessageReference reference) {
synchronized (lock) {
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(reference.getMessage(), "Adding message back to delivering");
}
logger.trace("Message {} back to delivering", reference);
deliveringRefs.addFirst(reference);
metrics.addMessage(reference.getMessage().getEncodeSize());
}
@ -1078,21 +1088,28 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// This is an optimization, if the reference is the first one, we just poll it.
// But first we need to make sure deliveringRefs isn't empty
if (deliveringRefs.isEmpty()) {
logger.trace("removeReferenceByID {} return null", messageID);
return null;
}
if (deliveringRefs.peek().getMessage().getMessageID() == messageID) {
MessageReference ref = deliveringRefs.poll();
if (logger.isTraceEnabled()) {
logger.trace("Remove Message By ID {} return ref {} after peek call", messageID, ref);
}
return ref;
}
//slow path in a separate method
MessageReference ref = removeDeliveringRefById(messageID);
if (logger.isTraceEnabled()) {
logger.trace("Remove Message By ID {} return ref {} after scan call", messageID, ref);
}
return ref;
}
}
private MessageReference removeDeliveringRefById(long messageID) {
assert deliveringRefs.peek().getMessage().getMessageID() != messageID;
logger.trace("RemoveDeiveringRefByID {}", messageID);
Iterator<MessageReference> iter = deliveringRefs.iterator();
@ -1106,6 +1123,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref = theRef;
logger.trace("Returning {}", theRef);
break;
}
}
@ -1134,7 +1153,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void disconnect() {
callback.disconnect(this, getQueue().getName());
callback.disconnect(this, "Queue deleted: " + getQueue().getName());
}
@Override
public void failed(Throwable t) {
try {
this.close(true);
} catch (Throwable e2) {
logger.warn(e2.getMessage(), e2);
}
if (callback != null) {
callback.disconnect(this, t.getMessage());
}
}
public float getRate() {

View File

@ -90,7 +90,7 @@ public interface SessionCallback {
void closed();
void disconnect(ServerConsumer consumerId, SimpleString queueName);
void disconnect(ServerConsumer consumerId, String errorMessage);
boolean isWritable(ReadyListener callback, Object protocolContext);

View File

@ -284,8 +284,10 @@ public class ThreadLeakCheckRule extends TestWatcher {
} else if (threadName.contains("GC Daemon")) {
return true;
} else {
// validating for known stack traces
for (StackTraceElement element : thread.getStackTrace()) {
if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener") ||
element.getClassName().contains("jdk.internal.ref.CleanerImpl")) {
return true;
}
}

View File

@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.util;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** This Proxy is based in one of the Netty Examples:
* https://github.com/netty/netty/tree/ccc5e01f0444301561f055b02cd7c1f3e875bca7/example/src/main/java/io/netty/example/proxy
* */
public final class TcpProxy implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ArrayList<OutboundHandler> outbound = new ArrayList<>();
ArrayList<InboundHandler> inbound = new ArrayList();
public List<OutboundHandler> getOutbounddHandlers() {
return outbound;
}
public List<InboundHandler> getInboundHandlers() {
return inbound;
}
public void stopAllHandlers() {
inbound.forEach(i -> i.setReadable(false));
outbound.forEach(i -> i.setReadable(false));
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
public static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
int localPort;
String remoteHost;
int remotePort;
boolean logging;
public TcpProxy(String remoteHost, int remotePort, int localPort, boolean logging) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.localPort = localPort;
this.logging = logging;
}
/** Try a Core Protocol connection until successful */
public void tryCore(String user, String password) {
ConnectionFactory cf = CFUtil.createConnectionFactory("CORE", "tcp://" + remoteHost + ":" + localPort);
// try to connect a few time, to make sure the proxy is up
boolean succeeded = false;
for (int i = 0; i < 10; i++) {
try (Connection connection = cf.createConnection(user, password)) {
succeeded = true;
break;
} catch (Exception e) {
try {
Thread.sleep(100);
} catch (Exception ignored) {
}
}
}
if (!succeeded) {
throw new IllegalStateException("Proxy did not work as expected");
}
inbound.clear();
outbound.clear();
}
Thread thread;
public void startProxy() {
thread = new Thread(this);
thread.start();
}
public void stopProxy() throws Exception {
stopProxy(5000);
}
public void stopProxy(int timeoutMillis) throws Exception {
channelFuture.cancel(true);
thread.join(timeoutMillis);
if (thread.isAlive()) {
throw new RuntimeException("Proxy thread still alive");
}
}
ChannelFuture channelFuture;
@Override
public void run() {
logger.info("Proxying {} to {}", localPort, remotePort);
// Configure the bootstrap.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class);
if (logging) {
b.handler(new LoggingHandler(LogLevel.INFO));
}
channelFuture = b.childHandler(new ProxyInitializer(remoteHost, remotePort))
.childOption(ChannelOption.AUTO_READ, false)
.bind(localPort).sync().channel().closeFuture();
channelFuture.sync();
logger.info("done");
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
} finally {
bossGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}
class ProxyInitializer extends ChannelInitializer<SocketChannel> {
private final String remoteHost;
private final int remotePort;
ProxyInitializer(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (logging) {
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
OutboundHandler outboundHandler = new OutboundHandler();
TcpProxy.this.outbound.add(outboundHandler);
pipeline.addLast(outboundHandler);
}
}
public class OutboundHandler extends ChannelInboundHandlerAdapter {
// As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
// the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
private Channel outboundChannel;
volatile boolean readable = true;
public OutboundHandler setReadable(boolean readable) {
this.readable = readable;
if (readable) {
outboundChannel.read();
}
return this;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
InboundHandler inboundHandler = new InboundHandler(inboundChannel);
TcpProxy.this.inbound.add(inboundHandler);
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(inboundHandler)
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(future -> {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
if (readable) {
ctx.channel().read();
}
} else {
new Exception("Closing").printStackTrace();
future.channel().close();
}
});
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
TcpProxy.closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
TcpProxy.closeOnFlush(ctx.channel());
}
}
public class InboundHandler extends ChannelInboundHandlerAdapter {
private final Channel inboundChannel;
public InboundHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
volatile boolean readable = true;
public InboundHandler setReadable(boolean readable) {
this.readable = readable;
if (readable) {
inboundChannel.read();
}
return this;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
if (readable) {
ctx.channel().read();
}
} else {
new Exception("Closing").printStackTrace();
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
TcpProxy.closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
TcpProxy.closeOnFlush(ctx.channel());
}
}
}

View File

@ -36,6 +36,11 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
public void failed(Throwable t) {
}
@Override
public SlowConsumerDetectionListener getSlowConsumerDetecion() {
return null;

View File

@ -567,8 +567,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
@Override
public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
//To change body of implemented methods use File | Settings | File Templates.
public void disconnect(ServerConsumer consumerId, String errorMessage) {
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
public class RefCountMessageAccessor {
public static void setRunOnLeak(RefCountMessage message, Runnable runnable) {
message.runOnLeak(runnable);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.leak;
import java.util.concurrent.TimeUnit;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RefCountMessageAccessor;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Assert;
import org.junit.Test;
public class RefCountMessageLeakTest extends ActiveMQTestBase {
static class DebugMessage extends RefCountMessage {
final String string;
DebugMessage(String str) {
this.string = str;
registerDebug();
}
@Override
public String toString() {
return "debugMessage(" + string + ")";
}
public void fired() {
this.released();
}
}
@Test
public void testLeakRefCount() throws Exception {
String strMessage = RandomUtil.randomString();
String strMessageFired = RandomUtil.randomString();
ReusableLatch latchLeaked = new ReusableLatch(1);
DebugMessage message = new DebugMessage(strMessage);
message.refUp();
message.durableUp();
RefCountMessageAccessor.setRunOnLeak(message, latchLeaked::countDown);
message = null;
// I know it's null, I'm just doing this to make sure there are no optimizations from the JVM delaying the GC cleanup
Assert.assertNull(message);
MemoryAssertions.assertMemory(new CheckLeak(), 0, RefCountMessage.class.getName());
Assert.assertTrue(latchLeaked.await(1, TimeUnit.SECONDS));
DebugMessage message2 = new DebugMessage(strMessageFired);
message2.refUp();
latchLeaked.setCount(1);
RefCountMessageAccessor.setRunOnLeak(message2, latchLeaked::countDown);
message2.fired();
message2 = null;
// I know it's null, I'm just doing this to make sure there are no optimizations from the JVM delaying the GC cleanup
Assert.assertNull(message2);
MemoryAssertions.assertMemory(new CheckLeak(), 0, RefCountMessage.class.getName());
Assert.assertFalse(latchLeaked.await(100, TimeUnit.MILLISECONDS));
}
}

View File

@ -0,0 +1,384 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.interruptlm;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Locale;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.TcpProxy;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Test various scenarios with broker communication in large message */
public class LargeMessageFrozenTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
TcpProxy proxy;
ActiveMQServer server;
@Before
public void startServer() throws Exception {
server = createServer(true, true);
server.getConfiguration().addAcceptorConfiguration("alternate", "tcp://localhost:44444?amqpIdleTimeout=100");
server.start();
}
private void startProxy() {
proxy = new TcpProxy("localhost", 44444, 33333, false);
proxy.startProxy();
runAfter(proxy::stopProxy);
proxy.tryCore(null, null);
}
@Test
public void testFreezeCore() throws Exception {
testFreeze("CORE");
}
@Test
public void testFreezeAMQP() throws Exception {
testFreeze("AMQP");
}
public void testFreeze(String protocol) throws Exception {
startProxy();
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
Assert.assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
Assert.assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
Assert.assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333");
}
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
Connection connection = factory.createConnection();
runAfter(connection::close);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
Assert.assertEquals(1, proxy.getInboundHandlers().size());
Assert.assertEquals(1, proxy.getOutbounddHandlers().size());
String body;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 10 * 1024 * 1024) {
buffer.append("Not so big, but big!!");
}
body = buffer.toString();
}
int NUMBER_OF_MESSAGES = 10;
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage(body));
}
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
boolean failed = false;
for (int repeat = 0; repeat < 5; repeat++) {
try {
for (int i = 0; i < 1; i++) {
Assert.assertNotNull(consumer.receive(1000));
}
proxy.stopAllHandlers();
consumer.receive(100);
connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // just to force an exception
} catch (Exception expected) {
logger.info(expected.getMessage(), expected);
failed = true;
}
Assert.assertTrue(failed);
server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("forced failure")));
connection = factory.createConnection();
connection.start();
runAfter(connection::close);
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(queue);
}
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(body, message.getText());
session.commit();
}
Wait.assertEquals(0, () -> {
System.gc();
return server.getConfiguration().getLargeMessagesLocation().listFiles().length;
});
}
@Test
public void testRemoveConsumerCORE() throws Exception {
testRemoveConsumer("CORE");
}
@Test
public void testRemoveConsumerAMQP() throws Exception {
testRemoveConsumer("AMQP");
}
@Test
public void testRemoveConsumerOpenWire() throws Exception {
testRemoveConsumer("OPENWIRE");
}
public void testRemoveConsumer(String protocol) throws Exception {
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:44444?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
Assert.assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
Assert.assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
Assert.assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:44444?amqp.idleTimeout=300&jms.prefetchPolicy.all=10");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:44444");
}
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
Connection connection = factory.createConnection();
runAfter(connection::close);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
String body;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 300 * 1024) {
buffer.append("Not so big, but big!!");
}
body = buffer.toString();
}
int NUMBER_OF_MESSAGES = 10;
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage(body));
}
session.commit();
ArrayList<MessageReference> queueMessages = new ArrayList<>();
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount);
serverQueue.forEach(queueMessages::add);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Assert.assertEquals(1, serverQueue.getConsumers().size());
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) serverQueue.getConsumers().iterator().next();
TextMessage message = (TextMessage) consumer.receive(100);
Assert.assertNotNull(message);
Assert.assertEquals(body, message.getText());
serverConsumer.errorProcessing(new Exception("Dumb error"), queueMessages.get(0));
try {
consumer.receiveNoWait();
} catch (Exception e) {
e.printStackTrace();
}
server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("forced failure")));
connection = factory.createConnection();
runAfter(connection::close);
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(queue);
connection.start();
long recCount = serverQueue.getMessageCount();
for (int i = 0; i < recCount; i++) {
TextMessage recMessage = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(recMessage);
Assert.assertEquals(body, recMessage.getText());
session.commit();
}
Assert.assertNull(consumer.receiveNoWait());
// I could have done this assert before the loop
// but I also wanted to see a condition where messages get damaged
Assert.assertEquals(NUMBER_OF_MESSAGES, recCount);
Wait.assertEquals(0, serverQueue::getMessageCount);
Wait.assertEquals(0, () -> {
System.gc();
return server.getConfiguration().getLargeMessagesLocation().listFiles().length;
});
}
@Test
public void testFreezeAutoAckAMQP() throws Exception {
testFreezeAutoAck("AMQP");
}
public void testFreezeAutoAck(String protocol) throws Exception {
startProxy();
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
Assert.assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
Assert.assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
Assert.assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333");
}
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
Connection connection = factory.createConnection();
runAfter(connection::close);
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionConsumer.createQueue(getName());
Assert.assertEquals(1, proxy.getInboundHandlers().size());
Assert.assertEquals(1, proxy.getOutbounddHandlers().size());
String body;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 10 * 1024 * 1024) {
buffer.append("Not so big, but big!!");
}
body = buffer.toString();
}
int NUMBER_OF_MESSAGES = 40;
try (Session sessionProducer = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
MessageProducer producer = sessionProducer.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(sessionConsumer.createTextMessage(body));
}
sessionProducer.commit();
}
MessageConsumer consumer = sessionConsumer.createConsumer(queue);
connection.start();
boolean failed = false;
try {
for (int i = 0; i < 10; i++) {
consumer.receive(5000);
}
proxy.stopAllHandlers();
consumer.receive(100);
connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // just to force an exception
} catch (Exception expected) {
logger.info(expected.getMessage(), expected);
failed = true;
}
Wait.assertEquals(0, () -> server.getActiveMQServerControl().getConnectionCount());
long numberOfMessages = serverQueue.getMessageCount();
Assert.assertTrue(failed);
connection = factory.createConnection();
connection.start();
runAfter(connection::close);
sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = sessionConsumer.createQueue(getName());
consumer = sessionConsumer.createConsumer(queue);
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(body, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
Assert.assertEquals(0L, serverQueue.getMessageCount());
Wait.assertEquals(0, () -> {
System.gc();
return server.getConfiguration().getLargeMessagesLocation().listFiles().length;
});
}
}

View File

@ -183,13 +183,13 @@ public class SoakReplicatedPagingTest extends SoakTestBase {
}
logger.debug("Awaiting producers...");
if (!producersLatch.await(30000, TimeUnit.MILLISECONDS)) {
if (!producersLatch.await(60000, TimeUnit.MILLISECONDS)) {
System.err.println("Awaiting producers timeout");
System.exit(0);
}
logger.debug("Awaiting consumers...");
if (!consumersLatch.await(30000, TimeUnit.MILLISECONDS)) {
if (!consumersLatch.await(60000, TimeUnit.MILLISECONDS)) {
System.err.println("Awaiting consumers timeout");
System.exit(0);
}

View File

@ -41,6 +41,11 @@ public class FakeConsumer implements Consumer {
filter = null;
}
@Override
public void failed(Throwable t) {
// no op
}
public FakeConsumer(final Filter filter) {
this.filter = filter;
}