If a connection is close() make sure we error out any outstanding requests it may be doing so that it can properly shutdown.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@639111 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-20 01:59:39 +00:00
parent ff7033fa14
commit 5d485843b6
1 changed files with 28 additions and 7 deletions

View File

@ -42,6 +42,7 @@ public class ResponseCorrelator extends TransportFilter {
private final Map<Integer, FutureResponse> requestMap = new HashMap<Integer, FutureResponse>();
private IntSequenceGenerator sequenceGenerator;
private final boolean debug = LOG.isDebugEnabled();
private IOException error;
public ResponseCorrelator(Transport next) {
this(next, new IntSequenceGenerator());
@ -65,6 +66,9 @@ public class ResponseCorrelator extends TransportFilter {
command.setResponseRequired(true);
FutureResponse future = new FutureResponse(responseCallback);
synchronized (requestMap) {
if( this.error !=null ) {
throw error;
}
requestMap.put(new Integer(command.getCommandId()), future);
}
next.oneway(command);
@ -106,15 +110,32 @@ public class ResponseCorrelator extends TransportFilter {
* any of current requests. Lets let them know of the problem.
*/
public void onException(IOException error) {
// Copy and Clear the request Map
ArrayList<FutureResponse> requests = new ArrayList<FutureResponse>(requestMap.values());
requestMap.clear();
for (Iterator<FutureResponse> iter = requests.iterator(); iter.hasNext();) {
FutureResponse fr = iter.next();
fr.set(new ExceptionResponse(error));
}
dispose(error);
super.onException(error);
}
@Override
public void stop() throws Exception {
dispose(new IOException("Stopped."));
super.stop();
}
private void dispose(IOException error) {
ArrayList<FutureResponse> requests=null;
synchronized(requestMap) {
if( this.error==null) {
this.error = error;
requests = new ArrayList<FutureResponse>(requestMap.values());
requestMap.clear();
}
}
if( requests!=null ) {
for (Iterator<FutureResponse> iter = requests.iterator(); iter.hasNext();) {
FutureResponse fr = iter.next();
fr.set(new ExceptionResponse(error));
}
}
}
public IntSequenceGenerator getSequenceGenerator() {
return sequenceGenerator;