Hiram R. Chirino 2006-06-30 17:24:01 +00:00
parent 1ca2d6f57a
commit 5143a2ad05
2 changed files with 22 additions and 27 deletions

View File

@ -20,11 +20,14 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.activemq.command.Response;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
public class FutureResponse {
private static final Log log = LogFactory.getLog(FutureResponse.class);
private final ResponseCallback responseCallback;
private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
@ -36,7 +39,12 @@ public class FutureResponse {
public Response getResult() throws IOException {
try {
return (Response) responseSlot.take();
} catch (InterruptedException e) {
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (log.isDebugEnabled()) {
log.debug("Operation interupted: " + e, e);
}
throw new InterruptedIOException("Interrupted.");
}
}
@ -49,14 +57,11 @@ public class FutureResponse {
}
}
public void set(Response result) throws InterruptedIOException {
try {
responseSlot.put(result);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
public void set(Response result) {
if( responseSlot.offer(result) ) {
if( responseCallback !=null ) {
responseCallback.onCompletion(this);
}
}
if( responseCallback !=null ) {
responseCallback.onCompletion(this);
}
}
}

View File

@ -17,9 +17,7 @@
package org.apache.activemq.transport;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.activemq.command.Command;
@ -82,16 +80,12 @@ public class ResponseCorrelator extends TransportFilter {
public void onCommand(Command command) {
boolean debug = log.isDebugEnabled();
if( command.isResponse() ) {
try {
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
if( future!=null ) {
future.set(response);
} else {
if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
}
} catch (InterruptedIOException e) {
onException(e);
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
if( future!=null ) {
future.set(response);
} else {
if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
}
} else {
getTransportListener().onCommand(command);
@ -109,12 +103,8 @@ public class ResponseCorrelator extends TransportFilter {
requestMap.clear();
for (Iterator iter = requests.iterator(); iter.hasNext();) {
try {
FutureResponse fr = (FutureResponse) iter.next();
fr.set(new ExceptionResponse(error));
} catch (InterruptedIOException e) {
Thread.currentThread().interrupt();
}
FutureResponse fr = (FutureResponse) iter.next();
fr.set(new ExceptionResponse(error));
}
super.onException(error);