This closes #704 ARTEMIS-669 Do binding query on sender link attach

This commit is contained in:
Andy Taylor 2016-08-09 08:21:41 +01:00
commit d88ede9e3a
5 changed files with 63 additions and 15 deletions

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -223,6 +224,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
return queryResult;
}
@Override
public boolean bindingQuery(String address) throws Exception {
boolean queryResult = false;
BindingQueryResult queueQuery = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
if (queueQuery.isExists()) {
queryResult = true;
}
else {
if (queueQuery.isAutoCreateJmsQueues()) {
serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
queryResult = true;
}
else {
queryResult = false;
}
}
return queryResult;
}
@Override
public void closeSender(final Object brokerConsumer) throws Exception {
Runnable runnable = new Runnable() {

View File

@ -50,6 +50,8 @@ public interface AMQPSessionCallback {
boolean queueQuery(String queueName) throws Exception;
boolean bindingQuery(String address) throws Exception;
void closeSender(Object brokerConsumer) throws Exception;
// This one can be a lot improved

View File

@ -88,7 +88,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
}
try {
if (!sessionSPI.queueQuery(address)) {
if (!sessionSPI.bindingQuery(address)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
}

View File

@ -104,6 +104,11 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
return true;
}
@Override
public boolean bindingQuery(String address) throws Exception {
return true;
}
@Override
public void closeSender(Object brokerConsumer) {
((Consumer) brokerConsumer).close();

View File

@ -102,6 +102,8 @@ public class ProtonTest extends ActiveMQTestBase {
private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
private int messagesSent = 0;
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}")
@ -401,7 +403,7 @@ public class ProtonTest extends ActiveMQTestBase {
// Use blocking send to ensure buffered messages do not interfere with credit.
sender.setSendTimeout(-1);
sendUntilFull(sender, destinationAddress);
sendUntilFull(sender);
// This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
assertTrue(sender.getSender().getCredit() == -1);
@ -421,7 +423,7 @@ public class ProtonTest extends ActiveMQTestBase {
setAddressFullBlockPolicy();
String destinationAddress = address + 1;
int messagesSent = fillAddress(destinationAddress);
fillAddress(destinationAddress);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = amqpConnection = client.connect();
@ -446,8 +448,7 @@ public class ProtonTest extends ActiveMQTestBase {
// Wait for address to unblock and flow frame to arrive
Thread.sleep(500);
assertTrue(sender.getSender().getCredit() == 0);
assertNotNull(receiver.receive());
assertTrue(sender.getSender().getCredit() >= 0);
}
finally {
amqpConnection.close();
@ -517,15 +518,14 @@ public class ProtonTest extends ActiveMQTestBase {
* @return
* @throws Exception
*/
private int fillAddress(String address) throws Exception {
private void fillAddress(String address) throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
int messagesSent = 0;
Exception exception = null;
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
messagesSent = sendUntilFull(sender, null);
sendUntilFull(sender);
}
catch (Exception e) {
exception = e;
@ -537,11 +537,9 @@ public class ProtonTest extends ActiveMQTestBase {
// Should receive a rejected error
assertNotNull(exception);
assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
return messagesSent;
}
private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws Exception {
private void sendUntilFull(final AmqpSender sender) throws Exception {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[50 * 1024];
message.setBytes(payload);
@ -572,11 +570,10 @@ public class ProtonTest extends ActiveMQTestBase {
timeout.await(5, TimeUnit.SECONDS);
System.out.println("Messages Sent: " + sentMessages);
messagesSent = sentMessages.get();
if (errors[0] != null) {
throw errors[0];
}
return sentMessages.get();
}
@Test
@ -599,11 +596,32 @@ public class ProtonTest extends ActiveMQTestBase {
}
@Test
public void testReplyTo() throws Throwable {
public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
String queueName = "TestQueueName";
String address = "TestAddress";
server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(queueName);
receiver.flow(1);
AmqpMessage message = new AmqpMessage();
message.setText("TestPayload");
sender.send(message);
AmqpMessage receivedMessage = receiver.receive();
assertNotNull(receivedMessage);
}
@Test
public void testReplyTo() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
System.out.println("queue:" + queue.getQueueName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();