290761 HttpExchange isDone handles intercepted events

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@968 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2009-10-05 14:22:32 +00:00
parent 5d022a627b
commit 14a1f24a0f
3 changed files with 136 additions and 79 deletions

View File

@ -14,6 +14,7 @@ jetty-7.0.1-SNAPSHOT
jetty-7.0.0
+ 289958 StatisticsServlet incorrectly adds StatisticsHandler
+ 290081 Eager consume LF after CR
+ 290761 HttpExchange isDone handles intercepted events.
jetty-7.0.0.RC6 September 18th 2009
+ JETTY-719 Document state machine of jetty http client

View File

@ -15,7 +15,9 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.security.SecurityListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeaders;
import org.eclipse.jetty.http.HttpMethods;
@ -85,7 +87,7 @@ public class HttpExchange
private Buffer _requestContent;
private InputStream _requestContentSource;
private int _status = STATUS_START;
private AtomicInteger _status = new AtomicInteger(STATUS_START);
private Buffer _requestContentChunk;
private boolean _retryStatus = false;
// controls if the exchange will have listeners autoconfigured by the destination
@ -93,12 +95,14 @@ public class HttpExchange
private HttpEventListener _listener = new Listener();
private volatile HttpConnection connection;
boolean _onRequestCompleteDone;
boolean _onResponseCompleteDone;
boolean _onDone; // == onConnectionFail || onException || onExpired || onCancelled || onResponseCompleted && onRequestCompleted
public int getStatus()
{
synchronized (this)
{
return _status;
}
return _status.get();
}
/**
@ -108,36 +112,54 @@ public class HttpExchange
*/
public void waitForStatus(int status) throws InterruptedException
{
synchronized (this)
{
while (_status < status)
{
this.wait();
}
}
throw new UnsupportedOperationException();
}
public int waitForDone() throws InterruptedException
/**
* Wait until the exchange is "done".
* Done is defined as when a final state has been passed to the
* HttpExchange via the associated onXxx call. Note that an
* exchange can transit a final state when being used as part
* of a dialog (eg {@link SecurityListener}. Done status
* is thus defined as:<pre>
* done == onConnectionFailed
* || onException
* || onExpire
* || onRequestComplete && onResponseComplete
* </pre>
* @return
* @throws InterruptedException
*/
public int waitForDone () throws InterruptedException
{
synchronized (this)
{
while (!isDone(_status))
while (!isDone())
this.wait();
return _status;
return _status.get();
}
}
public void reset()
{
setStatus(STATUS_START);
// TODO - this should do a cancel and wakeup everybody that was waiting.
// might need a version number concept
synchronized(this)
{
_onRequestCompleteDone=false;
_onResponseCompleteDone=false;
_onDone=false;
setStatus(STATUS_START);
}
}
void setStatus(int newStatus)
{
try
{
int oldStatus = getStatus();
int oldStatus = _status.get();
boolean set = false;
// State machine: from which old status you can go into which new status
switch (oldStatus)
{
@ -147,10 +169,8 @@ public class HttpExchange
case STATUS_WAITING_FOR_CONNECTION:
case STATUS_WAITING_FOR_COMMIT:
case STATUS_CANCELLING:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
case STATUS_WAITING_FOR_CONNECTION:
@ -159,10 +179,8 @@ public class HttpExchange
case STATUS_WAITING_FOR_COMMIT:
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
case STATUS_WAITING_FOR_COMMIT:
@ -171,33 +189,29 @@ public class HttpExchange
case STATUS_SENDING_REQUEST:
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
case STATUS_EXPIRED:
updateStatus(newStatus);
getEventListener().onExpire();
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
case STATUS_SENDING_REQUEST:
switch (newStatus)
{
case STATUS_WAITING_FOR_RESPONSE:
updateStatus(newStatus);
getEventListener().onRequestCommitted();
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onRequestCommitted();
break;
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
case STATUS_EXPIRED:
updateStatus(newStatus);
getEventListener().onExpire();
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
case STATUS_WAITING_FOR_RESPONSE:
@ -206,76 +220,71 @@ public class HttpExchange
case STATUS_PARSING_HEADERS:
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
case STATUS_EXPIRED:
updateStatus(newStatus);
getEventListener().onExpire();
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
case STATUS_PARSING_HEADERS:
switch (newStatus)
{
case STATUS_PARSING_CONTENT:
updateStatus(newStatus);
getEventListener().onResponseHeaderComplete();
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onResponseHeaderComplete();
break;
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
case STATUS_EXPIRED:
updateStatus(newStatus);
getEventListener().onExpire();
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
case STATUS_PARSING_CONTENT:
switch (newStatus)
{
case STATUS_COMPLETED:
updateStatus(newStatus);
getEventListener().onResponseComplete();
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onResponseComplete();
break;
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
case STATUS_EXPIRED:
updateStatus(newStatus);
getEventListener().onExpire();
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
case STATUS_COMPLETED:
switch (newStatus)
{
case STATUS_START:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
case STATUS_CANCELLING:
case STATUS_EXPIRED:
// Don't change the status, it's too late
set=true;
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
case STATUS_CANCELLING:
switch (newStatus)
{
case STATUS_CANCELLED:
updateStatus(newStatus);
if (set=_status.compareAndSet(oldStatus,newStatus))
done();
break;
default:
// Ignore other statuses, we're cancelling
set=true;
break;
}
break;
@ -285,16 +294,17 @@ public class HttpExchange
switch (newStatus)
{
case STATUS_START:
updateStatus(newStatus);
set=_status.compareAndSet(oldStatus,newStatus);
break;
default:
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
break;
default:
// Here means I allowed to set a state that I don't recognize
throw new AssertionError(oldStatus + " => " + newStatus);
}
if (!set)
throw new IllegalStateException(oldStatus + " => " + newStatus);
}
catch (IOException x)
{
@ -302,23 +312,14 @@ public class HttpExchange
}
}
private void updateStatus(int newStatus)
public boolean isDone()
{
synchronized (this)
{
_status = newStatus;
notifyAll();
return _onDone;
}
}
public boolean isDone (int status)
{
return status == STATUS_COMPLETED ||
status == STATUS_EXPIRED ||
status == STATUS_EXCEPTED ||
status == STATUS_CANCELLED;
}
public HttpEventListener getEventListener()
{
return _listener;
@ -595,6 +596,15 @@ public class HttpExchange
abort();
}
private void done()
{
synchronized(this)
{
_onDone=true;
notifyAll();
}
}
private void abort()
{
HttpConnection httpConnection = this.connection;
@ -768,19 +778,41 @@ public class HttpExchange
private class Listener implements HttpEventListener
{
public void onConnectionFailed(Throwable ex)
{
HttpExchange.this.onConnectionFailed(ex);
try
{
HttpExchange.this.onConnectionFailed(ex);
}
finally
{
done();
}
}
public void onException(Throwable ex)
{
HttpExchange.this.onException(ex);
try
{
HttpExchange.this.onException(ex);
}
finally
{
done();
}
}
public void onExpire()
{
HttpExchange.this.onExpire();
try
{
HttpExchange.this.onExpire();
}
finally
{
done();
}
}
public void onRequestCommitted() throws IOException
@ -790,12 +822,36 @@ public class HttpExchange
public void onRequestComplete() throws IOException
{
HttpExchange.this.onRequestComplete();
try
{
HttpExchange.this.onRequestComplete();
}
finally
{
synchronized(HttpExchange.this)
{
_onRequestCompleteDone=true;
_onDone=_onResponseCompleteDone;
HttpExchange.this.notifyAll();
}
}
}
public void onResponseComplete() throws IOException
{
HttpExchange.this.onResponseComplete();
try
{
HttpExchange.this.onResponseComplete();
}
finally
{
synchronized(HttpExchange.this)
{
_onResponseCompleteDone=true;
_onDone=_onRequestCompleteDone;
HttpExchange.this.notifyAll();
}
}
}
public void onResponseContent(Buffer content) throws IOException

View File

@ -49,7 +49,7 @@ public class ConnectionFailedTest extends TestCase
long wait = 100;
long maxWait = 10 * wait;
long curWait = wait;
while (curWait < maxWait && !exchange.isDone(exchange.getStatus()))
while (curWait < maxWait && !exchange.isDone())
{
Thread.sleep(wait);
curWait += wait;