diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 752674e5e2..853d3a78ca 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -45,6 +45,8 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.transport.DefaultTransportListener; +import org.apache.activemq.transport.FutureResponse; +import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IdGenerator; @@ -376,26 +378,47 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { try{ if(command.isMessageDispatch()){ waitStarted(); - MessageDispatch md=(MessageDispatch) command; + final MessageDispatch md=(MessageDispatch) command; DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId()); if(sub!=null){ Message message= configureMessage(md); if(trace) log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message); - if(!message.isPersistent()||!sub.getRemoteInfo().isDurable()){ + + + if( !message.isResponseRequired() ) { + + // If the message was originally sent using async send, we will preserve that QOS + // by bridging it using an async send (small chance of message loss). remoteBroker.oneway(message); - }else{ - Response response=remoteBroker.request(message); - if(response.isException()){ - ExceptionResponse er=(ExceptionResponse) response; - serviceLocalException(er.getException()); - } + localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); + + } else { + + // The message was not sent using async send, so we should only ack the local + // broker when we get confirmation that the remote broker has received the message. + ResponseCallback callback = new ResponseCallback() { + public void onCompletion(FutureResponse future) { + try { + Response response = future.getResult(); + if(response.isException()){ + ExceptionResponse er=(ExceptionResponse) response; + serviceLocalException(er.getException()); + } else { + localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); + } + } catch (IOException e) { + serviceLocalException(e); + } + } + }; + + remoteBroker.asyncRequest(message, callback); } // Ack on every message since we don't know if the broker is blocked due to memory // usage and is waiting for an Ack to un-block him. - localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); - + // Acking a range is more efficient, but also more prone to locking up a server // Perhaps doing something like the following should be policy based. // int dispatched = sub.incrementDispatched(); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java index fce11eb27e..e0927d805d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java @@ -24,13 +24,17 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.transport.DefaultTransportListener; +import org.apache.activemq.transport.FutureResponse; +import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; @@ -199,14 +203,14 @@ public class ForwardingBridge implements Bridge { } } - protected void serviceLocalException(IOException error) { + protected void serviceLocalException(Throwable error) { System.out.println("Unexpected local exception: "+error); error.printStackTrace(); } protected void serviceLocalCommand(Command command) { try { if( command.isMessageDispatch() ) { - MessageDispatch md = (MessageDispatch) command; + final MessageDispatch md = (MessageDispatch) command; Message message = md.getMessage(); message.setProducerId(producerInfo.getProducerId()); message.setDestination( md.getDestination() ); @@ -216,11 +220,40 @@ public class ForwardingBridge implements Bridge { message.setTransactionId(null); message.evictMarshlledForm(); - remoteBroker.oneway( message ); + if( !message.isResponseRequired() ) { + + // If the message was originally sent using async send, we will preserve that QOS + // by bridging it using an async send (small chance of message loss). + remoteBroker.oneway(message); + localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); + + } else { + + // The message was not sent using async send, so we should only ack the local + // broker when we get confirmation that the remote broker has received the message. + ResponseCallback callback = new ResponseCallback() { + public void onCompletion(FutureResponse future) { + try { + Response response = future.getResult(); + if(response.isException()){ + ExceptionResponse er=(ExceptionResponse) response; + serviceLocalException(er.getException()); + } else { + localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); + } + } catch (IOException e) { + serviceLocalException(e); + } + } + }; + + remoteBroker.asyncRequest(message, callback); + } + + // Ack on every message since we don't know if the broker is blocked due to memory // usage and is waiting for an Ack to un-block him. - localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); // Acking a range is more efficient, but also more prone to locking up a server // Perhaps doing something like the following should be policy based. diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java b/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java index dcd6c6fa20..c506417b2b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java @@ -16,62 +16,47 @@ */ package org.apache.activemq.transport; -import edu.emory.mathcs.backport.java.util.concurrent.Callable; -import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException; -import edu.emory.mathcs.backport.java.util.concurrent.FutureTask; -import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; -import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException; - -import org.apache.activemq.command.Response; -import org.apache.activemq.util.IOExceptionSupport; - import java.io.IOException; import java.io.InterruptedIOException; -public class FutureResponse extends FutureTask { +import org.apache.activemq.command.Response; + +import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; + +public class FutureResponse { + + private final ResponseCallback responseCallback; + private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1); - private static final Callable EMPTY_CALLABLE = new Callable() { - public Object call() throws Exception { - return null; - }}; - - public FutureResponse() { - super(EMPTY_CALLABLE); + public FutureResponse(ResponseCallback responseCallback) { + this.responseCallback = responseCallback; } - public synchronized Response getResult() throws IOException { + public Response getResult() throws IOException { try { - return (Response) super.get(); + return (Response) responseSlot.take(); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted."); - } catch (ExecutionException e) { - Throwable target = e.getCause(); - if( target instanceof IOException ) { - throw (IOException)target; - } else { - throw IOExceptionSupport.create(target); - } } } - public synchronized Response getResult(int timeout) throws IOException { + public Response getResult(int timeout) throws IOException { try { - return (Response) super.get(timeout,TimeUnit.MILLISECONDS); + return (Response) responseSlot.poll(timeout,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted."); - } catch (ExecutionException e) { - Throwable target = e.getCause(); - if( target instanceof IOException ) { - throw (IOException)target; - } else { - throw IOExceptionSupport.create(target); - } - }catch(TimeoutException e){ - return null; } } - public synchronized void set(Object result) { - super.set(result); + public void set(Response result) throws InterruptedIOException { + try { + responseSlot.put(result); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted."); + } + if( responseCallback !=null ) { + responseCallback.onCompletion(this); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java index dbfd2e92b6..ebdf72ae63 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java @@ -33,9 +33,9 @@ public class MutexTransport extends TransportFilter { super(next); } - public FutureResponse asyncRequest(Command command) throws IOException { + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException { synchronized(writeMutex) { - return next.asyncRequest(command); + return next.asyncRequest(command, null); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java new file mode 100644 index 0000000000..52311b04d7 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java @@ -0,0 +1,24 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +/** + * @version $Revision$ + */ +public interface ResponseCallback { + void onCompletion(FutureResponse resp); +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java index 74c6e017c5..ec0ecda505 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport; import java.io.IOException; +import java.io.InterruptedIOException; import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; @@ -55,34 +56,38 @@ public class ResponseCorrelator extends TransportFilter { next.oneway(command); } - public FutureResponse asyncRequest(Command command) throws IOException { + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException { command.setCommandId(sequenceGenerator.getNextSequenceId()); command.setResponseRequired(true); - FutureResponse future = new FutureResponse(); + FutureResponse future = new FutureResponse(responseCallback); requestMap.put(new Integer(command.getCommandId()), future); next.oneway(command); return future; } public Response request(Command command) throws IOException { - FutureResponse response = asyncRequest(command); + FutureResponse response = asyncRequest(command, null); return response.getResult(); } public Response request(Command command,int timeout) throws IOException { - FutureResponse response = asyncRequest(command); + FutureResponse response = asyncRequest(command, null); return response.getResult(timeout); } public void onCommand(Command command) { boolean debug = log.isDebugEnabled(); if( command.isResponse() ) { - Response response = (Response) command; - FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId())); - if( future!=null ) { - future.set(response); - } else { - if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId()); + try { + Response response = (Response) command; + FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId())); + if( future!=null ) { + future.set(response); + } else { + if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId()); + } + } catch (InterruptedIOException e) { + onException(e); } } else { getTransportListener().onCommand(command); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java index dc5b6b7725..eaf3054979 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java @@ -39,12 +39,15 @@ public interface Transport extends Service { /** * An asynchronous request response where the Receipt will be returned - * in the future + * in the future. If responseCallback is not null, then it will be called + * when the response has been completed. + * * @param command + * @param responseCallback TODO * @return the FutureResponse * @throws IOException */ - public FutureResponse asyncRequest(Command command) throws IOException; + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException; /** * A synchronous request response diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java index 585ac54358..a5d90195e9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java @@ -86,8 +86,8 @@ public class TransportFilter extends DefaultTransportListener implements Transpo next.oneway(command); } - public FutureResponse asyncRequest(Command command) throws IOException { - return next.asyncRequest(command); + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException { + return next.asyncRequest(command, null); } public Response request(Command command) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java index afde390510..84713d96a4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java @@ -66,7 +66,7 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo return null; } - public FutureResponse asyncRequest(Command command) throws IOException { + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException { throw new AssertionError("Unsupported Method"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java index 20f16f93c8..5245301ec3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java @@ -27,6 +27,7 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.transport.FutureResponse; +import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; @@ -96,7 +97,7 @@ public class ActiveIOTransport implements Transport { commandChannel.writeCommand(command); } - public FutureResponse asyncRequest(Command command) throws IOException { + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException { throw new AssertionError("Unsupported Method"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index fb01d91d62..9353f6d5c1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -32,6 +32,7 @@ import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; +import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; @@ -387,7 +388,7 @@ public class FailoverTransport implements CompositeTransport { } } - public FutureResponse asyncRequest(Command command) throws IOException { + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException { throw new AssertionError("Unsupported Method"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 32bd02bf5d..22ea2e1af1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -33,6 +33,7 @@ import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; +import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; @@ -411,7 +412,7 @@ public class FanoutTransport implements CompositeTransport { return true; } - public FutureResponse asyncRequest(Command command) throws IOException { + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException { throw new AssertionError("Unsupported Method"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java index 982f6c71c3..b5f6cf3513 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java @@ -22,6 +22,7 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; +import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportListener; @@ -95,8 +96,8 @@ public class MockTransport extends DefaultTransportListener implements Transport next.oneway(command); } - synchronized public FutureResponse asyncRequest(Command command) throws IOException { - return next.asyncRequest(command); + synchronized public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException { + return next.asyncRequest(command, null); } synchronized public Response request(Command command) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java index 2aea527412..485ef856d0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java @@ -78,7 +78,7 @@ public class ReliableTransport extends ResponseCorrelator { } public Response request(Command command) throws IOException { - FutureResponse response = asyncRequest(command); + FutureResponse response = asyncRequest(command, null); while (true) { Response result = response.getResult(requestTimeout); if (result != null) { @@ -89,7 +89,7 @@ public class ReliableTransport extends ResponseCorrelator { } public Response request(Command command, int timeout) throws IOException { - FutureResponse response = asyncRequest(command); + FutureResponse response = asyncRequest(command, null); while (timeout > 0) { int time = timeout; if (timeout > requestTimeout) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index a20bf85664..56767ce0e5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; import org.apache.activemq.transport.FutureResponse; +import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; import org.apache.commons.logging.Log; @@ -79,7 +80,7 @@ public class VMTransport implements Transport{ } } - public FutureResponse asyncRequest(Command command) throws IOException{ + public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException{ throw new AssertionError("Unsupported Method"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java index f267ffa4c3..30350565ca 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java @@ -110,6 +110,7 @@ public class VMTransportFactory extends TransportFactory{ if(server==null){ server=(VMTransportServer) bind(location,true); TransportConnector connector=new TransportConnector(broker.getBroker(),server); + connector.setUri(location); connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() ); connector.start(); connectors.put(host,connector); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java index c738f2045f..8b2701035a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java @@ -37,7 +37,7 @@ public class BadConnectionTest extends TestCase { public void testConnectingToUnavailableServer() throws Exception { try { - transport.asyncRequest(new ActiveMQMessage()); + transport.asyncRequest(new ActiveMQMessage(), null); fail("This should never succeed"); } catch (IOException e) {