This commit is contained in:
Clebert Suconic 2017-11-22 13:58:31 -05:00
commit 245495196a
3 changed files with 28 additions and 10 deletions

View File

@ -94,9 +94,9 @@ public interface ClientProducer extends AutoCloseable {
/**
* Sends a message to the specified address instead of the ClientProducer's address. <br>
* <br>
* This message will be sent asynchronously.
* This message will be sent asynchronously as long as {@link ServerLocator#setConfirmationWindowSize(int)} was set.
* <p>
* The handler will only get called if {@link ServerLocator#setConfirmationWindowSize(int) -1}.
* Notice that if no confirmationWindowsize is set
*
* @param address the address where the message will be sent
* @param message the message to send

View File

@ -397,6 +397,12 @@ public interface ActiveMQClientLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void unableToCheckKQueueAvailability(@Cause Throwable e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 212074, value = "SendAcknowledgementHandler will not be asynchronous without setting up confirmation window size",
format = Message.Format.MESSAGE_FORMAT)
void confirmationNotSet();
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
void onMessageError(@Cause Throwable e);

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
@ -43,6 +44,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
private static final Logger logger = Logger.getLogger(ClientProducerImpl.class);
private static boolean confirmationNotSetLogged = false;
private final SimpleString address;
private final ClientSessionInternal session;
@ -116,14 +119,14 @@ public class ClientProducerImpl implements ClientProducerInternal {
public void send(final Message msg) throws ActiveMQException {
checkClosed();
doSend(null, msg, null, false);
doSend(null, msg, null);
}
@Override
public void send(final SimpleString address1, final Message msg) throws ActiveMQException {
checkClosed();
doSend(address1, msg, null, false);
doSend(address1, msg, null);
}
@Override
@ -138,10 +141,20 @@ public class ClientProducerImpl implements ClientProducerInternal {
checkClosed();
boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled();
if (confirmationWindowEnabled) {
doSend(address1, message, handler, true);
doSend(address1, message, handler);
} else {
doSend(address1, message, null, true);
doSend(address1, message, null);
if (handler != null) {
if (logger.isDebugEnabled()) {
logger.debug("Handler was used on producing messages towards address " + address1.toString() + " however there is no confirmationWindowEnabled");
}
if (!confirmationNotSetLogged) {
// will log thisonly once
ActiveMQClientLogger.LOGGER.confirmationNotSet();
}
// if there is no confirmation enabled, we will at least call the handler after the sent is done
session.scheduleConfirmation(handler, message);
}
}
@ -209,8 +222,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
private void doSend(SimpleString sendingAddress,
final Message msgToSend,
final SendAcknowledgementHandler handler,
final boolean forceAsync) throws ActiveMQException {
final SendAcknowledgementHandler handler) throws ActiveMQException {
if (sendingAddress == null) {
sendingAddress = this.address;
}
@ -253,8 +265,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
}
final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
final boolean forceAsyncOverride = handler != null;
final boolean sendBlocking = sendBlockingConfig && !forceAsyncOverride;
// if Handler != null, we will send non blocking
final boolean sendBlocking = sendBlockingConfig && handler == null;
session.workDone();