diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index a00af71e0a..107df8a617 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager; import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -223,6 +224,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se return queryResult; } + @Override + public boolean bindingQuery(String address) throws Exception { + boolean queryResult = false; + + BindingQueryResult queueQuery = serverSession.executeBindingQuery(SimpleString.toSimpleString(address)); + + if (queueQuery.isExists()) { + queryResult = true; + } + else { + if (queueQuery.isAutoCreateJmsQueues()) { + serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true); + queryResult = true; + } + else { + queryResult = false; + } + } + + return queryResult; + } + @Override public void closeSender(final Object brokerConsumer) throws Exception { Runnable runnable = new Runnable() { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java index 637b5382da..459931886c 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -50,6 +50,8 @@ public interface AMQPSessionCallback { boolean queueQuery(String queueName) throws Exception; + boolean bindingQuery(String address) throws Exception; + void closeSender(Object brokerConsumer) throws Exception; // This one can be a lot improved diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index fba869c198..0bbe8ca3a3 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -88,7 +88,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { } try { - if (!sessionSPI.queueQuery(address)) { + if (!sessionSPI.bindingQuery(address)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } } diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java index b917aa6920..7397612ce0 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -104,6 +104,11 @@ public class MinimalSessionSPI implements AMQPSessionCallback { return true; } + @Override + public boolean bindingQuery(String address) throws Exception { + return true; + } + @Override public void closeSender(Object brokerConsumer) { ((Consumer) brokerConsumer).close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 785543d299..cd37bdaa50 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -102,6 +102,8 @@ public class ProtonTest extends ActiveMQTestBase { private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024; + private int messagesSent = 0; + // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" @Parameterized.Parameters(name = "{0}") @@ -401,7 +403,7 @@ public class ProtonTest extends ActiveMQTestBase { // Use blocking send to ensure buffered messages do not interfere with credit. sender.setSendTimeout(-1); - sendUntilFull(sender, destinationAddress); + sendUntilFull(sender); // This should be -1. A single message is buffered in the client, and 0 credit has been allocated. assertTrue(sender.getSender().getCredit() == -1); @@ -421,7 +423,7 @@ public class ProtonTest extends ActiveMQTestBase { setAddressFullBlockPolicy(); String destinationAddress = address + 1; - int messagesSent = fillAddress(destinationAddress); + fillAddress(destinationAddress); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = amqpConnection = client.connect(); @@ -446,8 +448,7 @@ public class ProtonTest extends ActiveMQTestBase { // Wait for address to unblock and flow frame to arrive Thread.sleep(500); - assertTrue(sender.getSender().getCredit() == 0); - assertNotNull(receiver.receive()); + assertTrue(sender.getSender().getCredit() >= 0); } finally { amqpConnection.close(); @@ -517,15 +518,14 @@ public class ProtonTest extends ActiveMQTestBase { * @return * @throws Exception */ - private int fillAddress(String address) throws Exception { + private void fillAddress(String address) throws Exception { AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = client.connect(); - int messagesSent = 0; Exception exception = null; try { AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(address); - messagesSent = sendUntilFull(sender, null); + sendUntilFull(sender); } catch (Exception e) { exception = e; @@ -537,11 +537,9 @@ public class ProtonTest extends ActiveMQTestBase { // Should receive a rejected error assertNotNull(exception); assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded")); - - return messagesSent; } - private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws Exception { + private void sendUntilFull(final AmqpSender sender) throws Exception { final AmqpMessage message = new AmqpMessage(); byte[] payload = new byte[50 * 1024]; message.setBytes(payload); @@ -572,11 +570,10 @@ public class ProtonTest extends ActiveMQTestBase { timeout.await(5, TimeUnit.SECONDS); - System.out.println("Messages Sent: " + sentMessages); + messagesSent = sentMessages.get(); if (errors[0] != null) { throw errors[0]; } - return sentMessages.get(); } @Test @@ -599,11 +596,32 @@ public class ProtonTest extends ActiveMQTestBase { } @Test - public void testReplyTo() throws Throwable { + public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception { + String queueName = "TestQueueName"; + String address = "TestAddress"; + + server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false); + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection amqpConnection = client.connect(); + AmqpSession session = amqpConnection.createSession(); + AmqpSender sender = session.createSender(address); + AmqpReceiver receiver = session.createReceiver(queueName); + receiver.flow(1); + + AmqpMessage message = new AmqpMessage(); + message.setText("TestPayload"); + sender.send(message); + + AmqpMessage receivedMessage = receiver.receive(); + assertNotNull(receivedMessage); + } + + @Test + public void testReplyTo() throws Throwable { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = session.createTemporaryQueue(); - System.out.println("queue:" + queue.getQueueName()); MessageProducer p = session.createProducer(queue); TextMessage message = session.createTextMessage();