From 0af13e0d03ace20bf913fb4f5d6956b6437db718 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Mon, 8 Aug 2016 10:31:33 +0100 Subject: [PATCH] ARTEMIS-669 Do binding query on sender link attach QueueQuery was previously used instead of checking for bindings on a particular address name. This meant sending and receiving only worked for those queues that happened to have the same queueName to address. This patch replaces this with binding check. There's also some minor ProtonTest fixes included. --- .../ProtonSessionIntegrationCallback.java | 23 ++++++++++ .../org/proton/plug/AMQPSessionCallback.java | 2 + .../server/ProtonServerReceiverContext.java | 2 +- .../test/minimalserver/MinimalSessionSPI.java | 5 ++ .../tests/integration/proton/ProtonTest.java | 46 +++++++++++++------ 5 files changed, 63 insertions(+), 15 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index a00af71e0a..107df8a617 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager; import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -223,6 +224,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se return queryResult; } + @Override + public boolean bindingQuery(String address) throws Exception { + boolean queryResult = false; + + BindingQueryResult queueQuery = serverSession.executeBindingQuery(SimpleString.toSimpleString(address)); + + if (queueQuery.isExists()) { + queryResult = true; + } + else { + if (queueQuery.isAutoCreateJmsQueues()) { + serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true); + queryResult = true; + } + else { + queryResult = false; + } + } + + return queryResult; + } + @Override public void closeSender(final Object brokerConsumer) throws Exception { Runnable runnable = new Runnable() { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java index 637b5382da..459931886c 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -50,6 +50,8 @@ public interface AMQPSessionCallback { boolean queueQuery(String queueName) throws Exception; + boolean bindingQuery(String address) throws Exception; + void closeSender(Object brokerConsumer) throws Exception; // This one can be a lot improved diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index fba869c198..0bbe8ca3a3 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -88,7 +88,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { } try { - if (!sessionSPI.queueQuery(address)) { + if (!sessionSPI.bindingQuery(address)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } } diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java index b917aa6920..7397612ce0 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -104,6 +104,11 @@ public class MinimalSessionSPI implements AMQPSessionCallback { return true; } + @Override + public boolean bindingQuery(String address) throws Exception { + return true; + } + @Override public void closeSender(Object brokerConsumer) { ((Consumer) brokerConsumer).close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 785543d299..cd37bdaa50 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -102,6 +102,8 @@ public class ProtonTest extends ActiveMQTestBase { private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024; + private int messagesSent = 0; + // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" @Parameterized.Parameters(name = "{0}") @@ -401,7 +403,7 @@ public class ProtonTest extends ActiveMQTestBase { // Use blocking send to ensure buffered messages do not interfere with credit. sender.setSendTimeout(-1); - sendUntilFull(sender, destinationAddress); + sendUntilFull(sender); // This should be -1. A single message is buffered in the client, and 0 credit has been allocated. assertTrue(sender.getSender().getCredit() == -1); @@ -421,7 +423,7 @@ public class ProtonTest extends ActiveMQTestBase { setAddressFullBlockPolicy(); String destinationAddress = address + 1; - int messagesSent = fillAddress(destinationAddress); + fillAddress(destinationAddress); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = amqpConnection = client.connect(); @@ -446,8 +448,7 @@ public class ProtonTest extends ActiveMQTestBase { // Wait for address to unblock and flow frame to arrive Thread.sleep(500); - assertTrue(sender.getSender().getCredit() == 0); - assertNotNull(receiver.receive()); + assertTrue(sender.getSender().getCredit() >= 0); } finally { amqpConnection.close(); @@ -517,15 +518,14 @@ public class ProtonTest extends ActiveMQTestBase { * @return * @throws Exception */ - private int fillAddress(String address) throws Exception { + private void fillAddress(String address) throws Exception { AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = client.connect(); - int messagesSent = 0; Exception exception = null; try { AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(address); - messagesSent = sendUntilFull(sender, null); + sendUntilFull(sender); } catch (Exception e) { exception = e; @@ -537,11 +537,9 @@ public class ProtonTest extends ActiveMQTestBase { // Should receive a rejected error assertNotNull(exception); assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded")); - - return messagesSent; } - private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws Exception { + private void sendUntilFull(final AmqpSender sender) throws Exception { final AmqpMessage message = new AmqpMessage(); byte[] payload = new byte[50 * 1024]; message.setBytes(payload); @@ -572,11 +570,10 @@ public class ProtonTest extends ActiveMQTestBase { timeout.await(5, TimeUnit.SECONDS); - System.out.println("Messages Sent: " + sentMessages); + messagesSent = sentMessages.get(); if (errors[0] != null) { throw errors[0]; } - return sentMessages.get(); } @Test @@ -599,11 +596,32 @@ public class ProtonTest extends ActiveMQTestBase { } @Test - public void testReplyTo() throws Throwable { + public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception { + String queueName = "TestQueueName"; + String address = "TestAddress"; + + server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false); + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection amqpConnection = client.connect(); + AmqpSession session = amqpConnection.createSession(); + AmqpSender sender = session.createSender(address); + AmqpReceiver receiver = session.createReceiver(queueName); + receiver.flow(1); + + AmqpMessage message = new AmqpMessage(); + message.setText("TestPayload"); + sender.send(message); + + AmqpMessage receivedMessage = receiver.receive(); + assertNotNull(receivedMessage); + } + + @Test + public void testReplyTo() throws Throwable { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = session.createTemporaryQueue(); - System.out.println("queue:" + queue.getQueueName()); MessageProducer p = session.createProducer(queue); TextMessage message = session.createTextMessage();