Updated the Transport interface so that you can pass in a ResponseCallback object that will be called when the response for a request arrives.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@392288 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-04-07 13:31:59 +00:00
parent 064880b08a
commit 84fd773f27
17 changed files with 158 additions and 79 deletions

View File

@ -45,6 +45,8 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
@ -376,26 +378,47 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
try{
if(command.isMessageDispatch()){
waitStarted();
MessageDispatch md=(MessageDispatch) command;
final MessageDispatch md=(MessageDispatch) command;
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
if(sub!=null){
Message message= configureMessage(md);
if(trace)
log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
if(!message.isPersistent()||!sub.getRemoteInfo().isDurable()){
if( !message.isResponseRequired() ) {
// If the message was originally sent using async send, we will preserve that QOS
// by bridging it using an async send (small chance of message loss).
remoteBroker.oneway(message);
}else{
Response response=remoteBroker.request(message);
if(response.isException()){
ExceptionResponse er=(ExceptionResponse) response;
serviceLocalException(er.getException());
}
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
} else {
// The message was not sent using async send, so we should only ack the local
// broker when we get confirmation that the remote broker has received the message.
ResponseCallback callback = new ResponseCallback() {
public void onCompletion(FutureResponse future) {
try {
Response response = future.getResult();
if(response.isException()){
ExceptionResponse er=(ExceptionResponse) response;
serviceLocalException(er.getException());
} else {
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
}
} catch (IOException e) {
serviceLocalException(e);
}
}
};
remoteBroker.asyncRequest(message, callback);
}
// Ack on every message since we don't know if the broker is blocked due to memory
// usage and is waiting for an Ack to un-block him.
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
// Acking a range is more efficient, but also more prone to locking up a server
// Perhaps doing something like the following should be policy based.
// int dispatched = sub.incrementDispatched();

View File

@ -24,13 +24,17 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
@ -199,14 +203,14 @@ public class ForwardingBridge implements Bridge {
}
}
protected void serviceLocalException(IOException error) {
protected void serviceLocalException(Throwable error) {
System.out.println("Unexpected local exception: "+error);
error.printStackTrace();
}
protected void serviceLocalCommand(Command command) {
try {
if( command.isMessageDispatch() ) {
MessageDispatch md = (MessageDispatch) command;
final MessageDispatch md = (MessageDispatch) command;
Message message = md.getMessage();
message.setProducerId(producerInfo.getProducerId());
message.setDestination( md.getDestination() );
@ -216,11 +220,40 @@ public class ForwardingBridge implements Bridge {
message.setTransactionId(null);
message.evictMarshlledForm();
remoteBroker.oneway( message );
if( !message.isResponseRequired() ) {
// If the message was originally sent using async send, we will preserve that QOS
// by bridging it using an async send (small chance of message loss).
remoteBroker.oneway(message);
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
} else {
// The message was not sent using async send, so we should only ack the local
// broker when we get confirmation that the remote broker has received the message.
ResponseCallback callback = new ResponseCallback() {
public void onCompletion(FutureResponse future) {
try {
Response response = future.getResult();
if(response.isException()){
ExceptionResponse er=(ExceptionResponse) response;
serviceLocalException(er.getException());
} else {
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
}
} catch (IOException e) {
serviceLocalException(e);
}
}
};
remoteBroker.asyncRequest(message, callback);
}
// Ack on every message since we don't know if the broker is blocked due to memory
// usage and is waiting for an Ack to un-block him.
localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
// Acking a range is more efficient, but also more prone to locking up a server
// Perhaps doing something like the following should be policy based.

View File

@ -16,62 +16,47 @@
*/
package org.apache.activemq.transport;
import edu.emory.mathcs.backport.java.util.concurrent.Callable;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
import org.apache.activemq.command.Response;
import org.apache.activemq.util.IOExceptionSupport;
import java.io.IOException;
import java.io.InterruptedIOException;
public class FutureResponse extends FutureTask {
import org.apache.activemq.command.Response;
import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
public class FutureResponse {
private final ResponseCallback responseCallback;
private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
private static final Callable EMPTY_CALLABLE = new Callable() {
public Object call() throws Exception {
return null;
}};
public FutureResponse() {
super(EMPTY_CALLABLE);
public FutureResponse(ResponseCallback responseCallback) {
this.responseCallback = responseCallback;
}
public synchronized Response getResult() throws IOException {
public Response getResult() throws IOException {
try {
return (Response) super.get();
return (Response) responseSlot.take();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
} catch (ExecutionException e) {
Throwable target = e.getCause();
if( target instanceof IOException ) {
throw (IOException)target;
} else {
throw IOExceptionSupport.create(target);
}
}
}
public synchronized Response getResult(int timeout) throws IOException {
public Response getResult(int timeout) throws IOException {
try {
return (Response) super.get(timeout,TimeUnit.MILLISECONDS);
return (Response) responseSlot.poll(timeout,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
} catch (ExecutionException e) {
Throwable target = e.getCause();
if( target instanceof IOException ) {
throw (IOException)target;
} else {
throw IOExceptionSupport.create(target);
}
}catch(TimeoutException e){
return null;
}
}
public synchronized void set(Object result) {
super.set(result);
public void set(Response result) throws InterruptedIOException {
try {
responseSlot.put(result);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
}
if( responseCallback !=null ) {
responseCallback.onCompletion(this);
}
}
}

View File

@ -33,9 +33,9 @@ public class MutexTransport extends TransportFilter {
super(next);
}
public FutureResponse asyncRequest(Command command) throws IOException {
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
synchronized(writeMutex) {
return next.asyncRequest(command);
return next.asyncRequest(command, null);
}
}

View File

@ -0,0 +1,24 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport;
/**
* @version $Revision$
*/
public interface ResponseCallback {
void onCompletion(FutureResponse resp);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
@ -55,34 +56,38 @@ public class ResponseCorrelator extends TransportFilter {
next.oneway(command);
}
public FutureResponse asyncRequest(Command command) throws IOException {
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(true);
FutureResponse future = new FutureResponse();
FutureResponse future = new FutureResponse(responseCallback);
requestMap.put(new Integer(command.getCommandId()), future);
next.oneway(command);
return future;
}
public Response request(Command command) throws IOException {
FutureResponse response = asyncRequest(command);
FutureResponse response = asyncRequest(command, null);
return response.getResult();
}
public Response request(Command command,int timeout) throws IOException {
FutureResponse response = asyncRequest(command);
FutureResponse response = asyncRequest(command, null);
return response.getResult(timeout);
}
public void onCommand(Command command) {
boolean debug = log.isDebugEnabled();
if( command.isResponse() ) {
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
if( future!=null ) {
future.set(response);
} else {
if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
try {
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
if( future!=null ) {
future.set(response);
} else {
if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
}
} catch (InterruptedIOException e) {
onException(e);
}
} else {
getTransportListener().onCommand(command);

View File

@ -39,12 +39,15 @@ public interface Transport extends Service {
/**
* An asynchronous request response where the Receipt will be returned
* in the future
* in the future. If responseCallback is not null, then it will be called
* when the response has been completed.
*
* @param command
* @param responseCallback TODO
* @return the FutureResponse
* @throws IOException
*/
public FutureResponse asyncRequest(Command command) throws IOException;
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException;
/**
* A synchronous request response

View File

@ -86,8 +86,8 @@ public class TransportFilter extends DefaultTransportListener implements Transpo
next.oneway(command);
}
public FutureResponse asyncRequest(Command command) throws IOException {
return next.asyncRequest(command);
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
return next.asyncRequest(command, null);
}
public Response request(Command command) throws IOException {

View File

@ -66,7 +66,7 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
return null;
}
public FutureResponse asyncRequest(Command command) throws IOException {
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}

View File

@ -27,6 +27,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@ -96,7 +97,7 @@ public class ActiveIOTransport implements Transport {
commandChannel.writeCommand(command);
}
public FutureResponse asyncRequest(Command command) throws IOException {
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@ -387,7 +388,7 @@ public class FailoverTransport implements CompositeTransport {
}
}
public FutureResponse asyncRequest(Command command) throws IOException {
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@ -411,7 +412,7 @@ public class FanoutTransport implements CompositeTransport {
return true;
}
public FutureResponse asyncRequest(Command command) throws IOException {
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
@ -95,8 +96,8 @@ public class MockTransport extends DefaultTransportListener implements Transport
next.oneway(command);
}
synchronized public FutureResponse asyncRequest(Command command) throws IOException {
return next.asyncRequest(command);
synchronized public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
return next.asyncRequest(command, null);
}
synchronized public Response request(Command command) throws IOException {

View File

@ -78,7 +78,7 @@ public class ReliableTransport extends ResponseCorrelator {
}
public Response request(Command command) throws IOException {
FutureResponse response = asyncRequest(command);
FutureResponse response = asyncRequest(command, null);
while (true) {
Response result = response.getResult(requestTimeout);
if (result != null) {
@ -89,7 +89,7 @@ public class ReliableTransport extends ResponseCorrelator {
}
public Response request(Command command, int timeout) throws IOException {
FutureResponse response = asyncRequest(command);
FutureResponse response = asyncRequest(command, null);
while (timeout > 0) {
int time = timeout;
if (timeout > requestTimeout) {

View File

@ -26,6 +26,7 @@ import java.util.List;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
@ -79,7 +80,7 @@ public class VMTransport implements Transport{
}
}
public FutureResponse asyncRequest(Command command) throws IOException{
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException{
throw new AssertionError("Unsupported Method");
}

View File

@ -110,6 +110,7 @@ public class VMTransportFactory extends TransportFactory{
if(server==null){
server=(VMTransportServer) bind(location,true);
TransportConnector connector=new TransportConnector(broker.getBroker(),server);
connector.setUri(location);
connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() );
connector.start();
connectors.put(host,connector);

View File

@ -37,7 +37,7 @@ public class BadConnectionTest extends TestCase {
public void testConnectingToUnavailableServer() throws Exception {
try {
transport.asyncRequest(new ActiveMQMessage());
transport.asyncRequest(new ActiveMQMessage(), null);
fail("This should never succeed");
}
catch (IOException e) {