From a28b4fb34eb3cc178dd611d0cb2acc51d6b7a965 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 17 Jul 2018 10:53:21 -0500 Subject: [PATCH] ARTEMIS-1545 refactor & rework a few incompatible pieces Existing commit for ARTEMIS-1545 broke bridges and large messages. This commit fixes those, and refactors the solution a bit to be more clear. --- .../client/SendAcknowledgementHandler.java | 8 +- .../core/protocol/core/ResponseHandler.java | 6 +- .../core/impl/ActiveMQSessionContext.java | 37 +- .../core/protocol/core/impl/ChannelImpl.java | 15 +- .../core/protocol/core/impl/PacketImpl.java | 5 +- .../protocol/core/impl/ResponseCache.java | 6 +- .../protocol/core/impl/ChannelImplTest.java | 512 ++++++++++++++++++ .../jms/client/ActiveMQMessageProducer.java | 92 ++-- .../core/ServerSessionPacketHandler.java | 3 +- .../artemis/jms/tests/SecurityTest.java | 113 +++- 10 files changed, 722 insertions(+), 75 deletions(-) create mode 100644 artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java index 0f47536602..ad45a5f9e2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java @@ -43,9 +43,11 @@ public interface SendAcknowledgementHandler { void sendAcknowledged(Message message); default void sendFailed(Message message, Exception e) { - //This is to keep old behaviour that would ack even if error, - // if anyone custom implemented this interface but doesnt update. - sendAcknowledged(message); + /** + * By default ignore failures to preserve compatibility with existing implementations. + * If the message makes it to the broker and a failure occurs sendAcknowledge() will + * still be invoked just like it always was. + */ } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java index 21e9879e3b..f96ef13c10 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ResponseHandler.java @@ -17,14 +17,14 @@ package org.apache.activemq.artemis.core.protocol.core; /** - * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets. + * A ResponseHandler is used by the channel to handle async responses. */ public interface ResponseHandler { /** - * called by channel after a confirmation has been received. + * called by channel after an async response has been received. * * @param packet the packet confirmed */ - void responseHandler(Packet packet, Packet response); + void handleResponse(Packet packet, Packet response); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 3c0647f52c..7306072ad9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -171,11 +171,7 @@ public class ActiveMQSessionContext extends SessionContext { sessionChannel.setHandler(handler); if (confirmationWindow >= 0) { - if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { - sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler); - } else { - sessionChannel.setResponseHandler(responseHandler); - } + setHandlers(); } } @@ -192,16 +188,24 @@ public class ActiveMQSessionContext extends SessionContext { this.killed = true; } + private void setHandlers() { + sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler); + + if (!sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { + sessionChannel.setResponseHandler(responseHandler); + } + } + private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() { @Override public void commandConfirmed(Packet packet) { - responseHandler.responseHandler(packet, null); + responseHandler.handleResponse(packet, null); } }; private final ResponseHandler responseHandler = new ResponseHandler() { @Override - public void responseHandler(Packet packet, Packet response) { + public void handleResponse(Packet packet, Packet response) { final ActiveMQException activeMQException; if (response != null && response.getType() == PacketImpl.EXCEPTION) { ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response; @@ -232,7 +236,7 @@ public class ActiveMQSessionContext extends SessionContext { if (exception == null) { sendAckHandler.sendAcknowledged(message); } else { - handler.sendFailed(message, exception); + sendAckHandler.sendFailed(message, exception); } } } @@ -272,11 +276,8 @@ public class ActiveMQSessionContext extends SessionContext { @Override public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { - if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { - sessionChannel.setCommandConfirmationHandler(commandConfirmationHandler); - } else { - sessionChannel.setResponseHandler(responseHandler); - } + setHandlers(); + this.sendAckHandler = handler; } @@ -946,12 +947,12 @@ public class ActiveMQSessionContext extends SessionContext { boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { - final boolean requiresResponse = lastChunk || confirmationWindow != -1; + final boolean requiresResponse = lastChunk && sendBlocking; final SessionSendContinuationMessage chunkPacket; if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); } else { - chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler); } final int expectedEncodeSize = chunkPacket.expectedEncodeSize(); //perform a weak form of flow control to avoid OOM on tight loops @@ -969,11 +970,7 @@ public class ActiveMQSessionContext extends SessionContext { } if (requiresResponse) { // When sending it blocking, only the last chunk will be blocking. - if (sendBlocking) { - channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); - } else { - channel.send(chunkPacket); - } + channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); } else { channel.send(chunkPacket); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 9cb2a8351d..61268d6938 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -253,6 +253,10 @@ public final class ChannelImpl implements Channel { this.transferring = transferring; } + protected ResponseCache getCache() { + return responseAsyncCache; + } + /** * @param timeoutMsg message to log on blocking call failover timeout */ @@ -316,7 +320,7 @@ public final class ChannelImpl implements Channel { checkReconnectID(reconnectID); //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, - //As the send could block if the response cache is cannot add, preventing responses to be handled. + //As the send could block if the response cache cannot add, preventing responses to be handled. if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { while (!responseAsyncCache.add(packet)) { try { @@ -426,7 +430,7 @@ public final class ChannelImpl implements Channel { throw new ActiveMQInterruptedException(e); } - if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) { + if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) { ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace")); } @@ -642,7 +646,7 @@ public final class ChannelImpl implements Channel { } } - public void handleResponse(Packet packet) { + public void handleAsyncResponse(Packet packet) { if (responseAsyncCache != null && packet.isResponseAsync()) { responseAsyncCache.handleResponse(packet); } @@ -700,7 +704,7 @@ public final class ChannelImpl implements Channel { if (packet.isResponse()) { confirm(packet); - handleResponse(packet); + handleAsyncResponse(packet); lock.lock(); try { @@ -752,6 +756,9 @@ public final class ChannelImpl implements Channel { if (commandConfirmationHandler != null) { commandConfirmationHandler.commandConfirmed(packet); } + if (responseAsyncCache != null) { + responseAsyncCache.handleResponse(packet); + } } firstStoredCommandID += numberToClear; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 470e3aef94..0168a47c66 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -31,7 +31,8 @@ public class PacketImpl implements Packet { // 2.0.0 public static final int ADDRESSING_CHANGE_VERSION = 129; - public static final int SHARED_QUEUE_SECURITY_FIX_CHANGE_VERSION = 130; + + // 2.7.0 public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130; @@ -430,7 +431,7 @@ public class PacketImpl implements Packet { } protected String getParentString() { - return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName(); + return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", responseAsync=" + isResponseAsync() + ", requiresResponse=" + isRequiresResponse() + ", correlationID=" + getCorrelationID() + ", packetObject=" + this.getClass().getSimpleName(); } private int stringEncodeSize(final String str) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java index f9e85388d4..8ee73d7055 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ResponseCache.java @@ -53,7 +53,7 @@ public class ResponseCache { long correlationID = response.getCorrelationID(); Packet packet = remove(correlationID); if (packet != null) { - responseHandler.responseHandler(packet, response); + responseHandler.handleResponse(packet, response); } } @@ -67,4 +67,8 @@ public class ResponseCache { public void setResponseHandler(ResponseHandler responseHandler) { this.responseHandler = responseHandler; } + + public int size() { + return this.store.size(); + } } diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java new file mode 100644 index 0000000000..416c911f49 --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java @@ -0,0 +1,512 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl; + +import javax.security.auth.Subject; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.protocol.core.Channel; +import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; +import org.apache.activemq.artemis.core.remoting.CloseListener; +import org.apache.activemq.artemis.core.remoting.FailureListener; +import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ChannelImplTest { + + ChannelImpl channel; + + @Before + public void setUp() { + channel = new ChannelImpl(new CoreRR(), 1, 4000, null); + } + + @Test + public void testCorrelation() { + + AtomicInteger handleResponseCount = new AtomicInteger(); + + RequestPacket requestPacket = new RequestPacket((byte) 1); + setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet()); + + channel.send(requestPacket); + + assertEquals(1, channel.getCache().size()); + + ResponsePacket responsePacket = new ResponsePacket((byte) 1); + responsePacket.setCorrelationID(requestPacket.getCorrelationID()); + + channel.handlePacket(responsePacket); + + assertEquals(1, handleResponseCount.get()); + assertEquals(0, channel.getCache().size()); + } + + private void setResponseHandlerAsPerActiveMQSessionContext(ResponseHandler responseHandler) { + channel.setResponseHandler(responseHandler); + channel.setCommandConfirmationHandler(wrapAsPerActiveMQSessionContext(responseHandler)); + } + + private CommandConfirmationHandler wrapAsPerActiveMQSessionContext(ResponseHandler responseHandler) { + return new CommandConfirmationHandler() { + @Override + public void commandConfirmed(Packet packet) { + responseHandler.handleResponse(packet, null); + } + }; + } + + @Test + public void testPacketsConfirmedMessage() { + + AtomicInteger handleResponseCount = new AtomicInteger(); + + RequestPacket requestPacket = new RequestPacket((byte) 1); + setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet()); + + channel.send(requestPacket); + + PacketsConfirmedMessage responsePacket = new PacketsConfirmedMessage((byte) 2); + + channel.handlePacket(responsePacket); + + assertEquals(0, channel.getCache().size()); + } + + class RequestPacket extends PacketImpl { + + private long id; + + RequestPacket(byte type) { + super(type); + } + + @Override + public boolean isRequiresResponse() { + return true; + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public long getCorrelationID() { + return id; + } + + @Override + public void setCorrelationID(long id) { + this.id = id; + } + + @Override + public int getPacketSize() { + return 0; + } + } + + class ResponsePacket extends PacketImpl { + + private long id; + + ResponsePacket(byte type) { + super(type); + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public boolean isResponse() { + return true; + } + + @Override + public long getCorrelationID() { + return id; + } + + @Override + public void setCorrelationID(long id) { + this.id = id; + } + + @Override + public int getPacketSize() { + return 0; + } + } + + class CoreRR implements CoreRemotingConnection { + + @Override + public int getChannelVersion() { + return 0; + } + + @Override + public void setChannelVersion(int clientVersion) { + + } + + @Override + public Channel getChannel(long channelID, int confWindowSize) { + return null; + } + + @Override + public void putChannel(long channelID, Channel channel) { + + } + + @Override + public boolean removeChannel(long channelID) { + return false; + } + + @Override + public long generateChannelID() { + return 0; + } + + @Override + public void syncIDGeneratorSequence(long id) { + + } + + @Override + public long getIDGeneratorSequence() { + return 0; + } + + @Override + public long getBlockingCallTimeout() { + return 0; + } + + @Override + public long getBlockingCallFailoverTimeout() { + return 0; + } + + @Override + public Object getTransferLock() { + return null; + } + + @Override + public ActiveMQPrincipal getDefaultActiveMQPrincipal() { + return null; + } + + @Override + public boolean blockUntilWritable(int size, long timeout) { + return false; + } + + @Override + public Object getID() { + return null; + } + + @Override + public long getCreationTime() { + return 0; + } + + @Override + public String getRemoteAddress() { + return null; + } + + @Override + public void scheduledFlush() { + + } + + @Override + public void addFailureListener(FailureListener listener) { + + } + + @Override + public boolean removeFailureListener(FailureListener listener) { + return false; + } + + @Override + public void addCloseListener(CloseListener listener) { + + } + + @Override + public boolean removeCloseListener(CloseListener listener) { + return false; + } + + @Override + public List removeCloseListeners() { + return null; + } + + @Override + public void setCloseListeners(List listeners) { + + } + + @Override + public List getFailureListeners() { + return null; + } + + @Override + public List removeFailureListeners() { + return null; + } + + @Override + public void setFailureListeners(List listeners) { + + } + + @Override + public ActiveMQBuffer createTransportBuffer(int size) { + return new ChannelBufferWrapper(Unpooled.buffer(size)); + } + + @Override + public void fail(ActiveMQException me) { + + } + + @Override + public void fail(ActiveMQException me, String scaleDownTargetNodeID) { + + } + + @Override + public void destroy() { + + } + + @Override + public Connection getTransportConnection() { + return new Connection() { + @Override + public ActiveMQBuffer createTransportBuffer(int size) { + return null; + } + + @Override + public RemotingConnection getProtocolConnection() { + return null; + } + + @Override + public void setProtocolConnection(RemotingConnection connection) { + + } + + @Override + public boolean isWritable(ReadyListener listener) { + return false; + } + + @Override + public void fireReady(boolean ready) { + + } + + @Override + public void setAutoRead(boolean autoRead) { + + } + + @Override + public Object getID() { + return null; + } + + @Override + public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) { + + } + + @Override + public void write(ActiveMQBuffer buffer, + boolean flush, + boolean batched, + ChannelFutureListener futureListener) { + + } + + @Override + public void write(ActiveMQBuffer buffer) { + + } + + @Override + public void forceClose() { + + } + + @Override + public void close() { + + } + + @Override + public String getRemoteAddress() { + return null; + } + + @Override + public String getLocalAddress() { + return null; + } + + @Override + public void checkFlushBatchBuffer() { + + } + + @Override + public TransportConfiguration getConnectorConfig() { + return null; + } + + @Override + public ActiveMQPrincipal getDefaultActiveMQPrincipal() { + return null; + } + + @Override + public boolean isUsingProtocolHandling() { + return false; + } + + @Override + public boolean isSameTarget(TransportConfiguration... configs) { + return false; + } + }; + } + + @Override + public boolean isClient() { + return true; + } + + @Override + public boolean isDestroyed() { + return false; + } + + @Override + public void disconnect(boolean criticalError) { + + } + + @Override + public void disconnect(String scaleDownNodeID, boolean criticalError) { + + } + + @Override + public boolean checkDataReceived() { + return false; + } + + @Override + public void flush() { + + } + + @Override + public boolean isWritable(ReadyListener callback) { + return false; + } + + @Override + public void killMessage(SimpleString nodeID) { + + } + + @Override + public boolean isSupportReconnect() { + return false; + } + + @Override + public boolean isSupportsFlowControl() { + return false; + } + + @Override + public Subject getSubject() { + return null; + } + + @Override + public String getProtocolName() { + return null; + } + + @Override + public void setClientID(String cID) { + + } + + @Override + public String getClientID() { + return null; + } + + @Override + public String getTransportLocalAddress() { + return null; + } + + @Override + public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { + + } + } + +} \ No newline at end of file diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index fc15d5e757..ee4223cc57 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -34,6 +34,8 @@ import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicPublisher; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -563,6 +565,14 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To private final Message jmsMessage; private final ActiveMQMessageProducer producer; + /** + * It's possible that this SendAcknowledgementHandler might be called twice due to subsequent + * packet confirmations on the same connection. Using this boolean avoids that possibility. + * A new CompletionListenerWrapper is created for each message sent so once it's called once + * it will never be called again. + */ + private AtomicBoolean active = new AtomicBoolean(true); + /** * @param jmsMessage * @param producer @@ -577,56 +587,62 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To @Override public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) { - if (jmsMessage instanceof StreamMessage) { - try { - ((StreamMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? + if (active.get()) { + if (jmsMessage instanceof StreamMessage) { + try { + ((StreamMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } } - } - if (jmsMessage instanceof BytesMessage) { - try { - ((BytesMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? + if (jmsMessage instanceof BytesMessage) { + try { + ((BytesMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } } - } - try { - producer.connection.getThreadAwareContext().setCurrentThread(true); - completionListener.onCompletion(jmsMessage); - } finally { - producer.connection.getThreadAwareContext().clearCurrentThread(true); + try { + producer.connection.getThreadAwareContext().setCurrentThread(true); + completionListener.onCompletion(jmsMessage); + } finally { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + active.set(false); + } } } @Override public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) { - if (jmsMessage instanceof StreamMessage) { - try { - ((StreamMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? + if (active.get()) { + if (jmsMessage instanceof StreamMessage) { + try { + ((StreamMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } } - } - if (jmsMessage instanceof BytesMessage) { - try { - ((BytesMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? + if (jmsMessage instanceof BytesMessage) { + try { + ((BytesMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? + } } - } - try { - producer.connection.getThreadAwareContext().setCurrentThread(true); - if (exception instanceof ActiveMQException) { - exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException)exception); - } else if (exception instanceof ActiveMQInterruptedException) { - exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception); + try { + producer.connection.getThreadAwareContext().setCurrentThread(true); + if (exception instanceof ActiveMQException) { + exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception); + } else if (exception instanceof ActiveMQInterruptedException) { + exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception); + } + completionListener.onException(jmsMessage, exception); + } finally { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + active.set(false); } - completionListener.onException(jmsMessage, exception); - } finally { - producer.connection.getThreadAwareContext().clearCurrentThread(true); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index f5756f2935..16a87d8e8e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -893,7 +893,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { final Packet response, final boolean flush, final boolean closeChannel) { - if (confirmPacket != null) { + // don't confirm if the response is an exception + if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) { channel.confirm(confirmPacket); if (flush) { diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java index 0fd469c3fa..3ba58f0658 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java @@ -18,12 +18,15 @@ package org.apache.activemq.artemis.jms.tests; import static org.junit.Assert.fail; +import javax.jms.BytesMessage; import javax.jms.CompletionListener; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.IllegalStateException; +import javax.jms.JMSContext; +import javax.jms.JMSProducer; import javax.jms.JMSSecurityException; import javax.jms.Message; import javax.jms.MessageProducer; @@ -33,6 +36,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; @@ -188,10 +193,14 @@ public class SecurityTest extends JMSTestCase { */ @Test public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); + if (getJmsServer().locateQueue(queueName) == null) { + getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + } ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection("guest", "guest"); Session session = connection.createSession(); - Destination destination = session.createQueue("guest.cannot.send"); + Destination destination = session.createQueue(queueName.toString()); MessageProducer messageProducer = session.createProducer(destination); try { messageProducer.send(session.createTextMessage("hello")); @@ -209,18 +218,21 @@ public class SecurityTest extends JMSTestCase { */ @Test public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); + if (getJmsServer().locateQueue(queueName) == null) { + getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + } ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connectionFactory.setConfirmationWindowSize(100); connectionFactory.setBlockOnDurableSend(false); connectionFactory.setBlockOnNonDurableSend(false); Connection connection = connectionFactory.createConnection("guest", "guest"); Session session = connection.createSession(); - Destination destination = session.createQueue("guest.cannot.send"); + Destination destination = session.createQueue(queueName.toString()); MessageProducer messageProducer = session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); try { AtomicReference e = new AtomicReference<>(); - // messageProducer.send(session.createTextMessage("hello")); CountDownLatch countDownLatch = new CountDownLatch(1); messageProducer.send(session.createTextMessage("hello"), new CompletionListener() { @@ -242,6 +254,101 @@ public class SecurityTest extends JMSTestCase { fail("JMSSecurityException expected as guest is not allowed to send"); } catch (JMSSecurityException activeMQSecurityException) { activeMQSecurityException.printStackTrace(); + } finally { + connection.close(); + } + } + + /** + * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using JMS 2 API. + */ + @Test + public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistentJMS2() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); + if (getJmsServer().locateQueue(queueName) == null) { + getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connectionFactory.setConfirmationWindowSize(100); + connectionFactory.setBlockOnDurableSend(false); + connectionFactory.setBlockOnNonDurableSend(false); + JMSContext context = connectionFactory.createContext("guest", "guest"); + Destination destination = context.createQueue(queueName.toString()); + JMSProducer messageProducer = context.createProducer(); + messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + try { + AtomicReference e = new AtomicReference<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + messageProducer.setAsync(new CompletionListener() { + @Override + public void onCompletion(Message message) { + countDownLatch.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + e.set(exception); + countDownLatch.countDown(); + } + }); + messageProducer.send(destination, context.createTextMessage("hello")); + countDownLatch.await(10, TimeUnit.SECONDS); + if (e.get() != null) { + throw e.get(); + } + fail("JMSSecurityException expected as guest is not allowed to send"); + } catch (JMSSecurityException activeMQSecurityException) { + activeMQSecurityException.printStackTrace(); + } finally { + context.close(); + } + } + + /** + * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using a large message. + */ + @Test + public void testLoginValidUserAndPasswordButNotAuthorisedToSendLargeNonPersistent() throws Exception { + SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); + if (getJmsServer().locateQueue(queueName) == null) { + getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + } + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connectionFactory.setConfirmationWindowSize(100); + connectionFactory.setBlockOnDurableSend(false); + connectionFactory.setBlockOnNonDurableSend(false); + connectionFactory.setMinLargeMessageSize(1024); + Connection connection = connectionFactory.createConnection("guest", "guest"); + Session session = connection.createSession(); + Destination destination = session.createQueue(queueName.toString()); + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + try { + AtomicReference e = new AtomicReference<>(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[10 * 1024]); + messageProducer.send(message, new CompletionListener() { + @Override + public void onCompletion(Message message) { + countDownLatch.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + e.set(exception); + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + if (e.get() != null) { + throw e.get(); + } + fail("JMSSecurityException expected as guest is not allowed to send"); + } catch (JMSSecurityException activeMQSecurityException) { + activeMQSecurityException.printStackTrace(); } connection.close(); }