ARTEMIS-883 improving exceptions sending
This commit is contained in:
parent
ae90edfdb6
commit
81df93ebcf
|
@ -55,8 +55,11 @@ import org.apache.activemq.command.Response;
|
||||||
import org.apache.activemq.command.SessionInfo;
|
import org.apache.activemq.command.SessionInfo;
|
||||||
import org.apache.activemq.openwire.OpenWireFormat;
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
import org.apache.activemq.wireformat.WireFormat;
|
import org.apache.activemq.wireformat.WireFormat;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
public class AMQSession implements SessionCallback {
|
public class AMQSession implements SessionCallback {
|
||||||
|
private final Logger logger = Logger.getLogger(AMQSession.class);
|
||||||
|
|
||||||
// ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
|
// ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
|
||||||
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
|
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
|
||||||
|
|
||||||
|
@ -305,7 +308,6 @@ public class AMQSession implements SessionCallback {
|
||||||
|
|
||||||
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
|
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
|
||||||
|
|
||||||
final Exception[] anyException = new Exception[] {null};
|
|
||||||
|
|
||||||
if (shouldBlockProducer) {
|
if (shouldBlockProducer) {
|
||||||
connection.getContext().setDontSendReponse(true);
|
connection.getContext().setDontSendReponse(true);
|
||||||
|
@ -329,6 +331,8 @@ public class AMQSession implements SessionCallback {
|
||||||
this.connection.disableTtl();
|
this.connection.disableTtl();
|
||||||
if (shouldBlockProducer) {
|
if (shouldBlockProducer) {
|
||||||
if (!store.checkMemory(() -> {
|
if (!store.checkMemory(() -> {
|
||||||
|
Exception exceptionToSend = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
|
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||||
|
|
||||||
|
@ -336,16 +340,15 @@ public class AMQSession implements SessionCallback {
|
||||||
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
|
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (anyException[0] == null) {
|
|
||||||
anyException[0] = e;
|
logger.warn(e.getMessage(), e);
|
||||||
}
|
exceptionToSend = e;
|
||||||
}
|
}
|
||||||
connection.enableTtl();
|
connection.enableTtl();
|
||||||
if (count.decrementAndGet() == 0) {
|
if (count.decrementAndGet() == 0) {
|
||||||
if (anyException[0] != null) {
|
if (exceptionToSend != null) {
|
||||||
this.connection.getContext().setDontSendReponse(false);
|
this.connection.getContext().setDontSendReponse(false);
|
||||||
ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
|
connection.sendException(exceptionToSend);
|
||||||
connection.sendException(anyException[0]);
|
|
||||||
} else {
|
} else {
|
||||||
if (sendProducerAck) {
|
if (sendProducerAck) {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue