Clean up, remove sleep and some unused code.
This commit is contained in:
Timothy Bish 2016-05-10 09:56:19 -04:00
parent d3ea5c4f9f
commit 809d5b9bc8
1 changed files with 39 additions and 27 deletions

View File

@ -16,15 +16,17 @@
*/ */
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
@ -33,17 +35,26 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class AMQ2616Test {
@Rule
public TestName test = new TestName();
public class AMQ2616Test extends TestCase {
private static final int NUMBER = 2000; private static final int NUMBER = 2000;
private BrokerService brokerService; private BrokerService brokerService;
private final ArrayList<Thread> threads = new ArrayList<Thread>();
private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0"; private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0";
private final AtomicBoolean shutdown = new AtomicBoolean();
private String connectionUri; private String connectionUri;
public void testQueueResourcesReleased() throws Exception{ @Test(timeout = 90000)
public void testQueueResourcesReleased() throws Exception {
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri); ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri);
Connection tempConnection = fac.createConnection(); Connection tempConnection = fac.createConnection();
tempConnection.start(); tempConnection.start();
@ -51,26 +62,32 @@ public class AMQ2616Test extends TestCase {
Queue tempQueue = tempSession.createTemporaryQueue(); Queue tempQueue = tempSession.createTemporaryQueue();
Connection testConnection = fac.createConnection(); Connection testConnection = fac.createConnection();
long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); final long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer testProducer = testSession.createProducer(tempQueue); MessageProducer testProducer = testSession.createProducer(tempQueue);
byte[] payload = new byte[1024*4]; byte[] payload = new byte[1024 * 4];
for (int i = 0; i < NUMBER; i++ ) {
for (int i = 0; i < NUMBER; i++) {
BytesMessage msg = testSession.createBytesMessage(); BytesMessage msg = testSession.createBytesMessage();
msg.writeBytes(payload); msg.writeBytes(payload);
testProducer.send(msg); testProducer.send(msg);
} }
long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
assertFalse(startUsage==endUsage);
tempConnection.close();
Thread.sleep(1000);
endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
assertEquals(startUsage,endUsage);
}
long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
assertFalse(startUsage == endUsage);
tempConnection.close();
assertTrue("Usage should return to original", Wait.waitFor(new Wait.Condition() {
@Override @Override
protected void setUp() throws Exception { public boolean isSatisified() throws Exception {
return brokerService.getSystemUsage().getMemoryUsage().getUsage() == startUsage;
}
}));
}
@Before
public void setUp() throws Exception {
// Start an embedded broker up. // Start an embedded broker up.
brokerService = new BrokerService(); brokerService = new BrokerService();
@ -90,6 +107,7 @@ public class AMQ2616Test extends TestCase {
pe.setExpireMessagesPeriod(1000); pe.setExpireMessagesPeriod(1000);
pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
policyMap.put(new ActiveMQQueue(">"), pe); policyMap.put(new ActiveMQQueue(">"), pe);
brokerService.setDestinationPolicy(policyMap); brokerService.setDestinationPolicy(policyMap);
brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024); brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024);
brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024); brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024);
@ -99,18 +117,12 @@ public class AMQ2616Test extends TestCase {
connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
new ActiveMQQueue(getName()); new ActiveMQQueue(test.getMethodName());
} }
@Override @After
protected void tearDown() throws Exception { public void tearDown() throws Exception {
// Stop any running threads.
shutdown.set(true);
for (Thread t : threads) {
t.interrupt();
t.join();
}
brokerService.stop(); brokerService.stop();
brokerService = null;
} }
} }