diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index b015be6c12..a55a5b1cf1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -26,7 +26,6 @@ import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; @@ -43,8 +42,6 @@ public class ClientProducerImpl implements ClientProducerInternal { private static final Logger logger = Logger.getLogger(ClientProducerImpl.class); - private static boolean confirmationNotSetLogged = false; - private final SimpleString address; private final ClientSessionInternal session; @@ -118,14 +115,14 @@ public class ClientProducerImpl implements ClientProducerInternal { public void send(final Message msg) throws ActiveMQException { checkClosed(); - doSend(null, msg, null); + send(null, msg, sessionContext.getSendAcknowledgementHandler()); } @Override public void send(final SimpleString address1, final Message msg) throws ActiveMQException { checkClosed(); - doSend(address1, msg, null); + send(address1, msg, sessionContext.getSendAcknowledgementHandler()); } @Override @@ -138,24 +135,20 @@ public class ClientProducerImpl implements ClientProducerInternal { Message message, SendAcknowledgementHandler handler) throws ActiveMQException { checkClosed(); - boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled(); - if (confirmationWindowEnabled) { - doSend(address1, message, handler); - } else { - doSend(address1, message, null); - if (handler != null) { - if (logger.isDebugEnabled()) { - logger.debug("Handler was used on producing messages towards address " + (address1 == null ? null : address1.toString()) + " however there is no confirmationWindowEnabled"); - } - if (!confirmationNotSetLogged) { - // will log thisonly once - ActiveMQClientLogger.LOGGER.confirmationNotSet(); - } + if (handler != null) { + handler = new SendAcknowledgementHandlerWrapper(handler); + } - // if there is no confirmation enabled, we will at least call the handler after the sent is done - session.scheduleConfirmation(handler, message); + doSend(address1, message, handler); + + if (handler != null && !session.isConfirmationWindowEnabled()) { + if (logger.isDebugEnabled()) { + logger.debug("Handler was used on producing messages towards address " + address1 + " however there is no confirmationWindowEnabled"); } + + // if there is no confirmation enabled, we will at least call the handler after the sent is done + session.scheduleConfirmation(handler, message); } } @@ -265,7 +258,7 @@ public class ClientProducerImpl implements ClientProducerInternal { final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend; // if Handler != null, we will send non blocking - final boolean sendBlocking = sendBlockingConfig && handler == null; + final boolean sendBlocking = sendBlockingConfig && handler == null && sessionContext.getSendAcknowledgementHandler() == null; session.workDone(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java new file mode 100644 index 0000000000..e935b10395 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/SendAcknowledgementHandlerWrapper.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.client.impl; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; + +public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHandler { + + private SendAcknowledgementHandler wrapped; + + /** + * It's possible that a SendAcknowledgementHandler might be called twice due to subsequent + * packet confirmations on the same connection. Using this boolean avoids that possibility. + * A new SendAcknowledgementHandlerWrapper is created for each message sent so once it's + * called once it will never be called again. + */ + private volatile boolean active = true; + + public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler wrapped) { + this.wrapped = wrapped; + } + + @Override + public void sendAcknowledged(Message message) { + if (active) { + try { + wrapped.sendAcknowledged(message); + } finally { + active = false; + } + } + } + + @Override + public void sendFailed(Message message, Exception e) { + if (active) { + try { + wrapped.sendFailed(message, e); + } finally { + active = false; + } + } + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 26c405c013..4843aaf9e2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -233,12 +233,6 @@ public class ActiveMQSessionContext extends SessionContext { } else { handler.sendFailed(message, exception); } - } else if (sendAckHandler != null) { - if (exception == null) { - sendAckHandler.sendAcknowledged(message); - } else { - sendAckHandler.sendFailed(message, exception); - } } } }; @@ -282,6 +276,11 @@ public class ActiveMQSessionContext extends SessionContext { this.sendAckHandler = handler; } + @Override + public SendAcknowledgementHandler getSendAcknowledgementHandler() { + return this.sendAckHandler; + } + @Override public void createSharedQueue(SimpleString address, SimpleString queueName, diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 93352049be..db27511659 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -164,6 +164,8 @@ public abstract class SessionContext { public abstract void setSendAcknowledgementHandler(SendAcknowledgementHandler handler); + public abstract SendAcknowledgementHandler getSendAcknowledgementHandler(); + /** * Creates a shared queue using the routing type set by the Address. If the Address supports more than one type of delivery * then the default delivery mode (MULTICAST) is used. diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index ccc5d7c2ae..0c771320d7 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -34,8 +34,6 @@ import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicPublisher; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; @@ -512,14 +510,6 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To private final Message jmsMessage; private final ActiveMQMessageProducer producer; - /** - * It's possible that this SendAcknowledgementHandler might be called twice due to subsequent - * packet confirmations on the same connection. Using this boolean avoids that possibility. - * A new CompletionListenerWrapper is created for each message sent so once it's called once - * it will never be called again. - */ - private AtomicBoolean active = new AtomicBoolean(true); - /** * @param jmsMessage * @param producer @@ -534,63 +524,57 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To @Override public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) { - if (active.get()) { - if (jmsMessage instanceof StreamMessage) { - try { - ((StreamMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? - } - } - if (jmsMessage instanceof BytesMessage) { - try { - ((BytesMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? - } - } - + if (jmsMessage instanceof StreamMessage) { try { - producer.connection.getThreadAwareContext().setCurrentThread(true); - completionListener.onCompletion(jmsMessage); - } finally { - producer.connection.getThreadAwareContext().clearCurrentThread(true); - active.set(false); + ((StreamMessage) jmsMessage).reset(); + } catch (JMSException e) { + logger.debug("ignoring exception", e); } } + if (jmsMessage instanceof BytesMessage) { + try { + ((BytesMessage) jmsMessage).reset(); + } catch (JMSException e) { + logger.debug("ignoring exception", e); + } + } + + try { + producer.connection.getThreadAwareContext().setCurrentThread(true); + completionListener.onCompletion(jmsMessage); + } finally { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + } } @Override public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) { - if (active.get()) { - if (jmsMessage instanceof StreamMessage) { - try { - ((StreamMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? - } - } - if (jmsMessage instanceof BytesMessage) { - try { - ((BytesMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? - } - } - + if (jmsMessage instanceof StreamMessage) { try { - producer.connection.getThreadAwareContext().setCurrentThread(true); - if (exception instanceof ActiveMQException) { - exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception); - } else if (exception instanceof ActiveMQInterruptedException) { - exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception); - } - completionListener.onException(jmsMessage, exception); - } finally { - producer.connection.getThreadAwareContext().clearCurrentThread(true); - active.set(false); + ((StreamMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? } } + if (jmsMessage instanceof BytesMessage) { + try { + ((BytesMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } + } + + try { + producer.connection.getThreadAwareContext().setCurrentThread(true); + if (exception instanceof ActiveMQException) { + exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception); + } else if (exception instanceof ActiveMQInterruptedException) { + exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception); + } + completionListener.onException(jmsMessage, exception); + } finally { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + } } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java index a634b44b21..14c019b75b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -41,6 +41,7 @@ import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; @@ -803,6 +804,8 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { @Test(timeout = 30000) public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable { String name = "exampleQueue1"; + // disable auto-delete as it causes thrashing during the test + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteQueues(false)); final int numMessages = 40; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.java index 95ef668fd7..8b88073b94 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionSendAcknowledgementHandlerTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -29,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -152,6 +154,76 @@ public class SessionSendAcknowledgementHandlerTest extends ActiveMQTestBase { Assert.assertTrue("producer specific handler must have acked, " + producerHandler, producerHandler.latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testHandlerOnSend() throws Exception { + final int MSG_COUNT = 750; + ServerLocator locator = createInVMNonHALocator(); + locator.setConfirmationWindowSize(256); + + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(); + ClientProducer producer = session.createProducer(address); + final AtomicInteger count = new AtomicInteger(0); + for (int i = 0; i < MSG_COUNT; i++) { + ClientMessage message = session.createMessage(true); + producer.send(message, message1 -> count.incrementAndGet()); + } + Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100); + } + + @Test + public void testHandlerOnSendWithAnonymousProducer() throws Exception { + final int MSG_COUNT = 750; + ServerLocator locator = createInVMNonHALocator(); + locator.setConfirmationWindowSize(256); + + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(); + final AtomicInteger count = new AtomicInteger(0); + ClientProducer producer = session.createProducer(); + for (int i = 0; i < MSG_COUNT; i++) { + ClientMessage message = session.createMessage(true); + producer.send(address, message, message1 -> count.incrementAndGet()); + } + Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100); + } + + @Test + public void testHandlerOnSession() throws Exception { + final int MSG_COUNT = 750; + ServerLocator locator = createInVMNonHALocator(); + locator.setConfirmationWindowSize(256); + + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(); + final AtomicInteger count = new AtomicInteger(0); + session.setSendAcknowledgementHandler(message1 -> count.incrementAndGet()); + ClientProducer producer = session.createProducer(address); + for (int i = 0; i < MSG_COUNT; i++) { + ClientMessage message = session.createMessage(true); + producer.send(message); + } + Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100); + } + + @Test + public void testHandlerOnSessionWithAnonymousProducer() throws Exception { + final int MSG_COUNT = 750; + ServerLocator locator = createInVMNonHALocator(); + locator.setConfirmationWindowSize(256); + + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(); + final AtomicInteger count = new AtomicInteger(0); + session.setSendAcknowledgementHandler(message1 -> count.incrementAndGet()); + ClientProducer producer = session.createProducer(); + for (int i = 0; i < MSG_COUNT; i++) { + ClientMessage message = session.createMessage(true); + producer.send(address, message); + } + Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100); + } + public static final class LatchAckHandler implements SendAcknowledgementHandler { public CountDownLatch latch; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java index 3020310a9e..804978398c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration; import org.apache.activemq.artemis.tests.util.JMSTestBase; @@ -43,7 +44,7 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class JmsProducerCompletionListenerTest extends JMSTestBase { - static final int TOTAL_MSGS = 20; + static final int TOTAL_MSGS = 200; private JMSContext context; private JMSProducer producer; @@ -85,7 +86,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase { JMSConsumer consumer = context.createConsumer(queue); sendMessages(context, producer, queue, TOTAL_MSGS); receiveMessages(consumer, 0, TOTAL_MSGS, true); - + assertEquals(TOTAL_MSGS, cl.completion.get()); context.close(); Assert.assertTrue("completion listener should be called", cl.completionLatch.await(3, TimeUnit.SECONDS)); } @@ -191,7 +192,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase { public static final class CountingCompletionListener implements CompletionListener { - public int completion; + public AtomicInteger completion = new AtomicInteger(0); public int error; public CountDownLatch completionLatch; public Message lastMessage; @@ -202,7 +203,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase { @Override public void onCompletion(Message message) { - completion++; + completion.incrementAndGet(); completionLatch.countDown(); lastMessage = message; }