diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 1667945e30..a6cad89a1b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -294,11 +294,11 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { return null; } - public void invokeIncomingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { - manager.invokeIncoming(message, connection); + public String invokeIncomingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { + return manager.invokeIncoming(message, connection); } - public void invokeOutgoingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { - manager.invokeOutgoing(message, connection); + public String invokeOutgoingInterceptors(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { + return manager.invokeOutgoing(message, connection); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index a65361daaa..dad5d54572 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -507,36 +507,39 @@ public class AMQPSessionCallback implements SessionCallback { final Receiver receiver, final RoutingContext routingContext) throws Exception { message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer()); - invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()); - serverSession.send(transaction, message, directDeliver, false, routingContext); + if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) { + serverSession.send(transaction, message, directDeliver, false, routingContext); - afterIO(new IOCallback() { - @Override - public void done() { - connection.runLater(() -> { - if (delivery.getRemoteState() instanceof TransactionalState) { - TransactionalState txAccepted = new TransactionalState(); - txAccepted.setOutcome(Accepted.getInstance()); - txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId()); + afterIO(new IOCallback() { + @Override + public void done() { + connection.runLater(() -> { + if (delivery.getRemoteState() instanceof TransactionalState) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(((TransactionalState) delivery.getRemoteState()).getTxnId()); - delivery.disposition(txAccepted); - } else { - delivery.disposition(Accepted.getInstance()); - } - delivery.settle(); - context.flow(); - connection.flush(); - }); - } + delivery.disposition(txAccepted); + } else { + delivery.disposition(Accepted.getInstance()); + } + delivery.settle(); + context.flow(); + connection.flush(); + }); + } - @Override - public void onError(int errorCode, String errorMessage) { - connection.runNow(() -> { - receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); - connection.flush(); - }); - } - }); + @Override + public void onError(int errorCode, String errorMessage) { + connection.runNow(() -> { + receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); + connection.flush(); + }); + } + }); + } else { + rejectMessage(delivery, Symbol.valueOf("failed"), "Interceptor rejected message"); + } } /** Will execute a Runnable on an Address when there's space in memory*/ @@ -692,12 +695,12 @@ public class AMQPSessionCallback implements SessionCallback { manager.getServer().getSecurityStore().check(address, checkType, session); } - public void invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { - protonSPI.invokeIncomingInterceptors(message, connection); + public String invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { + return protonSPI.invokeIncomingInterceptors(message, connection); } - public void invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { - protonSPI.invokeOutgoingInterceptors(message, connection); + public String invokeOutgoing(AMQPMessage message, ActiveMQProtonRemotingConnection connection) { + return protonSPI.invokeOutgoingInterceptors(message, connection); } public void addProducer(ServerProducer serverProducer) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 121020c566..e0af481645 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -294,12 +294,12 @@ public class ProtonProtocolManager extends AbstractProtocolManager, C private final Map prefixes = new HashMap<>(); - protected void invokeInterceptors(final List interceptors, final P message, final C connection) { + protected String invokeInterceptors(final List interceptors, final P message, final C connection) { if (interceptors != null && !interceptors.isEmpty()) { for (I interceptor : interceptors) { try { if (!interceptor.intercept(message, connection)) { - break; + return interceptor.getClass().getName(); } } catch (Exception e) { - ActiveMQServerLogger.LOGGER.failedToInvokeAninterceptor(e); + ActiveMQServerLogger.LOGGER.failedToInvokeAnInterceptor(e); } } } + + return null; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java index 8dcb2bfe2f..0dfcda2fb5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java @@ -80,6 +80,73 @@ public class AmqpSendReceiveInterceptorTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testRejectMessageWithIncomingInterceptor() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() { + @Override + public boolean intercept(AMQPMessage message, RemotingConnection connection) throws ActiveMQException { + latch.countDown(); + return false; + } + }); + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + 1); + message.setText("Test-Message"); + try { + sender.send(message); + fail("Sending message should have thrown exception here."); + } catch (Exception e) { + assertEquals("Interceptor rejected message [condition = failed]", e.getMessage()); + } + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(2); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNull(amqpMessage); + sender.close(); + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testRejectMessageWithOutgoingInterceptor() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + 1); + message.setText("Test-Message"); + sender.send(message); + + final CountDownLatch latch = new CountDownLatch(1); + server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() { + @Override + public boolean intercept(AMQPMessage packet, RemotingConnection connection) throws ActiveMQException { + latch.countDown(); + return false; + } + }); + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(2); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNull(amqpMessage); + assertEquals(latch.getCount(), 0); + sender.close(); + receiver.close(); + connection.close(); + } + private static final String ADDRESS = "address"; private static final String MESSAGE_ID = "messageId"; private static final String CORRELATION_ID = "correlationId"; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java new file mode 100644 index 0000000000..cda06c073f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTRejectingInterceptorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ErrorCollector; + +public class MQTTRejectingInterceptorTest extends MQTTTestSupport { + + @Rule + public ErrorCollector collector = new ErrorCollector(); + + @Test(timeout = 60000) + public void testRejectedMQTTMessage() throws Exception { + final String addressQueue = name.getMethodName(); + final String msgText = "Test rejected message"; + + final MQTTClientProvider subscribeProvider = getMQTTClientProvider(); + initializeConnection(subscribeProvider); + subscribeProvider.subscribe(addressQueue, AT_MOST_ONCE); + + MQTTInterceptor incomingInterceptor = new MQTTInterceptor() { + @Override + public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { + System.out.println("incoming"); + if (packet.getClass() == MqttPublishMessage.class) { + return false; + } else { + return true; + } + } + }; + + server.getRemotingService().addIncomingInterceptor(incomingInterceptor); + + final MQTTClientProvider publishProvider = getMQTTClientProvider(); + initializeConnection(publishProvider); + publishProvider.publish(addressQueue, msgText.getBytes(), AT_MOST_ONCE, false); + assertNull(subscribeProvider.receive(3000)); + + subscribeProvider.disconnect(); + publishProvider.disconnect(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java index fad0e1288e..23def66a8e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithInterceptorsTest.java @@ -38,8 +38,8 @@ public class StompWithInterceptorsTest extends StompTestBase { @Override public List getIncomingInterceptors() { List stompIncomingInterceptor = new ArrayList<>(); - stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$IncomingStompInterceptor"); - stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$CoreInterceptor"); + stompIncomingInterceptor.add(IncomingStompInterceptor.class.getName()); + stompIncomingInterceptor.add(CoreInterceptor.class.getName()); return stompIncomingInterceptor; } @@ -47,7 +47,7 @@ public class StompWithInterceptorsTest extends StompTestBase { @Override public List getOutgoingInterceptors() { List stompOutgoingInterceptor = new ArrayList<>(); - stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompWithInterceptorsTest$OutgoingStompInterceptor"); + stompOutgoingInterceptor.add(OutgoingStompInterceptor.class.getName()); return stompOutgoingInterceptor; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java new file mode 100644 index 0000000000..736a56a2eb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithRejectingInterceptorTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.stomp; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.protocol.stomp.StompFrame; +import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class StompWithRejectingInterceptorTest extends StompTestBase { + + @Override + public List getIncomingInterceptors() { + List stompIncomingInterceptor = new ArrayList<>(); + stompIncomingInterceptor.add(IncomingStompFrameRejectInterceptor.class.getName()); + + return stompIncomingInterceptor; + } + + @Test + public void stompFrameInterceptor() throws Exception { + IncomingStompFrameRejectInterceptor.interceptedFrames.clear(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + ClientStompFrame frame = conn.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.setBody("Hello World"); + conn.sendFrame(frame); + conn.disconnect(); + + assertTrue(Wait.waitFor(() -> IncomingStompFrameRejectInterceptor.interceptedFrames.size() == 3, 2000, 50)); + + List incomingCommands = new ArrayList<>(4); + incomingCommands.add("CONNECT"); + incomingCommands.add("SEND"); + incomingCommands.add("DISCONNECT"); + + for (int i = 0; i < IncomingStompFrameRejectInterceptor.interceptedFrames.size(); i++) { + Assert.assertEquals(incomingCommands.get(i), IncomingStompFrameRejectInterceptor.interceptedFrames.get(i).getCommand()); + } + + Wait.assertFalse(() -> server.locateQueue(SimpleString.toSimpleString(getQueuePrefix() + getQueueName())).getMessageCount() > 0, 1000, 100); + } + + public static class IncomingStompFrameRejectInterceptor implements StompFrameInterceptor { + + static List interceptedFrames = Collections.synchronizedList(new ArrayList<>()); + + @Override + public boolean intercept(StompFrame stompFrame, RemotingConnection connection) { + interceptedFrames.add(stompFrame); + if (stompFrame.getCommand() == Stomp.Commands.SEND) { + return false; + } + return true; + } + } +} \ No newline at end of file