This closes #789

This commit is contained in:
Clebert Suconic 2016-09-26 17:58:30 -04:00
commit c86e41d4fa
31 changed files with 4298 additions and 0 deletions

70
artemis-junit/pom.xml Normal file
View File

@ -0,0 +1,70 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>artemis-junit</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JUnit Rules</name>
<properties>
<activemq.basedir>${project.basedir}/..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>provided</scope>
</dependency>
<!--
-->
<dependency>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractActiveMQClientResource extends ExternalResource {
Logger log = LoggerFactory.getLogger(this.getClass());
boolean autoCreateQueue = true;
ServerLocator serverLocator;
ClientSessionFactory sessionFactory;
ClientSession session;
public AbstractActiveMQClientResource(String url) {
if (url == null) {
throw new IllegalArgumentException(String.format("Error creating {} - url cannot be null", this.getClass().getSimpleName()));
}
try {
this.serverLocator = ActiveMQClient.createServerLocator(url);
}
catch (Exception ex) {
throw new RuntimeException(String.format("Error creating {} - createServerLocator( {} ) failed", this.getClass().getSimpleName(), url.toString()), ex);
}
}
public AbstractActiveMQClientResource(ServerLocator serverLocator) {
if (serverLocator == null) {
throw new IllegalArgumentException(String.format("Error creating {} - ServerLocator cannot be null", this.getClass().getSimpleName()));
}
this.serverLocator = serverLocator;
}
/**
* Adds properties to a ClientMessage
*
* @param message
* @param properties
*/
public static void addMessageProperties(ClientMessage message, Map<String, Object> properties) {
if (properties != null && properties.size() > 0) {
for (Map.Entry<String, Object> property : properties.entrySet()) {
message.putObjectProperty(property.getKey(), property.getValue());
}
}
}
@Override
protected void before() throws Throwable {
super.before();
start();
}
@Override
protected void after() {
stop();
super.after();
}
void start() {
log.info("Starting {}", this.getClass().getSimpleName());
try {
sessionFactory = serverLocator.createSessionFactory();
session = sessionFactory.createSession();
}
catch (RuntimeException runtimeEx) {
throw runtimeEx;
}
catch (Exception ex) {
throw new ActiveMQClientResourceException(String.format("%s initialisation failure", this.getClass().getSimpleName()), ex);
}
createClient();
try {
session.start();
}
catch (ActiveMQException amqEx) {
throw new ActiveMQClientResourceException(String.format("%s startup failure", this.getClass().getSimpleName()), amqEx);
}
}
void stop() {
stopClient();
if (session != null) {
try {
session.close();
}
catch (ActiveMQException amqEx) {
log.warn("ActiveMQException encountered closing InternalClient ClientSession - ignoring", amqEx);
}
finally {
session = null;
}
}
if (sessionFactory != null) {
sessionFactory.close();
sessionFactory = null;
}
if (serverLocator != null) {
serverLocator.close();
serverLocator = null;
}
}
protected abstract void createClient();
protected abstract void stopClient();
public boolean isAutoCreateQueue() {
return autoCreateQueue;
}
public void setAutoCreateQueue(boolean autoCreateQueue) {
this.autoCreateQueue = autoCreateQueue;
}
public static class ActiveMQClientResourceException extends RuntimeException {
public ActiveMQClientResourceException(String message) {
super(message);
}
public ActiveMQClientResourceException(String message, Exception cause) {
super(message, cause);
}
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
/**
* A JUnit Rule that embeds an ActiveMQ Artemis ClientConsumer into a test.
*
* This JUnit Rule is designed to simplify using ActiveMQ Artemis clients in unit tests. Adding the rule to a test will startup
* a ClientConsumer, which can then be used to consume messages from an ActiveMQ Artemis server.
*
* <pre><code>
* public class SimpleTest {
* {@code @Rule}
* public ActiveMQConsumerResource client = new ActiveMQProducerResource( "vm://0", "test.queue" );
*
* {@code @Test}
* public void testSomething() throws Exception {
* // Use the embedded client here
* ClientMessage message = client.receiveMessage();
* }
* }
* </code></pre>
*/
public class ActiveMQConsumerResource extends AbstractActiveMQClientResource {
long defaultReceiveTimeout = 50;
SimpleString queueName;
ClientConsumer consumer;
public ActiveMQConsumerResource(String url, String queueName) {
this(url, SimpleString.toSimpleString(queueName));
}
public ActiveMQConsumerResource(String url, SimpleString queueName) {
super(url);
this.queueName = queueName;
}
public ActiveMQConsumerResource(ServerLocator serverLocator, String queueName) {
this(serverLocator, SimpleString.toSimpleString(queueName));
}
public ActiveMQConsumerResource(ServerLocator serverLocator, SimpleString queueName) {
super(serverLocator);
this.queueName = queueName;
}
public long getDefaultReceiveTimeout() {
return defaultReceiveTimeout;
}
/**
* Sets the default timeout in milliseconds used when receiving messages. Defaults to 50 milliseconds
*
* @param defaultReceiveTimeout received timeout in milliseconds
*/
public void setDefaultReceiveTimeout(long defaultReceiveTimeout) {
this.defaultReceiveTimeout = defaultReceiveTimeout;
}
@Override
protected void createClient() {
boolean browseOnly = false;
try {
if (!session.queueQuery(queueName).isExists() && autoCreateQueue) {
log.warn("{}: queue does not exist - creating queue: address = {}, name = {}", this.getClass().getSimpleName(), queueName.toString(), queueName.toString());
session.createQueue(queueName, queueName);
}
consumer = session.createConsumer(queueName, browseOnly);
}
catch (ActiveMQException amqEx) {
throw new ActiveMQClientResourceException(String.format("Error creating consumer for queueName %s", queueName.toString()), amqEx);
}
}
@Override
protected void stopClient() {
if (consumer != null) {
try {
consumer.close();
}
catch (ActiveMQException amqEx) {
log.warn("Exception encountered closing consumer - ignoring", amqEx);
}
finally {
consumer = null;
}
}
}
public boolean isAutoCreateQueue() {
return autoCreateQueue;
}
/**
* Enable/Disable the automatic creation of non-existant queues. The default is to automatically create non-existant queues
*
* @param autoCreateQueue
*/
public void setAutoCreateQueue(boolean autoCreateQueue) {
this.autoCreateQueue = autoCreateQueue;
}
public ClientMessage receiveMessage() {
return receiveMessage(defaultReceiveTimeout);
}
public ClientMessage receiveMessage(long timeout) {
ClientMessage message = null;
if (timeout > 0) {
try {
message = consumer.receive(timeout);
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive( timeout = %d ) for %s failed", timeout, queueName.toString()), amqEx);
}
}
else if (timeout == 0) {
try {
message = consumer.receiveImmediate();
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receiveImmediate() for %s failed", queueName.toString()), amqEx);
}
}
else {
try {
message = consumer.receive();
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive() for %s failed", queueName.toString()), amqEx);
}
}
return message;
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
/**
* A JUnit Rule that embeds an dynamic (i.e. unbound) ActiveMQ Artemis ClientProducer into a test.
*
* This JUnit Rule is designed to simplify using ActiveMQ Artemis clients in unit tests. Adding the rule to a test will startup
* an unbound ClientProducer, which can then be used to feed messages to any address on the ActiveMQ Artemis server.
*
* <pre><code>
* public class SimpleTest {
* {@code @Rule}
* public ActiveMQDynamicProducerResource producer = new ActiveMQDynamicProducerResource( "vm://0");
*
* {@code @Test}
* public void testSomething() throws Exception {
* // Use the embedded ClientProducer here
* producer.sendMessage( "test.address", "String Body" );
* }
* }
* </code></pre>
*/
public class ActiveMQDynamicProducerResource extends ActiveMQProducerResource {
public ActiveMQDynamicProducerResource(String url) {
super(url);
}
public ActiveMQDynamicProducerResource(ServerLocator serverLocator) {
super(serverLocator);
}
public ActiveMQDynamicProducerResource(String url, SimpleString address) {
super(url, address);
}
public ActiveMQDynamicProducerResource(ServerLocator serverLocator, SimpleString address) {
super(serverLocator, address);
}
@Override
protected void createClient() {
try {
if (address != null && !session.addressQuery(address).isExists() && autoCreateQueue) {
log.warn("queue does not exist - creating queue: address = {}, name = {}", address.toString(), address.toString());
session.createQueue(address, address);
}
producer = session.createProducer((SimpleString) null);
}
catch (ActiveMQException amqEx) {
if (address == null) {
throw new ActiveMQClientResourceException(String.format("Error creating producer for address %s", address.toString()), amqEx);
}
else {
throw new ActiveMQClientResourceException("Error creating producer", amqEx);
}
}
}
/**
* Send a ClientMessage to the default address on the server
*
* @param message the message to send
*/
@Override
public void sendMessage(ClientMessage message) {
sendMessage(address, message);
}
/**
* Send a ClientMessage to the specified address on the server
*
* @param targetAddress the target address
* @param message the message to send
*/
public void sendMessage(SimpleString targetAddress, ClientMessage message) {
if (targetAddress == null) {
throw new IllegalArgumentException(String.format("%s error - address cannot be null", this.getClass().getSimpleName()));
}
try {
if (autoCreateQueue && !session.addressQuery(targetAddress).isExists()) {
log.warn("queue does not exist - creating queue: address = {}, name = {}", address.toString(), address.toString());
session.createQueue(targetAddress, targetAddress);
}
}
catch (ActiveMQException amqEx) {
throw new ActiveMQClientResourceException(String.format("Queue creation failed for queue: address = %s, name = %s", address.toString(), address.toString()));
}
try {
producer.send(targetAddress, message);
}
catch (ActiveMQException amqEx) {
throw new ActiveMQClientResourceException(String.format("Failed to send message to %s", targetAddress.toString()), amqEx);
}
}
/**
* Create a new ClientMessage with the specified body and send to the specified address on the server
*
* @param targetAddress the target address
* @param body the body for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(SimpleString targetAddress, byte[] body) {
ClientMessage message = createMessage(body);
sendMessage(targetAddress, message);
return message;
}
/**
* Create a new ClientMessage with the specified body and send to the server
*
* @param targetAddress the target address
* @param body the body for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(SimpleString targetAddress, String body) {
ClientMessage message = createMessage(body);
sendMessage(targetAddress, message);
return message;
}
/**
* Create a new ClientMessage with the specified properties and send to the server
*
* @param targetAddress the target address
* @param properties the properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(SimpleString targetAddress, Map<String, Object> properties) {
ClientMessage message = createMessage(properties);
sendMessage(targetAddress, message);
return message;
}
/**
* Create a new ClientMessage with the specified body and and properties and send to the server
*
* @param targetAddress the target address
* @param properties the properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(SimpleString targetAddress, byte[] body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
sendMessage(targetAddress, message);
return message;
}
/**
* Create a new ClientMessage with the specified body and and properties and send to the server
*
* @param targetAddress the target address
* @param properties the properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(SimpleString targetAddress, String body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
sendMessage(targetAddress, message);
return message;
}
}

View File

@ -0,0 +1,300 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
/**
* A JUnit Rule that embeds an ActiveMQ Artemis ClientProducer bound to a specific address into a test.
*
* This JUnit Rule is designed to simplify using ActiveMQ Artemis clients in unit tests. Adding the rule to a test will startup
* a ClientProducer, which can then be used to feed messages to the bound address on an ActiveMQ Artemis server.
*
* <pre><code>
* public class SimpleTest {
* {@code @Rule}
* public ActiveMQProducerResource producer = new ActiveMQProducerResource( "vm://0", "test.queue");
*
* {@code @Test}
* public void testSomething() throws Exception {
* // Use the embedded ClientProducer here
* producer.sendMessage( "String Body" );
* }
* }
* </code></pre>
*/
public class ActiveMQProducerResource extends AbstractActiveMQClientResource {
boolean useDurableMessage = true;
SimpleString address = null;
ClientProducer producer;
protected ActiveMQProducerResource(String url) {
super(url);
}
protected ActiveMQProducerResource(ServerLocator serverLocator) {
super(serverLocator);
}
public ActiveMQProducerResource(String url, String address) {
this(url, SimpleString.toSimpleString(address));
}
public ActiveMQProducerResource(String url, SimpleString address) {
super(url);
if (address == null) {
throw new IllegalArgumentException(String.format("%s construction error - address cannot be null", this.getClass().getSimpleName()));
}
this.address = address;
}
public ActiveMQProducerResource(ServerLocator serverLocator, String address) {
this(serverLocator, SimpleString.toSimpleString(address));
}
public ActiveMQProducerResource(ServerLocator serverLocator, SimpleString address) {
super(serverLocator);
if (address == null) {
throw new IllegalArgumentException(String.format("%s construction error - address cannot be null", this.getClass().getSimpleName()));
}
this.address = address;
}
public boolean isUseDurableMessage() {
return useDurableMessage;
}
/**
* Disables/Enables creating durable messages. By default, durable messages are created
*
* @param useDurableMessage if true, durable messages will be created
*/
public void setUseDurableMessage(boolean useDurableMessage) {
this.useDurableMessage = useDurableMessage;
}
@Override
protected void createClient() {
try {
if (!session.addressQuery(address).isExists() && autoCreateQueue) {
log.warn("{}: queue does not exist - creating queue: address = {}, name = {}", this.getClass().getSimpleName(), address.toString(), address.toString());
session.createQueue(address, address);
}
producer = session.createProducer(address);
}
catch (ActiveMQException amqEx) {
throw new ActiveMQClientResourceException(String.format("Error creating producer for address %s", address.toString()), amqEx);
}
}
@Override
protected void stopClient() {
if (producer != null) {
try {
producer.close();
}
catch (ActiveMQException amqEx) {
log.warn("ActiveMQException encountered closing InternalClient ClientProducer - ignoring", amqEx);
}
finally {
producer = null;
}
}
}
/**
* Create a ClientMessage
* <p>
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @return a new ClientMessage
*/
public ClientMessage createMessage() {
if (session == null) {
throw new IllegalStateException("ClientSession is null");
}
return session.createMessage(isUseDurableMessage());
}
/**
* Create a ClientMessage with the specified body
* <p>
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param body the body for the new message
* @return a new ClientMessage with the specified body
*/
public ClientMessage createMessage(byte[] body) {
ClientMessage message = createMessage();
if (body != null) {
message.writeBodyBufferBytes(body);
}
return message;
}
/**
* Create a ClientMessage with the specified body
* <p>
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param body the body for the new message
* @return a new ClientMessage with the specified body
*/
public ClientMessage createMessage(String body) {
ClientMessage message = createMessage();
if (body != null) {
message.writeBodyBufferString(body);
}
return message;
}
/**
* Create a ClientMessage with the specified message properties
* <p>
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param properties message properties for the new message
* @return a new ClientMessage with the specified message properties
*/
public ClientMessage createMessage(Map<String, Object> properties) {
ClientMessage message = createMessage();
addMessageProperties(message, properties);
return message;
}
/**
* Create a ClientMessage with the specified body and message properties
* <p>
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param body the body for the new message
* @param properties message properties for the new message
* @return a new ClientMessage with the specified body and message properties
*/
public ClientMessage createMessage(byte[] body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
addMessageProperties(message, properties);
return message;
}
/**
* Create a ClientMessage with the specified body and message properties
* <p>
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param body the body for the new message
* @param properties message properties for the new message
* @return a new ClientMessage with the specified body and message properties
*/
public ClientMessage createMessage(String body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
addMessageProperties(message, properties);
return message;
}
/**
* Send a ClientMessage to the server
*
* @param message the message to send
*/
public void sendMessage(ClientMessage message) {
try {
producer.send(message);
}
catch (ActiveMQException amqEx) {
throw new ActiveMQClientResourceException(String.format("Failed to send message to %s", producer.getAddress().toString()), amqEx);
}
}
/**
* Create a new ClientMessage with the specified body and send to the server
*
* @param body the body for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(byte[] body) {
ClientMessage message = createMessage(body);
sendMessage(message);
return message;
}
/**
* Create a new ClientMessage with the specified body and send to the server
*
* @param body the body for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(String body) {
ClientMessage message = createMessage(body);
sendMessage(message);
return message;
}
/**
* Create a new ClientMessage with the specified properties and send to the server
*
* @param properties the properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(Map<String, Object> properties) {
ClientMessage message = createMessage(properties);
sendMessage(message);
return message;
}
/**
* Create a new ClientMessage with the specified body and and properties and send to the server
*
* @param properties the properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(byte[] body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
sendMessage(message);
return message;
}
/**
* Create a new ClientMessage with the specified body and and properties and send to the server
*
* @param properties the properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(String body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
sendMessage(message);
return message;
}
}

View File

@ -0,0 +1,892 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A JUnit Rule that embeds an ActiveMQ Artemis server into a test.
*
* This JUnit Rule is designed to simplify using embedded servers in unit tests. Adding the rule to a test will startup
* an embedded server, which can then be used by client applications.
*
* <pre><code>
* public class SimpleTest {
* {@code @Rule}
* public EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
*
* {@code @Test}
* public void testSomething() throws Exception {
* // Use the embedded server here
* }
* }
* </code></pre>
*/
public class EmbeddedActiveMQResource extends ExternalResource {
static final String SERVER_NAME = "embedded-server";
Logger log = LoggerFactory.getLogger(this.getClass());
boolean useDurableMessage = true;
boolean useDurableQueue = true;
long defaultReceiveTimeout = 50;
Configuration configuration;
EmbeddedActiveMQ server;
InternalClient internalClient;
/**
* Create a default EmbeddedActiveMQResource
*/
public EmbeddedActiveMQResource() {
configuration = new ConfigurationImpl().setName(SERVER_NAME).setPersistenceEnabled(false).setSecurityEnabled(false).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
init();
}
/**
* Create a default EmbeddedActiveMQResource with the specified serverId
*
* @param serverId server id
*/
public EmbeddedActiveMQResource(int serverId) {
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.SERVER_ID_PROP_NAME, serverId);
TransportConfiguration transportConfiguration = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
configuration = new ConfigurationImpl().setName(SERVER_NAME + "-" + serverId).setPersistenceEnabled(false).setSecurityEnabled(false).addAcceptorConfiguration(transportConfiguration);
init();
}
/**
* Creates an EmbeddedActiveMQResource using the specified configuration
*
* @param configuration ActiveMQServer configuration
*/
public EmbeddedActiveMQResource(Configuration configuration) {
this.configuration = configuration;
init();
}
/**
* Creates an EmbeddedActiveMQResource using the specified configuration file
*
* @param filename ActiveMQServer configuration file name
*/
public EmbeddedActiveMQResource(String filename) {
if (filename == null) {
throw new IllegalArgumentException("ActiveMQServer configuration file name cannot be null");
}
FileDeploymentManager deploymentManager = new FileDeploymentManager(filename);
FileConfiguration config = new FileConfiguration();
deploymentManager.addDeployable(config);
try {
deploymentManager.readConfiguration();
}
catch (Exception ex) {
throw new EmbeddedActiveMQResourceException(String.format("Failed to read configuration file %s", filename), ex);
}
this.configuration = config;
init();
}
/**
* Adds properties to a ClientMessage
*
* @param message
* @param properties
*/
public static void addMessageProperties(ClientMessage message, Map<String, Object> properties) {
if (properties != null && properties.size() > 0) {
for (Map.Entry<String, Object> property : properties.entrySet()) {
message.putObjectProperty(property.getKey(), property.getValue());
}
}
}
private void init() {
if (server == null) {
server = new EmbeddedActiveMQ().setConfiguration(configuration);
}
}
/**
* Start the embedded ActiveMQ Artemis server.
*
* The server will normally be started by JUnit using the before() method. This method allows the server to
* be started manually to support advanced testing scenarios.
*/
public void start() {
try {
server.start();
}
catch (Exception ex) {
throw new RuntimeException(String.format("Exception encountered starting %s: %s", server.getClass().getName(), this.getServerName()), ex);
}
configuration = server.getActiveMQServer().getConfiguration();
}
/**
* Stop the embedded ActiveMQ Artemis server
*
* The server will normally be stopped by JUnit using the after() method. This method allows the server to
* be stopped manually to support advanced testing scenarios.
*/
public void stop() {
if (internalClient != null) {
internalClient.stop();
internalClient = null;
}
if (server != null) {
try {
server.stop();
}
catch (Exception ex) {
log.warn(String.format("Exception encountered stopping %s: %s", server.getClass().getSimpleName(), this.getServerName()), ex);
}
}
}
/**
* Invoked by JUnit to setup the resource - start the embedded ActiveMQ Artemis server
*/
@Override
protected void before() throws Throwable {
log.info("Starting {}: {}", this.getClass().getSimpleName(), getServerName());
this.start();
super.before();
}
/**
* Invoked by JUnit to tear down the resource - stops the embedded ActiveMQ Artemis server
*/
@Override
protected void after() {
log.info("Stopping {}: {}", this.getClass().getSimpleName(), getServerName());
super.after();
this.stop();
}
public boolean isUseDurableMessage() {
return useDurableMessage;
}
/**
* Disables/Enables creating durable messages. By default, durable messages are created
*
* @param useDurableMessage if true, durable messages will be created
*/
public void setUseDurableMessage(boolean useDurableMessage) {
this.useDurableMessage = useDurableMessage;
}
public boolean isUseDurableQueue() {
return useDurableQueue;
}
/**
* Disables/Enables creating durable queues. By default, durable queues are created
*
* @param useDurableQueue if true, durable messages will be created
*/
public void setUseDurableQueue(boolean useDurableQueue) {
this.useDurableQueue = useDurableQueue;
}
public long getDefaultReceiveTimeout() {
return defaultReceiveTimeout;
}
/**
* Sets the default timeout in milliseconds used when receiving messages. Defaults to 50 milliseconds
*
* @param defaultReceiveTimeout received timeout in milliseconds
*/
public void setDefaultReceiveTimeout(long defaultReceiveTimeout) {
this.defaultReceiveTimeout = defaultReceiveTimeout;
}
/**
* Get the EmbeddedActiveMQ server.
*
* This may be required for advanced configuration of the EmbeddedActiveMQ server.
*
* @return the embedded ActiveMQ broker
*/
public EmbeddedActiveMQ getServer() {
return server;
}
/**
* Get the name of the EmbeddedActiveMQ server
*
* @return name of the embedded server
*/
public String getServerName() {
String name = "unknown";
ActiveMQServer activeMQServer = server.getActiveMQServer();
if (activeMQServer != null) {
name = activeMQServer.getConfiguration().getName();
}
else if (configuration != null) {
name = configuration.getName();
}
return name;
}
/**
* Get the VM URL for the embedded EmbeddedActiveMQ server
*
* @return the VM URL for the embedded server
*/
public String getVmURL() {
String vmURL = "vm://0";
for (TransportConfiguration transportConfiguration : configuration.getAcceptorConfigurations()) {
Map<String, Object> params = transportConfiguration.getParams();
if (params != null && params.containsKey(TransportConstants.SERVER_ID_PROP_NAME)) {
vmURL = "vm://" + params.get(TransportConstants.SERVER_ID_PROP_NAME);
}
}
return vmURL;
}
public long getMessageCount(String queueName) {
return getMessageCount(SimpleString.toSimpleString(queueName));
}
/**
* Get the number of messages in a specific queue.
*
* @param queueName the name of the queue
* @return the number of messages in the queue; -1 if queue is not found
*/
public long getMessageCount(SimpleString queueName) {
Queue queue = locateQueue(queueName);
if (queue == null) {
log.warn("getMessageCount(queueName) - queue {} not found; returning -1", queueName.toString());
return -1;
}
return queue.getMessageCount();
}
public Queue locateQueue(String queueName) {
return locateQueue(SimpleString.toSimpleString(queueName));
}
public Queue locateQueue(SimpleString queueName) {
return server.getActiveMQServer().locateQueue(queueName);
}
public List<Queue> getBoundQueues(String address) {
return getBoundQueues(SimpleString.toSimpleString(address));
}
public List<Queue> getBoundQueues(SimpleString address) {
if (address == null) {
throw new IllegalArgumentException("getBoundQueues( address ) - address cannot be null");
}
List<Queue> boundQueues = new java.util.LinkedList<>();
BindingQueryResult bindingQueryResult = null;
try {
bindingQueryResult = server.getActiveMQServer().bindingQuery(address);
}
catch (Exception e) {
throw new EmbeddedActiveMQResourceException(String.format("getBoundQueues( %s ) - bindingQuery( %s ) failed", address.toString(), address.toString()));
}
if (bindingQueryResult.isExists()) {
for (SimpleString queueName : bindingQueryResult.getQueueNames()) {
boundQueues.add(server.getActiveMQServer().locateQueue(queueName));
}
}
return boundQueues;
}
public Queue createQueue(String name) {
return createQueue(SimpleString.toSimpleString(name), SimpleString.toSimpleString(name));
}
public Queue createQueue(String address, String name) {
return createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(name));
}
public Queue createQueue(SimpleString address, SimpleString name) {
SimpleString filter = null;
boolean temporary = false;
Queue queue = null;
try {
queue = server.getActiveMQServer().createQueue(address, name, filter, isUseDurableQueue(), temporary);
}
catch (Exception ex) {
throw new EmbeddedActiveMQResourceException(String.format("Failed to create queue: queueName = %s, name = %s", address.toString(), name.toString()), ex);
}
return queue;
}
public void createSharedQueue(String name, String user) {
createSharedQueue(SimpleString.toSimpleString(name), SimpleString.toSimpleString(name), SimpleString.toSimpleString(user));
}
public void createSharedQueue(String address, String name, String user) {
createSharedQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(name), SimpleString.toSimpleString(user));
}
public void createSharedQueue(SimpleString address, SimpleString name, SimpleString user) {
SimpleString filter = null;
try {
server.getActiveMQServer().createSharedQueue(address, name, filter, user, isUseDurableQueue());
}
catch (Exception ex) {
throw new EmbeddedActiveMQResourceException(String.format("Failed to create shared queue: queueName = %s, name = %s, user = %s", address.toString(), name.toString(), user.toString()), ex);
}
}
/**
* Create a ClientMessage
*
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @return a new ClientMessage
*/
public ClientMessage createMessage() {
getInternalClient();
return internalClient.createMessage(isUseDurableMessage());
}
/**
* Create a ClientMessage with the specified body
*
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param body the body for the new message
* @return a new ClientMessage with the specified body
*/
public ClientMessage createMessage(byte[] body) {
getInternalClient();
ClientMessage message = internalClient.createMessage(isUseDurableMessage());
if (body != null) {
message.writeBodyBufferBytes(body);
}
return message;
}
/**
* Create a ClientMessage with the specified body
*
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param body the body for the new message
* @return a new ClientMessage with the specified body
*/
public ClientMessage createMessage(String body) {
getInternalClient();
ClientMessage message = internalClient.createMessage(isUseDurableMessage());
if (body != null) {
message.writeBodyBufferString(body);
}
return message;
}
/**
* Create a ClientMessage with the specified message properties
*
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param properties message properties for the new message
* @return a new ClientMessage with the specified message properties
*/
public ClientMessage createMessageWithProperties(Map<String, Object> properties) {
getInternalClient();
ClientMessage message = internalClient.createMessage(isUseDurableMessage());
addMessageProperties(message, properties);
return message;
}
/**
* Create a ClientMessage with the specified body and message properties
*
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param body the body for the new message
* @param properties message properties for the new message
* @return a new ClientMessage with the specified body and message properties
*/
public ClientMessage createMessageWithProperties(byte[] body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
addMessageProperties(message, properties);
return message;
}
/**
* Create a ClientMessage with the specified body and message properties
*
* If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
*
* @param body the body for the new message
* @param properties message properties for the new message
* @return a new ClientMessage with the specified body and message properties
*/
public ClientMessage createMessageWithProperties(String body, Map<String, Object> properties) {
ClientMessage message = createMessage(body);
addMessageProperties(message, properties);
return message;
}
/**
* Send a message to an address
*
* @param address the target queueName for the message
* @param message the message to send
*/
public void sendMessage(String address, ClientMessage message) {
sendMessage(SimpleString.toSimpleString(address), message);
}
/**
* Create a new message with the specified body, and send the message to an address
*
* @param address the target queueName for the message
* @param body the body for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(String address, byte[] body) {
return sendMessage(SimpleString.toSimpleString(address), body);
}
/**
* Create a new message with the specified body, and send the message to an address
*
* @param address the target queueName for the message
* @param body the body for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(String address, String body) {
return sendMessage(SimpleString.toSimpleString(address), body);
}
/**
* Create a new message with the specified properties, and send the message to an address
*
* @param address the target queueName for the message
* @param properties message properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessageWithProperties(String address, Map<String, Object> properties) {
return sendMessageWithProperties(SimpleString.toSimpleString(address), properties);
}
/**
* Create a new message with the specified body and properties, and send the message to an address
*
* @param address the target queueName for the message
* @param body the body for the new message
* @param properties message properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessageWithProperties(String address, byte[] body, Map<String, Object> properties) {
return sendMessageWithProperties(SimpleString.toSimpleString(address), body, properties);
}
/**
* Create a new message with the specified body and properties, and send the message to an address
*
* @param address the target queueName for the message
* @param body the body for the new message
* @param properties message properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessageWithProperties(String address, String body, Map<String, Object> properties) {
return sendMessageWithProperties(SimpleString.toSimpleString(address), body, properties);
}
/**
* Send a message to an queueName
*
* @param address the target queueName for the message
* @param message the message to send
*/
public void sendMessage(SimpleString address, ClientMessage message) {
if (address == null) {
throw new IllegalArgumentException("sendMessage failure - queueName is required");
}
else if (message == null) {
throw new IllegalArgumentException("sendMessage failure - a ClientMessage is required");
}
getInternalClient();
internalClient.sendMessage(address, message);
}
/**
* Create a new message with the specified body, and send the message to an queueName
*
* @param address the target queueName for the message
* @param body the body for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(SimpleString address, byte[] body) {
ClientMessage message = createMessage(body);
sendMessage(address, message);
return message;
}
/**
* Create a new message with the specified body, and send the message to an queueName
*
* @param address the target queueName for the message
* @param body the body for the new message
* @return the message that was sent
*/
public ClientMessage sendMessage(SimpleString address, String body) {
ClientMessage message = createMessage(body);
sendMessage(address, message);
return message;
}
/**
* Create a new message with the specified properties, and send the message to an queueName
*
* @param address the target queueName for the message
* @param properties message properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessageWithProperties(SimpleString address, Map<String, Object> properties) {
ClientMessage message = createMessageWithProperties(properties);
sendMessage(address, message);
return message;
}
/**
* Create a new message with the specified body and properties, and send the message to an queueName
*
* @param address the target queueName for the message
* @param body the body for the new message
* @param properties message properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessageWithProperties(SimpleString address, byte[] body, Map<String, Object> properties) {
ClientMessage message = createMessageWithProperties(body, properties);
sendMessage(address, message);
return message;
}
/**
* Create a new message with the specified body and properties, and send the message to an queueName
*
* @param address the target queueName for the message
* @param body the body for the new message
* @param properties message properties for the new message
* @return the message that was sent
*/
public ClientMessage sendMessageWithProperties(SimpleString address, String body, Map<String, Object> properties) {
ClientMessage message = createMessageWithProperties(body, properties);
sendMessage(address, message);
return message;
}
/**
* Receive a message from the specified queue using the default receive timeout
*
* @param queueName name of the source queue
* @return the received ClientMessage, null if the receive timed-out
*/
public ClientMessage receiveMessage(String queueName) {
return receiveMessage(SimpleString.toSimpleString(queueName));
}
/**
* Receive a message from the specified queue using the specified receive timeout
*
* @param queueName name of the source queue
* @param timeout receive timeout in milliseconds
* @return the received ClientMessage, null if the receive timed-out
*/
public ClientMessage receiveMessage(String queueName, long timeout) {
return receiveMessage(SimpleString.toSimpleString(queueName), timeout);
}
/**
* Receive a message from the specified queue using the default receive timeout
*
* @param queueName name of the source queue
* @return the received ClientMessage, null if the receive timed-out
*/
public ClientMessage receiveMessage(SimpleString queueName) {
final boolean browseOnly = false;
return getInternalClient().receiveMessage(queueName, defaultReceiveTimeout, browseOnly);
}
/**
* Receive a message from the specified queue using the specified receive timeout
*
* @param queueName name of the source queue
* @param timeout receive timeout in milliseconds
* @return the received ClientMessage, null if the receive timed-out
*/
public ClientMessage receiveMessage(SimpleString queueName, long timeout) {
final boolean browseOnly = false;
return getInternalClient().receiveMessage(queueName, timeout, browseOnly);
}
/**
* Browse a message (receive but do not consume) from the specified queue using the default receive timeout
*
* @param queueName name of the source queue
* @return the received ClientMessage, null if the receive timed-out
*/
public ClientMessage browseMessage(String queueName) {
return browseMessage(SimpleString.toSimpleString(queueName), defaultReceiveTimeout);
}
/**
* Browse a message (receive but do not consume) a message from the specified queue using the specified receive timeout
*
* @param queueName name of the source queue
* @param timeout receive timeout in milliseconds
* @return the received ClientMessage, null if the receive timed-out
*/
public ClientMessage browseMessage(String queueName, long timeout) {
return browseMessage(SimpleString.toSimpleString(queueName), timeout);
}
/**
* Browse a message (receive but do not consume) from the specified queue using the default receive timeout
*
* @param queueName name of the source queue
* @return the received ClientMessage, null if the receive timed-out
*/
public ClientMessage browseMessage(SimpleString queueName) {
final boolean browseOnly = true;
return getInternalClient().receiveMessage(queueName, defaultReceiveTimeout, browseOnly);
}
/**
* Browse a message (receive but do not consume) a message from the specified queue using the specified receive timeout
*
* @param queueName name of the source queue
* @param timeout receive timeout in milliseconds
* @return the received ClientMessage, null if the receive timed-out
*/
public ClientMessage browseMessage(SimpleString queueName, long timeout) {
final boolean browseOnly = true;
return getInternalClient().receiveMessage(queueName, timeout, browseOnly);
}
private InternalClient getInternalClient() {
if (internalClient == null) {
log.info("Creating Internal Client");
internalClient = new InternalClient();
internalClient.start();
}
return internalClient;
}
public static class EmbeddedActiveMQResourceException extends RuntimeException {
public EmbeddedActiveMQResourceException(String message) {
super(message);
}
public EmbeddedActiveMQResourceException(String message, Exception cause) {
super(message, cause);
}
}
private class InternalClient {
ServerLocator serverLocator;
ClientSessionFactory sessionFactory;
ClientSession session;
ClientProducer producer;
InternalClient() {
}
void start() {
log.info("Starting {}", this.getClass().getSimpleName());
try {
serverLocator = ActiveMQClient.createServerLocator(getVmURL());
sessionFactory = serverLocator.createSessionFactory();
}
catch (RuntimeException runtimeEx) {
throw runtimeEx;
}
catch (Exception ex) {
throw new EmbeddedActiveMQResourceException("Internal Client creation failure", ex);
}
try {
session = sessionFactory.createSession();
producer = session.createProducer((String) null);
session.start();
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException("Internal Client creation failure", amqEx);
}
}
void stop() {
if (producer != null) {
try {
producer.close();
}
catch (ActiveMQException amqEx) {
log.warn("ActiveMQException encountered closing InternalClient ClientProducer - ignoring", amqEx);
}
finally {
producer = null;
}
}
if (session != null) {
try {
session.close();
}
catch (ActiveMQException amqEx) {
log.warn("ActiveMQException encountered closing InternalClient ClientSession - ignoring", amqEx);
}
finally {
session = null;
}
}
if (sessionFactory != null) {
sessionFactory.close();
sessionFactory = null;
}
if (serverLocator != null) {
serverLocator.close();
serverLocator = null;
}
}
public ClientMessage createMessage(boolean durable) {
checkSession();
return session.createMessage(durable);
}
public void sendMessage(SimpleString address, ClientMessage message) {
checkSession();
if (producer == null) {
throw new IllegalStateException("ClientProducer is null - has the InternalClient been started?");
}
try {
producer.send(address, message);
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("Failed to send message to %s", address.toString()), amqEx);
}
}
public ClientMessage receiveMessage(SimpleString address, long timeout, boolean browseOnly) {
checkSession();
ClientConsumer consumer = null;
try {
consumer = session.createConsumer(address, browseOnly);
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("Failed to create consumer for %s", address.toString()), amqEx);
}
ClientMessage message = null;
if (timeout > 0) {
try {
message = consumer.receive(timeout);
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive( timeout = %d ) for %s failed", timeout, address.toString()), amqEx);
}
}
else if (timeout == 0) {
try {
message = consumer.receiveImmediate();
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receiveImmediate() for %s failed", address.toString()), amqEx);
}
}
else {
try {
message = consumer.receive();
}
catch (ActiveMQException amqEx) {
throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive() for %s failed", address.toString()), amqEx);
}
}
return message;
}
void checkSession() {
getInternalClient();
if (session == null) {
throw new IllegalStateException("ClientSession is null - has the InternalClient been started?");
}
}
}
}

View File

@ -0,0 +1,764 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.server.config.JMSConfiguration;
import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A JUnit Rule that embeds an ActiveMQ Artemis JMS server into a test.
*
* This JUnit Rule is designed to simplify using embedded servers in unit tests. Adding the rule to a test will startup
* an embedded JMS server, which can then be used by client applications.
*
* <pre><code>
* public class SimpleTest {
* {@code @Rule}
* public EmbeddedJMSResource server = new EmbeddedJMSResource();
*
* {@code @Test}
* public void testSomething() throws Exception {
* // Use the embedded server here
* }
* }
* </code></pre>
*/
public class EmbeddedJMSResource extends ExternalResource {
static final String SERVER_NAME = "embedded-jms-server";
Logger log = LoggerFactory.getLogger(this.getClass());
Integer serverId = null;
Configuration configuration;
JMSConfiguration jmsConfiguration;
EmbeddedJMS jmsServer;
InternalClient internalClient;
/**
* Create a default EmbeddedJMSResource
*/
public EmbeddedJMSResource() {
configuration = new ConfigurationImpl().setName(SERVER_NAME).setPersistenceEnabled(false).setSecurityEnabled(false).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
jmsConfiguration = new JMSConfigurationImpl();
init();
}
/**
* Create a default EmbeddedJMSResource with the specified server id
*/
public EmbeddedJMSResource(int serverId) {
this.serverId = serverId;
Map<String, Object> props = new HashMap<>();
props.put(TransportConstants.SERVER_ID_PROP_NAME, serverId);
configuration = new ConfigurationImpl().setName(SERVER_NAME + "-" + serverId).setPersistenceEnabled(false).setSecurityEnabled(false).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName(), props));
jmsConfiguration = new JMSConfigurationImpl();
init();
}
/**
* Create an EmbeddedJMSResource with the specified configurations
*
* @param configuration ActiveMQServer configuration
* @param jmsConfiguration JMSServerManager configuration
*/
public EmbeddedJMSResource(Configuration configuration, JMSConfiguration jmsConfiguration) {
this.configuration = configuration;
this.jmsConfiguration = jmsConfiguration;
init();
}
/**
* Create an EmbeddedJMSResource with the specified configuration file
*
* @param filename configuration file name
*/
public EmbeddedJMSResource(String filename) {
this(filename, filename);
}
/**
* Create an EmbeddedJMSResource with the specified configuration file
*
* @param serverConfigurationFileName ActiveMQServer configuration file name
* @param jmsConfigurationFileName JMSServerManager configuration file name
*/
public EmbeddedJMSResource(String serverConfigurationFileName, String jmsConfigurationFileName) {
if (serverConfigurationFileName == null) {
throw new IllegalArgumentException("ActiveMQServer configuration file name cannot be null");
}
if (jmsConfigurationFileName == null) {
throw new IllegalArgumentException("JMSServerManager configuration file name cannot be null");
}
FileDeploymentManager coreDeploymentManager = new FileDeploymentManager(serverConfigurationFileName);
FileConfiguration coreConfiguration = new FileConfiguration();
coreDeploymentManager.addDeployable(coreConfiguration);
try {
coreDeploymentManager.readConfiguration();
}
catch (Exception readCoreConfigEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to read ActiveMQServer configuration from file %s", serverConfigurationFileName), readCoreConfigEx);
}
this.configuration = coreConfiguration;
FileJMSConfiguration jmsConfiguration = new FileJMSConfiguration();
FileDeploymentManager jmsDeploymentManager = new FileDeploymentManager(jmsConfigurationFileName);
jmsDeploymentManager.addDeployable(jmsConfiguration);
try {
jmsDeploymentManager.readConfiguration();
}
catch (Exception readJmsConfigEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to read JMSServerManager configuration from file %s", jmsConfigurationFileName), readJmsConfigEx);
}
this.jmsConfiguration = jmsConfiguration;
init();
}
public static void setMessageProperties(Message message, Map<String, Object> properties) {
if (properties != null && properties.size() > 0) {
for (Map.Entry<String, Object> property : properties.entrySet()) {
try {
message.setObjectProperty(property.getKey(), property.getValue());
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to set property {%s = %s}", property.getKey(), property.getValue().toString()), jmsEx);
}
}
}
}
private void init() {
if (jmsServer == null) {
jmsServer = new EmbeddedJMS().setConfiguration(configuration).setJmsConfiguration(jmsConfiguration);
}
}
/**
* Start the embedded EmbeddedJMSResource.
* <p/>
* The server will normally be started by JUnit using the before() method. This method allows the server to
* be started manually to support advanced testing scenarios.
*/
public void start() {
log.info("Starting {}: {}", this.getClass().getSimpleName(), this.getServerName());
try {
jmsServer.start();
}
catch (Exception ex) {
throw new RuntimeException(String.format("Exception encountered starting %s: %s", jmsServer.getClass().getSimpleName(), this.getServerName()), ex);
}
}
/**
* Stop the embedded ActiveMQ broker, blocking until the broker has stopped.
* <p/>
* The broker will normally be stopped by JUnit using the after() method. This method allows the broker to
* be stopped manually to support advanced testing scenarios.
*/
public void stop() {
log.info("Stopping {}: {}", this.getClass().getSimpleName(), this.getServerName());
if (internalClient != null) {
internalClient.stop();
internalClient = null;
}
if (jmsServer != null) {
try {
jmsServer.stop();
}
catch (Exception ex) {
log.warn(String.format("Exception encountered stopping %s: %s - ignoring", jmsServer.getClass().getSimpleName(), this.getServerName()), ex);
}
}
}
/**
* Start the embedded ActiveMQ Broker
* <p/>
* Invoked by JUnit to setup the resource
*/
@Override
protected void before() throws Throwable {
log.info("Starting {}: {}", this.getClass().getSimpleName(), getServerName());
this.start();
super.before();
}
/**
* Stop the embedded ActiveMQ Broker
* <p/>
* Invoked by JUnit to tear down the resource
*/
@Override
protected void after() {
log.info("Stopping {}: {}", this.getClass().getSimpleName(), getServerName());
super.after();
this.stop();
}
/**
* Get the EmbeddedJMS server.
* <p/>
* This may be required for advanced configuration of the EmbeddedJMS server.
*
* @return
*/
public EmbeddedJMS getJmsServer() {
return jmsServer;
}
/**
* Get the name of the EmbeddedJMS server
*
* @return name of the server
*/
public String getServerName() {
String name = "unknown";
ActiveMQServer activeMQServer = jmsServer.getActiveMQServer();
if (activeMQServer != null) {
name = activeMQServer.getConfiguration().getName();
}
else if (configuration != null) {
name = configuration.getName();
}
return name;
}
/**
* Get the VM URL for the embedded EmbeddedActiveMQ server
*
* @return the VM URL for the embedded server
*/
public String getVmURL() {
String vmURL = "vm://0";
for (TransportConfiguration transportConfiguration : configuration.getAcceptorConfigurations()) {
Map<String, Object> params = transportConfiguration.getParams();
if (params != null && params.containsKey(TransportConstants.SERVER_ID_PROP_NAME)) {
vmURL = "vm://" + params.get(TransportConstants.SERVER_ID_PROP_NAME);
}
}
return vmURL;
}
/**
* Get the Queue corresponding to a JMS Destination.
* <p/>
* The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
* or topic://myTopic. If the destination type prefix is not included in the destination name, a prefix
* of "queue://" is assumed.
*
* @param destinationName the full name of the JMS Destination
* @return the number of messages in the JMS Destination
*/
public Queue getDestinationQueue(String destinationName) {
Queue queue = null;
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
String address = destination.getAddress();
String name = destination.getName();
if (destination.isQueue()) {
queue = jmsServer.getActiveMQServer().locateQueue(destination.getSimpleAddress());
}
else {
BindingQueryResult bindingQueryResult = null;
try {
bindingQueryResult = jmsServer.getActiveMQServer().bindingQuery(destination.getSimpleAddress());
}
catch (Exception ex) {
log.error(String.format("getDestinationQueue( %s ) - bindingQuery for %s failed", destinationName, destination.getAddress()), ex);
return null;
}
if (bindingQueryResult.isExists()) {
List<SimpleString> queueNames = bindingQueryResult.getQueueNames();
if (queueNames.size() > 0) {
queue = jmsServer.getActiveMQServer().locateQueue(queueNames.get(0));
}
}
}
return queue;
}
/**
* Get the Queues corresponding to a JMS Topic.
* <p/>
* The full name of the JMS Topic including the prefix should be provided - i.e. topic://myTopic. If the destination type prefix
* is not included in the destination name, a prefix of "topic://" is assumed.
*
* @param topicName the full name of the JMS Destination
* @return the number of messages in the JMS Destination
*/
public List<Queue> getTopicQueues(String topicName) {
List<Queue> queues = new LinkedList<>();
ActiveMQDestination destination = ActiveMQDestination.createDestination(topicName, ActiveMQDestination.TOPIC_TYPE);
if (!destination.isQueue()) {
BindingQueryResult bindingQueryResult = null;
try {
bindingQueryResult = jmsServer.getActiveMQServer().bindingQuery(destination.getSimpleAddress());
}
catch (Exception ex) {
log.error(String.format("getTopicQueues( %s ) - bindingQuery for %s failed", topicName, destination.getAddress()), ex);
return queues;
}
if (bindingQueryResult.isExists()) {
ActiveMQServer activeMQServer = jmsServer.getActiveMQServer();
for (SimpleString queueName : bindingQueryResult.getQueueNames()) {
queues.add(activeMQServer.locateQueue(queueName));
}
}
}
return queues;
}
/**
* Get the number of messages in a specific JMS Destination.
* <p/>
* The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
* or topic://myTopic. If the destination type prefix is not included in the destination name, a prefix
* of "queue://" is assumed.
*
* NOTE: For JMS Topics, this returned count will be the total number of messages for all subscribers. For
* example, if there are two subscribers on the topic and a single message is published, the returned count will
* be two (one message for each subscriber).
*
* @param destinationName the full name of the JMS Destination
* @return the number of messages in the JMS Destination
*/
public long getMessageCount(String destinationName) {
long count = 0;
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
if (destination.isQueue()) {
Queue queue = getDestinationQueue(destinationName);
if (queue == null) {
log.warn("getMessageCount(destinationName) - destination {} not found; returning -1", destinationName);
count = -1;
}
else {
count = queue.getMessageCount();
}
}
else {
for (Queue topicQueue : getTopicQueues(destinationName)) {
count += topicQueue.getMessageCount();
}
}
return count;
}
public BytesMessage createBytesMessage() {
return getInternalClient().createBytesMessage();
}
public TextMessage createTextMessage() {
return getInternalClient().createTextMessage();
}
public MapMessage createMapMessage() {
return getInternalClient().createMapMessage();
}
public ObjectMessage createObjectMessage() {
return getInternalClient().createObjectMessage();
}
public StreamMessage createStreamMessage() {
return getInternalClient().createStreamMessage();
}
public BytesMessage createMessage(byte[] body) {
return createMessage(body, null);
}
public TextMessage createMessage(String body) {
return createMessage(body, null);
}
public MapMessage createMessage(Map<String, Object> body) {
return createMessage(body, null);
}
public ObjectMessage createMessage(Serializable body) {
return createMessage(body, null);
}
public BytesMessage createMessage(byte[] body, Map<String, Object> properties) {
BytesMessage message = this.createBytesMessage();
if (body != null) {
try {
message.writeBytes(body);
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to set body {%s} on BytesMessage", new String(body)), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public TextMessage createMessage(String body, Map<String, Object> properties) {
TextMessage message = this.createTextMessage();
if (body != null) {
try {
message.setText(body);
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to set body {%s} on TextMessage", body), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) {
MapMessage message = this.createMapMessage();
if (body != null) {
for (Map.Entry<String, Object> entry : body.entrySet()) {
try {
message.setObject(entry.getKey(), entry.getValue());
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to set body entry {%s = %s} on MapMessage", entry.getKey(), entry.getValue().toString()), jmsEx);
}
}
}
setMessageProperties(message, properties);
return message;
}
public ObjectMessage createMessage(Serializable body, Map<String, Object> properties) {
ObjectMessage message = this.createObjectMessage();
if (body != null) {
try {
message.setObject(body);
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to set body {%s} on ObjectMessage", body.toString()), jmsEx);
}
}
setMessageProperties(message, properties);
return message;
}
public void pushMessage(String destinationName, Message message) {
if (destinationName == null) {
throw new IllegalArgumentException("sendMessage failure - destination name is required");
}
else if (message == null) {
throw new IllegalArgumentException("sendMessage failure - a Message is required");
}
ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
getInternalClient().pushMessage(destination, message);
}
public BytesMessage pushMessage(String destinationName, byte[] body) {
BytesMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public TextMessage pushMessage(String destinationName, String body) {
TextMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public MapMessage pushMessage(String destinationName, Map<String, Object> body) {
MapMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public ObjectMessage pushMessage(String destinationName, Serializable body) {
ObjectMessage message = createMessage(body, null);
pushMessage(destinationName, message);
return message;
}
public BytesMessage pushMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) {
BytesMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public TextMessage pushMessageWithProperties(String destinationName, String body, Map<String, Object> properties) {
TextMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public MapMessage pushMessageWithProperties(String destinationName,
Map<String, Object> body,
Map<String, Object> properties) {
MapMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public ObjectMessage pushMessageWithProperties(String destinationName,
Serializable body,
Map<String, Object> properties) {
ObjectMessage message = createMessage(body, properties);
pushMessage(destinationName, message);
return message;
}
public Message peekMessage(String destinationName) {
if (null == jmsServer) {
throw new NullPointerException("peekMessage failure - BrokerService is null");
}
if (destinationName == null) {
throw new IllegalArgumentException("peekMessage failure - destination name is required");
}
throw new UnsupportedOperationException("Not yet implemented");
}
public BytesMessage peekBytesMessage(String destinationName) {
return (BytesMessage) peekMessage(destinationName);
}
public TextMessage peekTextMessage(String destinationName) {
return (TextMessage) peekMessage(destinationName);
}
public MapMessage peekMapMessage(String destinationName) {
return (MapMessage) peekMessage(destinationName);
}
public ObjectMessage peekObjectMessage(String destinationName) {
return (ObjectMessage) peekMessage(destinationName);
}
public StreamMessage peekStreamMessage(String destinationName) {
return (StreamMessage) peekMessage(destinationName);
}
private InternalClient getInternalClient() {
if (internalClient == null) {
log.info("Creating InternalClient");
internalClient = new InternalClient();
internalClient.start();
}
return internalClient;
}
public static class EmbeddedJMSResourceException extends RuntimeException {
public EmbeddedJMSResourceException(String message) {
super(message);
}
public EmbeddedJMSResourceException(String message, Exception cause) {
super(message, cause);
}
}
private class InternalClient {
ConnectionFactory connectionFactory;
Connection connection;
Session session;
MessageProducer producer;
InternalClient() {
}
void start() {
connectionFactory = new ActiveMQConnectionFactory(getVmURL());
try {
connection = connectionFactory.createConnection();
session = connection.createSession();
producer = session.createProducer(null);
connection.start();
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException("InternalClient creation failure", jmsEx);
}
}
void stop() {
try {
producer.close();
}
catch (JMSException jmsEx) {
log.warn("JMSException encounter closing InternalClient Session - MessageProducer", jmsEx);
}
finally {
producer = null;
}
try {
session.close();
}
catch (JMSException jmsEx) {
log.warn("JMSException encounter closing InternalClient Session - ignoring", jmsEx);
}
finally {
session = null;
}
if (null != connection) {
try {
connection.close();
}
catch (JMSException jmsEx) {
log.warn("JMSException encounter closing InternalClient Connection - ignoring", jmsEx);
}
finally {
connection = null;
}
}
}
public BytesMessage createBytesMessage() {
checkSession();
try {
return session.createBytesMessage();
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException("Failed to create BytesMessage", jmsEx);
}
}
public TextMessage createTextMessage() {
checkSession();
try {
return session.createTextMessage();
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException("Failed to create TextMessage", jmsEx);
}
}
public MapMessage createMapMessage() {
checkSession();
try {
return session.createMapMessage();
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException("Failed to create MapMessage", jmsEx);
}
}
public ObjectMessage createObjectMessage() {
checkSession();
try {
return session.createObjectMessage();
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException("Failed to create ObjectMessage", jmsEx);
}
}
public StreamMessage createStreamMessage() {
checkSession();
try {
return session.createStreamMessage();
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException("Failed to create StreamMessage", jmsEx);
}
}
public void pushMessage(ActiveMQDestination destination, Message message) {
if (producer == null) {
throw new IllegalStateException("JMS MessageProducer is null - has the InternalClient been started?");
}
try {
producer.send(destination, message);
}
catch (JMSException jmsEx) {
throw new EmbeddedJMSResourceException(String.format("Failed to push %s to %s", message.getClass().getSimpleName(), destination.toString()), jmsEx);
}
}
void checkSession() {
if (session == null) {
throw new IllegalStateException("JMS Session is null - has the InternalClient been started?");
}
}
}
}

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.lang.ref.WeakReference;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
/**
* Messaging tests are usually Thread intensive and a thread leak or a server leakage may affect future tests.
* This Rule will prevent Threads leaking from one test into another by checking left over threads.
* This will also clear Client Thread Pools from ActiveMQClient.
*/
public class ThreadLeakCheckRule extends ExternalResource {
private static Logger log = Logger.getLogger(ThreadLeakCheckRule.class);
private static Set<String> knownThreads = new HashSet<>();
boolean enabled = true;
private Map<Thread, StackTraceElement[]> previousThreads;
public void disable() {
enabled = false;
}
/**
* Override to set up your specific external resource.
*
* @throws if setup fails (which will disable {@code after}
*/
@Override
protected void before() throws Throwable {
// do nothing
previousThreads = Thread.getAllStackTraces();
}
/**
* Override to tear down your specific external resource.
*/
@Override
protected void after() {
ActiveMQClient.clearThreadPools();
InVMConnector.resetThreadPool();
try {
if (enabled) {
boolean failed = true;
boolean failedOnce = false;
long timeout = System.currentTimeMillis() + 60000;
while (failed && timeout > System.currentTimeMillis()) {
failed = checkThread();
if (failed) {
failedOnce = true;
forceGC();
try {
Thread.sleep(500);
}
catch (Throwable e) {
}
}
}
if (failed) {
Assert.fail("Thread leaked");
}
else if (failedOnce) {
System.out.println("******************** Threads cleared after retries ********************");
System.out.println();
}
}
else {
enabled = true;
}
}
finally {
// clearing just to help GC
previousThreads = null;
}
}
private static int failedGCCalls = 0;
public static void forceGC() {
if (failedGCCalls >= 10) {
log.info("ignoring forceGC call since it seems System.gc is not working anyways");
return;
}
log.info("#test forceGC");
CountDownLatch finalized = new CountDownLatch(1);
WeakReference<DumbReference> dumbReference = new WeakReference<>(new DumbReference(finalized));
long timeout = System.currentTimeMillis() + 1000;
// A loop that will wait GC, using the minimal time as possible
while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) {
System.gc();
System.runFinalization();
try {
finalized.await(100, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
}
}
if (dumbReference.get() != null) {
failedGCCalls++;
log.info("It seems that GC is disabled at your VM");
}
else {
// a success would reset the count
failedGCCalls = 0;
}
log.info("#test forceGC Done ");
}
public static void removeKownThread(String name) {
knownThreads.remove(name);
}
public static void addKownThread(String name) {
knownThreads.add(name);
}
private boolean checkThread() {
boolean failedThread = false;
Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces();
if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) {
for (Thread aliveThread : postThreads.keySet()) {
if (aliveThread.isAlive() && !isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) {
if (!failedThread) {
System.out.println("*********************************************************************************");
System.out.println("LEAKING THREADS");
}
failedThread = true;
System.out.println("=============================================================================");
System.out.println("Thread " + aliveThread + " is still alive with the following stackTrace:");
StackTraceElement[] elements = postThreads.get(aliveThread);
for (StackTraceElement el : elements) {
System.out.println(el);
}
}
}
if (failedThread) {
System.out.println("*********************************************************************************");
}
}
return failedThread;
}
/**
* if it's an expected thread... we will just move along ignoring it
*
* @param thread
* @return
*/
private boolean isExpectedThread(Thread thread) {
for (String known: knownThreads) {
if (thread.getName().contains(known)) {
return true;
}
}
return false;
}
protected static class DumbReference {
private CountDownLatch finalized;
public DumbReference(CountDownLatch finalized) {
this.finalized = finalized;
}
@Override
public void finalize() throws Throwable {
finalized.countDown();
super.finalize();
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.concurrent.TimeUnit;
/**
* Utility adapted from: org.apache.activemq.util.Wait
*/
public class Wait {
public static final long MAX_WAIT_MILLIS = 30 * 1000;
public static final int SLEEP_MILLIS = 1000;
public interface Condition {
boolean isSatisfied() throws Exception;
}
public static boolean waitFor(Condition condition) throws Exception {
return waitFor(condition, MAX_WAIT_MILLIS);
}
public static boolean waitFor(final Condition condition, final long duration) throws Exception {
return waitFor(condition, duration, SLEEP_MILLIS);
}
public static boolean waitFor(final Condition condition,
final long duration,
final long sleepMillis) throws Exception {
final long expiry = System.currentTimeMillis() + duration;
boolean conditionSatisified = condition.isSatisfied();
while (!conditionSatisified && System.currentTimeMillis() < expiry) {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
conditionSatisified = condition.isSatisfied();
}
return conditionSatisified;
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class ActiveMQConsumerResourceTest {
static final SimpleString TEST_QUEUE = new SimpleString("test.queue");
static final SimpleString TEST_ADDRESS = new SimpleString("test.queue");
static final String TEST_BODY = "Test Message";
static final Map<String, Object> TEST_PROPERTIES;
static final String ASSERT_SENT_FORMAT = "Message should have been sent to %s";
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
static {
TEST_PROPERTIES = new HashMap<String, Object>(2);
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
}
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
ActiveMQConsumerResource consumer = new ActiveMQConsumerResource(server.getVmURL(), TEST_QUEUE);
@Rule
public RuleChain ruleChain = RuleChain.outerRule(new ThreadLeakCheckRule()).outerRule(server).around(consumer);
ClientMessage sent = null;
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getMessageCount("TEST_QUEUE") == 1;
}
}, 5000, 100);
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE), 1, server.getMessageCount(TEST_QUEUE));
ClientMessage received = consumer.receiveMessage();
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
}
@Test
public void testSendBytes() throws Exception {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
}
@Test
public void testSendString() throws Exception {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
}
@Test
public void testSendBytesAndProperties() throws Exception {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test
public void testSendStringAndProperties() throws Exception {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class ActiveMQDynamicProducerResourceTest {
static final SimpleString TEST_QUEUE_ONE = new SimpleString("test.queue.one");
static final SimpleString TEST_QUEUE_TWO = new SimpleString("test.queue.two");
static final String TEST_BODY = "Test Message";
static final Map<String, Object> TEST_PROPERTIES;
static final String ASSERT_SENT_FORMAT = "Message should have been sent to %s";
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
static {
TEST_PROPERTIES = new HashMap<String, Object>(2);
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
}
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
ActiveMQDynamicProducerResource producer = new ActiveMQDynamicProducerResource(server.getVmURL(), TEST_QUEUE_ONE);
@Rule
public RuleChain ruleChain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server).around(producer);
ClientMessage sentOne = null;
ClientMessage sentTwo = null;
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getMessageCount(TEST_QUEUE_ONE) == 1 && server.getMessageCount(TEST_QUEUE_TWO) == 1;
}
}, 5000, 100);
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE_ONE), 1, server.getMessageCount(TEST_QUEUE_ONE));
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE_TWO), 1, server.getMessageCount(TEST_QUEUE_TWO));
ClientMessage receivedOne = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), receivedOne);
ClientMessage receivedTwo = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedTwo);
}
@Test
public void testSendBytes() throws Exception {
sentOne = producer.sendMessage(TEST_BODY.getBytes());
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
}
@Test
public void testSendString() throws Exception {
sentOne = producer.sendMessage(TEST_BODY);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
}
@Test
public void testSendBytesAndProperties() throws Exception {
sentOne = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test
public void testSendStringAndProperties() throws Exception {
sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
static final SimpleString TEST_QUEUE_ONE = new SimpleString("test.queue.one");
static final String TEST_BODY = "Test Message";
static final Map<String, Object> TEST_PROPERTIES;
static {
TEST_PROPERTIES = new HashMap<String, Object>(2);
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
}
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
ActiveMQDynamicProducerResource producer = new ActiveMQDynamicProducerResource(server.getVmURL());
@Rule
public RuleChain ruleChain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server).around(producer);
ClientMessage sentOne = null;
@Before
public void setUp() throws Exception {
producer.setAutoCreateQueue(false);
server.createQueue(TEST_QUEUE_ONE, TEST_QUEUE_ONE);
}
@Test(expected = IllegalArgumentException.class)
public void testSendBytesToDefaultAddress() throws Exception {
sentOne = producer.sendMessage(TEST_BODY.getBytes());
}
@Test(expected = IllegalArgumentException.class)
public void testSendStringToDefaultAddress() throws Exception {
sentOne = producer.sendMessage(TEST_BODY);
}
@Test(expected = IllegalArgumentException.class)
public void testSendBytesAndPropertiesToDefaultAddress() throws Exception {
sentOne = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test(expected = IllegalArgumentException.class)
public void testSendStringAndPropertiesToDefaultAddress() throws Exception {
sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class ActiveMQDynamicProducerResourceWithoutAddressTest {
static final SimpleString TEST_QUEUE_ONE = new SimpleString("test.queue.one");
static final SimpleString TEST_QUEUE_TWO = new SimpleString("test.queue.two");
static final String TEST_BODY = "Test Message";
static final Map<String, Object> TEST_PROPERTIES;
static final String ASSERT_SENT_FORMAT = "Message should have been sent to %s";
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
static {
TEST_PROPERTIES = new HashMap<String, Object>(2);
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
}
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
ActiveMQDynamicProducerResource producer = new ActiveMQDynamicProducerResource(server.getVmURL());
@Rule
public RuleChain ruleChain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server).around(producer);
ClientMessage sentOne = null;
ClientMessage sentTwo = null;
@Before
public void setUp() throws Exception {
producer.setAutoCreateQueue(false);
server.createQueue(TEST_QUEUE_ONE, TEST_QUEUE_ONE);
server.createQueue(TEST_QUEUE_TWO, TEST_QUEUE_TWO);
}
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentTwo);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getMessageCount(TEST_QUEUE_ONE) == 1 && server.getMessageCount(TEST_QUEUE_TWO) == 1;
}
}, 5000, 100);
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE_ONE), 1, server.getMessageCount(TEST_QUEUE_ONE));
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE_TWO), 1, server.getMessageCount(TEST_QUEUE_TWO));
ClientMessage receivedOne = server.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_ONE), receivedOne);
ClientMessage receivedTwo = server.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedTwo);
}
@Test
public void testSendBytes() throws Exception {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes());
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes());
}
@Test
public void testSendString() throws Exception {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY);
}
@Test
public void testSendBytesAndProperties() throws Exception {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY.getBytes(), TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test
public void testSendStringAndProperties() throws Exception {
sentOne = producer.sendMessage(TEST_QUEUE_ONE, TEST_BODY, TEST_PROPERTIES);
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertNotNull;
public class ActiveMQProducerResourceTest {
static final SimpleString TEST_QUEUE = new SimpleString("test.queue");
static final SimpleString TEST_ADDRESS = new SimpleString("test.queue");
static final String TEST_BODY = "Test Message";
static final Map<String, Object> TEST_PROPERTIES;
static final String ASSERT_SENT_FORMAT = "Message should have been sent to %s";
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
static {
TEST_PROPERTIES = new HashMap<String, Object>(2);
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
}
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
ActiveMQProducerResource producer = new ActiveMQProducerResource(server.getVmURL(), TEST_ADDRESS);
@Rule
public RuleChain ruleChain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server).around(producer);
ClientMessage sent = null;
@After
public void checkResults() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE), received);
}
@Test
public void testSendBytes() throws Exception {
sent = producer.sendMessage(TEST_BODY.getBytes());
}
@Test
public void testSendString() throws Exception {
sent = producer.sendMessage(TEST_BODY);
}
@Test
public void testSendBytesAndProperties() throws Exception {
sent = producer.sendMessage(TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test
public void testSendStringAndProperties() throws Exception {
sent = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.List;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.Queue;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class EmbeddedActiveMQResourceCustomConfigurationTest {
static final String TEST_QUEUE = "test.queue";
static final String TEST_ADDRESS = "test.address";
CoreQueueConfiguration queueConfiguration = new CoreQueueConfiguration().setAddress(TEST_ADDRESS).setName(TEST_QUEUE);
Configuration customConfiguration = new ConfigurationImpl().setPersistenceEnabled(false).setSecurityEnabled(true).addQueueConfiguration(queueConfiguration);
private EmbeddedActiveMQResource server = new EmbeddedActiveMQResource(customConfiguration);
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server);
@Test
public void testCustomConfiguration() throws Exception {
Configuration configuration = server.getServer().getActiveMQServer().getConfiguration();
assertFalse("Persistence should have been disabled", configuration.isPersistenceEnabled());
assertTrue( "Security should have been enabled", configuration.isSecurityEnabled());
assertNotNull(TEST_QUEUE + " should exist", server.locateQueue(TEST_QUEUE));
List<Queue> boundQueues = server.getBoundQueues(TEST_ADDRESS);
assertNotNull("List should never be null", boundQueues);
assertEquals("Should have one queue bound to address " + TEST_ADDRESS, 1, boundQueues.size());
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.List;
import org.apache.activemq.artemis.core.server.Queue;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class EmbeddedActiveMQResourceFileConfigurationTest {
// These values must match the contents of the configuration file
static final String TEST_QUEUE = "test.queue";
static final String TEST_ADDRESS = "test.address";
private EmbeddedActiveMQResource server = new EmbeddedActiveMQResource("embedded-artemis-server.xml");
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server);
@Test
public void testConfiguredQueue() throws Exception {
assertNotNull(TEST_QUEUE + " should exist", server.locateQueue(TEST_QUEUE));
List<Queue> boundQueues = server.getBoundQueues(TEST_ADDRESS);
assertNotNull("List should never be null", boundQueues);
assertEquals("Should have one queue bound to address " + TEST_ADDRESS, 1, boundQueues.size());
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class EmbeddedActiveMQResourceTest {
static final SimpleString TEST_QUEUE = new SimpleString("test.queue");
static final SimpleString TEST_ADDRESS = new SimpleString("test.queueName");
static final String TEST_BODY = "Test Message";
static final Map<String, Object> TEST_PROPERTIES;
static final String ASSERT_SENT_FORMAT = "Message should have been sent to %s";
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
static {
TEST_PROPERTIES = new HashMap<String, Object>(2);
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
}
public EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server);
ClientMessage sent = null;
@Before
public void setUp() throws Exception {
server.createQueue(TEST_ADDRESS, TEST_QUEUE);
}
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_ADDRESS), sent);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getMessageCount(TEST_QUEUE) == 1;
}
}, 5000, 100);
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE), 1, server.getMessageCount(TEST_QUEUE));
ClientMessage received = server.receiveMessage(TEST_QUEUE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
}
@Test
public void testSendBytes() throws Exception {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY.getBytes());
}
@Test
public void testSendString() throws Exception {
sent = server.sendMessage(TEST_ADDRESS, TEST_BODY);
}
@Test
public void testSendBytesAndProperties() throws Exception {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test
public void testSendStringAndProperties() throws Exception {
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import java.util.List;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class EmbeddedJMSResourceMultipleFileConfigurationTest {
static final String TEST_QUEUE = "queue://test.queue";
static final String TEST_TOPIC = "topic://test.topic";
static final String TEST_BODY = "Test Message";
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-minimal-server.xml", "embedded-artemis-jms-only.xml");
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(jmsServer);
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ActiveMQMessageConsumer consumer;
@Before
public void setUp() throws Exception {
connectionFactory = new ActiveMQConnectionFactory(jmsServer.getVmURL());
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = (ActiveMQMessageConsumer) session.createConsumer(ActiveMQDestination.createDestination(TEST_TOPIC, ActiveMQDestination.TOPIC_TYPE));
connection.start();
}
@After
public void tearDown() throws Exception {
consumer.close();
session.close();
connection.close();
}
@Test
public void testConfiguration() throws Exception {
assertNotNull("Queue should have been created", jmsServer.getDestinationQueue(TEST_QUEUE));
assertNotNull("Topic should have been created", jmsServer.getDestinationQueue(TEST_TOPIC));
List<Queue> boundQueues = jmsServer.getTopicQueues(TEST_TOPIC);
assertNotNull("List should never be null", boundQueues);
assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 2, boundQueues.size());
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import javax.jms.Message;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class EmbeddedJMSResourceQueueTest {
static final String TEST_DESTINATION_NAME = "queue://test.queue";
static final String TEST_BODY = "Test Message";
static final Map<String, Object> TEST_MAP_BODY;
static final Map<String, Object> TEST_PROPERTIES;
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
static {
TEST_MAP_BODY = new HashMap<>(2);
TEST_MAP_BODY.put("Element 1", "Value 1");
TEST_MAP_BODY.put("Element 2", "Value 2");
TEST_PROPERTIES = new HashMap<String, Object>(2);
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
}
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(jmsServer);
Message pushed = null;
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_PUSHED_FORMAT, TEST_DESTINATION_NAME), pushed);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return jmsServer.getMessageCount(TEST_DESTINATION_NAME) == 1;
}
}, 5000, 100);
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_DESTINATION_NAME), 1, jmsServer.getMessageCount(TEST_DESTINATION_NAME));
}
@Test
public void testPushBytesMessage() throws Exception {
pushed = jmsServer.pushMessage(TEST_DESTINATION_NAME, TEST_BODY.getBytes());
}
@Test
public void testPushTextMessage() throws Exception {
pushed = jmsServer.pushMessage(TEST_DESTINATION_NAME, TEST_BODY);
}
@Test
public void testPushMapMessage() throws Exception {
pushed = jmsServer.pushMessage(TEST_DESTINATION_NAME, TEST_MAP_BODY);
}
@Test
public void testPushObjectMessage() throws Exception {
pushed = jmsServer.pushMessage(TEST_DESTINATION_NAME, (Serializable) TEST_BODY);
}
@Test
public void testPushBytesMessageWithProperties() throws Exception {
pushed = jmsServer.pushMessageWithProperties(TEST_DESTINATION_NAME, TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test
public void testPushTextMessageWithProperties() throws Exception {
pushed = jmsServer.pushMessageWithProperties(TEST_DESTINATION_NAME, TEST_BODY, TEST_PROPERTIES);
}
@Test
public void testPushMapMessageWithProperties() throws Exception {
pushed = jmsServer.pushMessageWithProperties(TEST_DESTINATION_NAME, TEST_MAP_BODY, TEST_PROPERTIES);
}
@Test
public void testPushObjectMessageWithProperties() throws Exception {
pushed = jmsServer.pushMessageWithProperties(TEST_DESTINATION_NAME, (Serializable) TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import java.util.List;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class EmbeddedJMSResourceSingleFileConfigurationTest {
static final String TEST_QUEUE = "queue://test.queue";
static final String TEST_TOPIC = "topic://test.topic";
static final String TEST_BODY = "Test Message";
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-jms-server.xml");
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(jmsServer);
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ActiveMQMessageConsumer consumer;
@Before
public void setUp() throws Exception {
connectionFactory = new ActiveMQConnectionFactory(jmsServer.getVmURL());
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = (ActiveMQMessageConsumer) session.createConsumer(ActiveMQDestination.createDestination(TEST_TOPIC, ActiveMQDestination.TOPIC_TYPE));
connection.start();
}
@After
public void tearDown() throws Exception {
consumer.close();
session.close();
connection.close();
}
@Test
public void testConfiguration() throws Exception {
assertNotNull("Queue should have been created", jmsServer.getDestinationQueue(TEST_QUEUE));
assertNotNull("Topic should have been created", jmsServer.getDestinationQueue(TEST_TOPIC));
List<Queue> boundQueues = jmsServer.getTopicQueues(TEST_TOPIC);
assertNotNull("List should never be null", boundQueues);
assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 2, boundQueues.size());
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class EmbeddedJMSResourceTopicTest {
static final String TEST_DESTINATION_NAME = "topic://test.topic";
static final String TEST_BODY = "Test Message";
static final Map<String, Object> TEST_MAP_BODY;
static final Map<String, Object> TEST_PROPERTIES;
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
static {
TEST_MAP_BODY = new HashMap<>(2);
TEST_MAP_BODY.put("Element 1", "Value 1");
TEST_MAP_BODY.put("Element 2", "Value 2");
TEST_PROPERTIES = new HashMap<String, Object>(2);
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
}
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(jmsServer);
Message pushed = null;
ConnectionFactory connectionFactory;
Connection connection;
Session session;
MessageConsumer consumer;
@Before
public void setUp() throws Exception {
connectionFactory = new ActiveMQConnectionFactory(jmsServer.getVmURL());
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(ActiveMQDestination.createDestination(TEST_DESTINATION_NAME, ActiveMQDestination.TOPIC_TYPE));
connection.start();
}
@After
public void tearDown() throws Exception {
assertNotNull(String.format(ASSERT_PUSHED_FORMAT, TEST_DESTINATION_NAME), pushed);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return jmsServer.getMessageCount(TEST_DESTINATION_NAME) == 1;
}
}, 5000, 100);
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_DESTINATION_NAME), 1, jmsServer.getMessageCount(TEST_DESTINATION_NAME));
consumer.close();
session.close();
connection.close();
}
@Test
public void testPushBytesMessage() throws Exception {
pushed = jmsServer.pushMessage(TEST_DESTINATION_NAME, TEST_BODY.getBytes());
}
@Test
public void testPushTextMessage() throws Exception {
pushed = jmsServer.pushMessage(TEST_DESTINATION_NAME, TEST_BODY);
}
@Test
public void testPushMapMessage() throws Exception {
pushed = jmsServer.pushMessage(TEST_DESTINATION_NAME, TEST_MAP_BODY);
}
@Test
public void testPushObjectMessage() throws Exception {
pushed = jmsServer.pushMessage(TEST_DESTINATION_NAME, (Serializable) TEST_BODY);
}
@Test
public void testPushBytesMessageWithProperties() throws Exception {
pushed = jmsServer.pushMessageWithProperties(TEST_DESTINATION_NAME, TEST_BODY.getBytes(), TEST_PROPERTIES);
}
@Test
public void testPushTextMessageWithProperties() throws Exception {
pushed = jmsServer.pushMessageWithProperties(TEST_DESTINATION_NAME, TEST_BODY, TEST_PROPERTIES);
}
@Test
public void testPushMapMessageWithProperties() throws Exception {
pushed = jmsServer.pushMessageWithProperties(TEST_DESTINATION_NAME, TEST_MAP_BODY, TEST_PROPERTIES);
}
@Test
public void testPushObjectMessageWithProperties() throws Exception {
pushed = jmsServer.pushMessageWithProperties(TEST_DESTINATION_NAME, (Serializable) TEST_BODY, TEST_PROPERTIES);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertNotNull;
public class MultipleEmbeddedActiveMQResourcesTest {
static final SimpleString TEST_QUEUE_ONE = new SimpleString("test.queue.one");
static final SimpleString TEST_QUEUE_TWO = new SimpleString("test.queue.two");
static final SimpleString TEST_ADDRESS_ONE = new SimpleString("test.address.one");
static final SimpleString TEST_ADDRESS_TWO = new SimpleString("test.address.two");
static final String TEST_BODY = "Test Message";
static final String ASSERT_SENT_FORMAT = "Message should have been sent to %s";
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
public EmbeddedActiveMQResource serverOne = new EmbeddedActiveMQResource(0);
public EmbeddedActiveMQResource serverTwo = new EmbeddedActiveMQResource(1);
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(serverOne).around(serverTwo);
@Before
public void setUp() throws Exception {
serverOne.createQueue(TEST_ADDRESS_ONE, TEST_QUEUE_ONE);
serverTwo.createQueue(TEST_ADDRESS_TWO, TEST_QUEUE_TWO);
}
@Test
public void testMultipleServers() throws Exception {
ClientMessage sentOne = serverOne.sendMessage(TEST_ADDRESS_ONE, TEST_BODY);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_ONE), sentOne);
ClientMessage receivedOne = serverOne.receiveMessage(TEST_QUEUE_ONE);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedOne);
ClientMessage sentTwo = serverTwo.sendMessage(TEST_ADDRESS_TWO, TEST_BODY);
assertNotNull(String.format(ASSERT_SENT_FORMAT, TEST_QUEUE_TWO), sentOne);
ClientMessage receivedTwo = serverTwo.receiveMessage(TEST_QUEUE_TWO);
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedOne);
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.junit;
import javax.jms.Message;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class MultipleEmbeddedJMSResourcesTest {
static final String TEST_QUEUE_ONE = "queue://test.queue.one";
static final String TEST_QUEUE_TWO = "queue://test.queue.two";
static final String TEST_BODY = "Test Message";
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
public EmbeddedJMSResource jmsServerOne = new EmbeddedJMSResource(0);
public EmbeddedJMSResource jmsServerTwo = new EmbeddedJMSResource(1);
@Rule
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(jmsServerOne).around(jmsServerTwo);
@Test
public void testMultipleServers() throws Exception {
Message pushedOne = jmsServerOne.pushMessage(TEST_QUEUE_ONE, TEST_BODY);
assertNotNull(String.format(ASSERT_PUSHED_FORMAT, TEST_QUEUE_ONE), pushedOne);
Message pushedTwo = jmsServerTwo.pushMessage(TEST_QUEUE_TWO, TEST_BODY);
assertNotNull(String.format(ASSERT_PUSHED_FORMAT, TEST_QUEUE_TWO), pushedTwo);
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE_ONE), 1, jmsServerOne.getMessageCount(TEST_QUEUE_ONE));
assertEquals(String.format(ASSERT_COUNT_FORMAT, TEST_QUEUE_TWO), 1, jmsServerTwo.getMessageCount(TEST_QUEUE_TWO));
}
}

View File

@ -0,0 +1,29 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
urn:activemq /schema/artemis-server.xsd
urn:activemq:jms /schema/artemis-jms.xsd">
<jms xmlns="urn:activemq:jms">
<queue name="test.queue" >
<durable>true</durable>
</queue>
<topic name="test.topic" />
</jms>
</configuration>

View File

@ -0,0 +1,40 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
urn:activemq /schema/artemis-server.xsd
urn:activemq:core /schema/artemis-configuration.xsd
urn:activemq:jms /schema/artemis-jms.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- Acceptors -->
<acceptors>
<acceptor name="in-vm">vm://0</acceptor>
</acceptors>
</core>
<jms xmlns="urn:activemq:jms">
<queue name="test.queue" >
<durable>true</durable>
</queue>
<topic name="test.topic" />
</jms>
</configuration>

View File

@ -0,0 +1,31 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
urn:activemq /schema/artemis-server.xsd
urn:activemq:core /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- Acceptors -->
<acceptors>
<acceptor name="in-vm">vm://0</acceptor>
</acceptors>
</core>
</configuration>

View File

@ -0,0 +1,41 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
urn:activemq /schema/artemis-server.xsd
urn:activemq:core /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- Acceptors -->
<acceptors>
<!-- In VM acceptor -->
<acceptor name="in-vm">vm://0</acceptor>
</acceptors>
<!-- Queues -->
<queues>
<queue name="test.queue">
<address>test.address</address>
<durable>true</durable>
</queue>
</queues>
</core>
</configuration>

View File

@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Additional logger names to configure (root logger is always configured)
# Root logger option
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests
# Root logger level
logger.level=INFO
# ActiveMQ Artemis logger levels
logger.org.apache.activemq.artemis.core.server.level=INFO
logger.org.apache.activemq.artemis.journal.level=INFO
logger.org.apache.activemq.artemis.utils.level=INFO
logger.org.apache.activemq.artemis.jms.level=INFO
logger.org.apache.activemq.artemis.ra.level=INFO
logger.org.apache.activemq.artemis.tests.unit.level=INFO
logger.org.apache.activemq.artemis.tests.integration.level=DEBUG
logger.org.apache.activemq.artemis.jms.tests.level=INFO
# Root logger handlers
logger.handlers=CONSOLE,TEST
#logger.handlers=CONSOLE,FILE
# Console handler configuration
handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
handler.CONSOLE.properties=autoFlush
handler.CONSOLE.level=FINE
handler.CONSOLE.autoFlush=true
handler.CONSOLE.formatter=PATTERN
# File handler configuration
handler.FILE=org.jboss.logmanager.handlers.FileHandler
handler.FILE.level=FINE
handler.FILE.properties=autoFlush,fileName
handler.FILE.autoFlush=true
handler.FILE.fileName=target/activemq.log
handler.FILE.formatter=PATTERN
# Console handler configuration
handler.TEST=org.apache.activemq.artemis.logs.AssertionLoggerHandler
handler.TEST.level=TRACE
handler.TEST.formatter=PATTERN
# Formatter pattern configuration
formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.PATTERN.properties=pattern
formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n

View File

@ -56,6 +56,7 @@
* [Protocols and Interoperability](protocols-interoperability.md)
* [Tools](tools.md)
* [Maven Plugin](maven-plugin.md)
* [Unit Testing](unit-testing.md)
* [Troubleshooting and Performance Tuning](perf-tuning.md)
* [Configuration Reference](configuration-index.md)

View File

@ -0,0 +1,77 @@
# Unit Testing
The package ```artemis-junit``` provides tools to facilitate how to run Artemis resources inside Junit Tests.
These are provided as junit rules and can make it easier to embed Messaging functionality on your tests.
## Example
### Import this on your pom.xml
```xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-junit</artifactId>
<!-- replace this for the version you are using -->
<version>1.5.0</version>
<scope>test</scope>
</dependency>
```
### Declare a rule on your JUnit Test
```java
import org.apache.activemq.artemis.junit.EmbeddedJMSResource;
import org.junit.Rule;
import org.junit.Test;
public class MyTest {
@Rule
public EmbeddedJMSResource resource = new EmbeddedJMSResource();
@Test
public void myTest() {
}
}
```
This will start a server that will be available for your test:
```
ain] 17:00:16,644 INFO [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)
[main] 17:00:16,666 INFO [org.apache.activemq.artemis.core.server] AMQ221045: libaio is not available, switching the configuration into NIO
[main] 17:00:16,688 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
[main] 17:00:16,801 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
[main] 17:00:16,801 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 1.5.0-SNAPSHOT [embedded-jms-server, nodeID=39e78380-842c-11e6-9e43-f45c8992f3c7]
[main] 17:00:16,891 INFO [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 1.5.0-SNAPSHOT [39e78380-842c-11e6-9e43-f45c8992f3c7] stopped, uptime 0.272 seconds
```
### Ordering rules
This is actually a Junit feature, but this could be helpful on pre-determining the order on which rules are executed.
```java
ActiveMQDynamicProducerResource producer = new ActiveMQDynamicProducerResource(server.getVmURL());
@Rule
public RuleChain ruleChain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server).around(producer);
```
### Available Rules
Name | Description
:--- | :---
EmbeddedActiveMQResource | It will run a Server, without the JMS manager
EmbeddedJMSResource | It will run a Server, including the JMS Manager
ActiveMQConsumerResource | It will automate the creation of a consumer
ActiveMQProducerResource | It will automate the creation of a producer
ThreadLeakCheckRule | It will check that all threads have been finished after the test is finished

View File

@ -40,6 +40,7 @@
<module>artemis-selector</module>
<module>artemis-core-client</module>
<module>artemis-server</module>
<module>artemis-junit</module>
<module>artemis-jms-client</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>