git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1464584 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-04-04 15:00:05 +00:00
parent 77fe814b9e
commit 1b38caacf4
14 changed files with 166 additions and 204 deletions

View File

@ -26,14 +26,12 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.command.ActiveMQBlobMessage;
public class HttpBlobTest extends HttpTestSupport {
public void testBlobFile() throws Exception {
// first create Message
File file = File.createTempFile("amq-data-file-", ".dat");
@ -54,7 +52,7 @@ public class HttpBlobTest extends HttpTestSupport {
// check message send
Message msg = consumer.receive(1000);
Assert.assertTrue(msg instanceof ActiveMQBlobMessage);
assertTrue(msg instanceof ActiveMQBlobMessage);
InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
StringBuilder b = new StringBuilder();
@ -64,11 +62,11 @@ public class HttpBlobTest extends HttpTestSupport {
i = input.read();
}
input.close();
File uploaded = new File(homeDir, msg.getJMSMessageID().toString().replace(":", "_"));
Assert.assertEquals(content, b.toString());
File uploaded = new File(homeDir, msg.getJMSMessageID().toString().replace(":", "_"));
assertEquals(content, b.toString());
assertTrue(uploaded.exists());
((ActiveMQBlobMessage)msg).deleteFile();
assertFalse(uploaded.exists());
}
}

View File

@ -16,49 +16,45 @@
*/
package org.apache.activemq.pool;
import junit.framework.Assert;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
/**
* Checks the behavior of the PooledConnectionFactory when the maximum amount
* of sessions is being reached (maximumActive).
* When using setBlockIfSessionPoolIsFull(true) on the ConnectionFactory,
* further requests for sessions should block.
* If it does not block, its a bug.
* Checks the behavior of the PooledConnectionFactory when the maximum amount of sessions is being reached
* (maximumActive). When using setBlockIfSessionPoolIsFull(true) on the ConnectionFactory, further requests for sessions
* should block. If it does not block, its a bug.
*
* @author: tmielke
*/
public class PooledConnectionFactoryMaximumActiveTest extends TestCase
{
public class PooledConnectionFactoryMaximumActiveTest extends TestCase {
public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryMaximumActiveTest.class);
public static Connection conn = null;
public static int sleepTimeout = 5000;
private static ConcurrentHashMap<Integer, Session> sessions = new ConcurrentHashMap<Integer,Session>();
private static ConcurrentHashMap<Integer, Session> sessions = new ConcurrentHashMap<Integer, Session>();
/**
* Create the test case
*
* @param testName name of the test case
* @param testName
* name of the test case
*/
public PooledConnectionFactoryMaximumActiveTest( String testName )
{
super( testName );
public PooledConnectionFactoryMaximumActiveTest(String testName) {
super(testName);
}
public static void addSession(Session s) {
@ -68,25 +64,18 @@ public class PooledConnectionFactoryMaximumActiveTest extends TestCase
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( PooledConnectionFactoryMaximumActiveTest.class );
public static Test suite() {
return new TestSuite(PooledConnectionFactoryMaximumActiveTest.class);
}
/**
* Tests the behavior of the sessionPool of the PooledConnectionFactory
* when maximum number of sessions are reached. This test uses
* maximumActive=1.
* When creating two threads that both
* try to create a JMS session from the same JMS connection,
* the thread that is second to call createSession()
* should block (as only 1 session is allowed) until the
* session is returned to pool.
* If it does not block, its a bug.
* Tests the behavior of the sessionPool of the PooledConnectionFactory when maximum number of sessions are reached.
* This test uses maximumActive=1. When creating two threads that both try to create a JMS session from the same JMS
* connection, the thread that is second to call createSession() should block (as only 1 session is allowed) until
* the session is returned to pool. If it does not block, its a bug.
*
*/
public void testApp() throws Exception
{
public void testApp() throws Exception {
// Initialize JMS connection
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
@ -101,22 +90,20 @@ public class PooledConnectionFactoryMaximumActiveTest extends TestCase
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(new TestRunner2());
// Thread.sleep(100);
Future<Boolean> result2 = (Future<Boolean>) executor.submit(new TestRunner2());
Future<Boolean> result2 = executor.submit(new TestRunner2());
// sleep to allow threads to run
Thread.sleep(sleepTimeout);
// second task should not have finished, instead wait on getting a
// JMS Session
Assert.assertEquals(false, result2.isDone());
assertEquals(false, result2.isDone());
//Only 1 session should have been created
Assert.assertEquals(1, sessions.size());
// Only 1 session should have been created
assertEquals(1, sessions.size());
// Take all threads down
executor.shutdownNow();
}
}
@ -127,6 +114,7 @@ class TestRunner2 implements Callable<Boolean> {
/**
* @return true if test succeeded, false otherwise
*/
@Override
public Boolean call() {
Session one = null;
@ -143,7 +131,6 @@ class TestRunner2 implements Callable<Boolean> {
LOG.info("Created new Session with id" + one);
PooledConnectionFactoryMaximumActiveTest.addSession(one);
Thread.sleep(2 * PooledConnectionFactoryMaximumActiveTest.sleepTimeout);
} catch (Exception ex) {
LOG.error(ex.getMessage());
return new Boolean(false);

View File

@ -27,7 +27,6 @@ import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import junit.framework.Assert;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
@ -176,7 +175,6 @@ public class PooledConnectionFactoryTest extends TestCase {
}
assertTrue("", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connections.size() == numConnections;
@ -203,17 +201,19 @@ public class PooledConnectionFactoryTest extends TestCase {
// start test runner thread
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Boolean> result = (Future<Boolean>) executor.submit(new TestRunner());
Future<Boolean> result = executor.submit(new TestRunner());
// test should not take > 5secs, so test fails i
Thread.sleep(5 * 1000);
if (!result.isDone() || !result.get().booleanValue()) {
PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" + " is blocking but should have returned an error instead.");
PooledConnectionFactoryTest.LOG.error("2nd call to createSession() " +
"is blocking but should have returned an error instead.");
executor.shutdownNow();
Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " + "limit is exceeded but should return an exception instead.");
fail("SessionPool inside PooledConnectionFactory is blocking if " +
"limit is exceeded but should return an exception instead.");
}
}
@ -224,6 +224,7 @@ public class PooledConnectionFactoryTest extends TestCase {
/**
* @return true if test succeeded, false otherwise
*/
@Override
public Boolean call() {
Connection conn = null;
@ -248,8 +249,9 @@ public class PooledConnectionFactoryTest extends TestCase {
two.close();
LOG.error("Expected JMSException wasn't thrown.");
Assert.fail("seconds call to Connection.createSession() was supposed" + "to raise an JMSException as internal session pool"
+ "is exhausted. This did not happen and indiates a problem");
fail("seconds call to Connection.createSession() was supposed" +
"to raise an JMSException as internal session pool" +
"is exhausted. This did not happen and indiates a problem");
return new Boolean(false);
} catch (JMSException ex) {
if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
@ -275,5 +277,3 @@ public class PooledConnectionFactoryTest extends TestCase {
}
}
}

View File

@ -16,100 +16,97 @@
*/
package org.apache.activemq.pool;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.IllegalStateException;
import junit.framework.TestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A couple of tests against the PooledConnection class.
*
*/
public class PooledConnectionTest extends TestCase {
private Logger log = LoggerFactory.getLogger(PooledConnectionTest.class);
@Override
public void setUp() throws Exception {
log.debug("setUp() called.");
}
@Override
public void tearDown() throws Exception {
log.debug("tearDown() called.");
}
/**
* AMQ-3752:
* Tests how the ActiveMQConnection reacts to repeated calls to
* setClientID().
*
* @throws Exception
*/
public void testRepeatedSetClientIDCalls() throws Exception {
log.debug("running testRepeatedSetClientIDCalls()");
// 1st test: call setClientID("newID") twice
// this should be tolerated and not result in an exception
//
ConnectionFactory cf = createPooledConnectionFactory();
Connection conn = cf.createConnection();
conn.setClientID("newID");
try {
conn.setClientID("newID");
conn.start();
conn.close();
cf = null;
} catch (IllegalStateException ise) {
log.error("Repeated calls to ActiveMQConnection.setClientID(\"newID\") caused " + ise.getMessage());
Assert.fail("Repeated calls to ActiveMQConnection.setClientID(\"newID\") caused " + ise.getMessage());
}
// 2nd test: call setClientID() twice with different IDs
// this should result in an IllegalStateException
//
cf = createPooledConnectionFactory();
conn = cf.createConnection();
conn.setClientID("newID1");
try {
conn.setClientID("newID2");
Assert.fail("calling ActiveMQConnection.setClientID() twice with different clientID must raise an IllegalStateException");
} catch (IllegalStateException ise) {
log.debug("Correctly received " + ise);
private final Logger log = LoggerFactory.getLogger(PooledConnectionTest.class);
@Override
public void setUp() throws Exception {
log.debug("setUp() called.");
}
@Override
public void tearDown() throws Exception {
log.debug("tearDown() called.");
}
/**
* AMQ-3752:
* Tests how the ActiveMQConnection reacts to repeated calls to
* setClientID().
*
* @throws Exception
*/
public void testRepeatedSetClientIDCalls() throws Exception {
log.debug("running testRepeatedSetClientIDCalls()");
// 1st test: call setClientID("newID") twice
// this should be tolerated and not result in an exception
//
ConnectionFactory cf = createPooledConnectionFactory();
Connection conn = cf.createConnection();
conn.setClientID("newID");
try {
conn.setClientID("newID");
conn.start();
conn.close();
cf = null;
} catch (IllegalStateException ise) {
log.error("Repeated calls to ActiveMQConnection.setClientID(\"newID\") caused " + ise.getMessage());
fail("Repeated calls to ActiveMQConnection.setClientID(\"newID\") caused " + ise.getMessage());
}
// 2nd test: call setClientID() twice with different IDs
// this should result in an IllegalStateException
//
cf = createPooledConnectionFactory();
conn = cf.createConnection();
conn.setClientID("newID1");
try {
conn.setClientID("newID2");
fail("calling ActiveMQConnection.setClientID() twice with different clientID must raise an IllegalStateException");
} catch (IllegalStateException ise) {
log.debug("Correctly received " + ise);
} finally {
conn.close();
}
// 3rd test: try to call setClientID() after start()
// should result in an exception
cf = createPooledConnectionFactory();
conn = cf.createConnection();
try {
conn.start();
conn.setClientID("newID3");
Assert.fail("Calling setClientID() after start() mut raise a JMSException.");
} catch (IllegalStateException ise) {
log.debug("Correctly received " + ise);
// 3rd test: try to call setClientID() after start()
// should result in an exception
cf = createPooledConnectionFactory();
conn = cf.createConnection();
try {
conn.start();
conn.setClientID("newID3");
fail("Calling setClientID() after start() mut raise a JMSException.");
} catch (IllegalStateException ise) {
log.debug("Correctly received " + ise);
} finally {
conn.close();
}
log.debug("Test finished.");
}
protected ConnectionFactory createPooledConnectionFactory() {
ConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
((PooledConnectionFactory)cf).setMaxConnections(1);
log.debug("ConnectionFactory initialized.");
return cf;
}
log.debug("Test finished.");
}
protected ConnectionFactory createPooledConnectionFactory() {
ConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
((PooledConnectionFactory)cf).setMaxConnections(1);
log.debug("ConnectionFactory initialized.");
return cf;
}
}

View File

@ -23,15 +23,12 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class PooledSessionTest {
private Logger LOG = Logger.getLogger(getClass());
private BrokerService broker;
private ActiveMQConnectionFactory factory;
private PooledConnectionFactory pooledFactory;
@ -70,5 +67,4 @@ public class PooledSessionTest {
assertEquals(1, connection.getNumtIdleSessions());
assertEquals(1, connection.getNumSessions());
}
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.pool;
import java.util.Hashtable;
import java.util.Vector;
import javax.jms.BytesMessage;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
@ -40,6 +40,7 @@ import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.ActiveMQXASession;

View File

@ -21,7 +21,6 @@ import java.io.File;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.slf4j.Logger;
@ -34,22 +33,16 @@ import org.springframework.core.io.Resource;
*/
public class LevelDBConfigTest extends TestCase {
protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/store/leveldb/";
private static final Logger LOG = LoggerFactory.getLogger(LevelDBConfigTest.class);
/*
* This tests configuring the different broker properties using
* xbeans-spring
*/
public void testBrokerConfig() throws Exception {
ActiveMQTopic dest;
BrokerService broker;
// Create broker from resource
// System.out.print("Creating broker... ");
broker = createBroker("org/apache/activemq/store/leveldb/leveldb.xml");
@ -95,8 +88,6 @@ public class LevelDBConfigTest extends TestCase {
}
}
/*
* TODO: Create additional tests for forwarding bridges
*/

View File

@ -34,8 +34,7 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.Assert;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -44,7 +43,6 @@ import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
@ -69,12 +67,14 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
private static final int NUM_MESSAGE_TO_SEND = 10;
private AtomicInteger messageCount = new AtomicInteger();
private final AtomicInteger messageCount = new AtomicInteger();
private CountDownLatch doneLatch;
@Override
public void setUp() throws Exception {
}
@Override
public void tearDown() throws Exception {
}
@ -111,18 +111,15 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
brokerService1.stop();
brokerService1 = null;
}
}
// This should fail with incubator-activemq-fuse-4.1.0.5
@ -162,7 +159,7 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
} finally {
container1.stop();
@ -214,9 +211,8 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
executor.shutdownNow();
Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
} finally {
container1.stop();
container1.destroy();
container1 = null;
@ -268,7 +264,6 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
}
return brokerService;
}
public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, final MessageListener listener, final String queue) {
@ -300,26 +295,21 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
public TestMessageListener1(long waitTime) {
this.waitTime = waitTime;
}
@Override
public void onMessage(Message msg) {
try {
LOG.info("Listener1 Consumed message " + msg.getIntProperty("count"));
messageCount.incrementAndGet();
doneLatch.countDown();
Thread.sleep(waitTime);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@ -334,6 +324,7 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
this.queueName = queueName;
}
@Override
public void run() {
try {
@ -355,6 +346,7 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
final BytesMessage message = session.createBytesMessage();
@ -387,6 +379,7 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
this.queueName = queueName;
}
@Override
public void run() {
try {
@ -408,6 +401,7 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
final BytesMessage message = session.createBytesMessage();
@ -429,5 +423,4 @@ public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
}
}
}
}

View File

@ -34,9 +34,7 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.test.*;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -241,6 +239,7 @@ public class AMQDeadlockTestW4Brokers extends org.apache.activemq.test.TestSuppo
this.waitTime = waitTime;
}
@Override
public void onMessage(Message msg) {
try {
@ -276,6 +275,7 @@ public class AMQDeadlockTestW4Brokers extends org.apache.activemq.test.TestSuppo
this.producerName = producerName;
}
@Override
public void run() {
try {
@ -294,6 +294,7 @@ public class AMQDeadlockTestW4Brokers extends org.apache.activemq.test.TestSuppo
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
final int count = i;
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
final BytesMessage message = session.createBytesMessage();

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@ -32,10 +33,8 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.test.*;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -57,7 +56,7 @@ public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
private static final int NUM_MESSAGE_TO_SEND = 10000;
private static final int TOTAL_MESSAGES = MAX_PRODUCERS * NUM_MESSAGE_TO_SEND;
private static final boolean USE_FAILOVER = true;
private AtomicInteger messageCount = new AtomicInteger();
private final AtomicInteger messageCount = new AtomicInteger();
private CountDownLatch doneLatch;
@Override
@ -92,7 +91,7 @@ public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
assertTrue(doneLatch.await(45, TimeUnit.SECONDS));
executor.shutdown();
// Thread.sleep(30000);
Assert.assertEquals(TOTAL_MESSAGES, messageCount.get());
assertEquals(TOTAL_MESSAGES, messageCount.get());
} finally {
container1.stop();
container1.destroy();
@ -165,13 +164,13 @@ public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
this.waitTime = waitTime;
}
@Override
public void onMessage(Message msg) {
try {
messageCount.incrementAndGet();
doneLatch.countDown();
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@ -187,6 +186,7 @@ public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
this.queueName = queueName;
}
@Override
public void run() {
try {
final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
@ -203,6 +203,7 @@ public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
final BytesMessage message = session.createBytesMessage();
message.writeBytes(bytes);

View File

@ -16,11 +16,15 @@
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertTrue;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Vector;
import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.security.JaasDualAuthenticationPlugin;
@ -31,9 +35,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static junit.framework.Assert.assertTrue;
// https://issues.apache.org/jira/browse/AMQ-3393
public class ConnectTest {
private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class);
@ -64,6 +65,7 @@ public class ConnectTest {
Thread t1 = new Thread() {
StompConnection connection = new StompConnection();
@Override
public void run() {
try {
connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
@ -105,6 +107,7 @@ public class ConnectTest {
Thread t1 = new Thread() {
StompConnection connection = new StompConnection();
@Override
public void run() {
try {
connection.open("localhost", listenPort);
@ -146,6 +149,7 @@ public class ConnectTest {
Thread t1 = new Thread() {
StompConnection connection = new StompConnection();
@Override
public void run() {
try {
connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort());

View File

@ -17,16 +17,13 @@
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
@ -37,9 +34,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class StompPrefetchTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompPrefetchTest.class);

View File

@ -16,7 +16,9 @@
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.net.URI;
@ -28,8 +30,6 @@ import java.util.concurrent.CountDownLatch;
import javax.management.ObjectName;
import junit.framework.Assert;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
@ -130,7 +130,7 @@ public class StompVirtualTopicTest extends StompTestSupport {
// check if consumer set failMsg, then let the test fail.
if (null != failMsg) {
LOG.error(failMsg);
Assert.fail(failMsg);
fail(failMsg);
}
}
@ -189,7 +189,7 @@ public class StompVirtualTopicTest extends StompTestSupport {
// another receive should not return any more msgs
try {
frame = stompConnection.receive(3000);
Assert.assertNull(frame);
assertNull(frame);
} catch (Exception e) {
LOG.info("Correctly received " + e + " while trying to consume an additional msg." +
" This is expected as the queue should be empty now.");

View File

@ -16,14 +16,13 @@
*/
package org.apache.activemq.network;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
import org.apache.activemq.util.Wait;
import org.junit.Test;
public class NetworkLoopBackTest {
@Test