ARTEMIS-1545 refactor & rework a few incompatible pieces

Existing commit for ARTEMIS-1545 broke bridges and large messages. This
commit fixes those, and refactors the solution a bit to be more clear.
This commit is contained in:
Justin Bertram 2018-07-17 10:53:21 -05:00 committed by Clebert Suconic
parent e4ba48a311
commit a28b4fb34e
10 changed files with 722 additions and 75 deletions

View File

@ -43,9 +43,11 @@ public interface SendAcknowledgementHandler {
void sendAcknowledged(Message message); void sendAcknowledged(Message message);
default void sendFailed(Message message, Exception e) { default void sendFailed(Message message, Exception e) {
//This is to keep old behaviour that would ack even if error, /**
// if anyone custom implemented this interface but doesnt update. * By default ignore failures to preserve compatibility with existing implementations.
sendAcknowledged(message); * If the message makes it to the broker and a failure occurs sendAcknowledge() will
* still be invoked just like it always was.
*/
} }
} }

View File

@ -17,14 +17,14 @@
package org.apache.activemq.artemis.core.protocol.core; package org.apache.activemq.artemis.core.protocol.core;
/** /**
* A CommandConfirmationHandler is used by the channel to confirm confirmations of packets. * A ResponseHandler is used by the channel to handle async responses.
*/ */
public interface ResponseHandler { public interface ResponseHandler {
/** /**
* called by channel after a confirmation has been received. * called by channel after an async response has been received.
* *
* @param packet the packet confirmed * @param packet the packet confirmed
*/ */
void responseHandler(Packet packet, Packet response); void handleResponse(Packet packet, Packet response);
} }

View File

@ -171,11 +171,7 @@ public class ActiveMQSessionContext extends SessionContext {
sessionChannel.setHandler(handler); sessionChannel.setHandler(handler);
if (confirmationWindow >= 0) { if (confirmationWindow >= 0) {
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { setHandlers();
sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
} else {
sessionChannel.setResponseHandler(responseHandler);
}
} }
} }
@ -192,16 +188,24 @@ public class ActiveMQSessionContext extends SessionContext {
this.killed = true; this.killed = true;
} }
private void setHandlers() {
sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
sessionChannel.setResponseHandler(responseHandler);
}
}
private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() { private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
@Override @Override
public void commandConfirmed(Packet packet) { public void commandConfirmed(Packet packet) {
responseHandler.responseHandler(packet, null); responseHandler.handleResponse(packet, null);
} }
}; };
private final ResponseHandler responseHandler = new ResponseHandler() { private final ResponseHandler responseHandler = new ResponseHandler() {
@Override @Override
public void responseHandler(Packet packet, Packet response) { public void handleResponse(Packet packet, Packet response) {
final ActiveMQException activeMQException; final ActiveMQException activeMQException;
if (response != null && response.getType() == PacketImpl.EXCEPTION) { if (response != null && response.getType() == PacketImpl.EXCEPTION) {
ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response; ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
@ -232,7 +236,7 @@ public class ActiveMQSessionContext extends SessionContext {
if (exception == null) { if (exception == null) {
sendAckHandler.sendAcknowledged(message); sendAckHandler.sendAcknowledged(message);
} else { } else {
handler.sendFailed(message, exception); sendAckHandler.sendFailed(message, exception);
} }
} }
} }
@ -272,11 +276,8 @@ public class ActiveMQSessionContext extends SessionContext {
@Override @Override
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) {
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { setHandlers();
sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler);
} else {
sessionChannel.setResponseHandler(responseHandler);
}
this.sendAckHandler = handler; this.sendAckHandler = handler;
} }
@ -946,12 +947,12 @@ public class ActiveMQSessionContext extends SessionContext {
boolean lastChunk, boolean lastChunk,
byte[] chunk, byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException { SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk || confirmationWindow != -1; final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket; final SessionSendContinuationMessage chunkPacket;
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
} else { } else {
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
} }
final int expectedEncodeSize = chunkPacket.expectedEncodeSize(); final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
//perform a weak form of flow control to avoid OOM on tight loops //perform a weak form of flow control to avoid OOM on tight loops
@ -969,11 +970,7 @@ public class ActiveMQSessionContext extends SessionContext {
} }
if (requiresResponse) { if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking. // When sending it blocking, only the last chunk will be blocking.
if (sendBlocking) { channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
} else {
channel.send(chunkPacket);
}
} else { } else {
channel.send(chunkPacket); channel.send(chunkPacket);
} }

View File

@ -253,6 +253,10 @@ public final class ChannelImpl implements Channel {
this.transferring = transferring; this.transferring = transferring;
} }
protected ResponseCache getCache() {
return responseAsyncCache;
}
/** /**
* @param timeoutMsg message to log on blocking call failover timeout * @param timeoutMsg message to log on blocking call failover timeout
*/ */
@ -316,7 +320,7 @@ public final class ChannelImpl implements Channel {
checkReconnectID(reconnectID); checkReconnectID(reconnectID);
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache is cannot add, preventing responses to be handled. //As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) { while (!responseAsyncCache.add(packet)) {
try { try {
@ -426,7 +430,7 @@ public final class ChannelImpl implements Channel {
throw new ActiveMQInterruptedException(e); throw new ActiveMQInterruptedException(e);
} }
if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) { if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace")); ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
} }
@ -642,7 +646,7 @@ public final class ChannelImpl implements Channel {
} }
} }
public void handleResponse(Packet packet) { public void handleAsyncResponse(Packet packet) {
if (responseAsyncCache != null && packet.isResponseAsync()) { if (responseAsyncCache != null && packet.isResponseAsync()) {
responseAsyncCache.handleResponse(packet); responseAsyncCache.handleResponse(packet);
} }
@ -700,7 +704,7 @@ public final class ChannelImpl implements Channel {
if (packet.isResponse()) { if (packet.isResponse()) {
confirm(packet); confirm(packet);
handleResponse(packet); handleAsyncResponse(packet);
lock.lock(); lock.lock();
try { try {
@ -752,6 +756,9 @@ public final class ChannelImpl implements Channel {
if (commandConfirmationHandler != null) { if (commandConfirmationHandler != null) {
commandConfirmationHandler.commandConfirmed(packet); commandConfirmationHandler.commandConfirmed(packet);
} }
if (responseAsyncCache != null) {
responseAsyncCache.handleResponse(packet);
}
} }
firstStoredCommandID += numberToClear; firstStoredCommandID += numberToClear;

View File

@ -31,7 +31,8 @@ public class PacketImpl implements Packet {
// 2.0.0 // 2.0.0
public static final int ADDRESSING_CHANGE_VERSION = 129; public static final int ADDRESSING_CHANGE_VERSION = 129;
public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130;
// 2.7.0
public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130; public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
@ -430,7 +431,7 @@ public class PacketImpl implements Packet {
} }
protected String getParentString() { protected String getParentString() {
return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName(); return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName();
} }
private int stringEncodeSize(final String str) { private int stringEncodeSize(final String str) {

View File

@ -53,7 +53,7 @@ public class ResponseCache {
long correlationID = response.getCorrelationID(); long correlationID = response.getCorrelationID();
Packet packet = remove(correlationID); Packet packet = remove(correlationID);
if (packet != null) { if (packet != null) {
responseHandler.responseHandler(packet, response); responseHandler.handleResponse(packet, response);
} }
} }
@ -67,4 +67,8 @@ public class ResponseCache {
public void setResponseHandler(ResponseHandler responseHandler) { public void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler; this.responseHandler = responseHandler;
} }
public int size() {
return this.store.size();
}
} }

View File

@ -0,0 +1,512 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import javax.security.auth.Subject;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ChannelImplTest {
ChannelImpl channel;
@Before
public void setUp() {
channel = new ChannelImpl(new CoreRR(), 1, 4000, null);
}
@Test
public void testCorrelation() {
AtomicInteger handleResponseCount = new AtomicInteger();
RequestPacket requestPacket = new RequestPacket((byte) 1);
setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
channel.send(requestPacket);
assertEquals(1, channel.getCache().size());
ResponsePacket responsePacket = new ResponsePacket((byte) 1);
responsePacket.setCorrelationID(requestPacket.getCorrelationID());
channel.handlePacket(responsePacket);
assertEquals(1, handleResponseCount.get());
assertEquals(0, channel.getCache().size());
}
private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
channel.setResponseHandler(responseHandler);
channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler));
}
private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) {
return new CommandConfirmationHandler() {
@Override
public void commandConfirmed(Packet packet) {
responseHandler.handleResponse(packet, null);
}
};
}
@Test
public void testPacketsConfirmedMessage() {
AtomicInteger handleResponseCount = new AtomicInteger();
RequestPacket requestPacket = new RequestPacket((byte) 1);
setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
channel.send(requestPacket);
PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2);
channel.handlePacket(responsePacket);
assertEquals(0, channel.getCache().size());
}
class RequestPacket extends PacketImpl {
private long id;
RequestPacket(byte type) {
super(type);
}
@Override
public boolean isRequiresResponse() {
return true;
}
@Override
public boolean isResponseAsync() {
return true;
}
@Override
public long getCorrelationID() {
return id;
}
@Override
public void setCorrelationID(long id) {
this.id = id;
}
@Override
public int getPacketSize() {
return 0;
}
}
class ResponsePacket extends PacketImpl {
private long id;
ResponsePacket(byte type) {
super(type);
}
@Override
public boolean isResponseAsync() {
return true;
}
@Override
public boolean isResponse() {
return true;
}
@Override
public long getCorrelationID() {
return id;
}
@Override
public void setCorrelationID(long id) {
this.id = id;
}
@Override
public int getPacketSize() {
return 0;
}
}
class CoreRR implements CoreRemotingConnection {
@Override
public int getChannelVersion() {
return 0;
}
@Override
public void setChannelVersion(int clientVersion) {
}
@Override
public Channel getChannel(long channelID, int confWindowSize) {
return null;
}
@Override
public void putChannel(long channelID, Channel channel) {
}
@Override
public boolean removeChannel(long channelID) {
return false;
}
@Override
public long generateChannelID() {
return 0;
}
@Override
public void syncIDGeneratorSequence(long id) {
}
@Override
public long getIDGeneratorSequence() {
return 0;
}
@Override
public long getBlockingCallTimeout() {
return 0;
}
@Override
public long getBlockingCallFailoverTimeout() {
return 0;
}
@Override
public Object getTransferLock() {
return null;
}
@Override
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null;
}
@Override
public boolean blockUntilWritable(int size, long timeout) {
return false;
}
@Override
public Object getID() {
return null;
}
@Override
public long getCreationTime() {
return 0;
}
@Override
public String getRemoteAddress() {
return null;
}
@Override
public void scheduledFlush() {
}
@Override
public void addFailureListener(FailureListener listener) {
}
@Override
public boolean removeFailureListener(FailureListener listener) {
return false;
}
@Override
public void addCloseListener(CloseListener listener) {
}
@Override
public boolean removeCloseListener(CloseListener listener) {
return false;
}
@Override
public List<CloseListener> removeCloseListeners() {
return null;
}
@Override
public void setCloseListeners(List<CloseListener> listeners) {
}
@Override
public List<FailureListener> getFailureListeners() {
return null;
}
@Override
public List<FailureListener> removeFailureListeners() {
return null;
}
@Override
public void setFailureListeners(List<FailureListener> listeners) {
}
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return new ChannelBufferWrapper(Unpooled.buffer(size));
}
@Override
public void fail(ActiveMQException me) {
}
@Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
}
@Override
public void destroy() {
}
@Override
public Connection getTransportConnection() {
return new Connection() {
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return null;
}
@Override
public RemotingConnection getProtocolConnection() {
return null;
}
@Override
public void setProtocolConnection(RemotingConnection connection) {
}
@Override
public boolean isWritable(ReadyListener listener) {
return false;
}
@Override
public void fireReady(boolean ready) {
}
@Override
public void setAutoRead(boolean autoRead) {
}
@Override
public Object getID() {
return null;
}
@Override
public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
}
@Override
public void write(ActiveMQBuffer buffer,
boolean flush,
boolean batched,
ChannelFutureListener futureListener) {
}
@Override
public void write(ActiveMQBuffer buffer) {
}
@Override
public void forceClose() {
}
@Override
public void close() {
}
@Override
public String getRemoteAddress() {
return null;
}
@Override
public String getLocalAddress() {
return null;
}
@Override
public void checkFlushBatchBuffer() {
}
@Override
public TransportConfiguration getConnectorConfig() {
return null;
}
@Override
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null;
}
@Override
public boolean isUsingProtocolHandling() {
return false;
}
@Override
public boolean isSameTarget(TransportConfiguration... configs) {
return false;
}
};
}
@Override
public boolean isClient() {
return true;
}
@Override
public boolean isDestroyed() {
return false;
}
@Override
public void disconnect(boolean criticalError) {
}
@Override
public void disconnect(String scaleDownNodeID, boolean criticalError) {
}
@Override
public boolean checkDataReceived() {
return false;
}
@Override
public void flush() {
}
@Override
public boolean isWritable(ReadyListener callback) {
return false;
}
@Override
public void killMessage(SimpleString nodeID) {
}
@Override
public boolean isSupportReconnect() {
return false;
}
@Override
public boolean isSupportsFlowControl() {
return false;
}
@Override
public Subject getSubject() {
return null;
}
@Override
public String getProtocolName() {
return null;
}
@Override
public void setClientID(String cID) {
}
@Override
public String getClientID() {
return null;
}
@Override
public String getTransportLocalAddress() {
return null;
}
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
}
}
}

View File

@ -34,6 +34,8 @@ import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@ -563,6 +565,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
private final Message jmsMessage; private final Message jmsMessage;
private final ActiveMQMessageProducer producer; private final ActiveMQMessageProducer producer;
/**
* It's possible that this SendAcknowledgementHandler might be called twice due to subsequent
* packet confirmations on the same connection. Using this boolean avoids that possibility.
* A new CompletionListenerWrapper is created for each message sent so once it's called once
* it will never be called again.
*/
private AtomicBoolean active = new AtomicBoolean(true);
/** /**
* @param jmsMessage * @param jmsMessage
* @param producer * @param producer
@ -577,56 +587,62 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
@Override @Override
public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) { public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
if (jmsMessage instanceof StreamMessage) { if (active.get()) {
try { if (jmsMessage instanceof StreamMessage) {
((StreamMessage) jmsMessage).reset(); try {
} catch (JMSException e) { ((StreamMessage) jmsMessage).reset();
// HORNETQ-1209 XXX ignore? } catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
} }
} if (jmsMessage instanceof BytesMessage) {
if (jmsMessage instanceof BytesMessage) { try {
try { ((BytesMessage) jmsMessage).reset();
((BytesMessage) jmsMessage).reset(); } catch (JMSException e) {
} catch (JMSException e) { // HORNETQ-1209 XXX ignore?
// HORNETQ-1209 XXX ignore? }
} }
}
try { try {
producer.connection.getThreadAwareContext().setCurrentThread(true); producer.connection.getThreadAwareContext().setCurrentThread(true);
completionListener.onCompletion(jmsMessage); completionListener.onCompletion(jmsMessage);
} finally { } finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true); producer.connection.getThreadAwareContext().clearCurrentThread(true);
active.set(false);
}
} }
} }
@Override @Override
public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) { public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) {
if (jmsMessage instanceof StreamMessage) { if (active.get()) {
try { if (jmsMessage instanceof StreamMessage) {
((StreamMessage) jmsMessage).reset(); try {
} catch (JMSException e) { ((StreamMessage) jmsMessage).reset();
// HORNETQ-1209 XXX ignore? } catch (JMSException e) {
// HORNETQ-1209 XXX ignore?
}
} }
} if (jmsMessage instanceof BytesMessage) {
if (jmsMessage instanceof BytesMessage) { try {
try { ((BytesMessage) jmsMessage).reset();
((BytesMessage) jmsMessage).reset(); } catch (JMSException e) {
} catch (JMSException e) { // HORNETQ-1209 XXX ignore?
// HORNETQ-1209 XXX ignore? }
} }
}
try { try {
producer.connection.getThreadAwareContext().setCurrentThread(true); producer.connection.getThreadAwareContext().setCurrentThread(true);
if (exception instanceof ActiveMQException) { if (exception instanceof ActiveMQException) {
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception); exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception);
} else if (exception instanceof ActiveMQInterruptedException) { } else if (exception instanceof ActiveMQInterruptedException) {
exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception); exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception);
}
completionListener.onException(jmsMessage, exception);
} finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true);
active.set(false);
} }
completionListener.onException(jmsMessage, exception);
} finally {
producer.connection.getThreadAwareContext().clearCurrentThread(true);
} }
} }

View File

@ -893,7 +893,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
final Packet response, final Packet response,
final boolean flush, final boolean flush,
final boolean closeChannel) { final boolean closeChannel) {
if (confirmPacket != null) { // don't confirm if the response is an exception
if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) {
channel.confirm(confirmPacket); channel.confirm(confirmPacket);
if (flush) { if (flush) {

View File

@ -18,12 +18,15 @@ package org.apache.activemq.artemis.jms.tests;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import javax.jms.BytesMessage;
import javax.jms.CompletionListener; import javax.jms.CompletionListener;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.IllegalStateException; import javax.jms.IllegalStateException;
import javax.jms.JMSContext;
import javax.jms.JMSProducer;
import javax.jms.JMSSecurityException; import javax.jms.JMSSecurityException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
@ -33,6 +36,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties; import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
@ -188,10 +193,14 @@ public class SecurityTest extends JMSTestCase {
*/ */
@Test @Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception { public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
if (getJmsServer().locateQueue(queueName) == null) {
getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection("guest", "guest"); Connection connection = connectionFactory.createConnection("guest", "guest");
Session session = connection.createSession(); Session session = connection.createSession();
Destination destination = session.createQueue("guest.cannot.send"); Destination destination = session.createQueue(queueName.toString());
MessageProducer messageProducer = session.createProducer(destination); MessageProducer messageProducer = session.createProducer(destination);
try { try {
messageProducer.send(session.createTextMessage("hello")); messageProducer.send(session.createTextMessage("hello"));
@ -209,18 +218,21 @@ public class SecurityTest extends JMSTestCase {
*/ */
@Test @Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception { public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception {
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
if (getJmsServer().locateQueue(queueName) == null) {
getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connectionFactory.setConfirmationWindowSize(100); connectionFactory.setConfirmationWindowSize(100);
connectionFactory.setBlockOnDurableSend(false); connectionFactory.setBlockOnDurableSend(false);
connectionFactory.setBlockOnNonDurableSend(false); connectionFactory.setBlockOnNonDurableSend(false);
Connection connection = connectionFactory.createConnection("guest", "guest"); Connection connection = connectionFactory.createConnection("guest", "guest");
Session session = connection.createSession(); Session session = connection.createSession();
Destination destination = session.createQueue("guest.cannot.send"); Destination destination = session.createQueue(queueName.toString());
MessageProducer messageProducer = session.createProducer(destination); MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
try { try {
AtomicReference<Exception> e = new AtomicReference<>(); AtomicReference<Exception> e = new AtomicReference<>();
// messageProducer.send(session.createTextMessage("hello"));
CountDownLatch countDownLatch = new CountDownLatch(1); CountDownLatch countDownLatch = new CountDownLatch(1);
messageProducer.send(session.createTextMessage("hello"), new CompletionListener() { messageProducer.send(session.createTextMessage("hello"), new CompletionListener() {
@ -242,6 +254,101 @@ public class SecurityTest extends JMSTestCase {
fail("JMSSecurityException expected as guest is not allowed to send"); fail("JMSSecurityException expected as guest is not allowed to send");
} catch (JMSSecurityException activeMQSecurityException) { } catch (JMSSecurityException activeMQSecurityException) {
activeMQSecurityException.printStackTrace(); activeMQSecurityException.printStackTrace();
} finally {
connection.close();
}
}
/**
* Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using JMS 2 API.
*/
@Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistentJMS2() throws Exception {
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
if (getJmsServer().locateQueue(queueName) == null) {
getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connectionFactory.setConfirmationWindowSize(100);
connectionFactory.setBlockOnDurableSend(false);
connectionFactory.setBlockOnNonDurableSend(false);
JMSContext context = connectionFactory.createContext("guest", "guest");
Destination destination = context.createQueue(queueName.toString());
JMSProducer messageProducer = context.createProducer();
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
try {
AtomicReference<Exception> e = new AtomicReference<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
messageProducer.setAsync(new CompletionListener() {
@Override
public void onCompletion(Message message) {
countDownLatch.countDown();
}
@Override
public void onException(Message message, Exception exception) {
e.set(exception);
countDownLatch.countDown();
}
});
messageProducer.send(destination, context.createTextMessage("hello"));
countDownLatch.await(10, TimeUnit.SECONDS);
if (e.get() != null) {
throw e.get();
}
fail("JMSSecurityException expected as guest is not allowed to send");
} catch (JMSSecurityException activeMQSecurityException) {
activeMQSecurityException.printStackTrace();
} finally {
context.close();
}
}
/**
* Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using a large message.
*/
@Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSendLargeNonPersistent() throws Exception {
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
if (getJmsServer().locateQueue(queueName) == null) {
getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connectionFactory.setConfirmationWindowSize(100);
connectionFactory.setBlockOnDurableSend(false);
connectionFactory.setBlockOnNonDurableSend(false);
connectionFactory.setMinLargeMessageSize(1024);
Connection connection = connectionFactory.createConnection("guest", "guest");
Session session = connection.createSession();
Destination destination = session.createQueue(queueName.toString());
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
try {
AtomicReference<Exception> e = new AtomicReference<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[10 * 1024]);
messageProducer.send(message, new CompletionListener() {
@Override
public void onCompletion(Message message) {
countDownLatch.countDown();
}
@Override
public void onException(Message message, Exception exception) {
e.set(exception);
countDownLatch.countDown();
}
});
countDownLatch.await(10, TimeUnit.SECONDS);
if (e.get() != null) {
throw e.get();
}
fail("JMSSecurityException expected as guest is not allowed to send");
} catch (JMSSecurityException activeMQSecurityException) {
activeMQSecurityException.printStackTrace();
} }
connection.close(); connection.close();
} }