This commit is contained in:
Clebert Suconic 2017-04-19 00:50:13 -04:00
commit fc4d5edefa
20 changed files with 881 additions and 587 deletions

View File

@ -87,6 +87,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final String password;
private final URI remoteURI;
private final String connectionId;
private List<Symbol> desiredCapabilities = Collections.emptyList();
private List<Symbol> offeredCapabilities = Collections.emptyList();
private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
@ -146,6 +147,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
getEndpoint().setContainer(safeGetContainerId());
}
getEndpoint().setHostname(remoteURI.getHost());
if (!getDesiredCapabilities().isEmpty()) {
getEndpoint().setDesiredCapabilities(getDesiredCapabilities().toArray(new Symbol[0]));
}
if (!getOfferedCapabilities().isEmpty()) {
getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0]));
}
@ -393,12 +397,24 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.drainTimeout = drainTimeout;
}
public List<Symbol> getDesiredCapabilities() {
return desiredCapabilities;
}
public void setDesiredCapabilities(List<Symbol> desiredCapabilities) {
if (desiredCapabilities == null) {
desiredCapabilities = Collections.emptyList();
}
this.desiredCapabilities = desiredCapabilities;
}
public List<Symbol> getOfferedCapabilities() {
return offeredCapabilities;
}
public void setOfferedCapabilities(List<Symbol> offeredCapabilities) {
if (offeredCapabilities != null) {
if (offeredCapabilities == null) {
offeredCapabilities = Collections.emptyList();
}
@ -410,7 +426,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
public void setOfferedProperties(Map<Symbol, Object> offeredProperties) {
if (offeredProperties != null) {
if (offeredProperties == null) {
offeredProperties = Collections.emptyMap();
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
public class AmqpAnonymousRelayTest extends AmqpClientTestSupport {
@Override
protected boolean isAutoCreateQueues() {
return false;
}
@Override
protected boolean isAutoCreateAddresses() {
return false;
}
@Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(getQueueName());
message.setMessageId("msg" + 1);
message.setText("Test-Message");
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals("msg1", received.getMessageId());
received.accept();
receiver.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress("exampleQueu-not-in-service");
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Set;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
@ -26,7 +26,6 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
@ -48,9 +47,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
protected static Symbol SHARED = Symbol.getSymbol("shared");
protected static Symbol GLOBAL = Symbol.getSymbol("global");
protected JMSServerManager serverManager;
protected ActiveMQServer server;
@Before
@Override
public void setUp() throws Exception {
@ -84,6 +83,22 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
super.tearDown();
}
protected boolean isAutoCreateQueues() {
return true;
}
protected boolean isAutoCreateAddresses() {
return true;
}
protected String getDeadLetterAddress() {
return "ActiveMQ.DLQ";
}
protected int getPrecreatedQueueSize() {
return 10;
}
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = createServer(true, true);
serverManager = new JMSServerManagerImpl(server);
@ -91,21 +106,30 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
// Address 1
CoreAddressConfiguration address = new CoreAddressConfiguration();
address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST);
address.setName(getQueueName()).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig = new CoreQueueConfiguration();
queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST);
queueConfig.setName(getQueueName()).setAddress(getQueueName()).setRoutingType(RoutingType.ANYCAST);
address.getQueueConfigurations().add(queueConfig);
serverConfig.addAddressConfiguration(address);
// Address 2
CoreAddressConfiguration address2 = new CoreAddressConfiguration();
address2.setName(getTestName2()).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration();
queueConfig2.setName(getTestName2()).setAddress(getTestName2()).setRoutingType(RoutingType.ANYCAST);
address2.getQueueConfigurations().add(queueConfig2);
serverConfig.addAddressConfiguration(address2);
// Address 1....N
for (int i = 0; i < getPrecreatedQueueSize(); ++i) {
CoreAddressConfiguration address2 = new CoreAddressConfiguration();
address2.setName(getQueueName(i)).getRoutingTypes().add(RoutingType.ANYCAST);
CoreQueueConfiguration queueConfig2 = new CoreQueueConfiguration();
queueConfig2.setName(getQueueName(i)).setAddress(getQueueName(i)).setRoutingType(RoutingType.ANYCAST);
address2.getQueueConfigurations().add(queueConfig2);
serverConfig.addAddressConfiguration(address2);
}
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
// Address configuration
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateQueues(isAutoCreateQueues());
addressSettings.setAutoCreateAddresses(isAutoCreateQueues());
addressSettings.setDeadLetterAddress(new SimpleString(getDeadLetterAddress()));
serverConfig.getAddressesSettings().put("#", addressSettings);
serverConfig.setSecurityEnabled(false);
Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
@ -127,8 +151,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
return getName();
}
public String getTestName2() {
return getName() + "2";
public String getQueueName() {
return getName();
}
public String getQueueName(int index) {
return getName() + "-" + index;
}
public AmqpClientTestSupport() {

View File

@ -40,18 +40,18 @@ public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName());
message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getQueueName());
sender.send(message);
receiver.flow(1);
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getQueueName());
assertEquals(1, queue.getMessageCount());
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);

View File

@ -51,16 +51,16 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message);
sender.close();
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getQueueName());
assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
@ -77,14 +77,14 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message);
sender.close();
connection.close();
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getQueueName());
assertEquals(1, queue.getMessageCount());
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
@ -111,13 +111,13 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
// Send with AMQP client.
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setDescribedType(new AmqpNoLocalFilter());
sender.send(message);
sender.close();
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getQueueName());
assertEquals(1, queue.getMessageCount());
// Receive and resend with OpenWire JMS client
@ -142,7 +142,7 @@ public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
// Now lets receive it with AMQP and see that we get back what we expected.
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(returned);

View File

@ -35,10 +35,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
@ -50,7 +50,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
@ -66,10 +66,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
@ -81,7 +81,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
@ -97,10 +97,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
@ -114,7 +114,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
@ -130,10 +130,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
@ -149,7 +149,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
Thread.sleep(1000);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
@ -165,10 +165,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
@ -184,7 +184,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
@ -200,10 +200,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
@ -215,7 +215,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
@ -231,10 +231,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
@ -248,7 +248,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
assertEquals(1, queueView.getMessageCount());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);

View File

@ -16,6 +16,17 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
@ -25,34 +36,22 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@RunWith(Parameterized.class)
public class AmqpNettyFailoverTest extends FailoverTestBase {
public class AmqpFailoverEndpointDiscoveryTest extends FailoverTestBase {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}")
public static Collection getParameters() {
public static Collection<?> getParameters() {
// these 3 are for comparison
return Arrays.asList(new Object[][]{{"NON_SSL", 0}
/*, {"SSL", 1} */ });
}
private final int protocol;
public AmqpNettyFailoverTest(String name, int protocol) {
public AmqpFailoverEndpointDiscoveryTest(String name, int protocol) {
this.protocol = protocol;
}
@ -66,7 +65,6 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
return getNettyConnectorTransportConfig(live);
}
@Test(timeout = 120000)
public void testFailoverListWithAMQP() throws Exception {
JmsConnectionFactory factory = getJmsConnectionFactory();
@ -94,7 +92,6 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
} else {
String keystore = this.getClass().getClassLoader().getResource("client-side-keystore.jks").getFile();
String truststore = this.getClass().getClassLoader().getResource("client-side-truststore.jks").getFile();
// return new JmsConnectionFactory("amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false");
return new JmsConnectionFactory("failover:(amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false)");
}
}
@ -108,14 +105,12 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "server-side-truststore.jks");
server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
//server1Params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
}
if (live) {
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
}
server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
@ -137,5 +132,4 @@ public class AmqpNettyFailoverTest extends FailoverTestBase {
server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
}
}

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONNECTION_OPEN_FAILED;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONTAINER_ID;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.INVALID_FIELD;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for behaviors expected of the broker when clients connect to the broker
*/
public class AmqpInboundConnectionTest extends AmqpClientTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpInboundConnectionTest.class);
private static final String BROKER_NAME = "localhost";
private static final String PRODUCT_NAME = "apache-activemq-artemis";
@Test
public void testBrokerContainerId() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
if (!BROKER_NAME.equals(connection.getRemoteContainer())) {
markAsInvalid("Broker did not send the expected container ID");
}
}
});
AmqpConnection connection = addConnection(client.connect());
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test
public void testBrokerConnectionProperties() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
Map<Symbol, Object> properties = connection.getRemoteProperties();
if (!properties.containsKey(PRODUCT)) {
markAsInvalid("Broker did not send a queue product name value");
return;
}
if (!properties.containsKey(VERSION)) {
markAsInvalid("Broker did not send a queue version value");
return;
}
if (!PRODUCT_NAME.equals(properties.get(PRODUCT))) {
markAsInvalid("Broker did not send a the expected product name");
return;
}
String brokerVersion = VersionLoader.getVersion().getFullVersion();
if (!brokerVersion.equals(properties.get(VERSION))) {
markAsInvalid("Broker did not send a the expected product version");
return;
}
}
});
AmqpConnection connection = addConnection(client.connect());
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testConnectionCarriesExpectedCapabilities() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
Symbol[] offered = connection.getRemoteOfferedCapabilities();
if (!contains(offered, ANONYMOUS_RELAY)) {
markAsInvalid("Broker did not indicate it support anonymous relay");
return;
}
if (!contains(offered, DELAYED_DELIVERY)) {
markAsInvalid("Broker did not indicate it support delayed message delivery");
return;
}
}
});
AmqpConnection connection = addConnection(client.connect());
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testCanConnectWithDifferentContainerIds() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection1 = addConnection(client.createConnection());
AmqpConnection connection2 = addConnection(client.createConnection());
connection1.setContainerId(getTestName() + "-Client:1");
connection2.setContainerId(getTestName() + "-Client:2");
connection1.connect();
assertEquals(1, server.getConnectionCount());
connection2.connect();
assertEquals(2, server.getConnectionCount());
connection1.close();
assertEquals(1, server.getConnectionCount());
connection2.close();
assertEquals(0, server.getConnectionCount());
}
@Test(timeout = 60000)
public void testCannotConnectWithSameContainerId() throws Exception {
AmqpClient client = createAmqpClient();
List<Symbol> desiredCapabilities = new ArrayList<>(1);
desiredCapabilities.add(AmqpSupport.SOLE_CONNECTION_CAPABILITY);
assertNotNull(client);
AmqpConnection connection1 = addConnection(client.createConnection());
AmqpConnection connection2 = addConnection(client.createConnection());
connection1.setDesiredCapabilities(desiredCapabilities);
connection2.setDesiredCapabilities(desiredCapabilities);
connection1.setContainerId(getTestName());
connection2.setContainerId(getTestName());
connection1.connect();
assertEquals(1, server.getConnectionCount());
connection2.setStateInspector(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
markAsInvalid("Broker did not set connection establishment failed property");
}
}
@Override
public void inspectClosedResource(Connection connection) {
ErrorCondition remoteError = connection.getRemoteCondition();
if (remoteError == null || remoteError.getCondition() == null) {
markAsInvalid("Broker did not add error condition for duplicate client ID");
} else {
if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) {
markAsInvalid("Broker did not set condition to " + AmqpError.INVALID_FIELD);
}
if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) {
markAsInvalid("Broker did not set condition to " + AmqpError.INVALID_FIELD);
}
}
// Validate the info map contains a hint that the container/client id was the
// problem
Map<?, ?> infoMap = remoteError.getInfo();
if (infoMap == null) {
markAsInvalid("Broker did not set an info map on condition");
} else if (!infoMap.containsKey(INVALID_FIELD)) {
markAsInvalid("Info map does not contain expected key");
} else {
Object value = infoMap.get(INVALID_FIELD);
if (!CONTAINER_ID.equals(value)) {
markAsInvalid("Info map does not contain expected value: " + value);
}
}
}
});
try {
connection2.connect();
fail("Should not be able to connect with same container Id.");
} catch (Exception ex) {
LOG.info("Second connection with same container Id failed as expected.");
}
connection2.getStateInspector().assertValid();
connection2.close();
assertTrue(Wait.waitFor(() -> server.getConnectionCount() == 1));
connection1.close();
assertEquals(0, server.getConnectionCount());
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.junit.Assert;
import org.junit.Test;
public class AmqpManagementTest extends AmqpClientTestSupport {
@Test
public void testManagementQueryOverAMQP() throws Throwable {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
String destinationAddress = getQueueName(1);
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("activemq.management");
AmqpReceiver receiver = session.createReceiver(destinationAddress);
receiver.flow(10);
// Create request message for getQueueNames query
AmqpMessage request = new AmqpMessage();
request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER);
request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
request.setReplyToAddress(destinationAddress);
request.setText("[]");
sender.send(request);
AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
assertNotNull(response);
Object section = response.getWrappedMessage().getBody();
assertTrue(section instanceof AmqpValue);
Object value = ((AmqpValue) section).getValue();
assertTrue(value instanceof String);
assertTrue(((String) value).length() > 0);
assertTrue(((String) value).contains(destinationAddress));
response.accept();
} finally {
connection.close();
}
}
}

View File

@ -42,7 +42,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:1");
@ -51,9 +51,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -73,7 +73,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
@ -91,9 +91,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
connection = addConnection(client.connect());
session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -114,7 +114,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:1");
@ -123,9 +123,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -146,7 +146,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:1");
@ -155,9 +155,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -178,7 +178,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:1");
@ -186,9 +186,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -208,7 +208,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (short i = 0; i <= 9; ++i) {
AmqpMessage message = new AmqpMessage();
@ -219,9 +219,9 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(10, queueView.getMessageCount());
receiver.flow(10);

View File

@ -35,16 +35,16 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testPresettledReceiverAndNonPresettledReceiverOnSameQueue() throws Exception {
final int MSG_COUNT = 2;
sendMessages(getTestName(), MSG_COUNT);
sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName(), null, false, true);
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, true);
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(1);
@ -68,7 +68,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
// Should be nothing left on the Queue
AmqpReceiver receiver3 = session.createReceiver(getTestName());
AmqpReceiver receiver3 = session.createReceiver(getQueueName());
receiver3.flow(1);
AmqpMessage received = receiver3.receive(5, TimeUnit.SECONDS);
@ -85,15 +85,15 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testPresettledReceiverReadsAllMessages() throws Exception {
final int MSG_COUNT = 100;
sendMessages(getTestName(), MSG_COUNT);
sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true);
AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver.flow(MSG_COUNT);
@ -105,7 +105,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
// Open a new receiver and see if any message are left on the Queue
receiver = session.createReceiver(getTestName());
receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
if (received != null) {
@ -121,15 +121,15 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testPresettledReceiverReadsAllMessagesInWhenReadInBatches() throws Exception {
final int MSG_COUNT = 100;
sendMessages(getTestName(), MSG_COUNT);
sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true);
AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
// Consume all 100 but do so in batches by flowing only limited credit.
@ -157,7 +157,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
// Open a new receiver and see if any message are left on the Queue
receiver = session.createReceiver(getTestName());
receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
if (received != null) {
@ -185,8 +185,8 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
final Queue queue = getProxyToQueue(getTestName());
AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
@ -194,7 +194,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true);
AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
session.begin();
@ -221,8 +221,8 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
final Queue queue = getProxyToQueue(getTestName());
AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
@ -230,7 +230,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true);
AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
session.begin();

View File

@ -34,19 +34,18 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
@Test(timeout = 30000)
public void testReleasedDisposition() throws Exception {
sendMessages(getTestName(), 1);
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId());
@ -75,13 +74,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
@Test(timeout = 30000)
public void testRejectedDisposition() throws Exception {
sendMessages(getTestName(), 1);
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
@ -101,7 +100,7 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
assertNull("Should not receive message again", message);
// Attempt to Read the message again with another receiver to validate it is archived.
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1);
assertNull(receiver2.receiveNoWait());
@ -129,13 +128,13 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
}
private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
sendMessages(getTestName(), 1);
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
@ -154,7 +153,7 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
assertNull("Should not receive message again", message);
}
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1);
message = receiver2.receive(5, TimeUnit.SECONDS);

View File

@ -35,15 +35,15 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testReceiverCanDrainMessages() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT);
sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver.drain(MSG_COUNT);
@ -66,11 +66,11 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(10);
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(0, queueView.getMessageCount());
assertEquals(0, queueView.getDeliveringCount());
@ -86,15 +86,15 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testPullOneFromRemote() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT);
sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
assertEquals(0, receiver.getReceiver().getRemoteCredit());
@ -119,11 +119,11 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(10);
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(0, queueView.getMessageCount());
assertEquals(10, receiver.getReceiver().getRemoteCredit());

View File

@ -74,7 +74,7 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
Source source = new Source();
source.setAddress(getTestName());
source.setAddress(getQueueName());
source.setFilter(filters);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
@ -116,13 +116,12 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), "color = red");
session.createReceiver(getQueueName(), "color = red");
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testReceivedUnsignedFilter() throws Exception {
final int NUM_MESSAGES = 100;
@ -131,10 +130,9 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
AmqpConnection connection = client.connect();
try {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
@ -144,7 +142,7 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName(), "myNewID < " + (NUM_MESSAGES / 2));
AmqpReceiver receiver = session.createReceiver(getQueueName(), "myNewID < " + (NUM_MESSAGES / 2));
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
@ -161,6 +159,4 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport {
connection.close();
}
}
}

View File

@ -36,101 +36,170 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
public void testSendWithDeliveryTimeIsScheduled() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
try {
AmqpSession session = connection.createSession();
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
assertEquals(1, queueView.getScheduledCount());
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNull(received);
assertEquals(1, queueView.getScheduledCount());
connection.close();
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNull(received);
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendRecvWithDeliveryTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
try {
AmqpSession session = connection.createSession();
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 6000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
assertEquals(1, queueView.getScheduledCount());
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 6000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
assertEquals(1, queueView.getScheduledCount());
// Now try and get the message, should not due to being scheduled.
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
// Now try and get the message, should get it now
received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
// Now try and get the message, should not due to being scheduled.
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
connection.close();
// Now try and get the message, should get it now
received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
} finally {
connection.close();
}
}
@Test
public void testScheduleWithDelay() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
try {
AmqpSession session = connection.createSession();
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
long delay = 6000;
message.setMessageAnnotation("x-opt-delivery-delay", delay);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
assertEquals(1, queueView.getScheduledCount());
AmqpMessage message = new AmqpMessage();
long delay = 6000;
message.setMessageAnnotation("x-opt-delivery-delay", delay);
message.setText("Test-Message");
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
assertEquals(1, queueView.getScheduledCount());
// Now try and get the message, should not due to being scheduled.
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
// Now try and get the message, should get it now
received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
// Now try and get the message, should not due to being scheduled.
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
assertNull(received);
connection.close();
// Now try and get the message, should get it now
received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
// Shouldn't get this since we delayed the message.
assertNull(receiver.receive(1, TimeUnit.SECONDS));
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 2000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
assertNotNull(msgDeliveryTime);
assertEquals(deliveryTime, msgDeliveryTime.longValue());
} finally {
connection.close();
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -22,11 +22,11 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -57,7 +57,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HashSet<Role> value = new HashSet<>();
value.add(new Role("none", false, true, true, true, true, true, true, true));
securityRepository.addMatch(getTestName(), value);
securityRepository.addMatch(getQueueName(), value);
serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration();
@ -135,7 +135,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
@ -154,8 +154,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST));
server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false);
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
server.createQueue(new SimpleString(getQueueName()), RoutingType.ANYCAST, new SimpleString(getQueueName()), null, true, false);
AmqpClient client = createAmqpClient(user1, password1);
AmqpConnection connection = client.connect();
@ -165,7 +165,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(getTestName());
message.setAddress(getQueueName());
message.setMessageId("msg" + 1);
message.setText("Test-Message");

View File

@ -69,9 +69,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getQueueName());
assertNotNull(queue);
receiver.close();
@ -84,9 +84,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
sendMessages(getTestName(), 10);
sendMessages(getQueueName(), 10);
for (int i = 0; i < 10; i++) {
receiver.flow(1);
@ -98,7 +98,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getQueueName());
assertNotNull(queue);
assertEquals(0, queue.getMessageCount());
}
@ -130,7 +130,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), "JMSPriority > 8");
session.createReceiver(getQueueName(), "JMSPriority > 8");
connection.getStateInspector().assertValid();
connection.close();
@ -163,7 +163,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
session.createReceiver(getTestName(), null, true);
session.createReceiver(getQueueName(), null, true);
connection.getStateInspector().assertValid();
connection.close();
@ -177,7 +177,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
try {
session.createReceiver(getTestName(), "null = 'f''", true);
session.createReceiver(getQueueName(), "null = 'f''", true);
fail("should throw exception");
} catch (Exception e) {
assertTrue(e.getCause() instanceof JMSException);
@ -189,15 +189,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testQueueReceiverReadMessage() throws Exception {
sendMessages(getTestName(), 1);
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -211,11 +211,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testQueueReceiverReadMessageWithDivert() throws Exception {
final String forwardingAddress = getTestName() + "Divert";
final String forwardingAddress = getQueueName() + "Divert";
final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false);
server.getActiveMQServerControl().createDivert("name", "routingName", getTestName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
sendMessages(getTestName(), 1);
server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
@ -313,15 +313,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception {
sendMessages(getTestName(), 1, false);
sendMessages(getQueueName(), 1, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -337,15 +337,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageDurableTrue() throws Exception {
sendMessages(getTestName(), 1, true);
sendMessages(getQueueName(), 1, true);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@ -362,22 +362,22 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT);
sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getTestName());
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2);
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(2, server.getTotalConsumerCount());
@ -398,15 +398,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT);
sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2);
@ -425,7 +425,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(2, server.getTotalConsumerCount());
@ -456,15 +456,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT);
sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
final Queue queueView = getProxyToQueue(getTestName());
final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(20);
@ -479,7 +479,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver1.close();
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(1, server.getTotalConsumerCount());
@ -513,7 +513,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
@ -525,7 +525,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
LOG.info("Attempting to read message with receiver");
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
@ -544,7 +544,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
@ -560,17 +560,17 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queue.getMessageCount());
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(MSG_COUNT);
AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have got a message", received);
assertEquals("msg0", received.getMessageId());
receiver1.close();
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(200);
for (int i = 0; i < MSG_COUNT; ++i) {
received = receiver2.receive(5, TimeUnit.SECONDS);
@ -597,12 +597,12 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message2.setGroupId("hijklm");
message2.setApplicationProperty("sn", 200);
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
sender.send(message1);
sender.send(message2);
sender.close();
AmqpReceiver receiver = session.createReceiver(getTestName(), "sn = 100");
AmqpReceiver receiver = session.createReceiver(getQueueName(), "sn = 100");
receiver.flow(2);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received);
@ -624,7 +624,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
@ -639,7 +639,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
LOG.info("Attempting to read first two messages with receiver #1");
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(2);
AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
@ -651,7 +651,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message2.accept();
LOG.info("Attempting to read next two messages with receiver #2");
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(2);
AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
@ -685,7 +685,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
@ -699,10 +699,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpReceiver receiver2 = session.createReceiver(getTestName());
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1);
AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
@ -759,7 +759,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
final String address = getTestName();
final String address = getQueueName();
AmqpReceiver receiver = session.createReceiver(address);
AmqpSender sender = session.createSender(address);
@ -793,7 +793,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final CountDownLatch receiverReady = new CountDownLatch(1);
ExecutorService executorService = Executors.newCachedThreadPool();
final String address = getTestName();
final String address = getQueueName();
executorService.submit(new Runnable() {
@Override
@ -858,10 +858,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpReceiver receiver1 = session.createReceiver(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
Queue queue = getProxyToQueue(getTestName());
Queue queue = getProxyToQueue(getQueueName());
// Create default message that should be sent as non-durable
AmqpMessage message1 = new AmqpMessage();
@ -904,7 +904,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
final String address = getTestName();
final String address = getQueueName();
AmqpReceiver receiver = session.createReceiver(address);
AmqpSender sender = session.createSender(address);
@ -957,7 +957,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
final String address = getTestName();
final String address = getQueueName();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
@ -1036,7 +1036,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
AmqpSender sender = session.createSender("queue://" + getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
assertNotNull(sender);
connection.getStateInspector().assertValid();
@ -1047,7 +1047,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageWithToFieldSetToSenderAddress() throws Exception {
doTestMessageWithToFieldSet(false, getTestName());
doTestMessageWithToFieldSet(false, getQueueName());
}
@Test(timeout = 60000)
@ -1067,7 +1067,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageWithToFieldSetWithAnonymousSender() throws Exception {
doTestMessageWithToFieldSet(true, getTestName());
doTestMessageWithToFieldSet(true, getQueueName());
}
private void doTestMessageWithToFieldSet(boolean anonymous, String expected) throws Exception {
@ -1075,7 +1075,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
final String address = getTestName();
final String address = getQueueName();
AmqpSender sender = session.createSender(anonymous ? null : address);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
@ -25,8 +24,12 @@ import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.junit.After;
/** This will only add methods to support AMQP Testing without creating servers or anything */
/**
* Base test support class providing client support methods to aid in
* creating and configuration the AMQP test client.
*/
public class AmqpTestSupport extends ActiveMQTestBase {
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
protected boolean useSSL;
@ -121,7 +124,4 @@ public class AmqpTestSupport extends ActiveMQTestBase {
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
return new AmqpClient(brokerURI, username, password);
}
}

View File

@ -94,7 +94,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
assertNotNull(session);
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
sender.setStateInspector(new AmqpValidator() {
@Override
@ -148,8 +148,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
final Queue queue = getProxyToQueue(getTestName());
AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getQueueName());
session.begin();
@ -173,8 +173,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
final Queue queue = getProxyToQueue(getTestName());
AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getQueueName());
session.begin();
@ -198,8 +198,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
final Queue queue = getProxyToQueue(getTestName());
AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
@ -207,7 +207,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin();
@ -230,8 +230,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
final Queue queue = getProxyToQueue(getTestName());
AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
@ -239,7 +239,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin();
@ -253,7 +253,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection = addConnection(client.connect());
session = connection.createSession();
receiver = session.createReceiver(getTestName());
receiver = session.createReceiver(getQueueName());
session.begin();
receiver.flow(1);
@ -274,8 +274,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
final Queue queue = getProxyToQueue(getTestName());
AmqpSender sender = session.createSender(getQueueName());
final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
@ -283,7 +283,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin();
@ -308,7 +308,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Load up the Queue with some messages
{
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
@ -326,11 +326,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession();
// Sender linked to each session
AmqpReceiver receiver1 = session1.createReceiver(getTestName());
AmqpReceiver receiver2 = session2.createReceiver(getTestName());
AmqpReceiver receiver3 = session3.createReceiver(getTestName());
AmqpReceiver receiver1 = session1.createReceiver(getQueueName());
AmqpReceiver receiver2 = session2.createReceiver(getQueueName());
AmqpReceiver receiver3 = session3.createReceiver(getQueueName());
final Queue queue = getProxyToQueue(getTestName());
final Queue queue = getProxyToQueue(getQueueName());
assertEquals(3, queue.getMessageCount());
// Begin the transaction that all senders will operate in.
@ -365,7 +365,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Load up the Queue with some messages
{
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
@ -383,11 +383,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession();
// Sender linked to each session
AmqpReceiver receiver1 = session1.createReceiver(getTestName());
AmqpReceiver receiver2 = session2.createReceiver(getTestName());
AmqpReceiver receiver3 = session3.createReceiver(getTestName());
AmqpReceiver receiver1 = session1.createReceiver(getQueueName());
AmqpReceiver receiver2 = session2.createReceiver(getQueueName());
AmqpReceiver receiver3 = session3.createReceiver(getQueueName());
final Queue queue = getProxyToQueue(getTestName());
final Queue queue = getProxyToQueue(getQueueName());
assertEquals(3, queue.getMessageCount());
// Begin the transaction that all senders will operate in.
@ -428,11 +428,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession();
// Sender linked to each session
AmqpSender sender1 = session1.createSender(getTestName());
AmqpSender sender2 = session2.createSender(getTestName());
AmqpSender sender3 = session3.createSender(getTestName());
AmqpSender sender1 = session1.createSender(getQueueName());
AmqpSender sender2 = session2.createSender(getQueueName());
AmqpSender sender3 = session3.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName());
final Queue queue = getProxyToQueue(getQueueName());
assertEquals(0, queue.getMessageCount());
// Begin the transaction that all senders will operate in.
@ -468,11 +468,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession();
// Sender linked to each session
AmqpSender sender1 = session1.createSender(getTestName());
AmqpSender sender2 = session2.createSender(getTestName());
AmqpSender sender3 = session3.createSender(getTestName());
AmqpSender sender1 = session1.createSender(getQueueName());
AmqpSender sender2 = session2.createSender(getQueueName());
AmqpSender sender3 = session3.createSender(getQueueName());
final Queue queue = getProxyToQueue(getTestName());
final Queue queue = getProxyToQueue(getQueueName());
assertEquals(0, queue.getMessageCount());
// Begin the transaction that all senders will operate in.
@ -509,7 +509,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
// Commit TXN work from a sender.
txnSession.begin();
@ -538,7 +538,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
txnSession.commit();
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(NUM_MESSAGES * 2);
for (int i = 0; i < NUM_MESSAGES * 2; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
@ -563,7 +563,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
@ -573,7 +573,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
@ -629,7 +629,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
@ -639,7 +639,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2);
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
@ -700,7 +700,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
@ -710,7 +710,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
@ -787,7 +787,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
@ -797,7 +797,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2);
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
@ -930,12 +930,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
assertNotNull(session);
AmqpSender sender = session.createSender(getTestName());
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
AmqpReceiver receiver = session.createReceiver(getTestName());
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.setStateInspector(new AmqpValidator() {
@Override

View File

@ -16,6 +16,22 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -41,26 +57,10 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.remoting.CloseListener;
@ -84,17 +84,14 @@ import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.junit.After;
@ -104,18 +101,12 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
@RunWith(Parameterized.class)
public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
private static final String brokerName = "localhost";
private static final long maxSizeBytes = 1 * 1024 * 1024;
@ -370,132 +361,6 @@ public class ProtonTest extends ProtonTestBase {
}
}
@Test
public void testBrokerContainerId() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
assertTrue(brokerName.equals(amqpConnection.getEndpoint().getRemoteContainer()));
} finally {
amqpConnection.close();
}
}
@Test
public void testBrokerConnectionProperties() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
Map<Symbol, Object> properties = amqpConnection.getEndpoint().getRemoteProperties();
assertTrue(properties != null);
if (properties != null) {
assertTrue("apache-activemq-artemis".equals(properties.get(Symbol.valueOf("product"))));
assertTrue(VersionLoader.getVersion().getFullVersion().equals(properties.get(Symbol.valueOf("version"))));
}
} finally {
amqpConnection.close();
}
}
@Test(timeout = 60000)
public void testConnectionCarriesExpectedCapabilities() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) {
Symbol[] offered = connection.getRemoteOfferedCapabilities();
if (!contains(offered, DELAYED_DELIVERY)) {
markAsInvalid("Broker did not indicate it support delayed message delivery");
return;
}
Map<Symbol, Object> properties = connection.getRemoteProperties();
if (!properties.containsKey(PRODUCT)) {
markAsInvalid("Broker did not send a queue product name value");
return;
}
if (!properties.containsKey(VERSION)) {
markAsInvalid("Broker did not send a queue version value");
return;
}
}
});
AmqpConnection connection = client.connect();
try {
assertNotNull(connection);
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(address);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
// Shouldn't get this since we delayed the message.
assertNull(receiver.receive(1, TimeUnit.SECONDS));
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
assertNotNull(client);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver = session.createReceiver(address);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 2000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
sender.send(message);
// Now try and get the message
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
assertNotNull(msgDeliveryTime);
assertEquals(deliveryTime, msgDeliveryTime.longValue());
} finally {
connection.close();
}
}
@Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
@ -983,41 +848,6 @@ public class ProtonTest extends ProtonTestBase {
amqpConnection.close();
}
@Test
public void testManagementQueryOverAMQP() throws Throwable {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
String destinationAddress = address + 1;
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender("activemq.management");
AmqpReceiver receiver = session.createReceiver(destinationAddress);
receiver.flow(10);
//create request message for getQueueNames query
AmqpMessage request = new AmqpMessage();
request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER);
request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
request.setReplyToAddress(destinationAddress);
request.setText("[]");
sender.send(request);
AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
assertNotNull(response);
Object section = response.getWrappedMessage().getBody();
assertTrue(section instanceof AmqpValue);
Object value = ((AmqpValue) section).getValue();
assertTrue(value instanceof String);
assertTrue(((String) value).length() > 0);
assertTrue(((String) value).contains(destinationAddress));
response.accept();
} finally {
amqpConnection.close();
}
}
@Test
public void testReplyTo() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -1792,93 +1622,6 @@ public class ProtonTest extends ProtonTestBase {
}
}
@Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(address);
message.setMessageId("msg" + 1);
message.setText("Test-Message");
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals("msg1", received.getMessageId());
received.accept();
receiver.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(address + "-not-in-service");
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
private javax.jms.Queue createQueue(String address) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {