This closes #112

This commit is contained in:
Timothy Bish 2015-06-19 11:13:55 -04:00
commit a35be76ff3
2 changed files with 81 additions and 5 deletions

View File

@ -147,9 +147,11 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
* @see javax.resource.spi.ResourceAdapter#stop()
*/
public void stop() {
while (endpointWorkers.size() > 0) {
ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
synchronized (endpointWorkers) {
while (endpointWorkers.size() > 0) {
ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
}
}
synchronized( this ) {
@ -205,10 +207,12 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
* javax.resource.spi.ActivationSpec)
*/
public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
if (activationSpec instanceof MessageActivationSpec) {
ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
ActiveMQEndpointWorker worker = endpointWorkers.remove(key);
ActiveMQEndpointWorker worker = null;
synchronized (endpointWorkers) {
worker = endpointWorkers.remove(key);
}
if (worker == null) {
// This is weird.. that endpoint was not activated.. oh well..
// this method

View File

@ -274,6 +274,78 @@ public class MDBTest extends TestCase {
}
//https://issues.apache.org/jira/browse/AMQ-5811
public void testAsyncStop() throws Exception {
for (int repeat = 0; repeat < 10; repeat++) {
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl("vm://localhost?broker.persistent=false");
adapter.setQueuePrefetch(1);
adapter.start(new StubBootstrapContext());
final int num = 20;
MessageEndpointFactory[] endpointFactories = new MessageEndpointFactory[num];
ActiveMQActivationSpec[] activationSpecs = new ActiveMQActivationSpec[num];
for (int i = 0; i < num; i++) {
final StubMessageEndpoint endpoint = new StubMessageEndpoint()
{
public void onMessage(Message message)
{
super.onMessage(message);
}
};
activationSpecs[i] = new ActiveMQActivationSpec();
activationSpecs[i].setDestinationType(Queue.class.getName());
activationSpecs[i].setDestination("TEST" + i);
activationSpecs[i].setResourceAdapter(adapter);
activationSpecs[i].validate();
endpointFactories[i] = new MessageEndpointFactory() {
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
};
// Activate an Endpoint
adapter.endpointActivation(endpointFactories[i], activationSpecs[i]);
}
//spawn num threads to deactivate
Thread[] threads = asyncDeactivate(adapter, endpointFactories, activationSpecs);
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
adapter.stop();
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
}
}
private Thread[] asyncDeactivate(final ActiveMQResourceAdapter adapter,
final MessageEndpointFactory[] endpointFactories,
final ActiveMQActivationSpec[] activationSpecs) {
Thread[] threads = new Thread[endpointFactories.length];
for (int i = 0; i < threads.length; i++) {
final MessageEndpointFactory endpointFactory = endpointFactories[i];
final ActiveMQActivationSpec activationSpec = activationSpecs[i];
threads[i] = new Thread() {
public void run() {
adapter.endpointDeactivation(endpointFactory, activationSpec);
}
};
}
return threads;
}
public void testErrorOnNoMessageDeliveryBrokerZeroPrefetchConfig() throws Exception {
final BrokerService brokerService = new BrokerService();