AMQ-5400 - rework to remove static lock - impacted parallel delivery and hense performance. Fix and additional tet

This commit is contained in:
gtully 2015-07-16 12:38:52 +01:00
parent 2b5b890db9
commit c85c7c1472
2 changed files with 79 additions and 4 deletions

View File

@ -200,7 +200,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
private static final Object REDELIVERY_GUARD = new Object();
private final ThreadPoolExecutor connectionExecutor;
protected int acknowledgementMode;
@ -220,7 +219,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected boolean asyncDispatch;
protected boolean sessionAsyncDispatch;
protected final boolean debug;
protected Object sendMutex = new Object();
protected final Object sendMutex = new Object();
protected final Object redeliveryGuard = new Object();
private final AtomicBoolean clearInProgress = new AtomicBoolean();
private MessageListener messageListener;
@ -930,7 +931,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
* We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
* */
synchronized (REDELIVERY_GUARD) {
synchronized (redeliveryGuard) {
try {
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
@ -1011,7 +1012,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
/*
* wait for the first delivery to be complete, i.e. after delivery has been called.
* */
synchronized (REDELIVERY_GUARD) {
synchronized (redeliveryGuard) {
/*
* If its non blocking then we can just dispatch in a new session.
* */

View File

@ -312,6 +312,80 @@ public class MDBTest {
adapter.stop();
}
@Test
public void testParallelMessageDelivery() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl("vm://localhost?broker.persistent=false");
adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(10);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
@Override
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
}
@Override
public void afterDelivery() throws ResourceException {
}
public void onMessage(Message message) {
LOG.info("Message:" + message);
super.onMessage(message);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
messageDelivered.countDown();
};
};
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setDestination("TEST");
activationSpec.setResourceAdapter(adapter);
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return false;
}
};
// Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec);
// Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
for (int i=0;i<10;i++) {
producer.send(session.createTextMessage(i+"-Hello!"));
}
connection.close();
// Wait for the message to be delivered.
assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
//https://issues.apache.org/jira/browse/AMQ-5811
@Test(timeout = 90000)
public void testAsyncStop() throws Exception {