ARTEMIS-3609 Do not use netty thread for thread completion listener

This commit is contained in:
Clebert Suconic 2022-12-15 09:19:32 -05:00 committed by clebertsuconic
parent af9bd7b84a
commit 1de10671f8
5 changed files with 21 additions and 7 deletions

View File

@ -136,7 +136,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
checkClosed();
if (handler != null) {
handler = new SendAcknowledgementHandlerWrapper(handler);
handler = new SendAcknowledgementHandlerWrapper(handler, session.getSessionExecutor());
}
doSend(address1, message, handler);

View File

@ -2222,4 +2222,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
public SessionContext getSessionContext() {
return sessionContext;
}
@Override
public Executor getSessionExecutor() {
return executor;
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -135,4 +137,6 @@ public interface ClientSessionInternal extends ClientSession {
boolean isWritable(ReadyListener callback);
SessionContext getSessionContext();
Executor getSessionExecutor();
}

View File

@ -16,8 +16,11 @@
*/
package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.utils.actors.Actor;
public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHandler {
@ -31,15 +34,19 @@ public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHan
*/
private volatile boolean active = true;
public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler wrapped) {
private final Actor<Message> messageActor;
public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler wrapped, Executor executor) {
this.wrapped = wrapped;
messageActor = new Actor<>(executor, wrapped::sendAcknowledged);
}
@Override
public void sendAcknowledged(Message message) {
if (active) {
try {
wrapped.sendAcknowledged(message);
messageActor.act(message);
} finally {
active = false;
}

View File

@ -23,7 +23,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Test;
import javax.jms.CompletionListener;
@ -33,7 +32,6 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class JMSTransactionTest extends JMSTestBase {
@ -71,7 +69,7 @@ public class JMSTransactionTest extends JMSTestBase {
@Override
public void onCompletion(Message message) {
try {
commitLatch.await(100, TimeUnit.MILLISECONDS); // can't block the netty thread. We will delay things, but can't block it otherwise the test just blocks
commitLatch.await();
sentMessages.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
@ -86,7 +84,7 @@ public class JMSTransactionTest extends JMSTestBase {
}
session.commit();
Assert.assertEquals(messages, sentMessages.get());
Wait.assertEquals(messages, sentMessages::get);
org.apache.activemq.artemis.core.server.Queue queueView = server.locateQueue(SimpleString.toSimpleString(queueName));
Wait.assertEquals(messages, queueView::getMessageCount);