Non SSL client working

This commit is contained in:
Greg Wilkins 2011-10-24 17:14:56 +11:00
parent a642f7b55c
commit 6a17c69bfb
15 changed files with 145 additions and 123 deletions

View File

@ -1,10 +1,8 @@
package org.eclipse.jetty.client;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import org.eclipse.jetty.http.AbstractGenerator;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
@ -13,7 +11,6 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -24,10 +21,13 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
private boolean _requestComplete;
private int _status;
private Buffer _requestContentChunk;
private final AsyncEndPoint _asyncEndp;
AsyncHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
{
super(requestBuffers,responseBuffers,endp);
_asyncEndp=(AsyncEndPoint)endp;
}
protected void reset() throws IOException
@ -45,14 +45,13 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
{
boolean failed = false;
int loops=1000; // TODO remove this safety net
int loops=10000; // TODO remove this safety net
// While the endpoint is open
// AND we have more characters to read OR we made some progress
while (_endp.isOpen() &&
(_parser.isMoreInBuffer() || _endp.isBufferingInput() || progress))
{
// System.err.println("loop");
if (loops--<0)
{
System.err.println("LOOPING!!!");
@ -76,40 +75,35 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
// Generate output
if (_generator.isCommitted() && !_generator.isComplete())
{
int flushed=_generator.flushBuffer();
if (flushed>0)
if (_generator.flushBuffer()>0)
progress=true;
// Is there more content to send or should we complete the generator
if (!_generator.isComplete() && _generator.isEmpty())
if (_generator.isState(AbstractGenerator.STATE_CONTENT))
{
if (exchange!=null)
// Look for more content to send.
if (_requestContentChunk==null)
_requestContentChunk = exchange.getRequestContentChunk(null);
if (_requestContentChunk==null)
{
Buffer chunk = _exchange.getRequestContentChunk();
if (chunk!=null)
_generator.addContent(chunk,false);
else
{
_generator.complete();
progress=true;
}
}
else
{
_generator.complete();
progress=true;
_generator.complete();
}
else if (_generator.isEmpty())
{
progress=true;
Buffer chunk=_requestContentChunk;
_requestContentChunk=exchange.getRequestContentChunk(null);
_generator.addContent(chunk,_requestContentChunk==null);
}
}
else
{
_generator.complete();
progress=true;
}
}
// Signal request completion
if (_generator.isComplete() && !_requestComplete)
{
progress=true;
_requestComplete = true;
exchange.getEventListener().onRequestComplete();
}
@ -121,6 +115,11 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
// Read any input that is available
if (!_parser.isComplete() && _parser.parseAvailable())
progress=true;
// Has any IO been done by the endpoint itself since last loop
if (_asyncEndp.hasProgressed())
progress=true;
}
catch (Throwable e)
{
@ -138,7 +137,8 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
// Cancelling the exchange causes an exception as we close the connection,
// but we don't report it as it is normal cancelling operation
if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
!exchange.isDone())
{
exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
exchange.getEventListener().onException(e);
@ -220,20 +220,6 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
{
_parser.returnBuffers();
_generator.returnBuffers();
// TODO why is this needed?
if (!_generator.isEmpty())
{
if (((SelectChannelEndPoint)_endp).isWritable())
{
System.err.println("early exit??? "+progress);
System.err.println(_endp);
System.err.println(_generator);
System.exit(1);
}
((SelectChannelEndPoint)_endp).scheduleWrite();
}
}
return connection;

View File

@ -98,7 +98,6 @@ public class HttpExchange
private InputStream _requestContentSource;
private AtomicInteger _status = new AtomicInteger(STATUS_START);
private Buffer _requestContentChunk;
private boolean _retryStatus = false;
// controls if the exchange will have listeners autoconfigured by the destination
private boolean _configureListeners = true;
@ -705,27 +704,21 @@ public class HttpExchange
return _requestContentSource;
}
public Buffer getRequestContentChunk() throws IOException
public Buffer getRequestContentChunk(Buffer buffer) throws IOException
{
synchronized (this)
{
if (_requestContentSource!=null)
{
if (_requestContentChunk == null)
_requestContentChunk = new ByteArrayBuffer(4096); // TODO configure
else
{
if (_requestContentChunk.hasContent())
throw new IllegalStateException();
_requestContentChunk.clear();
}
if (buffer == null)
buffer = new ByteArrayBuffer(8192); // TODO configure
int read = _requestContentChunk.capacity();
int length = _requestContentSource.read(_requestContentChunk.array(),0,read);
int space = buffer.space();
int length = _requestContentSource.read(buffer.array(),buffer.putIndex(),space);
if (length >= 0)
{
_requestContentChunk.setPutIndex(length);
return _requestContentChunk;
buffer.setPutIndex(buffer.putIndex()+length);
return buffer;
}
}
return null;

View File

@ -30,4 +30,11 @@ public class AsyncSslHttpExchangeTest extends SslHttpExchangeTest
_port = _server.getConnectors()[0].getLocalPort();
}
@Override
public void testPerf() throws Exception
{
sender(1,true);
}
}

View File

@ -38,6 +38,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.server.Server;
@ -53,7 +54,7 @@ import org.junit.Test;
*/
public class HttpExchangeTest
{
final static boolean verbose=false;
final static boolean verbose=true;
protected static int _maxConnectionsPerAddress = 2;
protected static String _scheme = "http";
protected static Server _server;
@ -147,7 +148,7 @@ public class HttpExchangeTest
protected void onRequestCommitted()
{
if (verbose)
System.err.println("[ ");
System.err.println(n+" [ "+this);
result = "committed";
}
@ -156,7 +157,7 @@ public class HttpExchangeTest
protected void onRequestComplete() throws IOException
{
if (verbose)
System.err.println("[ ==");
System.err.println(n+" [ ==");
result = "sent";
}
@ -165,7 +166,7 @@ public class HttpExchangeTest
protected void onResponseStatus(Buffer version, int status, Buffer reason)
{
if (verbose)
System.err.println("] "+version+" "+status+" "+reason);
System.err.println(n+" ] "+version+" "+status+" "+reason);
result = "status";
}
@ -174,7 +175,7 @@ public class HttpExchangeTest
protected void onResponseHeader(Buffer name, Buffer value)
{
if (verbose)
System.err.println("] "+name+": "+value);
System.err.println(n+" ] "+name+": "+value);
}
/* ------------------------------------------------------------ */
@ -182,7 +183,7 @@ public class HttpExchangeTest
protected void onResponseHeaderComplete() throws IOException
{
if (verbose)
System.err.println("] -");
System.err.println(n+" ] -");
result = "content";
super.onResponseHeaderComplete();
}
@ -193,7 +194,7 @@ public class HttpExchangeTest
{
len += content.length();
if (verbose)
System.err.println("] "+content.length()+" -> "+len);
System.err.println(n+" ] "+content.length()+" -> "+len);
}
/* ------------------------------------------------------------ */
@ -201,12 +202,12 @@ public class HttpExchangeTest
protected void onResponseComplete()
{
if (verbose)
System.err.println("] == "+len+" "+complete.getCount()+"/"+nb);
System.err.println(n+" ] == "+len+" "+complete.getCount()+"/"+nb);
result = "complete";
if (len == 2009)
allcontent.decrementAndGet();
else
System.err.println(n + " ONLY " + len+ "/2009");
System.err.println(n+ " ONLY " + len+ "/2009");
complete.countDown();
}
@ -215,10 +216,10 @@ public class HttpExchangeTest
protected void onConnectionFailed(Throwable ex)
{
if (verbose)
System.err.println("] "+ex);
System.err.println(n+" ] "+ex);
complete.countDown();
result = "failed";
System.err.println(n + " FAILED " + ex);
System.err.println(n+ " FAILED " + ex);
super.onConnectionFailed(ex);
}
@ -227,10 +228,10 @@ public class HttpExchangeTest
protected void onException(Throwable ex)
{
if (verbose)
System.err.println("] "+ex);
System.err.println(n+" ] "+ex);
complete.countDown();
result = "excepted";
System.err.println(n + " EXCEPTED " + ex);
System.err.println(n+ " EXCEPTED " + ex);
super.onException(ex);
}
@ -239,7 +240,7 @@ public class HttpExchangeTest
protected void onExpire()
{
if (verbose)
System.err.println("] expired");
System.err.println(n+" ] expired");
complete.countDown();
result = "expired";
System.err.println(n + " EXPIRED " + len);
@ -478,10 +479,7 @@ public class HttpExchangeTest
@Test
public void testSlowPost() throws Exception
{
ContentExchange httpExchange=new ContentExchange()
{
};
ContentExchange httpExchange=new ContentExchange();
httpExchange.setURI(getBaseURI());
httpExchange.setMethod(HttpMethods.POST);
@ -497,6 +495,15 @@ public class HttpExchangeTest
if (_index>=data.length())
return -1;
try
{
Thread.sleep(5);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return data.charAt(_index++);
}

View File

@ -72,9 +72,6 @@ public class SslHttpExchangeTest extends HttpExchangeTest
@Override
public void testPerf() throws Exception
{
// TODO needs to be further investigated
Assume.assumeTrue(!OS.IS_OSX || Stress.isEnabled());
// TODO Resolve problems on IBM JVM https://bugs.eclipse.org/bugs/show_bug.cgi?id=304532
IgnoreTestOnBuggyIBM();
super.testPerf();

View File

@ -17,6 +17,7 @@ package org.eclipse.jetty.client.helperClasses;
import org.eclipse.jetty.http.ssl.SslContextFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
import org.eclipse.jetty.server.ssl.SslSocketConnector;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.log.Log;
@ -34,7 +35,7 @@ public abstract class AbstractSslServerAndClientCreator implements ServerAndClie
public Server createServer() throws Exception
{
Server server = new Server();
//SslSelectChannelConnector connector = new SslSelectChannelConnector();
// SslSelectChannelConnector connector = new SslSelectChannelConnector();
SslSocketConnector connector = new SslSocketConnector();
String keystore = MavenTestingUtils.getTestResourceFile("keystore").getAbsolutePath();
@ -44,7 +45,6 @@ public abstract class AbstractSslServerAndClientCreator implements ServerAndClie
cf.setKeyStorePath(keystore);
cf.setKeyStorePassword("storepwd");
cf.setKeyManagerPassword("keypwd");
connector.setAllowRenegotiate(true);
server.setConnectors(new Connector[]{ connector });
server.setHandler(new GenericServerHandler());

View File

@ -16,9 +16,9 @@ public class AsyncSslServerAndClientCreator extends AbstractSslServerAndClientCr
httpClient.setMaxConnectionsPerAddress(2);
String keystore = MavenTestingUtils.getTestResourceFile("keystore").getAbsolutePath();
httpClient.setKeyStoreInputStream(new FileInputStream(keystore));
httpClient.setKeyStorePassword("storepwd");
httpClient.setKeyManagerPassword("keypwd");
httpClient.getSslContextFactory().setKeyStorePath(keystore);
httpClient.getSslContextFactory().setKeyStorePassword("storepwd");
httpClient.getSslContextFactory().setKeyManagerPassword("keypwd");
httpClient.start();
return httpClient;
}

View File

@ -1093,9 +1093,10 @@ public class HttpGenerator extends AbstractGenerator
@Override
public String toString()
{
return "HttpGenerator s="+_state+
" h="+(_header==null?"null":_header.length())+
" b="+(_buffer==null?"null":_buffer.length())+
" c="+(_content==null?"null":_content.length());
return "HttpGenerator{s="+_state+
",h="+(_header==null?"":_header.length())+
",b="+(_buffer==null?"":_buffer.length())+
",c="+(_content==null?"":_content.length())+
"}";
}
}

View File

@ -300,6 +300,8 @@ public class HttpParser implements Parser
if (filled < 0 || _endp.isInputShutdown())
{
System.err.println("CLOSING f="+filled+"/"+_buffer.length()+" "+_endp);
_persistent=false;
// do we have content to deliver?
@ -1139,7 +1141,7 @@ public class HttpParser implements Parser
@Override
public String toString()
{
return "state=" + _state + " length=" + _length + " len=" + _contentLength;
return "HttpParser{s=" + _state + ",l=" + _length + ",c=" + _contentLength+"}";
}
/* ------------------------------------------------------------ */

View File

@ -228,7 +228,7 @@ public class SslContextFactory extends AbstractLifeCycle
if (_trustAll)
{
LOG.info("No keystore or trust store configured. ACCEPTING UNTRUSTED CERTIFICATES!!!!!");
LOG.debug("No keystore or trust store configured. ACCEPTING UNTRUSTED CERTIFICATES!!!!!");
// Create a trust manager that does not validate certificate chains
TrustManager trustAllCerts = new X509TrustManager()
{

View File

@ -15,13 +15,13 @@ public abstract class AbstractConnection implements Connection
public AbstractConnection(EndPoint endp)
{
_endp=endp;
_endp=(EndPoint)endp;
_timeStamp = System.currentTimeMillis();
}
public AbstractConnection(EndPoint endp,long timestamp)
{
_endp=endp;
_endp=(EndPoint)endp;
_timeStamp = timestamp;
}

View File

@ -39,4 +39,12 @@ public interface AsyncEndPoint extends EndPoint
*/
public void cancelIdle();
/* ------------------------------------------------------------ */
public boolean isWritable();
/* ------------------------------------------------------------ */
/**
* @return True if IO has been successfully performed since the last call to {@link #hasProgressed()}
*/
public boolean hasProgressed();
}

View File

@ -78,6 +78,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private boolean _ishut;
private volatile boolean _progressed;
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
throws IOException
@ -273,17 +275,15 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
/* ------------------------------------------------------------ */
/**
* @return True if the endpoint has produced/consumed bytes itself (non application data).
*/
public boolean isProgressing()
@Override
public int fill(Buffer buffer) throws IOException
{
return false;
int length=super.fill(buffer);
_progressed|=(length>0);
return length;
}
/* ------------------------------------------------------------ */
/*
*/
@Override
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
@ -299,8 +299,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
updateKey();
}
}
else
else if (l>0)
{
_progressed=true;
_writable=true;
}
return l;
}
@ -322,8 +325,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
updateKey();
}
}
else
else if (l>0)
{
_progressed=true;
_writable=true;
}
return l;
}
@ -439,12 +445,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
updateKey();
}
// TODO remove
/* ------------------------------------------------------------ */
public boolean isWritable()
{
return _writable;
}
/* ------------------------------------------------------------ */
public boolean hasProgressed()
{
boolean progressed=_progressed;
_progressed=false;
return progressed;
}
/* ------------------------------------------------------------ */
/**
* Updates selection key. Adds operations types to the selection key as needed. No operations
@ -672,9 +686,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
synchronized(this)
{
return "SCEP@" + hashCode() + _channel+
"[o="+isOpen()+" d=" + _dispatched + ",io=" + _interestOps+
",w=" + _writable + ",rb=" + _readBlocked + ",wb=" + _writeBlocked + "]";
return "SCEP@" + hashCode() +
"{"+_socket.getRemoteSocketAddress()+"->"+_socket.getLocalSocketAddress()+
(_dispatched?",D":"") +
(isOpen()?",open":"") +
(isInputShutdown()?",ishut":"") +
(isOutputShutdown()?",oshut":"") +
(_readBlocked?"":",RB") +
(_writeBlocked?"":",WB") +
(_writable?"":",!W") +
",io="+_interestOps +
((_key==null || !_key.isValid())?"!":(
(_key.isReadable()?"R":"")+
(_key.isWritable()?"W":"")))+
"}";
}
}

View File

@ -150,17 +150,6 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
}
}
/* ------------------------------------------------------------ */
/**
* @return True if the endpoint has produced/consumed bytes itself (non application data).
*/
public boolean isProgressing()
{
SSLEngineResult result = _result;
_result=null;
return result!=null && (result.bytesConsumed()>0 || result.bytesProduced()>0);
}
/* ------------------------------------------------------------ */
/**
* @return True if SSL re-negotiation is allowed (default false)
@ -765,9 +754,10 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
{
final NIOBuffer i=_inNIOBuffer;
final NIOBuffer o=_outNIOBuffer;
return "SSL"+super.toString()+","+(_engine==null?"-":_engine.getHandshakeStatus())+", in/out="+
(i==null?0:i.length())+"/"+(o==null?0:o.length())+
" bi/o="+isBufferingInput()+"/"+isBufferingOutput()+
" "+_result;
return "SSL"+super.toString()+
","+(_engine==null?"-":_engine.getHandshakeStatus())+
((i!=null&&i.length()>0)?(",in="+i.length()):"")+
((o!=null&&o.length()>0)?(",out="+o.length()):"")+
(_result==null?"":(","+_result.getStatus()+",bp="+_result.bytesProduced()+",bc="+_result.bytesConsumed()));
}
}

View File

@ -4,6 +4,7 @@ import java.io.IOException;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
@ -18,10 +19,12 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
private int _total_no_progress;
private final AsyncEndPoint _asyncEndp;
public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server)
{
super(connector,endpoint,server);
_asyncEndp=(AsyncEndPoint)endpoint;
}
public Connection handle() throws IOException
@ -62,6 +65,10 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
if (_endp.isBufferingOutput())
_endp.flush();
// Has any IO been done by the endpoint itself since last loop
if (_asyncEndp.hasProgressed())
progress=true;
}
catch (HttpException e)
{
@ -100,7 +107,6 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
}
}
some_progress|=progress|((SelectChannelEndPoint)_endp).isProgressing();
}
}
}