Ensure that connections using VM transport are closed.
This commit is contained in:
Timothy Bish 2016-06-09 18:37:09 -04:00
parent c49db029ab
commit 9f5fff795d
5 changed files with 48 additions and 49 deletions

View File

@ -35,7 +35,7 @@ import org.apache.ftpserver.usermanager.impl.WritePermission;
import org.jmock.Mockery;
public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
protected static final String ftpServerListenerName = "default";
protected Connection connection;
protected FtpServer server;
@ -44,11 +44,12 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
Mockery context = null;
String ftpUrl;
int ftpPort;
final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest");
@Override
protected void setUp() throws Exception {
if (ftpHomeDirFile.getParentFile().exists()) {
IOHelper.deleteFile(ftpHomeDirFile.getParentFile());
}
@ -65,22 +66,22 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
user.setName("activemq");
user.setPassword("activemq");
user.setHomeDirectory(ftpHomeDirFile.getParent());
// authorize user
List<Authority> auths = new ArrayList<Authority>();
Authority auth = new WritePermission();
auths.add(auth);
user.setAuthorities(auths);
userManager.save(user);
BaseUser guest = new BaseUser();
guest.setName("guest");
guest.setPassword("guest");
guest.setHomeDirectory(ftpHomeDirFile.getParent());
userManager.save(guest);
serverFactory.setUserManager(userManager);
factory.setPort(0);
serverFactory.addListener(ftpServerListenerName, factory
@ -91,7 +92,7 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
.getPort();
super.setUp();
}
public void setConnection() throws Exception {
ftpUrl = "ftp://"
+ userNamePass
@ -101,16 +102,17 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
+ ftpPort
+ "/ftptest/";
bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + ftpUrl;
connectionFactory = createConnectionFactory();
connection = createConnection();
connection.start();
connection.start();
}
@Override
protected void tearDown() throws Exception {
if (connection != null) {
connection.stop();
connection.close();
}
super.tearDown();
if (server != null) {
@ -119,6 +121,6 @@ public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
IOHelper.deleteFile(ftpHomeDirFile.getParentFile());
}
}

View File

@ -95,7 +95,7 @@ public class FilesystemBlobTest extends EmbeddedBrokerTestSupport {
@Override
protected void tearDown() throws Exception {
if (connection != null) {
connection.stop();
connection.close();
}
super.tearDown();

View File

@ -38,7 +38,6 @@ import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -173,5 +172,7 @@ public class DestinationGCTest {
producer.close();
assertFalse(brokerService.getDestination(q).canGC());
connection.close();
}
}

View File

@ -29,8 +29,6 @@ import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
@ -47,6 +45,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import junit.framework.TestCase;
/**
* Unit test for virtual topics and DLQ messaging. See individual test for more
* detail
@ -74,6 +74,7 @@ public class VirtualTopicDLQTest extends TestCase {
// Number of messages
private static final int numberMessages = 6;
@Override
@Before
public void setUp() throws Exception {
try {
@ -86,6 +87,7 @@ public class VirtualTopicDLQTest extends TestCase {
}
}
@Override
@After
public void tearDown() throws Exception {
try {
@ -233,6 +235,7 @@ public class VirtualTopicDLQTest extends TestCase {
return latch;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory = null;
ActiveMQConnection connection = null;
@ -259,7 +262,7 @@ public class VirtualTopicDLQTest extends TestCase {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < numberMessages; i++) {
TextMessage message = (TextMessage) session.createTextMessage("I am a message :: " + String.valueOf(i));
TextMessage message = session.createTextMessage("I am a message :: " + String.valueOf(i));
try {
producer.send(message);
@ -275,18 +278,13 @@ public class VirtualTopicDLQTest extends TestCase {
} catch (Exception e) {
LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e);
e.printStackTrace();
} finally {
try {
// Clean up
if (session != null)
session.close();
if (connection != null)
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
}
}
@ -318,6 +316,7 @@ public class VirtualTopicDLQTest extends TestCase {
return latch;
}
@Override
public void run() {
try {
@ -354,24 +353,18 @@ public class VirtualTopicDLQTest extends TestCase {
} catch (Exception e) {
LOG.error("Consumer (" + destinationName + ") Caught: " + e);
e.printStackTrace();
} finally {
try {
// Clean up
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
}
}
}
@Override
public synchronized void onException(JMSException ex) {
ex.printStackTrace();
LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occured. Shutting down client.");
@ -381,6 +374,7 @@ public class VirtualTopicDLQTest extends TestCase {
this.bStop = bStop;
}
@Override
public synchronized void onMessage(Message message) {
receivedMessageCounter++;
latch.countDown();
@ -401,7 +395,6 @@ public class VirtualTopicDLQTest extends TestCase {
}
} catch (JMSException ex) {
ex.printStackTrace();
LOG.error("Error reading JMS Message from destination " + destinationName + ".");
}
}

View File

@ -20,6 +20,7 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
@ -73,16 +74,20 @@ public class VirtualTopicFanoutPerfTest {
@Test
@Ignore("comparison test - concurrentSend=true virtual topic, use transaction")
public void testFanoutDuration() throws Exception {
public void testFanoutDuration() throws Exception {
Connection connection1 = connectionFactory.createConnection();
connection1.start();
Session session = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i=0; i<numConsumers; i++) {
session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"));
}
// create topic producer
Session producerSession = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
Connection connection2 = connectionFactory.createConnection();
connection2.start();
Session producerSession = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(new ActiveMQTopic("VirtualTopic.TEST"));
long start = System.currentTimeMillis();
@ -92,13 +97,11 @@ public class VirtualTopicFanoutPerfTest {
}
LOG.info("Done producer, duration: " + (System.currentTimeMillis() - start) );
try {
connection1.close();
} catch (Exception ex) {}
try {
connection2.close();
} catch (Exception ex) {}
}
private Connection createStartAndTrackConnection() throws Exception {
Connection connection = connectionFactory.createConnection();
connection.start();
return connection;
}
}