ARTEMIS-1123 Clean up and add new AMQP tests

Adds some new AMQP protocol handling tests brought forward from
ActiveMQ 5.x as well as cleaning up some of th existing tests
code to make adding some other tests easier.
This commit is contained in:
Timothy Bish 2017-04-18 16:50:08 -04:00 committed by Clebert Suconic
parent cc4c3957b1
commit 0260a304b4
20 changed files with 881 additions and 587 deletions

View File

@ -87,6 +87,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final String password; private final String password;
private final URI remoteURI; private final URI remoteURI;
private final String connectionId; private final String connectionId;
private List<Symbol> desiredCapabilities = Collections.emptyList();
private List<Symbol> offeredCapabilities = Collections.emptyList(); private List<Symbol> offeredCapabilities = Collections.emptyList();
private Map<Symbol, Object> offeredProperties = Collections.emptyMap(); private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
@ -146,6 +147,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
getEndpoint().setContainer(safeGetContainerId()); getEndpoint().setContainer(safeGetContainerId());
} }
getEndpoint().setHostname(remoteURI.getHost()); getEndpoint().setHostname(remoteURI.getHost());
if (!getDesiredCapabilities().isEmpty()) {
getEndpoint().setDesiredCapabilities(getDesiredCapabilities().toArray(new Symbol[0]));
}
if (!getOfferedCapabilities().isEmpty()) { if (!getOfferedCapabilities().isEmpty()) {
getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0])); getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0]));
} }
@ -393,12 +397,24 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.drainTimeout = drainTimeout; this.drainTimeout = drainTimeout;
} }
public List<Symbol> getDesiredCapabilities() {
return desiredCapabilities;
}
public void setDesiredCapabilities(List<Symbol> desiredCapabilities) {
if (desiredCapabilities == null) {
desiredCapabilities = Collections.emptyList();
}
this.desiredCapabilities = desiredCapabilities;
}
public List<Symbol> getOfferedCapabilities() { public List<Symbol> getOfferedCapabilities() {
return offeredCapabilities; return offeredCapabilities;
} }
public void setOfferedCapabilities(List<Symbol> offeredCapabilities) { public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
if (offeredCapabilities != null) { if (offeredCapabilities == null) {
offeredCapabilities = Collections.emptyList(); offeredCapabilities = Collections.emptyList();
} }
@ -410,7 +426,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
} }
public void setOfferedProperties(Map<Symbol, Object> offeredProperties) { public void setOfferedProperties(Map<Symbol, Object> offeredProperties) {
if (offeredProperties != null) { if (offeredProperties == null) {
offeredProperties = Collections.emptyMap(); offeredProperties = Collections.emptyMap();
} }

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
public class AmqpAnonymousRelayTest extends AmqpClientTestSupport {
@Override
protected boolean isAutoCreateQueues() {
return false;
}
@Override
protected boolean isAutoCreateAddresses() {
return false;
}
@Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(getQueueName());
message.setMessageId("msg" + 1);
message.setText("Test-Message");
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals("msg1", received.getMessageId());
received.accept();
receiver.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress("exampleQueu-not-in-service");
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
}

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Set; import java.util.Set;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
@ -26,7 +26,6 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
@ -48,9 +47,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
protected static Symbol SHARED = Symbol.getSymbol("shared"); protected static Symbol SHARED = Symbol.getSymbol("shared");
protected static Symbol GLOBAL = Symbol.getSymbol("global"); protected static Symbol GLOBAL = Symbol.getSymbol("global");
protected JMSServerManager serverManager; protected JMSServerManager serverManager;
protected ActiveMQServer server; protected ActiveMQServer server;
@Before @Before
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
@ -84,6 +83,22 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
super.tearDown(); super.tearDown();
} }
protected boolean isAutoCreateQueues() {
return true;
}
protected boolean isAutoCreateAddresses() {
return true;
}
protected String getDeadLetterAddress() {
return "ActiveMQ.DLQ";
}
protected int getPrecreatedQueueSize() {
return 10;
}
protected ActiveMQServer createServer() throws Exception { protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = createServer(true, true); ActiveMQServer server = createServer(true, true);
serverManager = new JMSServerManagerImpl(server); serverManager = new JMSServerManagerImpl(server);
@ -91,21 +106,30 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
// Address 1 // Address 1
CoreAddressConfiguration address = new CoreAddressConfiguration(); CoreAddressConfiguration address = new CoreAddressConfiguration();
address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST); address.setName(getQueueName()).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(); CoreQueueConfiguration queueConfig = new CoreQueueConfiguration();
queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST); queueConfig.setName(getQueueName()).setAddress(getQueueName()).setRoutingType(RoutingType.ANYCAST);
address.getQueueConfigurations().add(queueConfig); address.getQueueConfigurations().add(queueConfig);
serverConfig.addAddressConfiguration(address); serverConfig.addAddressConfiguration(address);
// Address 2 // Address 1....N
for (int i = 0; i < getPrecreatedQueueSize(); ++i) {
CoreAddressConfiguration address2 = new CoreAddressConfiguration(); CoreAddressConfiguration address2 = new CoreAddressConfiguration();
address2.setName(getTestName2()).getRoutingTypes().add(RoutingType.ANYCAST); address2.setName(getQueueName(i)).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration(); CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration();
queueConfig2.setName(getTestName2()).setAddress(getTestName2()).setRoutingType(RoutingType.ANYCAST); queueConfig2.setName(getQueueName(i)).setAddress(getQueueName(i)).setRoutingType(RoutingType.ANYCAST);
address2.getQueueConfigurations().add(queueConfig2); address2.getQueueConfigurations().add(queueConfig2);
serverConfig.addAddressConfiguration(address2); serverConfig.addAddressConfiguration(address2);
}
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); // Address configuration
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateQueues(isAutoCreateQueues());
addressSettings.setAutoCreateAddresses(isAutoCreateQueues());
addressSettings.setDeadLetterAddress(new SimpleString(getDeadLetterAddress()));
serverConfig.getAddressesSettings().put("#", addressSettings);
serverConfig.setSecurityEnabled(false); serverConfig.setSecurityEnabled(false);
Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations(); Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) { for (TransportConfiguration tc : acceptors) {
@ -127,8 +151,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
return getName(); return getName();
} }
public String getTestName2() { public String getQueueName() {
return getName() + "2"; return getName();
}
public String getQueueName(int index) {
return getName() + "-" + index;
} }
public AmqpClientTestSupport() { public AmqpClientTestSupport() {

View File

@ -40,18 +40,18 @@ public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName()); message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getQueueName());
sender.send(message); sender.send(message);
receiver.flow(1); receiver.flow(1);
Queue queue = getProxyToQueue(getTestName()); Queue queue = getProxyToQueue(getQueueName());
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);

View File

@ -51,16 +51,16 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter()); message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message); sender.send(message);
sender.close(); sender.close();
Queue queue = getProxyToQueue(getTestName()); Queue queue = getProxyToQueue(getQueueName());
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received); assertNotNull(received);
@ -77,14 +77,14 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter()); message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message); sender.send(message);
sender.close(); sender.close();
connection.close(); connection.close();
Queue queue = getProxyToQueue(getTestName()); Queue queue = getProxyToQueue(getQueueName());
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
@ -111,13 +111,13 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
// Send with AMQP client. // Send with AMQP client.
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter()); message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message); sender.send(message);
sender.close(); sender.close();
Queue queue = getProxyToQueue(getTestName()); Queue queue = getProxyToQueue(getQueueName());
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
// Receive and resend with OpenWire JMS client // Receive and resend with OpenWire JMS client
@ -142,7 +142,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
// Now lets receive it with AMQP and see that we get back what we expected. // Now lets receive it with AMQP and see that we get back what we expected.
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(returned); assertNotNull(returned);

View File

@ -35,10 +35,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -50,7 +50,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received); assertNull(received);
@ -66,10 +66,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -81,7 +81,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received); assertNotNull(received);
@ -97,10 +97,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -114,7 +114,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received); assertNull(received);
@ -130,10 +130,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -149,7 +149,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
Thread.sleep(1000); Thread.sleep(1000);
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received); assertNull(received);
@ -165,10 +165,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -184,7 +184,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received); assertNotNull(received);
@ -200,10 +200,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -215,7 +215,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received); assertNotNull(received);
@ -231,10 +231,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -248,7 +248,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received); assertNull(received);

View File

@ -16,6 +16,17 @@
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
@ -25,34 +36,22 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class AmqpNettyFailoverTest extends FailoverTestBase { public class AmqpFailoverEndpointDiscoveryTest extends FailoverTestBase {
// this will ensure that all tests in this class are run twice, // this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false" // once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
public static Collection getParameters() { public static Collection<?> getParameters() {
// these 3 are for comparison // these 3 are for comparison
return Arrays.asList(new Object[][]{{"NON_SSL", 0} return Arrays.asList(new Object[][]{{"NON_SSL", 0}
/*, {"SSL", 1} */ }); /*, {"SSL", 1} */ });
} }
private final int protocol; private final int protocol;
public AmqpNettyFailoverTest(String name, int protocol) { public AmqpFailoverEndpointDiscoveryTest(String name, int protocol) {
this.protocol = protocol; this.protocol = protocol;
} }
@ -66,7 +65,6 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
return getNettyConnectorTransportConfig(live); return getNettyConnectorTransportConfig(live);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testFailoverListWithAMQP() throws Exception { public void testFailoverListWithAMQP() throws Exception {
JmsConnectionFactory factory = getJmsConnectionFactory(); JmsConnectionFactory factory = getJmsConnectionFactory();
@ -94,7 +92,6 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
} else { } else {
String keystore = this.getClass().getClassLoader().getResource("client-side-keystore.jks").getFile(); String keystore = this.getClass().getClassLoader().getResource("client-side-keystore.jks").getFile();
String truststore = this.getClass().getClassLoader().getResource("client-side-truststore.jks").getFile(); String truststore = this.getClass().getClassLoader().getResource("client-side-truststore.jks").getFile();
// return new JmsConnectionFactory("amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false");
return new JmsConnectionFactory("failover:(amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false)"); return new JmsConnectionFactory("failover:(amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false)");
} }
} }
@ -108,14 +105,12 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample"); server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "server-side-truststore.jks"); server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "server-side-truststore.jks");
server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample"); server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
//server1Params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
} }
if (live) { if (live) {
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params); return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
} }
server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1); server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params); return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
@ -137,5 +132,4 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1); server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params); return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
} }
} }

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONNECTION_OPEN_FAILED;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONTAINER_ID;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.INVALID_FIELD;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for behaviors expected of the broker when clients connect to the broker
*/
public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpInboundConnectionTest.class);
private static final String BROKER_NAME = "localhost";
private static final String PRODUCT_NAME = "apache-activemq-artemis";
@Test
public void testBrokerContainerId() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
if (!BROKER_NAME.equals(connection.getRemoteContainer())) {
markAsInvalid("Broker did not send the expected container ID");
}
}
});
AmqpConnection connection = addConnection(client.connect());
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test
public void testBrokerConnectionProperties() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
Map<Symbol, Object> properties = connection.getRemoteProperties();
if (!properties.containsKey(PRODUCT)) {
markAsInvalid("Broker did not send a queue product name value");
return;
}
if (!properties.containsKey(VERSION)) {
markAsInvalid("Broker did not send a queue version value");
return;
}
if (!PRODUCT_NAME.equals(properties.get(PRODUCT))) {
markAsInvalid("Broker did not send a the expected product name");
return;
}
String brokerVersion = VersionLoader.getVersion().getFullVersion();
if (!brokerVersion.equals(properties.get(VERSION))) {
markAsInvalid("Broker did not send a the expected product version");
return;
}
}
});
AmqpConnection connection = addConnection(client.connect());
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testConnectionCarriesExpectedCapabilities() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
Symbol[] offered = connection.getRemoteOfferedCapabilities();
if (!contains(offered, ANONYMOUS_RELAY)) {
markAsInvalid("Broker did not indicate it support anonymous relay");
return;
}
if (!contains(offered, DELAYED_DELIVERY)) {
markAsInvalid("Broker did not indicate it support delayed message delivery");
return;
}
}
});
AmqpConnection connection = addConnection(client.connect());
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testCanConnectWithDifferentContainerIds() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection1 = addConnection(client.createConnection());
AmqpConnection connection2 = addConnection(client.createConnection());
connection1.setContainerId(getTestName() + "-Client:1");
connection2.setContainerId(getTestName() + "-Client:2");
connection1.connect();
assertEquals(1, server.getConnectionCount());
connection2.connect();
assertEquals(2, server.getConnectionCount());
connection1.close();
assertEquals(1, server.getConnectionCount());
connection2.close();
assertEquals(0, server.getConnectionCount());
}
@Test(timeout = 60000)
public void testCannotConnectWithSameContainerId() throws Exception {
AmqpClient client = createAmqpClient();
List<Symbol> desiredCapabilities = new ArrayList<>(1);
desiredCapabilities.add(AmqpSupport.SOLE_CONNECTION_CAPABILITY);
assertNotNull(client);
AmqpConnection connection1 = addConnection(client.createConnection());
AmqpConnection connection2 = addConnection(client.createConnection());
connection1.setDesiredCapabilities(desiredCapabilities);
connection2.setDesiredCapabilities(desiredCapabilities);
connection1.setContainerId(getTestName());
connection2.setContainerId(getTestName());
connection1.connect();
assertEquals(1, server.getConnectionCount());
connection2.setStateInspector(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
markAsInvalid("Broker did not set connection establishment failed property");
}
}
@Override
public void inspectClosedResource(Connection connection) {
ErrorCondition remoteError = connection.getRemoteCondition();
if (remoteError == null || remoteError.getCondition() == null) {
markAsInvalid("Broker did not add error condition for duplicate client ID");
} else {
if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) {
markAsInvalid("Broker did not set condition to " + AmqpError.INVALID_FIELD);
}
if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) {
markAsInvalid("Broker did not set condition to " + AmqpError.INVALID_FIELD);
}
}
// Validate the info map contains a hint that the container/client id was the
// problem
Map<?, ?> infoMap = remoteError.getInfo();
if (infoMap == null) {
markAsInvalid("Broker did not set an info map on condition");
} else if (!infoMap.containsKey(INVALID_FIELD)) {
markAsInvalid("Info map does not contain expected key");
} else {
Object value = infoMap.get(INVALID_FIELD);
if (!CONTAINER_ID.equals(value)) {
markAsInvalid("Info map does not contain expected value: " + value);
}
}
}
});
try {
connection2.connect();
fail("Should not be able to connect with same container Id.");
} catch (Exception ex) {
LOG.info("Second connection with same container Id failed as expected.");
}
connection2.getStateInspector().assertValid();
connection2.close();
assertTrue(Wait.waitFor(() -> server.getConnectionCount() == 1));
connection1.close();
assertEquals(0, server.getConnectionCount());
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.junit.Assert;
import org.junit.Test;
public class AmqpManagementTest extends AmqpClientTestSupport {
@Test
public void testManagementQueryOverAMQP() throws Throwable {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
String destinationAddress = getQueueName(1);
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("activemq.management");
AmqpReceiver receiver = session.createReceiver(destinationAddress);
receiver.flow(10);
// Create request message for getQueueNames query
AmqpMessage request = new AmqpMessage();
request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER);
request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
request.setReplyToAddress(destinationAddress);
request.setText("[]");
sender.send(request);
AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
assertNotNull(response);
Object section = response.getWrappedMessage().getBody();
assertTrue(section instanceof AmqpValue);
Object value = ((AmqpValue) section).getValue();
assertTrue(value instanceof String);
assertTrue(((String) value).length() > 0);
assertTrue(((String) value).contains(destinationAddress));
response.accept();
} finally {
connection.close();
}
}
}

View File

@ -42,7 +42,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:1"); message.setMessageId("MessageID:1");
@ -51,9 +51,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
receiver.flow(1); receiver.flow(1);
@ -73,7 +73,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setDurable(true); message.setDurable(true);
@ -91,9 +91,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
connection = addConnection(client.connect()); connection = addConnection(client.connect());
session = connection.createSession(); session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
receiver.flow(1); receiver.flow(1);
@ -114,7 +114,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:1"); message.setMessageId("MessageID:1");
@ -123,9 +123,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
receiver.flow(1); receiver.flow(1);
@ -146,7 +146,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:1"); message.setMessageId("MessageID:1");
@ -155,9 +155,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
receiver.flow(1); receiver.flow(1);
@ -178,7 +178,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:1"); message.setMessageId("MessageID:1");
@ -186,9 +186,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
receiver.flow(1); receiver.flow(1);
@ -208,7 +208,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (short i = 0; i <= 9; ++i) { for (short i = 0; i <= 9; ++i) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -219,9 +219,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(10, queueView.getMessageCount()); assertEquals(10, queueView.getMessageCount());
receiver.flow(10); receiver.flow(10);

View File

@ -35,16 +35,16 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testPresettledReceiverAndNonPresettledReceiverOnSameQueue() throws Exception { public void testPresettledReceiverAndNonPresettledReceiverOnSameQueue() throws Exception {
final int MSG_COUNT = 2; final int MSG_COUNT = 2;
sendMessages(getTestName(), MSG_COUNT); sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName(), null, false, true); AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, true);
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(1); receiver1.flow(1);
@ -68,7 +68,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
// Should be nothing left on the Queue // Should be nothing left on the Queue
AmqpReceiver receiver3 = session.createReceiver(getTestName()); AmqpReceiver receiver3 = session.createReceiver(getQueueName());
receiver3.flow(1); receiver3.flow(1);
AmqpMessage received = receiver3.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver3.receive(5, TimeUnit.SECONDS);
@ -85,15 +85,15 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testPresettledReceiverReadsAllMessages() throws Exception { public void testPresettledReceiverReadsAllMessages() throws Exception {
final int MSG_COUNT = 100; final int MSG_COUNT = 100;
sendMessages(getTestName(), MSG_COUNT); sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver.flow(MSG_COUNT); receiver.flow(MSG_COUNT);
@ -105,7 +105,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
// Open a new receiver and see if any message are left on the Queue // Open a new receiver and see if any message are left on the Queue
receiver = session.createReceiver(getTestName()); receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
if (received != null) { if (received != null) {
@ -121,15 +121,15 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testPresettledReceiverReadsAllMessagesInWhenReadInBatches() throws Exception { public void testPresettledReceiverReadsAllMessagesInWhenReadInBatches() throws Exception {
final int MSG_COUNT = 100; final int MSG_COUNT = 100;
sendMessages(getTestName(), MSG_COUNT); sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(MSG_COUNT, queueView.getMessageCount());
// Consume all 100 but do so in batches by flowing only limited credit. // Consume all 100 but do so in batches by flowing only limited credit.
@ -157,7 +157,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
System.out.println("Message Count after all consumed: " + queueView.getMessageCount()); System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
// Open a new receiver and see if any message are left on the Queue // Open a new receiver and see if any message are left on the Queue
receiver = session.createReceiver(getTestName()); receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
if (received != null) { if (received != null) {
@ -185,8 +185,8 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
@ -194,7 +194,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
session.begin(); session.begin();
@ -221,8 +221,8 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
@ -230,7 +230,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true); AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
session.begin(); session.begin();

View File

@ -34,19 +34,18 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testReleasedDisposition() throws Exception { public void testReleasedDisposition() throws Exception {
sendMessages(getTestName(), 1); sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1); receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertNotNull("did not receive message first time", message); assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId()); assertEquals("MessageID:0", message.getMessageId());
@ -75,13 +74,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testRejectedDisposition() throws Exception { public void testRejectedDisposition() throws Exception {
sendMessages(getTestName(), 1); sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1); receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
@ -101,7 +100,7 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
assertNull("Should not receive message again", message); assertNull("Should not receive message again", message);
// Attempt to Read the message again with another receiver to validate it is archived. // Attempt to Read the message again with another receiver to validate it is archived.
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1); receiver2.flow(1);
assertNull(receiver2.receiveNoWait()); assertNull(receiver2.receiveNoWait());
@ -129,13 +128,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
} }
private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
sendMessages(getTestName(), 1); sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1); receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
@ -154,7 +153,7 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
assertNull("Should not receive message again", message); assertNull("Should not receive message again", message);
} }
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1); receiver2.flow(1);
message = receiver2.receive(5, TimeUnit.SECONDS); message = receiver2.receive(5, TimeUnit.SECONDS);

View File

@ -35,15 +35,15 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testReceiverCanDrainMessages() throws Exception { public void testReceiverCanDrainMessages() throws Exception {
int MSG_COUNT = 20; int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT); sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect(); AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver.drain(MSG_COUNT); receiver.drain(MSG_COUNT);
@ -66,11 +66,11 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
AmqpConnection connection = client.connect(); AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(10); receiver.flow(10);
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(0, queueView.getMessageCount()); assertEquals(0, queueView.getMessageCount());
assertEquals(0, queueView.getDeliveringCount()); assertEquals(0, queueView.getDeliveringCount());
@ -86,15 +86,15 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testPullOneFromRemote() throws Exception { public void testPullOneFromRemote() throws Exception {
int MSG_COUNT = 20; int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT); sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect(); AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(MSG_COUNT, queueView.getMessageCount());
assertEquals(0, receiver.getReceiver().getRemoteCredit()); assertEquals(0, receiver.getReceiver().getRemoteCredit());
@ -119,11 +119,11 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
AmqpConnection connection = client.connect(); AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(10); receiver.flow(10);
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(0, queueView.getMessageCount()); assertEquals(0, queueView.getMessageCount());
assertEquals(10, receiver.getReceiver().getRemoteCredit()); assertEquals(10, receiver.getReceiver().getRemoteCredit());

View File

@ -74,7 +74,7 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER); filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
Source source = new Source(); Source source = new Source();
source.setAddress(getTestName()); source.setAddress(getQueueName());
source.setFilter(filters); source.setFilter(filters);
source.setDurable(TerminusDurability.NONE); source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
@ -116,13 +116,12 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), "color = red"); session.createReceiver(getQueueName(), "color = red");
connection.getStateInspector().assertValid(); connection.getStateInspector().assertValid();
connection.close(); connection.close();
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testReceivedUnsignedFilter() throws Exception { public void testReceivedUnsignedFilter() throws Exception {
final int NUM_MESSAGES = 100; final int NUM_MESSAGES = 100;
@ -131,10 +130,9 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
AmqpConnection connection = client.connect(); AmqpConnection connection = client.connect();
try { try {
// Normal Session which won't create an TXN itself // Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) { for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -144,7 +142,7 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
} }
// Read all messages from the Queue, do not accept them yet. // Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName(), "myNewID < " + (NUM_MESSAGES / 2)); AmqpReceiver receiver = session.createReceiver(getQueueName(), "myNewID < " + (NUM_MESSAGES / 2));
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2); receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES / 2; ++i) { for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
@ -161,6 +159,4 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
} }
} }

View File

@ -36,12 +36,14 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
public void testSendWithDeliveryTimeIsScheduled() throws Exception { public void testSendWithDeliveryTimeIsScheduled() throws Exception {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -54,24 +56,27 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getScheduledCount()); assertEquals(1, queueView.getScheduledCount());
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNull(received); assertNull(received);
} finally {
connection.close(); connection.close();
} }
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSendRecvWithDeliveryTime() throws Exception { public void testSendRecvWithDeliveryTime() throws Exception {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -83,7 +88,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getScheduledCount()); assertEquals(1, queueView.getScheduledCount());
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
// Now try and get the message, should not due to being scheduled. // Now try and get the message, should not due to being scheduled.
@ -94,20 +99,23 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
received = receiver.receive(10, TimeUnit.SECONDS); received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received); assertNotNull(received);
received.accept(); received.accept();
} finally {
connection.close(); connection.close();
} }
}
@Test @Test
public void testScheduleWithDelay() throws Exception { public void testScheduleWithDelay() throws Exception {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery. // Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView); assertNotNull(queueView);
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -119,7 +127,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getScheduledCount()); assertEquals(1, queueView.getScheduledCount());
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
// Now try and get the message, should not due to being scheduled. // Now try and get the message, should not due to being scheduled.
@ -130,7 +138,68 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
received = receiver.receive(10, TimeUnit.SECONDS); received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received); assertNotNull(received);
received.accept(); received.accept();
} finally {
connection.close(); connection.close();
} }
} }
@Test(timeout = 60000)
public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
// Shouldn't get this since we delayed the message.
assertNull(receiver.receive(1, TimeUnit.SECONDS));
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 2000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
assertNotNull(msgDeliveryTime);
assertEquals(deliveryTime, msgDeliveryTime.longValue());
} finally {
connection.close();
}
}
}

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -22,11 +22,11 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -57,7 +57,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository(); HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HashSet<Role> value = new HashSet<>(); HashSet<Role> value = new HashSet<>();
value.add(new Role("none", false, true, true, true, true, true, true, true)); value.add(new Role("none", false, true, true, true, true, true, true, true));
securityRepository.addMatch(getTestName(), value); securityRepository.addMatch(getQueueName(), value);
serverManager = new JMSServerManagerImpl(server); serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration(); Configuration serverConfig = server.getConfiguration();
@ -135,7 +135,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
connection = addConnection(client.connect()); connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1); message.setMessageId("msg" + 1);
@ -154,8 +154,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception { public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST)); server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false); server.createQueue(new SimpleString(getQueueName()), RoutingType.ANYCAST, new SimpleString(getQueueName()), null, true, false);
AmqpClient client = createAmqpClient(user1, password1); AmqpClient client = createAmqpClient(user1, password1);
AmqpConnection connection = client.connect(); AmqpConnection connection = client.connect();
@ -165,7 +165,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
AmqpSender sender = session.createAnonymousSender(); AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setAddress(getTestName()); message.setAddress(getQueueName());
message.setMessageId("msg" + 1); message.setMessageId("msg" + 1);
message.setText("Test-Message"); message.setText("Test-Message");

View File

@ -69,9 +69,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queue = getProxyToQueue(getTestName()); Queue queue = getProxyToQueue(getQueueName());
assertNotNull(queue); assertNotNull(queue);
receiver.close(); receiver.close();
@ -84,9 +84,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
sendMessages(getTestName(), 10); sendMessages(getQueueName(), 10);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
receiver.flow(1); receiver.flow(1);
@ -98,7 +98,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver.close(); receiver.close();
connection.close(); connection.close();
Queue queue = getProxyToQueue(getTestName()); Queue queue = getProxyToQueue(getQueueName());
assertNotNull(queue); assertNotNull(queue);
assertEquals(0, queue.getMessageCount()); assertEquals(0, queue.getMessageCount());
} }
@ -130,7 +130,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), "JMSPriority > 8"); session.createReceiver(getQueueName(), "JMSPriority > 8");
connection.getStateInspector().assertValid(); connection.getStateInspector().assertValid();
connection.close(); connection.close();
@ -163,7 +163,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), null, true); session.createReceiver(getQueueName(), null, true);
connection.getStateInspector().assertValid(); connection.getStateInspector().assertValid();
connection.close(); connection.close();
@ -177,7 +177,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
try { try {
session.createReceiver(getTestName(), "null = 'f''", true); session.createReceiver(getQueueName(), "null = 'f''", true);
fail("should throw exception"); fail("should throw exception");
} catch (Exception e) { } catch (Exception e) {
assertTrue(e.getCause() instanceof JMSException); assertTrue(e.getCause() instanceof JMSException);
@ -189,15 +189,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testQueueReceiverReadMessage() throws Exception { public void testQueueReceiverReadMessage() throws Exception {
sendMessages(getTestName(), 1); sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
receiver.flow(1); receiver.flow(1);
@ -211,11 +211,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testQueueReceiverReadMessageWithDivert() throws Exception { public void testQueueReceiverReadMessageWithDivert() throws Exception {
final String forwardingAddress = getTestName() + "Divert"; final String forwardingAddress = getQueueName() + "Divert";
final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress); final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false); server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false);
server.getActiveMQServerControl().createDivert("name", "routingName", getTestName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString()); server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
sendMessages(getTestName(), 1); sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
@ -313,15 +313,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception { public void testMessageDurableFalse() throws Exception {
sendMessages(getTestName(), 1, false); sendMessages(getQueueName(), 1, false);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
receiver.flow(1); receiver.flow(1);
@ -337,15 +337,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testMessageDurableTrue() throws Exception { public void testMessageDurableTrue() throws Exception {
sendMessages(getTestName(), 1, true); sendMessages(getQueueName(), 1, true);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); assertEquals(1, queueView.getMessageCount());
receiver.flow(1); receiver.flow(1);
@ -362,22 +362,22 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4; int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT); sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2); receiver1.flow(2);
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(2, server.getTotalConsumerCount()); assertEquals(2, server.getTotalConsumerCount());
@ -398,15 +398,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception { public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
int MSG_COUNT = 4; int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT); sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2); receiver1.flow(2);
@ -425,7 +425,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
} }
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(2, server.getTotalConsumerCount()); assertEquals(2, server.getTotalConsumerCount());
@ -456,15 +456,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception { public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
int MSG_COUNT = 20; int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT); sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
final Queue queueView = getProxyToQueue(getTestName()); final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount()); assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(20); receiver1.flow(20);
@ -479,7 +479,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver1.close(); receiver1.close();
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(1, server.getTotalConsumerCount()); assertEquals(1, server.getTotalConsumerCount());
@ -513,7 +513,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -525,7 +525,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close(); sender.close();
LOG.info("Attempting to read message with receiver"); LOG.info("Attempting to read message with receiver");
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2); receiver.flow(2);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received); assertNotNull("Should have read message", received);
@ -544,7 +544,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) { for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -560,17 +560,17 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close(); sender.close();
Queue queue = getProxyToQueue(getTestName()); Queue queue = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queue.getMessageCount()); assertEquals(MSG_COUNT, queue.getMessageCount());
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(MSG_COUNT); receiver1.flow(MSG_COUNT);
AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have got a message", received); assertNotNull("Should have got a message", received);
assertEquals("msg0", received.getMessageId()); assertEquals("msg0", received.getMessageId());
receiver1.close(); receiver1.close();
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(200); receiver2.flow(200);
for (int i = 0; i < MSG_COUNT; ++i) { for (int i = 0; i < MSG_COUNT; ++i) {
received = receiver2.receive(5, TimeUnit.SECONDS); received = receiver2.receive(5, TimeUnit.SECONDS);
@ -597,12 +597,12 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message2.setGroupId("hijklm"); message2.setGroupId("hijklm");
message2.setApplicationProperty("sn", 200); message2.setApplicationProperty("sn", 200);
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
sender.send(message1); sender.send(message1);
sender.send(message2); sender.send(message2);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName(), "sn = 100"); AmqpReceiver receiver = session.createReceiver(getQueueName(), "sn = 100");
receiver.flow(2); receiver.flow(2);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received); assertNotNull("Should have read a message", received);
@ -624,7 +624,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) { for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -639,7 +639,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close(); sender.close();
LOG.info("Attempting to read first two messages with receiver #1"); LOG.info("Attempting to read first two messages with receiver #1");
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(2); receiver1.flow(2);
AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS); AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS); AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
@ -651,7 +651,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message2.accept(); message2.accept();
LOG.info("Attempting to read next two messages with receiver #2"); LOG.info("Attempting to read next two messages with receiver #2");
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(2); receiver2.flow(2);
AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS); AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS); AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
@ -685,7 +685,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) { for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -699,10 +699,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close(); sender.close();
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1); receiver1.flow(1);
AmqpReceiver receiver2 = session.createReceiver(getTestName()); AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1); receiver2.flow(1);
AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS); AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
@ -759,7 +759,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
final String address = getTestName(); final String address = getQueueName();
AmqpReceiver receiver = session.createReceiver(address); AmqpReceiver receiver = session.createReceiver(address);
AmqpSender sender = session.createSender(address); AmqpSender sender = session.createSender(address);
@ -793,7 +793,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final CountDownLatch receiverReady = new CountDownLatch(1); final CountDownLatch receiverReady = new CountDownLatch(1);
ExecutorService executorService = Executors.newCachedThreadPool(); ExecutorService executorService = Executors.newCachedThreadPool();
final String address = getTestName(); final String address = getQueueName();
executorService.submit(new Runnable() { executorService.submit(new Runnable() {
@Override @Override
@ -858,10 +858,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpReceiver receiver1 = session.createReceiver(getTestName()); AmqpReceiver receiver1 = session.createReceiver(getQueueName());
Queue queue = getProxyToQueue(getTestName()); Queue queue = getProxyToQueue(getQueueName());
// Create default message that should be sent as non-durable // Create default message that should be sent as non-durable
AmqpMessage message1 = new AmqpMessage(); AmqpMessage message1 = new AmqpMessage();
@ -904,7 +904,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
final String address = getTestName(); final String address = getQueueName();
AmqpReceiver receiver = session.createReceiver(address); AmqpReceiver receiver = session.createReceiver(address);
AmqpSender sender = session.createSender(address); AmqpSender sender = session.createSender(address);
@ -957,7 +957,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
final String address = getTestName(); final String address = getQueueName();
AmqpSender sender = session.createSender(address); AmqpSender sender = session.createSender(address);
AmqpReceiver receiver1 = session.createReceiver(address, null, false, true); AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
@ -1036,7 +1036,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); AmqpSender sender = session.createSender("queue://" + getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
assertNotNull(sender); assertNotNull(sender);
connection.getStateInspector().assertValid(); connection.getStateInspector().assertValid();
@ -1047,7 +1047,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testMessageWithToFieldSetToSenderAddress() throws Exception { public void testMessageWithToFieldSetToSenderAddress() throws Exception {
doTestMessageWithToFieldSet(false, getTestName()); doTestMessageWithToFieldSet(false, getQueueName());
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -1067,7 +1067,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testMessageWithToFieldSetWithAnonymousSender() throws Exception { public void testMessageWithToFieldSetWithAnonymousSender() throws Exception {
doTestMessageWithToFieldSet(true, getTestName()); doTestMessageWithToFieldSet(true, getQueueName());
} }
private void doTestMessageWithToFieldSet(boolean anonymous, String expected) throws Exception { private void doTestMessageWithToFieldSet(boolean anonymous, String expected) throws Exception {
@ -1075,7 +1075,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
final String address = getTestName(); final String address = getQueueName();
AmqpSender sender = session.createSender(anonymous ? null : address); AmqpSender sender = session.createSender(anonymous ? null : address);

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI; import java.net.URI;
@ -25,8 +24,12 @@ import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.junit.After; import org.junit.After;
/** This will only add methods to support AMQP Testing without creating servers or anything */ /**
* Base test support class providing client support methods to aid in
* creating and configuration the AMQP test client.
*/
public class AmqpTestSupport extends ActiveMQTestBase { public class AmqpTestSupport extends ActiveMQTestBase {
protected LinkedList<AmqpConnection> connections = new LinkedList<>(); protected LinkedList<AmqpConnection> connections = new LinkedList<>();
protected boolean useSSL; protected boolean useSSL;
@ -121,7 +124,4 @@ public class AmqpTestSupport extends ActiveMQTestBase {
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
return new AmqpClient(brokerURI, username, password); return new AmqpClient(brokerURI, username, password);
} }
} }

View File

@ -94,7 +94,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
assertNotNull(session); assertNotNull(session);
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
sender.setStateInspector(new AmqpValidator() { sender.setStateInspector(new AmqpValidator() {
@Override @Override
@ -148,8 +148,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
session.begin(); session.begin();
@ -173,8 +173,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
session.begin(); session.begin();
@ -198,8 +198,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
@ -207,7 +207,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin(); session.begin();
@ -230,8 +230,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
@ -239,7 +239,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin(); session.begin();
@ -253,7 +253,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection = addConnection(client.connect()); connection = addConnection(client.connect());
session = connection.createSession(); session = connection.createSession();
receiver = session.createReceiver(getTestName()); receiver = session.createReceiver(getQueueName());
session.begin(); session.begin();
receiver.flow(1); receiver.flow(1);
@ -274,8 +274,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect()); AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
@ -283,7 +283,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount()); assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin(); session.begin();
@ -308,7 +308,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Load up the Queue with some messages // Load up the Queue with some messages
{ {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
sender.send(message); sender.send(message);
@ -326,11 +326,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession(); AmqpSession session3 = connection.createSession();
// Sender linked to each session // Sender linked to each session
AmqpReceiver receiver1 = session1.createReceiver(getTestName()); AmqpReceiver receiver1 = session1.createReceiver(getQueueName());
AmqpReceiver receiver2 = session2.createReceiver(getTestName()); AmqpReceiver receiver2 = session2.createReceiver(getQueueName());
AmqpReceiver receiver3 = session3.createReceiver(getTestName()); AmqpReceiver receiver3 = session3.createReceiver(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
assertEquals(3, queue.getMessageCount()); assertEquals(3, queue.getMessageCount());
// Begin the transaction that all senders will operate in. // Begin the transaction that all senders will operate in.
@ -365,7 +365,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Load up the Queue with some messages // Load up the Queue with some messages
{ {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
sender.send(message); sender.send(message);
@ -383,11 +383,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession(); AmqpSession session3 = connection.createSession();
// Sender linked to each session // Sender linked to each session
AmqpReceiver receiver1 = session1.createReceiver(getTestName()); AmqpReceiver receiver1 = session1.createReceiver(getQueueName());
AmqpReceiver receiver2 = session2.createReceiver(getTestName()); AmqpReceiver receiver2 = session2.createReceiver(getQueueName());
AmqpReceiver receiver3 = session3.createReceiver(getTestName()); AmqpReceiver receiver3 = session3.createReceiver(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
assertEquals(3, queue.getMessageCount()); assertEquals(3, queue.getMessageCount());
// Begin the transaction that all senders will operate in. // Begin the transaction that all senders will operate in.
@ -428,11 +428,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession(); AmqpSession session3 = connection.createSession();
// Sender linked to each session // Sender linked to each session
AmqpSender sender1 = session1.createSender(getTestName()); AmqpSender sender1 = session1.createSender(getQueueName());
AmqpSender sender2 = session2.createSender(getTestName()); AmqpSender sender2 = session2.createSender(getQueueName());
AmqpSender sender3 = session3.createSender(getTestName()); AmqpSender sender3 = session3.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
assertEquals(0, queue.getMessageCount()); assertEquals(0, queue.getMessageCount());
// Begin the transaction that all senders will operate in. // Begin the transaction that all senders will operate in.
@ -468,11 +468,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession(); AmqpSession session3 = connection.createSession();
// Sender linked to each session // Sender linked to each session
AmqpSender sender1 = session1.createSender(getTestName()); AmqpSender sender1 = session1.createSender(getQueueName());
AmqpSender sender2 = session2.createSender(getTestName()); AmqpSender sender2 = session2.createSender(getQueueName());
AmqpSender sender3 = session3.createSender(getTestName()); AmqpSender sender3 = session3.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName()); final Queue queue = getProxyToQueue(getQueueName());
assertEquals(0, queue.getMessageCount()); assertEquals(0, queue.getMessageCount());
// Begin the transaction that all senders will operate in. // Begin the transaction that all senders will operate in.
@ -509,7 +509,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself // Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
// Commit TXN work from a sender. // Commit TXN work from a sender.
txnSession.begin(); txnSession.begin();
@ -538,7 +538,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
} }
txnSession.commit(); txnSession.commit();
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(NUM_MESSAGES * 2); receiver.flow(NUM_MESSAGES * 2);
for (int i = 0; i < NUM_MESSAGES * 2; ++i) { for (int i = 0; i < NUM_MESSAGES * 2; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
@ -563,7 +563,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself // Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) { for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -573,7 +573,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
} }
// Read all messages from the Queue, do not accept them yet. // Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2); receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) { for (int i = 0; i < NUM_MESSAGES; ++i) {
@ -629,7 +629,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself // Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES; ++i) { for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -639,7 +639,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
} }
// Read all messages from the Queue, do not accept them yet. // Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2); receiver.flow(2);
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
@ -700,7 +700,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself // Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) { for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -710,7 +710,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
} }
// Read all messages from the Queue, do not accept them yet. // Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2); receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) { for (int i = 0; i < NUM_MESSAGES; ++i) {
@ -787,7 +787,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself // Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES; ++i) { for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
@ -797,7 +797,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
} }
// Read all messages from the Queue, do not accept them yet. // Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2); receiver.flow(2);
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
@ -930,12 +930,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession(); AmqpSession session = connection.createSession();
assertNotNull(session); assertNotNull(session);
AmqpSender sender = session.createSender(getTestName()); AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setText("Test-Message"); message.setText("Test-Message");
sender.send(message); sender.send(message);
AmqpReceiver receiver = session.createReceiver(getTestName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.setStateInspector(new AmqpValidator() { receiver.setStateInspector(new AmqpValidator() {
@Override @Override

View File

@ -16,6 +16,22 @@
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -41,26 +57,10 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MBeanServerFactory; import javax.management.MBeanServerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
@ -84,17 +84,14 @@ import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils; import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.junit.After; import org.junit.After;
@ -104,18 +101,12 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class ProtonTest extends ProtonTestBase { public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672"; private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
private static final String brokerName = "localhost";
private static final long maxSizeBytes = 1 * 1024 * 1024; private static final long maxSizeBytes = 1 * 1024 * 1024;
@ -370,132 +361,6 @@ public class ProtonTest extends ProtonTestBase {
} }
} }
@Test
public void testBrokerContainerId() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
assertTrue(brokerName.equals(amqpConnection.getEndpoint().getRemoteContainer()));
} finally {
amqpConnection.close();
}
}
@Test
public void testBrokerConnectionProperties() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
Map<Symbol, Object> properties = amqpConnection.getEndpoint().getRemoteProperties();
assertTrue(properties != null);
if (properties != null) {
assertTrue("apache-activemq-artemis".equals(properties.get(Symbol.valueOf("product"))));
assertTrue(VersionLoader.getVersion().getFullVersion().equals(properties.get(Symbol.valueOf("version"))));
}
} finally {
amqpConnection.close();
}
}
@Test(timeout = 60000)
public void testConnectionCarriesExpectedCapabilities() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) {
Symbol[] offered = connection.getRemoteOfferedCapabilities();
if (!contains(offered, DELAYED_DELIVERY)) {
markAsInvalid("Broker did not indicate it support delayed message delivery");
return;
}
Map<Symbol, Object> properties = connection.getRemoteProperties();
if (!properties.containsKey(PRODUCT)) {
markAsInvalid("Broker did not send a queue product name value");
return;
}
if (!properties.containsKey(VERSION)) {
markAsInvalid("Broker did not send a queue version value");
return;
}
}
});
AmqpConnection connection = client.connect();
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(address);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
// Shouldn't get this since we delayed the message.
assertNull(receiver.receive(1, TimeUnit.SECONDS));
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(address);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 2000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
assertNotNull(msgDeliveryTime);
assertEquals(deliveryTime, msgDeliveryTime.longValue());
} finally {
connection.close();
}
}
@Test @Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
@ -983,41 +848,6 @@ public class ProtonTest extends ProtonTestBase {
amqpConnection.close(); amqpConnection.close();
} }
@Test
public void testManagementQueryOverAMQP() throws Throwable {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
String destinationAddress = address + 1;
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender("activemq.management");
AmqpReceiver receiver = session.createReceiver(destinationAddress);
receiver.flow(10);
//create request message for getQueueNames query
AmqpMessage request = new AmqpMessage();
request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER);
request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
request.setReplyToAddress(destinationAddress);
request.setText("[]");
sender.send(request);
AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
assertNotNull(response);
Object section = response.getWrappedMessage().getBody();
assertTrue(section instanceof AmqpValue);
Object value = ((AmqpValue) section).getValue();
assertTrue(value instanceof String);
assertTrue(((String) value).length() > 0);
assertTrue(((String) value).contains(destinationAddress));
response.accept();
} finally {
amqpConnection.close();
}
}
@Test @Test
public void testReplyTo() throws Throwable { public void testReplyTo() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -1792,93 +1622,6 @@ public class ProtonTest extends ProtonTestBase {
} }
} }
@Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(address);
message.setMessageId("msg" + 1);
message.setText("Test-Message");
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals("msg1", received.getMessageId());
received.accept();
receiver.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(address + "-not-in-service");
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
private javax.jms.Queue createQueue(String address) throws Exception { private javax.jms.Queue createQueue(String address) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try { try {