ARTEMIS-669 Do binding query on sender link attach

QueueQuery was previously used instead of checking for bindings on a
particular address name.  This meant sending and receiving only worked
for those queues that happened to have the same queueName to address.
This patch replaces this with binding check.

There's also some minor ProtonTest fixes included.
This commit is contained in:
Martyn Taylor 2016-08-08 10:31:33 +01:00 committed by Andy Taylor
parent 1af6f5c5cc
commit 0af13e0d03
5 changed files with 63 additions and 15 deletions

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -223,6 +224,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
return queryResult;
}
@Override
public boolean bindingQuery(String address) throws Exception {
boolean queryResult = false;
BindingQueryResult queueQuery = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
if (queueQuery.isExists()) {
queryResult = true;
}
else {
if (queueQuery.isAutoCreateJmsQueues()) {
serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
queryResult = true;
}
else {
queryResult = false;
}
}
return queryResult;
}
@Override
public void closeSender(final Object brokerConsumer) throws Exception {
Runnable runnable = new Runnable() {

View File

@ -50,6 +50,8 @@ public interface AMQPSessionCallback {
boolean queueQuery(String queueName) throws Exception;
boolean bindingQuery(String address) throws Exception;
void closeSender(Object brokerConsumer) throws Exception;
// This one can be a lot improved

View File

@ -88,7 +88,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
}
try {
if (!sessionSPI.queueQuery(address)) {
if (!sessionSPI.bindingQuery(address)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
}

View File

@ -104,6 +104,11 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
return true;
}
@Override
public boolean bindingQuery(String address) throws Exception {
return true;
}
@Override
public void closeSender(Object brokerConsumer) {
((Consumer) brokerConsumer).close();

View File

@ -102,6 +102,8 @@ public class ProtonTest extends ActiveMQTestBase {
private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
private int messagesSent = 0;
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}")
@ -401,7 +403,7 @@ public class ProtonTest extends ActiveMQTestBase {
// Use blocking send to ensure buffered messages do not interfere with credit.
sender.setSendTimeout(-1);
sendUntilFull(sender, destinationAddress);
sendUntilFull(sender);
// This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
assertTrue(sender.getSender().getCredit() == -1);
@ -421,7 +423,7 @@ public class ProtonTest extends ActiveMQTestBase {
setAddressFullBlockPolicy();
String destinationAddress = address + 1;
int messagesSent = fillAddress(destinationAddress);
fillAddress(destinationAddress);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = amqpConnection = client.connect();
@ -446,8 +448,7 @@ public class ProtonTest extends ActiveMQTestBase {
// Wait for address to unblock and flow frame to arrive
Thread.sleep(500);
assertTrue(sender.getSender().getCredit() == 0);
assertNotNull(receiver.receive());
assertTrue(sender.getSender().getCredit() >= 0);
}
finally {
amqpConnection.close();
@ -517,15 +518,14 @@ public class ProtonTest extends ActiveMQTestBase {
* @return
* @throws Exception
*/
private int fillAddress(String address) throws Exception {
private void fillAddress(String address) throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
int messagesSent = 0;
Exception exception = null;
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
messagesSent = sendUntilFull(sender, null);
sendUntilFull(sender);
}
catch (Exception e) {
exception = e;
@ -537,11 +537,9 @@ public class ProtonTest extends ActiveMQTestBase {
// Should receive a rejected error
assertNotNull(exception);
assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
return messagesSent;
}
private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws Exception {
private void sendUntilFull(final AmqpSender sender) throws Exception {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[50 * 1024];
message.setBytes(payload);
@ -572,11 +570,10 @@ public class ProtonTest extends ActiveMQTestBase {
timeout.await(5, TimeUnit.SECONDS);
System.out.println("Messages Sent: " + sentMessages);
messagesSent = sentMessages.get();
if (errors[0] != null) {
throw errors[0];
}
return sentMessages.get();
}
@Test
@ -599,11 +596,32 @@ public class ProtonTest extends ActiveMQTestBase {
}
@Test
public void testReplyTo() throws Throwable {
public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
String queueName = "TestQueueName";
String address = "TestAddress";
server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(queueName);
receiver.flow(1);
AmqpMessage message = new AmqpMessage();
message.setText("TestPayload");
sender.send(message);
AmqpMessage receivedMessage = receiver.receive();
assertNotNull(receivedMessage);
}
@Test
public void testReplyTo() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
System.out.println("queue:" + queue.getQueueName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();