This commit is contained in:
Clebert Suconic 2020-02-14 13:12:49 -05:00
commit fa6a008fa9
8 changed files with 202 additions and 88 deletions

View File

@ -26,7 +26,6 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
@ -43,8 +42,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
private static final Logger logger = Logger.getLogger(ClientProducerImpl.class);
private static boolean confirmationNotSetLogged = false;
private final SimpleString address;
private final ClientSessionInternal session;
@ -118,14 +115,14 @@ public class ClientProducerImpl implements ClientProducerInternal {
public void send(final Message msg) throws ActiveMQException {
checkClosed();
doSend(null, msg, null);
send(null, msg, sessionContext.getSendAcknowledgementHandler());
}
@Override
public void send(final SimpleString address1, final Message msg) throws ActiveMQException {
checkClosed();
doSend(address1, msg, null);
send(address1, msg, sessionContext.getSendAcknowledgementHandler());
}
@Override
@ -138,24 +135,20 @@ public class ClientProducerImpl implements ClientProducerInternal {
Message message,
SendAcknowledgementHandler handler) throws ActiveMQException {
checkClosed();
boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled();
if (confirmationWindowEnabled) {
doSend(address1, message, handler);
} else {
doSend(address1, message, null);
if (handler != null) {
if (logger.isDebugEnabled()) {
logger.debug("Handler was used on producing messages towards address " + (address1 == null ? null : address1.toString()) + " however there is no confirmationWindowEnabled");
}
if (!confirmationNotSetLogged) {
// will log thisonly once
ActiveMQClientLogger.LOGGER.confirmationNotSet();
}
if (handler != null) {
handler = new SendAcknowledgementHandlerWrapper(handler);
}
// if there is no confirmation enabled, we will at least call the handler after the sent is done
session.scheduleConfirmation(handler, message);
doSend(address1, message, handler);
if (handler != null && !session.isConfirmationWindowEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Handler was used on producing messages towards address " + address1 + " however there is no confirmationWindowEnabled");
}
// if there is no confirmation enabled, we will at least call the handler after the sent is done
session.scheduleConfirmation(handler, message);
}
}
@ -265,7 +258,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
// if Handler != null, we will send non blocking
final boolean sendBlocking = sendBlockingConfig && handler == null;
final boolean sendBlocking = sendBlockingConfig && handler == null && sessionContext.getSendAcknowledgementHandler() == null;
session.workDone();

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
public class SendAcknowledgementHandlerWrapper implements SendAcknowledgementHandler {
private SendAcknowledgementHandler wrapped;
/**
* It's possible that a SendAcknowledgementHandler might be called twice due to subsequent
* packet confirmations on the same connection. Using this boolean avoids that possibility.
* A new SendAcknowledgementHandlerWrapper is created for each message sent so once it's
* called once it will never be called again.
*/
private volatile boolean active = true;
public SendAcknowledgementHandlerWrapper(SendAcknowledgementHandler wrapped) {
this.wrapped = wrapped;
}
@Override
public void sendAcknowledged(Message message) {
if (active) {
try {
wrapped.sendAcknowledged(message);
} finally {
active = false;
}
}
}
@Override
public void sendFailed(Message message, Exception e) {
if (active) {
try {
wrapped.sendFailed(message, e);
} finally {
active = false;
}
}
}
}

View File

@ -233,12 +233,6 @@ public class ActiveMQSessionContext extends SessionContext {
} else {
handler.sendFailed(message, exception);
}
} else if (sendAckHandler != null) {
if (exception == null) {
sendAckHandler.sendAcknowledged(message);
} else {
sendAckHandler.sendFailed(message, exception);
}
}
}
};
@ -282,6 +276,11 @@ public class ActiveMQSessionContext extends SessionContext {
this.sendAckHandler = handler;
}
@Override
public SendAcknowledgementHandler getSendAcknowledgementHandler() {
return this.sendAckHandler;
}
@Override
public void createSharedQueue(SimpleString address,
SimpleString queueName,

View File

@ -164,6 +164,8 @@ public abstract class SessionContext {
public abstract void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
public abstract SendAcknowledgementHandler getSendAcknowledgementHandler();
/**
* Creates a shared queue using the routing type set by the Address. If the Address supports more than one type of delivery
* then the default delivery mode (MULTICAST) is used.

View File

@ -34,8 +34,6 @@ import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -512,14 +510,6 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
private final Message jmsMessage;
private final ActiveMQMessageProducer producer;
/**
* It's possible that this SendAcknowledgementHandler might be called twice due to subsequent
* packet confirmations on the same connection. Using this boolean avoids that possibility.
* A new CompletionListenerWrapper is created for each message sent so once it's called once
* it will never be called again.
*/
private AtomicBoolean active = new AtomicBoolean(true);
/**
* @param jmsMessage
* @param producer
@ -534,63 +524,57 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
@Override
public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
if (active.get()) {
if (jmsMessage instanceof StreamMessage) {
try {
((StreamMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
if (jmsMessage instanceof BytesMessage) {
try {
((BytesMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
if (jmsMessage instanceof StreamMessage) {
try {
producer.connection.getThreadAwareContext().setCurrentThread(true);
completionListener.onCompletion(jmsMessage);
} finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true);
active.set(false);
((StreamMessage) jmsMessage).reset();
} catch (JMSException e) {
logger.debug("ignoring exception", e);
}
}
if (jmsMessage instanceof BytesMessage) {
try {
((BytesMessage) jmsMessage).reset();
} catch (JMSException e) {
logger.debug("ignoring exception", e);
}
}
try {
producer.connection.getThreadAwareContext().setCurrentThread(true);
completionListener.onCompletion(jmsMessage);
} finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true);
}
}
@Override
public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
if (active.get()) {
if (jmsMessage instanceof StreamMessage) {
try {
((StreamMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
if (jmsMessage instanceof BytesMessage) {
try {
((BytesMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
if (jmsMessage instanceof StreamMessage) {
try {
producer.connection.getThreadAwareContext().setCurrentThread(true);
if (exception instanceof ActiveMQException) {
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
} else if (exception instanceof ActiveMQInterruptedException) {
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
}
completionListener.onException(jmsMessage, exception);
} finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true);
active.set(false);
((StreamMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
if (jmsMessage instanceof BytesMessage) {
try {
((BytesMessage) jmsMessage).reset();
} catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
}
try {
producer.connection.getThreadAwareContext().setCurrentThread(true);
if (exception instanceof ActiveMQException) {
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
} else if (exception instanceof ActiveMQInterruptedException) {
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
}
completionListener.onException(jmsMessage, exception);
} finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true);
}
}
@Override

View File

@ -41,6 +41,7 @@ import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
@ -803,6 +804,8 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000)
public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
String name = "exampleQueue1";
// disable auto-delete as it causes thrashing during the test
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteQueues(false));
final int numMessages = 40;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -29,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -152,6 +154,76 @@ public class SessionSendAcknowledgementHandlerTest extends ActiveMQTestBase {
Assert.assertTrue("producer specific handler must have acked, " + producerHandler, producerHandler.latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testHandlerOnSend() throws Exception {
final int MSG_COUNT = 750;
ServerLocator locator = createInVMNonHALocator();
locator.setConfirmationWindowSize(256);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
ClientProducer producer = session.createProducer(address);
final AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < MSG_COUNT; i++) {
ClientMessage message = session.createMessage(true);
producer.send(message, message1 -> count.incrementAndGet());
}
Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100);
}
@Test
public void testHandlerOnSendWithAnonymousProducer() throws Exception {
final int MSG_COUNT = 750;
ServerLocator locator = createInVMNonHALocator();
locator.setConfirmationWindowSize(256);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
final AtomicInteger count = new AtomicInteger(0);
ClientProducer producer = session.createProducer();
for (int i = 0; i < MSG_COUNT; i++) {
ClientMessage message = session.createMessage(true);
producer.send(address, message, message1 -> count.incrementAndGet());
}
Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100);
}
@Test
public void testHandlerOnSession() throws Exception {
final int MSG_COUNT = 750;
ServerLocator locator = createInVMNonHALocator();
locator.setConfirmationWindowSize(256);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
final AtomicInteger count = new AtomicInteger(0);
session.setSendAcknowledgementHandler(message1 -> count.incrementAndGet());
ClientProducer producer = session.createProducer(address);
for (int i = 0; i < MSG_COUNT; i++) {
ClientMessage message = session.createMessage(true);
producer.send(message);
}
Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100);
}
@Test
public void testHandlerOnSessionWithAnonymousProducer() throws Exception {
final int MSG_COUNT = 750;
ServerLocator locator = createInVMNonHALocator();
locator.setConfirmationWindowSize(256);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
final AtomicInteger count = new AtomicInteger(0);
session.setSendAcknowledgementHandler(message1 -> count.incrementAndGet());
ClientProducer producer = session.createProducer();
for (int i = 0; i < MSG_COUNT; i++) {
ClientMessage message = session.createMessage(true);
producer.send(address, message);
}
Wait.assertEquals(MSG_COUNT, () -> count.get(), 2000, 100);
}
public static final class LatchAckHandler implements SendAcknowledgementHandler {
public CountDownLatch latch;

View File

@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
@ -43,7 +44,7 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JmsProducerCompletionListenerTest extends JMSTestBase {
static final int TOTAL_MSGS = 20;
static final int TOTAL_MSGS = 200;
private JMSContext context;
private JMSProducer producer;
@ -85,7 +86,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
JMSConsumer consumer = context.createConsumer(queue);
sendMessages(context, producer, queue, TOTAL_MSGS);
receiveMessages(consumer, 0, TOTAL_MSGS, true);
assertEquals(TOTAL_MSGS, cl.completion.get());
context.close();
Assert.assertTrue("completion listener should be called", cl.completionLatch.await(3, TimeUnit.SECONDS));
}
@ -191,7 +192,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
public static final class CountingCompletionListener implements CompletionListener {
public int completion;
public AtomicInteger completion = new AtomicInteger(0);
public int error;
public CountDownLatch completionLatch;
public Message lastMessage;
@ -202,7 +203,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase {
@Override
public void onCompletion(Message message) {
completion++;
completion.incrementAndGet();
completionLatch.countDown();
lastMessage = message;
}