Fixes AMQ-3769: Support doing non-blocking sends that uses an async callback that gets notified when the send has been received by the broker

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1300727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-03-14 21:32:18 +00:00
parent a7e7bce0ae
commit d4cd7f9eed
5 changed files with 241 additions and 12 deletions

View File

@ -24,13 +24,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -91,6 +85,8 @@ import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.failover.FailoverTransport;
@ -1288,6 +1284,63 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @return
* @throws JMSException
*/
public void syncSendPacket(Command command, final AsyncCallback onComplete) throws JMSException {
if(onComplete==null) {
syncSendPacket(command);
} else {
if (isClosed()) {
throw new ConnectionClosedException();
}
try {
this.transport.asyncRequest(command, new ResponseCallback() {
@Override
public void onCompletion(FutureResponse resp) {
Response response;
Throwable exception = null;
try {
response = resp.getResult();
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
exception = er.getException();
}
} catch (Exception e) {
exception = e;
}
if(exception!=null) {
if ( exception instanceof JMSException) {
onComplete.onException((JMSException) exception);
} else {
if (isClosed()||closing.get()) {
LOG.debug("Received an exception but connection is closing");
}
JMSException jmsEx = null;
try {
jmsEx = JMSExceptionSupport.create(exception);
} catch(Throwable e) {
LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
}
//dispose of transport for security exceptions
if (exception instanceof SecurityException){
Transport t = transport;
if (null != t){
ServiceSupport.dispose(t);
}
}
if (jmsEx !=null) {
onComplete.onException(jmsEx);
}
}
} else {
onComplete.onSuccess();
}
}
});
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
}
public Response syncSendPacket(Command command) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();

View File

@ -209,6 +209,36 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
* @since 1.1
*/
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
this.send(destination, message, deliveryMode, priority, timeToLive, null);
}
public void send(Message message, AsyncCallback onComplete) throws JMSException {
this.send(this.getDestination(),
message,
this.defaultDeliveryMode,
this.defaultPriority,
this.defaultTimeToLive, onComplete);
}
public void send(Destination destination, Message message, AsyncCallback onComplete) throws JMSException {
this.send(destination,
message,
this.defaultDeliveryMode,
this.defaultPriority,
this.defaultTimeToLive,
onComplete);
}
public void send(Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
this.send(this.getDestination(),
message,
deliveryMode,
priority,
timeToLive,
onComplete);
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination == null) {
if (info.getDestination() == null) {
@ -244,7 +274,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
}
}
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
stats.onMessage();
}

View File

@ -1702,6 +1702,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
/**
* Sends the message for dispatch by the broker.
*
*
* @param producer - message producer.
* @param destination - message destination.
* @param message - message to be sent.
@ -1709,10 +1710,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @param priority - message priority.
* @param timeToLive - message expiration.
* @param producerWindow
* @param onComplete
* @throws JMSException
*/
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout) throws JMSException {
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
@ -1763,7 +1765,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
@ -1777,10 +1779,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0) {
if (sendTimeout > 0 && onComplete==null) {
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg);
this.connection.syncSendPacket(msg, onComplete);
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.jms.ExceptionListener;
/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface AsyncCallback extends ExceptionListener {
public void onSuccess();
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.jms.*;
import javax.jms.Message;
import java.util.concurrent.CountDownLatch;
/**
*
*/
public class JmsSendWithAsyncCallbackTest extends TestSupport {
private Connection connection;
protected void setUp() throws Exception {
super.setUp();
connection = createConnection();
}
/**
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
connection = null;
}
super.tearDown();
}
public void testAsyncCallbackIsFaster() throws JMSException, InterruptedException {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getName());
// setup a consumer to drain messages..
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
// warmup...
for(int i=0; i < 10; i++) {
benchmarkNonCallbackRate();
benchmarkCallbackRate();
}
double callbackRate = benchmarkCallbackRate();
double nonCallbackRate = benchmarkNonCallbackRate();
System.out.println(String.format("AsyncCallback Send rate: %,.2f m/s", callbackRate));
System.out.println(String.format("NonAsyncCallback Send rate: %,.2f m/s", nonCallbackRate));
// The async style HAS to be faster than the non-async style..
assertTrue( callbackRate/nonCallbackRate > 1.5 );
}
private double benchmarkNonCallbackRate() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getName());
int count = 1000;
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
producer.send(session.createTextMessage("Hello"));
}
return 1000.0 * count / (System.currentTimeMillis() - start);
}
private double benchmarkCallbackRate() throws JMSException, InterruptedException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getName());
int count = 1000;
final CountDownLatch messagesSent = new CountDownLatch(count);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
producer.send(session.createTextMessage("Hello"), new AsyncCallback() {
@Override
public void onSuccess() {
messagesSent.countDown();
}
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
}
messagesSent.await();
return 1000.0 * count / (System.currentTimeMillis() - start);
}
}