ARTEMIS-3609 Using a different thread for the completion listener

This commit is contained in:
Clebert Suconic 2022-12-16 18:06:58 -05:00 committed by clebertsuconic
parent 4550fcf47c
commit bc1258ab25
8 changed files with 48 additions and 35 deletions

View File

@ -127,4 +127,12 @@ public interface ICoreMessage extends Message {
return map;
}
default boolean isConfirmed() {
return false;
}
default void setConfirmed(boolean confirmed) {
}
}

View File

@ -46,6 +46,8 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
private int flowControlSize = -1;
private boolean confirmed;
/**
* Used on LargeMessages only
*/
@ -414,4 +416,13 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
return new ClientMessageImpl(this);
}
@Override
public boolean isConfirmed() {
return confirmed;
}
@Override
public void setConfirmed(boolean confirmed) {
this.confirmed = confirmed;
}
}

View File

@ -136,7 +136,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
checkClosed();
if (handler != null) {
handler = new SendAcknowledgementHandlerWrapper(handler, session.getSessionExecutor());
handler = session.wrap(handler);
}
doSend(address1, message, handler);
@ -145,7 +145,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
logger.debug("Handler was used on producing messages towards address {} however there is no confirmationWindowEnabled", address1);
// if there is no confirmation enabled, we will at least call the handler after the sent is done
session.scheduleConfirmation(handler, message);
handler.sendAcknowledged(message); // this is asynchronous as we wrapped with an executor
}
}
@ -257,6 +257,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
session.workDone();
msg.setConfirmed(false);
if (isLarge) {
largeMessageSend(sendBlocking, msg, theCredits, handler);
} else {

View File

@ -732,7 +732,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {

View File

@ -80,6 +80,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private final Executor executor;
private final Executor confirmationExecutor;
// to be sent to consumers as consumers will need a separate consumer for flow control
private final Executor flowControlExecutor;
@ -184,6 +186,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final String groupID,
final SessionContext sessionContext,
final Executor executor,
final Executor confirmationExecutor,
final Executor flowControlExecutor,
final Executor closeExecutor) throws ActiveMQException {
this.sessionFactory = sessionFactory;
@ -196,6 +199,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
this.executor = executor;
this.confirmationExecutor = confirmationExecutor;
this.flowControlExecutor = flowControlExecutor;
this.xa = xa;
@ -2208,23 +2213,17 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return true;
}
@Override
public void scheduleConfirmation(final SendAcknowledgementHandler handler, final Message message) {
executor.execute(new Runnable() {
@Override
public void run() {
handler.sendAcknowledged(message);
}
});
}
@Override
public SessionContext getSessionContext() {
return sessionContext;
}
@Override
public Executor getSessionExecutor() {
return executor;
public SendAcknowledgementHandler wrap(SendAcknowledgementHandler handler) {
if (!(handler instanceof SendAcknowledgementHandlerWrapper)) {
handler = new SendAcknowledgementHandlerWrapper(handler, confirmationExecutor);
}
return handler;
}
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -125,11 +123,6 @@ public interface ClientSessionInternal extends ClientSession {
boolean isConfirmationWindowEnabled();
/**
* @param handler
*/
void scheduleConfirmation(SendAcknowledgementHandler handler, Message message);
boolean isClosing();
String getNodeId();
@ -138,5 +131,5 @@ public interface ClientSessionInternal extends ClientSession {
SessionContext getSessionContext();
Executor getSessionExecutor();
SendAcknowledgementHandler wrap(SendAcknowledgementHandler handler);
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.utils.actors.Actor;
@ -26,13 +27,7 @@ public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHan
private SendAcknowledgementHandler wrapped;
/**
* It's possible that a SendAcknowledgementHandler might be called twice due to subsequent
* packet confirmations on the same connection. Using this boolean avoids that possibility.
* A new SendAcknowledgementHandlerWrapper is created for each message sent so once it's
* called once it will never be called again.
*/
private volatile boolean active = true;
private final Actor<Message> messageActor;
@ -44,22 +39,27 @@ public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHan
@Override
public void sendAcknowledged(Message message) {
if (active) {
ICoreMessage msg = message.toCore();
// It is possible that a SendAcknowledgementHandler might be called twice due to subsequent
// packet confirmations on the same connection. Using this boolean avoids that possibility.
if (!msg.isConfirmed()) {
try {
messageActor.act(message);
} finally {
active = false;
msg.setConfirmed(true);
}
}
}
@Override
public void sendFailed(Message message, Exception e) {
if (active) {
ICoreMessage msg = message.toCore();
if (!msg.isConfirmed()) {
try {
wrapped.sendFailed(message, e);
} finally {
active = false;
msg.setConfirmed(true);
}
}
}

View File

@ -278,7 +278,7 @@ public class ActiveMQSessionContext extends SessionContext {
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
setHandlers();
this.sendAckHandler = handler;
this.sendAckHandler = session.wrap(handler);
}
@Override