Fixed bug #288401: HttpExchange.cancel() Method Unimplemented.

It is now implemented.
Improved Javadocs and made all members private.

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@881 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Simone Bordet 2009-09-11 14:24:28 +00:00
parent ee1a75a077
commit 84c922dd4c
9 changed files with 1202 additions and 564 deletions

View File

@ -1,6 +1,7 @@
jetty-7.0.1-SNAPSHOT
+ Promoted Jetty WebApp Verifier from Sandbox
+ Promoted Jetty Centralized Logging from Sandbox
+ 288401 HttpExchange.cancel() Method Unimplemented
jetty-7.0.0.RC6-SNAPSHOT
+ 288055 jetty-client fails to resolve failed resolution attempts correctly

View File

@ -4,11 +4,11 @@
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
@ -18,51 +18,47 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.Buffer;
/**
* An exchange that caches response status and fields for later use.
*
*
*
* An exchange that retains response status and response headers for later use.
*/
public class CachedExchange extends HttpExchange
{
int _responseStatus;
HttpFields _responseFields;
private final HttpFields _responseFields;
private volatile int _responseStatus;
public CachedExchange(boolean cacheFields)
/**
* Creates a new CachedExchange.
*
* @param cacheHeaders true to cache response headers, false to not cache them
*/
public CachedExchange(boolean cacheHeaders)
{
if (cacheFields)
_responseFields = new HttpFields();
_responseFields = cacheHeaders ? new HttpFields() : null;
}
/* ------------------------------------------------------------ */
public int getResponseStatus()
{
if (_status < HttpExchange.STATUS_PARSING_HEADERS)
throw new IllegalStateException("Response not received");
if (getStatus() < HttpExchange.STATUS_PARSING_HEADERS)
throw new IllegalStateException("Response not received yet");
return _responseStatus;
}
/* ------------------------------------------------------------ */
public HttpFields getResponseFields()
{
if (_status < HttpExchange.STATUS_PARSING_CONTENT)
throw new IllegalStateException("Headers not complete");
if (getStatus() < HttpExchange.STATUS_PARSING_CONTENT)
throw new IllegalStateException("Headers not completely received yet");
return _responseFields;
}
/* ------------------------------------------------------------ */
protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
{
_responseStatus = status;
super.onResponseStatus(version,status,reason);
super.onResponseStatus(version, status, reason);
}
/* ------------------------------------------------------------ */
protected void onResponseHeader(Buffer name, Buffer value) throws IOException
{
if (_responseFields != null)
_responseFields.add(name,value);
super.onResponseHeader(name,value);
_responseFields.add(name, value);
super.onResponseHeader(name, value);
}
}

View File

@ -4,11 +4,11 @@
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
@ -26,29 +26,25 @@ import org.eclipse.jetty.io.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
/**
* A CachedExchange that retains all content for later use.
*
* A exchange that retains response content for later use.
*/
public class ContentExchange extends CachedExchange
{
int _contentLength = 1024;
String _encoding = "utf-8";
protected ByteArrayOutputStream _responseContent;
File _fileForUpload;
private int _contentLength = 1024;
private String _encoding = "utf-8";
private ByteArrayOutputStream _responseContent;
private File _fileForUpload;
public ContentExchange()
{
super(false);
}
/* ------------------------------------------------------------ */
public ContentExchange(boolean cacheFields)
{
super(cacheFields);
}
/* ------------------------------------------------------------ */
}
public String getResponseContent() throws UnsupportedEncodingException
{
if (_responseContent != null)
@ -58,11 +54,10 @@ public class ContentExchange extends CachedExchange
return null;
}
/* ------------------------------------------------------------ */
@Override
protected void onResponseHeader(Buffer name, Buffer value) throws IOException
{
super.onResponseHeader(name,value);
super.onResponseHeader(name, value);
int header = HttpHeaders.CACHE.getOrdinal(name);
switch (header)
{
@ -81,7 +76,7 @@ public class ContentExchange extends CachedExchange
@Override
protected void onResponseContent(Buffer content) throws IOException
{
super.onResponseContent( content );
super.onResponseContent(content);
if (_responseContent == null)
_responseContent = new ByteArrayOutputStream(_contentLength);
content.writeTo(_responseContent);
@ -92,19 +87,23 @@ public class ContentExchange extends CachedExchange
{
if (_fileForUpload != null)
{
_requestContent = null;
_requestContentSource = getInputStream();
setRequestContent(null);
setRequestContentSource(getInputStream());
}
else if (_requestContentSource != null)
else
{
if (_requestContentSource.markSupported())
InputStream requestContentStream = getRequestContentSource();
if (requestContentStream != null)
{
_requestContent = null;
_requestContentSource.reset();
}
else
{
throw new IOException("Unsupported retry attempt");
if (requestContentStream.markSupported())
{
setRequestContent(null);
requestContentStream.reset();
}
else
{
throw new IOException("Unsupported retry attempt");
}
}
}
super.onRetry();
@ -112,7 +111,7 @@ public class ContentExchange extends CachedExchange
private InputStream getInputStream() throws IOException
{
return new FileInputStream( _fileForUpload );
return new FileInputStream(_fileForUpload);
}
public File getFileForUpload()
@ -123,6 +122,6 @@ public class ContentExchange extends CachedExchange
public void setFileForUpload(File fileForUpload) throws IOException
{
this._fileForUpload = fileForUpload;
_requestContentSource = getInputStream();
setRequestContentSource(getInputStream());
}
}

View File

@ -4,11 +4,11 @@
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
@ -18,6 +18,7 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import org.eclipse.jetty.client.security.Authorization;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeaderValues;
import org.eclipse.jetty.http.HttpHeaders;
@ -34,30 +35,26 @@ import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.Timeout;
/**
*
*
*
*
* @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
*/
public class HttpConnection implements Connection
{
HttpDestination _destination;
EndPoint _endp;
HttpGenerator _generator;
HttpParser _parser;
boolean _http11 = true;
Buffer _connectionHeader;
Buffer _requestContentChunk;
long _last;
boolean _requestComplete;
public String _message;
public Throwable _throwable;
public boolean _reserved;
/* The current exchange waiting for a response */
volatile HttpExchange _exchange;
HttpExchange _pipeline;
private HttpDestination _destination;
private EndPoint _endp;
private HttpGenerator _generator;
private HttpParser _parser;
private boolean _http11 = true;
private Buffer _connectionHeader;
private Buffer _requestContentChunk;
private long _last;
private boolean _requestComplete;
private boolean _reserved;
// The current exchange waiting for a response
private volatile HttpExchange _exchange;
private HttpExchange _pipeline;
private final Timeout.Task _timeout = new TimeoutTask();
public void dump() throws IOException
{
@ -69,45 +66,6 @@ public class HttpConnection implements Connection
((SslSelectChannelEndPoint)_endp).dump();
}
Timeout.Task _timeout = new Timeout.Task()
{
public void expired()
{
HttpExchange ex = null;
try
{
synchronized (HttpConnection.this)
{
ex = _exchange;
_exchange = null;
if (ex != null)
_destination.returnConnection(HttpConnection.this,true);
}
}
catch (Exception e)
{
Log.debug(e);
}
finally
{
try
{
_endp.close();
}
catch (IOException e)
{
Log.ignore(e);
}
if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
{
ex.setStatus(HttpExchange.STATUS_EXPIRED);
}
}
}
};
/* ------------------------------------------------------------ */
HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
{
_endp = endp;
@ -119,31 +77,24 @@ public class HttpConnection implements Connection
{
_reserved = reserved;
}
public boolean isReserved()
{
return _reserved;
}
/* ------------------------------------------------------------ */
public HttpDestination getDestination()
{
return _destination;
}
/* ------------------------------------------------------------ */
public void setDestination(HttpDestination destination)
{
_destination = destination;
}
/* ------------------------------------------------------------ */
public boolean send(HttpExchange ex) throws IOException
{
// _message =
// Thread.currentThread().getName()+": Generator instance="+_generator
// .hashCode()+" state= "+_generator.getState()+" _exchange="+_exchange;
_throwable = new Throwable();
synchronized (this)
{
if (_exchange != null)
@ -157,167 +108,80 @@ public class HttpConnection implements Connection
if (!_endp.isOpen())
return false;
ex.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
_exchange = ex;
_exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
if (_endp.isBlocking())
{
this.notify();
}
else
{
SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
scep.scheduleWrite();
}
if (!_endp.isBlocking())
_destination.getHttpClient().schedule(_timeout);
}
return true;
}
}
/* ------------------------------------------------------------ */
public void handle() throws IOException
{
int no_progress = 0;
long flushed = 0;
if (_exchange != null)
_exchange.associate(this);
boolean failed = false;
while (_endp.isBufferingInput() || _endp.isOpen())
try
{
synchronized (this)
int no_progress = 0;
long flushed = 0;
boolean failed = false;
while (_endp.isBufferingInput() || _endp.isOpen())
{
while (_exchange == null)
synchronized (this)
{
if (_endp.isBlocking())
while (_exchange == null)
{
try
if (_endp.isBlocking())
{
this.wait();
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
}
else
{
// Hopefully just space?
_parser.fill();
_parser.skipCRLF();
if (_parser.isMoreInBuffer())
{
Log.warn("unexpected data");
_endp.close();
}
return;
}
}
}
if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
{
no_progress = 0;
commitRequest();
}
try
{
long io = 0;
_endp.flush();
if (_generator.isComplete())
{
if (!_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
}
else
{
// Write as much of the request as possible
synchronized (this)
{
if (_exchange == null)
continue;
flushed = _generator.flushBuffer();
io += flushed;
}
if (!_generator.isComplete())
{
InputStream in = _exchange.getRequestContentSource();
if (in != null)
{
if (_requestContentChunk == null || _requestContentChunk.length() == 0)
try
{
_requestContentChunk = _exchange.getRequestContentChunk();
if (_requestContentChunk != null)
_generator.addContent(_requestContentChunk,false);
else
_generator.complete();
io += _generator.flushBuffer();
this.wait();
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
}
else
_generator.complete();
{
// Hopefully just space?
_parser.fill();
_parser.skipCRLF();
if (_parser.isMoreInBuffer())
{
Log.warn("Unexpected data received but no request sent");
close();
}
return;
}
}
if (!_exchange.isAssociated())
_exchange.associate(this);
}
// If we are not ended then parse available
if (!_parser.isComplete() && _generator.isCommitted())
if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
{
long filled = _parser.parseAvailable();
io += filled;
}
if (io > 0)
no_progress = 0;
else if (no_progress++ >= 2 && !_endp.isBlocking())
{
// SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
{
if (_generator.flushBuffer()>0)
continue;
}
return;
commitRequest();
}
}
catch (Throwable e)
{
if (e instanceof ThreadDeath)
throw (ThreadDeath)e;
synchronized (this)
try
{
if (_exchange != null)
{
_exchange.getEventListener().onException(e);
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
}
}
Log.warn("IOE on "+_exchange);
failed = true;
if (e instanceof IOException)
throw (IOException)e;
if (e instanceof Error)
throw (Error)e;
if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw new RuntimeException(e);
}
finally
{
boolean complete = false;
boolean close = failed; // always close the connection on error
if (!failed)
{
// are we complete?
long io = 0;
_endp.flush();
if (_generator.isComplete())
{
if (!_requestComplete)
@ -325,62 +189,173 @@ public class HttpConnection implements Connection
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
// we need to return the HttpConnection to a state that
// it can be reused or closed out
if (_parser.isComplete())
{
_destination.getHttpClient().cancel(_timeout);
complete = true;
}
}
}
if (complete || failed)
{
synchronized (this)
else
{
if (!close)
close = shouldClose();
reset(true);
no_progress = 0;
flushed = -1;
if (_exchange != null)
// Write as much of the request as possible
synchronized (this)
{
_exchange = null;
if (_exchange == null)
continue;
flushed = _generator.flushBuffer();
io += flushed;
}
if (_pipeline == null)
if (!_generator.isComplete())
{
InputStream in = _exchange.getRequestContentSource();
if (in != null)
{
if (!isReserved())
_destination.returnConnection(this,close);
if (close)
return;
if (_requestContentChunk == null || _requestContentChunk.length() == 0)
{
_requestContentChunk = _exchange.getRequestContentChunk();
if (_requestContentChunk != null)
_generator.addContent(_requestContentChunk,false);
else
_generator.complete();
io += _generator.flushBuffer();
}
}
else
_generator.complete();
}
}
if (_generator.isComplete() && !_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
// If we are not ended then parse available
if (!_parser.isComplete() && _generator.isCommitted())
{
long filled = _parser.parseAvailable();
io += filled;
}
if (io > 0)
no_progress = 0;
else if (no_progress++ >= 2 && !_endp.isBlocking())
{
// SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
{
if (_generator.flushBuffer()>0)
continue;
}
return;
}
}
catch (Throwable e)
{
Log.debug("Failure on " + _exchange, e);
if (e instanceof ThreadDeath)
throw (ThreadDeath)e;
synchronized (this)
{
if (_exchange != null)
{
// Cancelling the exchange causes an exception as we close the connection,
// but we don't report it as it is normal cancelling operation
if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING)
{
if (close)
_exchange.getEventListener().onException(e);
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
}
}
}
failed = true;
if (e instanceof IOException)
throw (IOException)e;
if (e instanceof Error)
throw (Error)e;
if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw new RuntimeException(e);
}
finally
{
boolean complete = false;
boolean close = failed; // always close the connection on error
if (!failed)
{
// are we complete?
if (_generator.isComplete())
{
if (!_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
// we need to return the HttpConnection to a state that
// it can be reused or closed out
if (_parser.isComplete())
{
_destination.getHttpClient().cancel(_timeout);
complete = true;
}
}
}
if (complete || failed)
{
synchronized (this)
{
if (!close)
close = shouldClose();
reset(true);
no_progress = 0;
if (_exchange != null)
{
_exchange.disassociate(this);
_exchange = null;
if (_pipeline == null)
{
if (!isReserved())
_destination.returnConnection(this,close);
_destination.send(_pipeline);
_pipeline = null;
return;
_destination.returnConnection(this, close);
}
else
{
if (close)
{
if (!isReserved())
_destination.returnConnection(this,close);
HttpExchange ex = _pipeline;
_pipeline = null;
send(ex);
HttpExchange exchange = _pipeline;
_pipeline = null;
_destination.send(exchange);
}
else
{
HttpExchange exchange = _pipeline;
_pipeline = null;
send(exchange);
}
}
}
}
}
}
}
}
finally
{
if (_exchange != null && _exchange.isAssociated())
_exchange.disassociate(this);
}
}
/* ------------------------------------------------------------ */
public boolean isIdle()
{
synchronized (this)
@ -388,8 +363,7 @@ public class HttpConnection implements Connection
return _exchange == null;
}
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.Connection#isSuspended()
*/
@ -398,13 +372,11 @@ public class HttpConnection implements Connection
return false;
}
/* ------------------------------------------------------------ */
public EndPoint getEndPoint()
{
return _endp;
}
/* ------------------------------------------------------------ */
private void commitRequest() throws IOException
{
synchronized (this)
@ -413,9 +385,9 @@ public class HttpConnection implements Connection
throw new IllegalStateException();
_exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
_generator.setVersion(_exchange._version);
_generator.setVersion(_exchange.getVersion());
String uri = _exchange._uri;
String uri = _exchange.getURI();
if (_destination.isProxied() && uri.startsWith("/"))
{
// TODO suppress port 80 or 443
@ -426,50 +398,49 @@ public class HttpConnection implements Connection
auth.setCredentials(_exchange);
}
_generator.setRequest(_exchange._method,uri);
_generator.setRequest(_exchange.getMethod(), uri);
if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
HttpFields requestHeaders = _exchange.getRequestFields();
if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
{
if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
_exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
}
if (_exchange._requestContent != null)
Buffer requestContent = _exchange.getRequestContent();
if (requestContent != null)
{
_exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
_generator.completeHeader(_exchange._requestFields,false);
_generator.addContent(_exchange._requestContent,true);
}
else if (_exchange._requestContentSource != null)
{
_generator.completeHeader(_exchange._requestFields,false);
int available = _exchange._requestContentSource.available();
if (available > 0)
{
// TODO deal with any known content length
// TODO reuse this buffer!
byte[] buf = new byte[available];
int length = _exchange._requestContentSource.read(buf);
_generator.addContent(new ByteArrayBuffer(buf,0,length),false);
}
requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
_generator.completeHeader(requestHeaders,false);
_generator.addContent(requestContent,true);
}
else
{
_exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO
// :
// should
// not
// be
// needed
_generator.completeHeader(_exchange._requestFields,true);
InputStream requestContentStream = _exchange.getRequestContentSource();
if (requestContentStream != null)
{
_generator.completeHeader(requestHeaders, false);
int available = requestContentStream.available();
if (available > 0)
{
// TODO deal with any known content length
// TODO reuse this buffer!
byte[] buf = new byte[available];
int length = requestContentStream.read(buf);
_generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
}
}
else
{
requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
_generator.completeHeader(requestHeaders, true);
}
}
_exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
}
}
/* ------------------------------------------------------------ */
protected void reset(boolean returnBuffers) throws IOException
{
_requestComplete = false;
@ -479,7 +450,6 @@ public class HttpConnection implements Connection
_http11 = true;
}
/* ------------------------------------------------------------ */
private boolean shouldClose()
{
if (_connectionHeader!=null)
@ -492,7 +462,6 @@ public class HttpConnection implements Connection
return !_http11;
}
/* ------------------------------------------------------------ */
private class Handler extends HttpParser.EventHandler
{
@Override
@ -556,19 +525,16 @@ public class HttpConnection implements Connection
}
}
/* ------------------------------------------------------------ */
public String toString()
{
return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
}
/* ------------------------------------------------------------ */
public String toDetailString()
{
return toString() + " ex=" + _exchange + " " + _timeout.getAge();
}
/* ------------------------------------------------------------ */
/**
* @return the last
*/
@ -577,7 +543,6 @@ public class HttpConnection implements Connection
return _last;
}
/* ------------------------------------------------------------ */
/**
* @param last
* the last to set
@ -587,10 +552,49 @@ public class HttpConnection implements Connection
_last = last;
}
/* ------------------------------------------------------------ */
public void close() throws IOException
{
_endp.close();
}
private class TimeoutTask extends Timeout.Task
{
public void expired()
{
HttpExchange ex = null;
try
{
synchronized (HttpConnection.this)
{
ex = _exchange;
_exchange = null;
if (ex != null)
{
ex.disassociate(HttpConnection.this);
_destination.returnConnection(HttpConnection.this, true);
}
}
}
catch (Exception e)
{
Log.debug(e);
}
finally
{
try
{
close();
}
catch (IOException e)
{
Log.ignore(e);
}
if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
{
ex.setStatus(HttpExchange.STATUS_EXPIRED);
}
}
}
}
}

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.util.log.Log;
/**
*
*
* @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
*/
public class HttpDestination
{
@ -50,7 +50,6 @@ public class HttpDestination
private PathMap _authorizations;
private List<HttpCookie> _cookies;
/* ------------------------------------------------------------ */
public void dump() throws IOException
{
synchronized (this)
@ -70,7 +69,6 @@ public class HttpDestination
/* The queue of exchanged for this destination if connections are limited */
private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
/* ------------------------------------------------------------ */
HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
{
_client = client;
@ -82,31 +80,26 @@ public class HttpDestination
_hostHeader = new ByteArrayBuffer(addressString);
}
/* ------------------------------------------------------------ */
public Address getAddress()
{
return _address;
}
/* ------------------------------------------------------------ */
public Buffer getHostHeader()
{
return _hostHeader;
}
/* ------------------------------------------------------------ */
public HttpClient getHttpClient()
{
return _client;
}
/* ------------------------------------------------------------ */
public boolean isSecure()
{
return _ssl;
}
/* ------------------------------------------------------------ */
public void addAuthorization(String pathSpec, Authorization authorization)
{
synchronized (this)
@ -119,7 +112,6 @@ public class HttpDestination
// TODO query and remove methods
}
/* ------------------------------------------------------------------------------- */
public void addCookie(HttpCookie cookie)
{
synchronized (this)
@ -132,7 +124,6 @@ public class HttpDestination
// TODO query, remove and age methods
}
/* ------------------------------------------------------------------------------- */
/**
* Get a connection. We either get an idle connection if one is available, or
* we make a new connection, if we have not yet reached maxConnections. If we
@ -193,7 +184,6 @@ public class HttpDestination
return connection;
}
/* ------------------------------------------------------------------------------- */
public HttpConnection reserveConnection(long timeout) throws IOException
{
HttpConnection connection = getConnection(timeout);
@ -202,7 +192,6 @@ public class HttpDestination
return connection;
}
/* ------------------------------------------------------------------------------- */
public HttpConnection getIdleConnection() throws IOException
{
long now = System.currentTimeMillis();
@ -232,7 +221,6 @@ public class HttpDestination
}
}
/* ------------------------------------------------------------------------------- */
protected void startNewConnection()
{
try
@ -245,12 +233,11 @@ public class HttpDestination
}
catch (Exception e)
{
e.printStackTrace();
Log.debug(e);
onConnectionFailed(e);
}
}
/* ------------------------------------------------------------------------------- */
public void onConnectionFailed(Throwable throwable)
{
Throwable connect_failure = null;
@ -284,7 +271,6 @@ public class HttpDestination
}
}
/* ------------------------------------------------------------------------------- */
public void onException(Throwable throwable)
{
synchronized (this)
@ -299,7 +285,6 @@ public class HttpDestination
}
}
/* ------------------------------------------------------------------------------- */
public void onNewConnection(HttpConnection connection) throws IOException
{
HttpConnection q_connection = null;
@ -338,7 +323,6 @@ public class HttpDestination
}
}
/* ------------------------------------------------------------------------------- */
public void returnConnection(HttpConnection connection, boolean close) throws IOException
{
if (connection.isReserved())
@ -387,7 +371,6 @@ public class HttpDestination
}
}
/* ------------------------------------------------------------ */
public void send(HttpExchange ex) throws IOException
{
LinkedList<String> listeners = _client.getRegisteredListeners();
@ -423,7 +406,6 @@ public class HttpDestination
doSend(ex);
}
/* ------------------------------------------------------------ */
public void resend(HttpExchange ex) throws IOException
{
ex.getEventListener().onRetry();
@ -431,7 +413,6 @@ public class HttpDestination
doSend(ex);
}
/* ------------------------------------------------------------ */
protected void doSend(HttpExchange ex) throws IOException
{
// add cookies
@ -481,13 +462,11 @@ public class HttpDestination
}
}
/* ------------------------------------------------------------ */
public synchronized String toString()
{
return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
}
/* ------------------------------------------------------------ */
public synchronized String toDetailString()
{
StringBuilder b = new StringBuilder();
@ -497,13 +476,10 @@ public class HttpDestination
{
for (HttpConnection connection : _connections)
{
if (connection._exchange != null)
{
b.append(connection.toDetailString());
if (_idle.contains(connection))
b.append(" IDLE");
b.append('\n');
}
b.append(connection.toDetailString());
if (_idle.contains(connection))
b.append(" IDLE");
b.append('\n');
}
}
b.append("--");
@ -512,37 +488,31 @@ public class HttpDestination
return b.toString();
}
/* ------------------------------------------------------------ */
public void setProxy(Address proxy)
{
_proxy = proxy;
}
/* ------------------------------------------------------------ */
public Address getProxy()
{
return _proxy;
}
/* ------------------------------------------------------------ */
public Authorization getProxyAuthentication()
{
return _proxyAuthentication;
}
/* ------------------------------------------------------------ */
public void setProxyAuthentication(Authorization authentication)
{
_proxyAuthentication = authentication;
}
/* ------------------------------------------------------------ */
public boolean isProxied()
{
return _proxy != null;
}
/* ------------------------------------------------------------ */
public void close() throws IOException
{
synchronized (this)
@ -553,5 +523,4 @@ public class HttpDestination
}
}
}
}

View File

@ -4,18 +4,17 @@
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeaders;
@ -24,42 +23,42 @@ import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersions;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.BufferCache.CachedBuffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.util.log.Log;
/**
* An HTTP client API that encapsulates Exchange with a HTTP server.
* <p>An HTTP client API that encapsulates an exchange (a request and its response) with a HTTP server.</p>
*
* This object encapsulates:<ul>
* <li>The HTTP server. (see {@link #setAddress(InetSocketAddress)} or {@link #setURL(String)})
* This object encapsulates:
* <ul>
* <li>The HTTP server address, see {@link #setAddress(Address)} or {@link #setURL(String)})
* <li>The HTTP request method, URI and HTTP version (see {@link #setMethod(String)}, {@link #setURI(String)}, and {@link #setVersion(int)}
* <li>The Request headers (see {@link #addRequestHeader(String, String)} or {@link #setRequestHeader(String, String)})
* <li>The Request content (see {@link #setRequestContent(Buffer)} or {@link #setRequestContentSource(InputStream)})
* <li>The request headers (see {@link #addRequestHeader(String, String)} or {@link #setRequestHeader(String, String)})
* <li>The request content (see {@link #setRequestContent(Buffer)} or {@link #setRequestContentSource(InputStream)})
* <li>The status of the exchange (see {@link #getStatus()})
* <li>Callbacks to handle state changes (see the onXxx methods such as {@link #onRequestComplete()} or {@link #onResponseComplete()})
* <li>The ability to intercept callbacks (see {@link #setEventListener(HttpEventListener)}
* </ul>
*
* The HttpExchange class is intended to be used by a developer wishing to have close asynchronous
* interaction with the the exchange. Typically a developer will extend the HttpExchange class with a derived
* class that implements some or all of the onXxx callbacks. There are also some predefined HttpExchange subtypes
* that can be used as a basis (see {@link ContentExchange} and {@link CachedExchange}.
* <p>The HttpExchange class is intended to be used by a developer wishing to have close asynchronous
* interaction with the the exchange.<br />
* Typically a developer will extend the HttpExchange class with a derived
* class that overrides some or all of the onXxx callbacks. <br />
* There are also some predefined HttpExchange subtypes that can be used as a basis,
* see {@link org.eclipse.jetty.client.ContentExchange} and {@link org.eclipse.jetty.client.CachedExchange}.</p>
*
* <p>Typically the HttpExchange is passed to a the {@link HttpClient#send(HttpExchange)} method, which in
* turn selects a {@link HttpDestination} and calls it's {@link HttpDestination#send(HttpExchange), which
* <p>Typically the HttpExchange is passed to the {@link HttpClient#send(HttpExchange)} method, which in
* turn selects a {@link HttpDestination} and calls its {@link HttpDestination#send(HttpExchange), which
* then creates or selects a {@link HttpConnection} and calls its {@link HttpConnection#send(HttpExchange).
* A developer may wish to directly call send on the destination or connection if they wish to bypass
* some handling provided (eg Cookie handling in the HttpDestination).
* some handling provided (eg Cookie handling in the HttpDestination).</p>
*
* <p>In some circumstances, the HttpClient or HttpDestination may wish to retry a HttpExchange (eg. failed
* pipeline request, authentication retry or redirection). In such cases, the HttpClient and/or HttpDestination
* may insert their own HttpExchangeListener to intercept and filter the call backs intended for the
* HttpExchange.
*
*
*
* HttpExchange.</p>
*/
public class HttpExchange
{
@ -73,43 +72,39 @@ public class HttpExchange
public static final int STATUS_COMPLETED = 7;
public static final int STATUS_EXPIRED = 8;
public static final int STATUS_EXCEPTED = 9;
public static final int STATUS_CANCELLING = 10;
public static final int STATUS_CANCELLED = 11;
Address _address;
String _method = HttpMethods.GET;
Buffer _scheme = HttpSchemes.HTTP_BUFFER;
int _version = HttpVersions.HTTP_1_1_ORDINAL;
String _uri;
int _status = STATUS_START;
HttpFields _requestFields = new HttpFields();
Buffer _requestContent;
InputStream _requestContentSource;
Buffer _requestContentChunk;
boolean _retryStatus = false;
/**
* boolean controlling if the exchange will have listeners autoconfigured by
* the destination
*/
boolean _configureListeners = true;
// HTTP protocol fields
private String _method = HttpMethods.GET;
private Buffer _scheme = HttpSchemes.HTTP_BUFFER;
private String _uri;
private int _version = HttpVersions.HTTP_1_1_ORDINAL;
private Address _address;
private final HttpFields _requestFields = new HttpFields();
private Buffer _requestContent;
private InputStream _requestContentSource;
private int _status = STATUS_START;
private Buffer _requestContentChunk;
private boolean _retryStatus = false;
// controls if the exchange will have listeners autoconfigured by the destination
private boolean _configureListeners = true;
private HttpEventListener _listener = new Listener();
private volatile HttpConnection connection;
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
// methods to build request
/* ------------------------------------------------------------ */
public int getStatus()
{
return _status;
synchronized (this)
{
return _status;
}
}
/* ------------------------------------------------------------ */
/**
* @deprecated
* @param status the status to wait for
* @throws InterruptedException if the waiting thread is interrupted
* @deprecated Use {@link #waitForDone()} instead
*/
public void waitForStatus(int status) throws InterruptedException
{
@ -122,94 +117,213 @@ public class HttpExchange
}
}
public int waitForDone () throws InterruptedException
public int waitForDone() throws InterruptedException
{
synchronized (this)
{
while (!isDone(_status))
this.wait();
return _status;
}
return _status;
}
/* ------------------------------------------------------------ */
public void reset()
{
setStatus(STATUS_START);
}
/* ------------------------------------------------------------ */
void setStatus(int status)
{
synchronized (this)
try
{
_status = status;
this.notifyAll();
try
synchronized (this)
{
switch (status)
// Wakeup any waiter, no matter if the status is really changed
notifyAll();
// State machine: from which old status you can go into which new status
switch (_status)
{
case STATUS_START:
switch (status)
{
case STATUS_WAITING_FOR_CONNECTION:
case STATUS_WAITING_FOR_COMMIT:
case STATUS_CANCELLING:
_status = status;
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
case STATUS_WAITING_FOR_CONNECTION:
switch (status)
{
case STATUS_WAITING_FOR_COMMIT:
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
_status = status;
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
case STATUS_WAITING_FOR_COMMIT:
switch (status)
{
case STATUS_SENDING_REQUEST:
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
_status = status;
break;
case STATUS_EXPIRED:
_status = status;
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
case STATUS_SENDING_REQUEST:
switch (status)
{
case STATUS_WAITING_FOR_RESPONSE:
_status = status;
getEventListener().onRequestCommitted();
break;
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
_status = status;
break;
case STATUS_EXPIRED:
_status = status;
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
case HttpExchange.STATUS_WAITING_FOR_RESPONSE:
getEventListener().onRequestCommitted();
case STATUS_WAITING_FOR_RESPONSE:
switch (status)
{
case STATUS_PARSING_HEADERS:
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
_status = status;
break;
case STATUS_EXPIRED:
_status = status;
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
case STATUS_PARSING_HEADERS:
switch (status)
{
case STATUS_PARSING_CONTENT:
_status = status;
getEventListener().onResponseHeaderComplete();
break;
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
_status = status;
break;
case STATUS_EXPIRED:
_status = status;
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
case STATUS_PARSING_CONTENT:
getEventListener().onResponseHeaderComplete();
switch (status)
{
case STATUS_COMPLETED:
_status = status;
getEventListener().onResponseComplete();
break;
case STATUS_CANCELLING:
case STATUS_EXCEPTED:
_status = status;
break;
case STATUS_EXPIRED:
_status = status;
getEventListener().onExpire();
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
case STATUS_COMPLETED:
getEventListener().onResponseComplete();
switch (status)
{
case STATUS_START:
_status = status;
break;
case STATUS_CANCELLING:
case STATUS_EXPIRED:
// Don't change the status, it's too late
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
case STATUS_CANCELLING:
switch (status)
{
case STATUS_CANCELLED:
_status = status;
break;
default:
// Ignore other statuses, we're cancelling
break;
}
break;
case STATUS_EXCEPTED:
case STATUS_EXPIRED:
getEventListener().onExpire();
case STATUS_CANCELLED:
switch (status)
{
case STATUS_START:
_status = status;
break;
default:
throw new IllegalStateException(_status + " => " + status);
}
break;
default:
// Here means I allowed to set a state that I don't recognize
throw new AssertionError(_status + " => " + status);
}
}
catch (IOException e)
{
Log.warn(e);
}
}
catch (IOException x)
{
Log.warn(x);
}
}
/* ------------------------------------------------------------ */
public boolean isDone (int status)
{
return ((status == STATUS_COMPLETED) || (status == STATUS_EXPIRED) || (status == STATUS_EXCEPTED));
return status == STATUS_COMPLETED ||
status == STATUS_EXPIRED ||
status == STATUS_EXCEPTED ||
status == STATUS_CANCELLED;
}
/* ------------------------------------------------------------ */
public HttpEventListener getEventListener()
{
return _listener;
}
/* ------------------------------------------------------------ */
public void setEventListener(HttpEventListener listener)
{
_listener=listener;
}
/* ------------------------------------------------------------ */
/**
* @param url Including protocol, host and port
*/
@ -236,56 +350,53 @@ public class HttpExchange
String completePath = uri.getCompletePath();
if (completePath == null)
completePath = "/";
setURI(completePath);
}
/* ------------------------------------------------------------ */
/**
* @param address
* @param address the address of the server
*/
public void setAddress(Address address)
{
_address = address;
}
/* ------------------------------------------------------------ */
/**
* @return
* @return the address of the server
*/
public Address getAddress()
{
return _address;
}
/* ------------------------------------------------------------ */
/**
* @param scheme
* @param scheme the scheme of the URL (for example 'http')
*/
public void setScheme(Buffer scheme)
{
_scheme = scheme;
}
/* ------------------------------------------------------------ */
/**
* @return
* @return the scheme of the URL
*/
public Buffer getScheme()
{
return _scheme;
}
/* ------------------------------------------------------------ */
/**
* @param version as integer, 9, 10 or 11 for 0.9, 1.0 or 1.1
* @param version the HTTP protocol version as integer, 9, 10 or 11 for 0.9, 1.0 or 1.1
*/
public void setVersion(int version)
{
_version = version;
}
/* ------------------------------------------------------------ */
/**
* @param version the HTTP protocol version as string
*/
public void setVersion(String version)
{
CachedBuffer v = HttpVersions.CACHE.get(version);
@ -295,139 +406,127 @@ public class HttpExchange
_version = v.getOrdinal();
}
/* ------------------------------------------------------------ */
/**
* @return
* @return the HTTP protocol version as integer
* @see #setVersion(int)
*/
public int getVersion()
{
return _version;
}
/* ------------------------------------------------------------ */
/**
* @param method
* @param method the HTTP method (for example 'GET')
*/
public void setMethod(String method)
{
_method = method;
}
/* ------------------------------------------------------------ */
/**
* @return
* @return the HTTP method
*/
public String getMethod()
{
return _method;
}
/* ------------------------------------------------------------ */
/**
* @return
* @return the path of the URL
*/
public String getURI()
{
return _uri;
}
/* ------------------------------------------------------------ */
/**
* @param uri
* @param uri the path of the URL (for example '/foo/bar?a=1')
*/
public void setURI(String uri)
{
_uri = uri;
}
/* ------------------------------------------------------------ */
/**
* @param name
* @param value
* Adds the specified request header
* @param name the header name
* @param value the header value
*/
public void addRequestHeader(String name, String value)
{
getRequestFields().add(name,value);
}
/* ------------------------------------------------------------ */
/**
* @param name
* @param value
* Adds the specified request header
* @param name the header name
* @param value the header value
*/
public void addRequestHeader(Buffer name, Buffer value)
{
getRequestFields().add(name,value);
}
/* ------------------------------------------------------------ */
/**
* @param name
* @param value
* Sets the specified request header
* @param name the header name
* @param value the header value
*/
public void setRequestHeader(String name, String value)
{
getRequestFields().put(name,value);
}
/* ------------------------------------------------------------ */
/**
* @param name
* @param value
* Sets the specified request header
* @param name the header name
* @param value the header value
*/
public void setRequestHeader(Buffer name, Buffer value)
{
getRequestFields().put(name,value);
}
/* ------------------------------------------------------------ */
/**
* @param value
* @param value the content type of the request
*/
public void setRequestContentType(String value)
{
getRequestFields().put(HttpHeaders.CONTENT_TYPE_BUFFER,value);
}
/* ------------------------------------------------------------ */
/**
* @return
* @return the request headers
*/
public HttpFields getRequestFields()
{
return _requestFields;
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
// methods to commit and/or send the request
/* ------------------------------------------------------------ */
/**
* @param requestContent
* @param requestContent the request content
*/
public void setRequestContent(Buffer requestContent)
{
_requestContent = requestContent;
}
/* ------------------------------------------------------------ */
/**
* @param in
* @param stream the request content as a stream
*/
public void setRequestContentSource(InputStream in)
public void setRequestContentSource(InputStream stream)
{
_requestContentSource = in;
_requestContentSource = stream;
}
/* ------------------------------------------------------------ */
/**
* @return the request content as a stream
*/
public InputStream getRequestContentSource()
{
return _requestContentSource;
}
/* ------------------------------------------------------------ */
public Buffer getRequestContentChunk() throws IOException
{
synchronized (this)
@ -452,126 +551,182 @@ public class HttpExchange
}
}
/* ------------------------------------------------------------ */
/**
* @return the request content
*/
public Buffer getRequestContent()
{
return _requestContent;
}
/**
* @return whether a retry will be attempted or not
*/
public boolean getRetryStatus()
{
return _retryStatus;
}
public void setRetryStatus( boolean retryStatus )
/**
* @param retryStatus whether a retry will be attempted or not
*/
public void setRetryStatus(boolean retryStatus)
{
_retryStatus = retryStatus;
}
/* ------------------------------------------------------------ */
/** Cancel this exchange
* Currently this implementation does nothing.
/**
* Initiates the cancelling of this exchange.
* The status of the exchange is set to {@link #STATUS_CANCELLING}.
* Cancelling the exchange is an asynchronous operation with respect to the request/response,
* and as such checking the request/response status of a cancelled exchange may return undefined results
* (for example it may have only some of the response headers being sent by the server).
* The cancelling of the exchange is completed when the exchange status (see {@link #getStatus()}) is
* {@link #STATUS_CANCELLED}, and this can be waited using {@link #waitForDone()}.
*/
public void cancel()
{
setStatus(STATUS_CANCELLING);
abort();
}
private void abort()
{
HttpConnection httpConnection = this.connection;
if (httpConnection != null)
{
try
{
// Closing the connection here will cause the connection
// to be returned in HttpConnection.handle()
httpConnection.close();
}
catch (IOException x)
{
Log.debug(x);
}
}
}
void associate(HttpConnection connection)
{
this.connection = connection;
if (getStatus() == STATUS_CANCELLING)
abort();
}
boolean isAssociated()
{
return this.connection != null;
}
HttpConnection disassociate(HttpConnection connection)
{
HttpConnection result = this.connection;
this.connection = null;
if (getStatus() == STATUS_CANCELLING)
setStatus(STATUS_CANCELLED);
return result;
}
/* ------------------------------------------------------------ */
public String toString()
{
return "HttpExchange@" + hashCode() + "=" + _method + "//" + _address.getHost() + ":" + _address.getPort() + _uri + "#" + _status;
return getClass().getSimpleName() + "@" + hashCode() + "=" + _method + "//" + _address + _uri + "#" + getStatus();
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
// methods to handle response
/**
* Called when the request headers has been sent
* @throws IOException
* Callback called when the request headers have been sent to the server.
* This implementation does nothing.
* @throws IOException allowed to be thrown by overriding code
*/
protected void onRequestCommitted() throws IOException
{
}
/**
* Called when the request and it's body have been sent.
* @throws IOException
* Callback called when the request and its body have been sent to the server.
* This implementation does nothing.
* @throws IOException allowed to be thrown by overriding code
*/
protected void onRequestComplete() throws IOException
{
}
/**
* Called when a response status line has been received.
* @param version HTTP version
* @param status HTTP status code
* @param reason HTTP status code reason string
* @throws IOException
* Callback called when a response status line has been received from the server.
* This implementation does nothing.
* @param version the HTTP version
* @param status the HTTP status code
* @param reason the HTTP status reason string
* @throws IOException allowed to be thrown by overriding code
*/
protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
{
}
/**
* Called for each response header received
* @param name header name
* @param value header value
* @throws IOException
* Callback called for each response header received from the server.
* This implementation does nothing.
* @param name the header name
* @param value the header value
* @throws IOException allowed to be thrown by overriding code
*/
protected void onResponseHeader(Buffer name, Buffer value) throws IOException
{
}
/**
* Called when the response header has been completely received.
* @throws IOException
* Callback called when the response headers have been completely received from the server.
* This implementation does nothing.
* @throws IOException allowed to be thrown by overriding code
*/
protected void onResponseHeaderComplete() throws IOException
{
}
/**
* Called for each chunk of the response content received.
* @param content
* @throws IOException
* Callback called for each chunk of the response content received from the server.
* This implementation does nothing.
* @param content the buffer holding the content chunk
* @throws IOException allowed to be thrown by overriding code
*/
protected void onResponseContent(Buffer content) throws IOException
{
}
/**
* Called when the entire response has been received
* @throws IOException
* Callback called when the entire response has been received from the server
* This implementation does nothing.
* @throws IOException allowed to be thrown by overriding code
*/
protected void onResponseComplete() throws IOException
{
}
/**
* Called when an exception was thrown during an attempt to open a connection
* @param ex
* Callback called when an exception was thrown during an attempt to establish the connection
* with the server (for example the server is not listening).
* This implementation logs a warning.
* @param x the exception thrown attempting to establish the connection with the server
*/
protected void onConnectionFailed(Throwable ex)
protected void onConnectionFailed(Throwable x)
{
Log.warn("CONNECTION FAILED on " + this,ex);
Log.warn("CONNECTION FAILED " + this,x);
}
/**
* Called when any other exception occurs during handling for the exchange
* @param ex
* Callback called when any other exception occurs during the handling of this exchange.
* This implementation logs a warning.
* @param x the exception thrown during the handling of this exchange
*/
protected void onException(Throwable ex)
protected void onException(Throwable x)
{
Log.warn("EXCEPTION on " + this,ex);
Log.warn("EXCEPTION " + this,x);
}
/**
* Called when no response has been received within the timeout.
* Callback called when no response has been received within the timeout.
* This implementation logs a warning.
*/
protected void onExpire()
{
@ -579,27 +734,29 @@ public class HttpExchange
}
/**
* Called when the request is retried (due to failures or authentication).
* Implementations may need to reset any consumable content that needs to
* be sent.
* @throws IOException
* Callback called when the request is retried (due to failures or authentication).
* Implementations may need to reset any consumable content that needs to be sent.
* This implementation does nothing.
* @throws IOException allowed to be thrown by overriding code
*/
protected void onRetry() throws IOException
{}
{
}
/**
* true of the exchange should have listeners configured for it by the destination
*
* @return true if the exchange should have listeners configured for it by the destination,
* false if this is being managed elsewhere
*
* @return
* @see #setConfigureListeners(boolean)
*/
public boolean configureListeners()
{
return _configureListeners;
}
public void setConfigureListeners(boolean autoConfigure )
/**
* @param autoConfigure whether the listeners are configured by the destination or elsewhere
*/
public void setConfigureListeners(boolean autoConfigure)
{
this._configureListeners = autoConfigure;
}
@ -665,14 +822,13 @@ public class HttpExchange
}
catch (IOException e)
{
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
Log.debug(e);
}
}
}
/**
* @deprecated use {@link org.eclipse.jetty.client.CachedExchange}
*
* @deprecated use {@link org.eclipse.jetty.client.CachedExchange} instead
*/
public static class CachedExchange extends org.eclipse.jetty.client.CachedExchange
{
@ -683,14 +839,9 @@ public class HttpExchange
}
/**
* @deprecated use {@link org.eclipse.jetty.client.ContentExchange}
*
* @deprecated use {@link org.eclipse.jetty.client.ContentExchange} instead
*/
public static class ContentExchange extends org.eclipse.jetty.client.ContentExchange
{
}
}

View File

@ -0,0 +1,388 @@
/*
* Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package org.eclipse.jetty.client;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import junit.framework.TestCase;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
/**
* @version $Revision$ $Date$
*/
public abstract class AbstractHttpExchangeCancelTest extends TestCase
{
private Server server;
private Connector connector;
@Override
protected void setUp() throws Exception
{
server = new Server();
connector = new SelectChannelConnector();
server.addConnector(connector);
server.setHandler(new EmptyHandler());
server.start();
}
@Override
protected void tearDown() throws Exception
{
server.stop();
server.join();
}
public void testHttpExchangeCancelOnSend1() throws Exception
{
// One of the first things that HttpClient.send() does
// is to change the status of the exchange
// We exploit that to be sure the exchange is canceled
// without race conditions
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
void setStatus(int status)
{
// Cancel before setting the new status
if (getStatus() == HttpExchange.STATUS_START &&
status == STATUS_WAITING_FOR_CONNECTION)
cancel();
super.setStatus(status);
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
getHttpClient().send(exchange);
// Cancelling here is wrong and makes the test fail spuriously
// due to a race condition with send(): the send() can complete
// before the exchange is canceled so it will be in STATUS_COMPLETE
// which will fail the test.
// exchange.cancel();
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_CANCELLED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeCancelOnSend2() throws Exception
{
// One of the first things that HttpClient.send() does
// is to change the status of the exchange
// We exploit that to be sure the exchange is canceled
// without race conditions
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
void setStatus(int status)
{
// Cancel after setting the new status
int oldStatus = getStatus();
super.setStatus(status);
if (oldStatus == STATUS_START &&
getStatus() == HttpExchange.STATUS_WAITING_FOR_CONNECTION)
cancel();
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
getHttpClient().send(exchange);
// Cancelling here is wrong and makes the test fail spuriously
// due to a race condition with send(): the send() can complete
// before the exchange is canceled so it will be in STATUS_COMPLETE
// which will fail the test.
// exchange.cancel();
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_CANCELLED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeCancelOnRequestCommitted() throws Exception
{
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
protected void onRequestCommitted() throws IOException
{
super.onRequestCommitted();
cancel();
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
getHttpClient().send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_CANCELLED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeCancelOnRequestComplete() throws Exception
{
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
protected void onRequestComplete() throws IOException
{
super.onRequestComplete();
cancel();
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
getHttpClient().send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_CANCELLED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeCancelOnResponseStatus() throws Exception
{
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
{
super.onResponseStatus(version, status, reason);
cancel();
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
getHttpClient().send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_CANCELLED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeCancelOnResponseHeader() throws Exception
{
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
protected void onResponseHeader(Buffer name, Buffer value) throws IOException
{
super.onResponseHeader(name, value);
cancel();
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
getHttpClient().send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_CANCELLED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeCancelOnResponseHeadersComplete() throws Exception
{
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
protected void onResponseHeaderComplete() throws IOException
{
super.onResponseHeaderComplete();
cancel();
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
getHttpClient().send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_CANCELLED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeCancelOnResponseContent() throws Exception
{
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
protected void onResponseContent(Buffer content) throws IOException
{
super.onResponseContent(content);
cancel();
}
};
exchange.setAddress(newAddress());
exchange.setURI("/?action=body");
getHttpClient().send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_CANCELLED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeCancelOnResponseComplete() throws Exception
{
TestHttpExchange exchange = new TestHttpExchange()
{
@Override
protected void onResponseComplete() throws IOException
{
super.onResponseComplete();
cancel();
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
getHttpClient().send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_COMPLETED, status);
assertTrue(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
public void testHttpExchangeOnServerException() throws Exception
{
TestHttpExchange exchange = new TestHttpExchange();
exchange.setAddress(newAddress());
exchange.setURI("/?action=throw");
getHttpClient().send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_COMPLETED, status);
assertTrue(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertFalse(exchange.isAssociated());
}
protected abstract HttpClient getHttpClient();
protected Address newAddress()
{
return new Address("localhost", connector.getLocalPort());
}
private static class EmptyHandler extends AbstractHandler
{
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
{
request.setHandled(true);
String action = httpRequest.getParameter("action");
if (action != null)
{
if ("body".equals(action))
{
ServletOutputStream output = httpResponse.getOutputStream();
output.write("body".getBytes("UTF-8"));
// output.flush();
}
else if ("throw".equals(action))
{
throw new ServletException();
}
else if (action.startsWith("wait"))
{
long sleep = Long.valueOf(action.substring("wait".length()));
try
{
Thread.sleep(sleep);
}
catch (InterruptedException x)
{
Thread.currentThread().interrupt();
}
}
}
}
}
protected static class TestHttpExchange extends ContentExchange
{
private volatile boolean responseCompleted;
private volatile boolean failed;
private volatile boolean expired;
protected TestHttpExchange()
{
super(true);
}
@Override
protected void onResponseComplete() throws IOException
{
this.responseCompleted = true;
}
public boolean isResponseCompleted()
{
return responseCompleted;
}
@Override
protected void onException(Throwable ex)
{
this.failed = true;
}
public boolean isFailed()
{
return failed;
}
@Override
protected void onExpire()
{
this.expired = true;
}
public boolean isExpired()
{
return expired;
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package org.eclipse.jetty.client;
/**
* @version $Revision$ $Date$
*/
public class BlockingHttpExchangeCancelTest extends AbstractHttpExchangeCancelTest
{
private HttpClient httpClient;
@Override
protected void setUp() throws Exception
{
super.setUp();
httpClient = new HttpClient();
httpClient.setConnectorType(HttpClient.CONNECTOR_SOCKET);
httpClient.start();
}
@Override
protected void tearDown() throws Exception
{
httpClient.stop();
super.tearDown();
}
protected HttpClient getHttpClient()
{
return httpClient;
}
public void DONTtestHttpExchangeOnExpire() throws Exception
{
HttpClient httpClient = getHttpClient();
httpClient.stop();
httpClient.setSoTimeout(2000);
httpClient.start();
TestHttpExchange exchange = new TestHttpExchange();
exchange.setAddress(newAddress());
exchange.setURI("/?action=wait5000");
httpClient.send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_EXPIRED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertTrue(exchange.isExpired());
assertFalse(exchange.isAssociated());
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package org.eclipse.jetty.client;
/**
* @version $Revision$ $Date$
*/
public class NonBlockingHttpExchangeCancelTest extends AbstractHttpExchangeCancelTest
{
private HttpClient httpClient;
@Override
protected void setUp() throws Exception
{
super.setUp();
httpClient = new HttpClient();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
httpClient.start();
}
@Override
protected void tearDown() throws Exception
{
httpClient.stop();
super.tearDown();
}
protected HttpClient getHttpClient()
{
return httpClient;
}
public void testHttpExchangeOnExpire() throws Exception
{
HttpClient httpClient = getHttpClient();
httpClient.stop();
httpClient.setTimeout(2000);
httpClient.start();
TestHttpExchange exchange = new TestHttpExchange();
exchange.setAddress(newAddress());
exchange.setURI("/?action=wait5000");
httpClient.send(exchange);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_EXPIRED, status);
assertFalse(exchange.isResponseCompleted());
assertFalse(exchange.isFailed());
assertTrue(exchange.isExpired());
assertFalse(exchange.isAssociated());
}
}