From a1062c273f8c02b6b6311397d2c0e4059054a1a8 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Fri, 12 Jun 2015 13:41:05 +0800 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5811 Added synchronization blocks around sentitive code to prevent concurrent modification of the HashMap. --- .../activemq/ra/ActiveMQResourceAdapter.java | 14 ++-- .../java/org/apache/activemq/ra/MDBTest.java | 72 +++++++++++++++++++ 2 files changed, 81 insertions(+), 5 deletions(-) diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index 9ae948a2d8..8c5d4ca0c0 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -147,9 +147,11 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement * @see javax.resource.spi.ResourceAdapter#stop() */ public void stop() { - while (endpointWorkers.size() > 0) { - ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next(); - endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); + synchronized (endpointWorkers) { + while (endpointWorkers.size() > 0) { + ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next(); + endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); + } } synchronized( this ) { @@ -205,10 +207,12 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement * javax.resource.spi.ActivationSpec) */ public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { - if (activationSpec instanceof MessageActivationSpec) { ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); - ActiveMQEndpointWorker worker = endpointWorkers.remove(key); + ActiveMQEndpointWorker worker = null; + synchronized (endpointWorkers) { + worker = endpointWorkers.remove(key); + } if (worker == null) { // This is weird.. that endpoint was not activated.. oh well.. // this method diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java index af36389fcf..89c80c039e 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java @@ -274,6 +274,78 @@ public class MDBTest extends TestCase { } + //https://issues.apache.org/jira/browse/AMQ-5811 + public void testAsyncStop() throws Exception { + for (int repeat = 0; repeat < 10; repeat++) { + ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); + adapter.setServerUrl("vm://localhost?broker.persistent=false"); + adapter.setQueuePrefetch(1); + adapter.start(new StubBootstrapContext()); + + final int num = 20; + MessageEndpointFactory[] endpointFactories = new MessageEndpointFactory[num]; + ActiveMQActivationSpec[] activationSpecs = new ActiveMQActivationSpec[num]; + + for (int i = 0; i < num; i++) { + + final StubMessageEndpoint endpoint = new StubMessageEndpoint() + { + public void onMessage(Message message) + { + super.onMessage(message); + } + }; + + activationSpecs[i] = new ActiveMQActivationSpec(); + activationSpecs[i].setDestinationType(Queue.class.getName()); + activationSpecs[i].setDestination("TEST" + i); + activationSpecs[i].setResourceAdapter(adapter); + activationSpecs[i].validate(); + + endpointFactories[i] = new MessageEndpointFactory() { + public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { + endpoint.xaresource = resource; + return endpoint; + } + + public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { + return true; + } + }; + + // Activate an Endpoint + adapter.endpointActivation(endpointFactories[i], activationSpecs[i]); + } + + //spawn num threads to deactivate + Thread[] threads = asyncDeactivate(adapter, endpointFactories, activationSpecs); + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + adapter.stop(); + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + } + } + + private Thread[] asyncDeactivate(final ActiveMQResourceAdapter adapter, + final MessageEndpointFactory[] endpointFactories, + final ActiveMQActivationSpec[] activationSpecs) { + Thread[] threads = new Thread[endpointFactories.length]; + for (int i = 0; i < threads.length; i++) { + final MessageEndpointFactory endpointFactory = endpointFactories[i]; + final ActiveMQActivationSpec activationSpec = activationSpecs[i]; + + threads[i] = new Thread() { + public void run() { + adapter.endpointDeactivation(endpointFactory, activationSpec); + } + }; + } + return threads; + } + public void testErrorOnNoMessageDeliveryBrokerZeroPrefetchConfig() throws Exception { final BrokerService brokerService = new BrokerService();