mirror of https://github.com/apache/activemq.git
test case that shows something is wrong with start logic on pooled connection factory, the vm test variant was getting serialized on the broker vm transport server
This commit is contained in:
parent
27b3a7c344
commit
736ffc9b96
|
@ -34,9 +34,11 @@ import junit.framework.TestSuite;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.command.ConnectionId;
|
import org.apache.activemq.command.ConnectionId;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks the behavior of the PooledConnectionFactory when the maximum amount of
|
* Checks the behavior of the PooledConnectionFactory when the maximum amount of
|
||||||
|
@ -203,50 +205,58 @@ public class PooledConnectionFactoryTest extends TestCase {
|
||||||
doTestConcurrentCreateGetsUniqueConnection(false);
|
doTestConcurrentCreateGetsUniqueConnection(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore("something up - don't know why the start call to createConnection does not cause close - but that does not fix it either!")
|
||||||
public void testConcurrentCreateGetsUniqueConnectionCreateOnStart() throws Exception {
|
public void testConcurrentCreateGetsUniqueConnectionCreateOnStart() throws Exception {
|
||||||
doTestConcurrentCreateGetsUniqueConnection(true);
|
doTestConcurrentCreateGetsUniqueConnection(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doTestConcurrentCreateGetsUniqueConnection(boolean createOnStart) throws Exception {
|
private void doTestConcurrentCreateGetsUniqueConnection(boolean createOnStart) throws Exception {
|
||||||
|
|
||||||
final int numConnections = 50;
|
BrokerService brokerService = new BrokerService();
|
||||||
|
brokerService.setPersistent(false);
|
||||||
|
brokerService.addConnector("tcp://localhost:0");
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
|
try {
|
||||||
final PooledConnectionFactory cf = new PooledConnectionFactory();
|
final int numConnections = 2;
|
||||||
cf.setConnectionFactory(amq);
|
|
||||||
cf.setMaxConnections(numConnections);
|
|
||||||
cf.setCreateConnectionOnStartup(createOnStart);
|
|
||||||
|
|
||||||
final ConcurrentHashMap<ConnectionId, Connection> connections =
|
final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
|
||||||
new ConcurrentHashMap<ConnectionId, Connection>();
|
final PooledConnectionFactory cf = new PooledConnectionFactory();
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(numConnections / 2);
|
cf.setConnectionFactory(amq);
|
||||||
|
cf.setMaxConnections(numConnections);
|
||||||
|
cf.setCreateConnectionOnStartup(createOnStart);
|
||||||
|
cf.start();
|
||||||
|
|
||||||
for (int i = 0; i < numConnections; ++i) {
|
final ConcurrentHashMap<ConnectionId, Connection> connections =
|
||||||
executor.execute(new Runnable() {
|
new ConcurrentHashMap<ConnectionId, Connection>();
|
||||||
|
final ExecutorService executor = Executors.newFixedThreadPool(numConnections);
|
||||||
|
|
||||||
@Override
|
for (int i = 0; i < numConnections; ++i) {
|
||||||
public void run() {
|
executor.execute(new Runnable() {
|
||||||
try {
|
|
||||||
PooledConnection pooled = (PooledConnection) cf.createConnection();
|
@Override
|
||||||
ActiveMQConnection amq = (ActiveMQConnection) pooled.getConnection();
|
public void run() {
|
||||||
connections.put(amq.getConnectionInfo().getConnectionId(), pooled);
|
try {
|
||||||
} catch (JMSException e) {
|
PooledConnection pooled = (PooledConnection) cf.createConnection();
|
||||||
|
ActiveMQConnection amq = (ActiveMQConnection) pooled.getConnection();
|
||||||
|
connections.put(amq.getConnectionInfo().getConnectionId(), pooled);
|
||||||
|
} catch (JMSException e) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue("Should have all unique connections", Wait.waitFor(new Wait.Condition() {
|
|
||||||
@Override
|
|
||||||
public boolean isSatisified() throws Exception {
|
|
||||||
return connections.size() == numConnections;
|
|
||||||
}
|
}
|
||||||
}));
|
|
||||||
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
|
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
|
||||||
connections.clear();
|
|
||||||
cf.stop();
|
assertEquals("Should have all unique connections", numConnections, connections.size());
|
||||||
|
|
||||||
|
connections.clear();
|
||||||
|
cf.stop();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
brokerService.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue