This commit is contained in:
Clebert Suconic 2017-03-17 15:09:56 -04:00
commit 861c231551
1 changed files with 64 additions and 8 deletions

View File

@ -16,7 +16,11 @@
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.JMSException; import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -26,10 +30,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -43,15 +49,11 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Sender;
import org.jgroups.util.UUID;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
/** /**
* Test basic send and receive scenarios using only AMQP sender and receiver links. * Test basic send and receive scenarios using only AMQP sender and receiver links.
*/ */
@ -1049,6 +1051,61 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
@Test(timeout = 60000)
public void testMessageWithToFieldSetToSenderAddress() throws Exception {
doTestMessageWithToFieldSet(false, getTestName());
}
@Test(timeout = 60000)
public void testMessageWithToFieldSetToRandomAddress() throws Exception {
doTestMessageWithToFieldSet(false, UUID.randomUUID().toString());
}
@Test(timeout = 60000)
public void testMessageWithToFieldSetToEmpty() throws Exception {
doTestMessageWithToFieldSet(false, "");
}
@Test(timeout = 60000)
public void testMessageWithToFieldSetToNull() throws Exception {
doTestMessageWithToFieldSet(false, null);
}
@Test(timeout = 60000)
public void testMessageWithToFieldSetWithAnonymousSender() throws Exception {
doTestMessageWithToFieldSet(true, getTestName());
}
private void doTestMessageWithToFieldSet(boolean anonymous, String expected) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
final String address = getTestName();
AmqpSender sender = session.createSender(anonymous ? null : address);
AmqpMessage message = new AmqpMessage();
message.setAddress(expected);
message.setMessageId("msg:1");
sender.send(message);
AmqpReceiver receiver = session.createReceiver(address);
Queue queueView = getProxyToQueue(address);
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(expected, received.getAddress());
receiver.close();
assertEquals(1, queueView.getMessageCount());
connection.close();
}
public void sendMessages(String destinationName, int count) throws Exception { public void sendMessages(String destinationName, int count) throws Exception {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
@ -1066,7 +1123,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
} }
} }
public void sendMessages(String destinationName, int count, boolean durable) throws Exception { public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());