357240 fixed client connection recycle

This commit is contained in:
Greg Wilkins 2011-09-29 15:50:09 +10:00
parent f14bbbfc0a
commit d0a2557527
10 changed files with 60 additions and 47 deletions

View File

@ -353,6 +353,20 @@ public class HttpConnection extends AbstractConnection implements Dumpable
complete = true;
}
}
// if the endpoint is closed, but the parser incomplete
if (!_endp.isOpen() && !(_parser.isComplete()||_parser.isIdle()))
{
// we wont be called again so let the parser see the close
complete=true;
_parser.parseAvailable();
if (!(_parser.isComplete()||_parser.isIdle()))
{
LOG.warn("Incomplete {} {}",_parser,_endp);
if (_exchange!=null)
_exchange.cancel();
}
}
}
/* TODO - is this needed ?
@ -435,7 +449,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
_parser.returnBuffers();
// Do we have more stuff to write?
if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp.isOpen() && _endp instanceof AsyncEndPoint)
{
// Assume we are write blocked!
((AsyncEndPoint)_endp).scheduleWrite();

View File

@ -584,7 +584,7 @@ public class HttpDestination implements Dumpable
@Override
public synchronized String toString()
{
return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize);
}
public synchronized String toDetailString()

View File

@ -32,17 +32,12 @@ public class AsyncSslHttpExchangeTest extends SslHttpExchangeTest
_port = _server.getConnectors()[0].getLocalPort();
}
@Override
public void testPerf() throws Exception
{
super.testPerf();
}
@Test
public void testPerf1() throws Exception
{
sender(1,true);
sender(10,true);
}
}

View File

@ -51,7 +51,7 @@ import org.junit.Test;
*/
public class HttpExchangeTest
{
final static boolean verbose=true;
final static boolean verbose=false;
protected static int _maxConnectionsPerAddress = 2;
protected static String _scheme = "http";
protected static Server _server;
@ -105,6 +105,8 @@ public class HttpExchangeTest
{
sender(1,false);
sender(1,true);
sender(10,false);
sender(10,true);
if (Stress.isEnabled())
{
@ -113,11 +115,6 @@ public class HttpExchangeTest
sender(10000,false);
sender(10000,true);
}
else
{
sender(10,false);
sender(10,true);
}
}
/* ------------------------------------------------------------ */
@ -130,7 +127,7 @@ public class HttpExchangeTest
{
_count.set(0);
final CountDownLatch complete = new CountDownLatch(nb);
final CountDownLatch latch = new CountDownLatch(nb);
final AtomicInteger allcontent = new AtomicInteger(nb);
HttpExchange[] httpExchange = new HttpExchange[nb];
long start = System.currentTimeMillis();
for (int i = 0; i < nb; i++)
@ -201,14 +198,12 @@ public class HttpExchangeTest
protected void onResponseComplete()
{
if (verbose)
System.err.println("] ==");
System.err.println("] == "+len+" "+complete.getCount()+"/"+nb);
result = "complete";
if (len == 2009)
latch.countDown();
allcontent.decrementAndGet();
else
{
System.err.println(n + " ONLY " + len+ "/2009");
}
complete.countDown();
}
@ -263,22 +258,13 @@ public class HttpExchangeTest
_httpClient.send(httpExchange[n]);
}
Thread.sleep(2000);
System.err.println(_httpClient.dump());
assertTrue(complete.await(10,TimeUnit.SECONDS));
long elapsed=System.currentTimeMillis()-start;
if (!complete.await(2,TimeUnit.SECONDS))
System.err.println(_httpClient.dump());
// make windows-friendly ... System.currentTimeMillis() on windows is dope!
/*
if(elapsed>0)
System.err.println(nb+"/"+_count+" c="+close+" rate="+(nb*1000/elapsed));
*/
assertTrue(complete.await(20,TimeUnit.SECONDS));
assertEquals("nb="+nb+" close="+close,0,latch.getCount());
assertEquals("nb="+nb+" close="+close,0,allcontent.get());
}
/* ------------------------------------------------------------ */

View File

@ -266,7 +266,17 @@ public class HttpParser implements Parser
// Fill buffer if we can
if (length == 0)
{
long filled=fill();
long filled=-1;
IOException ex=null;
try
{
filled=fill();
}
catch(IOException e)
{
LOG.debug(this.toString(),e);
ex=e;
}
if (filled < 0 || _endp.isInputShutdown())
{
@ -291,6 +301,8 @@ public class HttpParser implements Parser
return 1;
}
if (ex!=null)
throw ex;
return -1;
}
length=_buffer.length();
@ -927,15 +939,7 @@ public class HttpParser implements Parser
if (_buffer.space() == 0)
throw new HttpException(HttpStatus.REQUEST_ENTITY_TOO_LARGE_413, "FULL "+(_buffer==_body?"body":"head"));
try
{
return _endp.fill(_buffer);
}
catch(IOException e)
{
LOG.debug(e);
throw (e instanceof EofException) ? e:new EofException(e);
}
return _endp.fill(_buffer);
}
return -1;

View File

@ -539,9 +539,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
cancelIdle();
if (_open)
{
_selectSet.destroyEndPoint(this);
}
_open=false;
_key = null;
}

View File

@ -845,6 +845,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
/* ------------------------------------------------------------ */
public void destroyEndPoint(SelectChannelEndPoint endp)
{
LOG.debug("destroyEndPoint {}",endp);
_endPoints.remove(endp);
endPointClosed(endp);
}

View File

@ -76,9 +76,8 @@ public class BlockingHttpConnection extends HttpConnection
LOG.debug(e);
}
_generator.sendError(e.getStatus(), e.getReason(), null, true);
_parser.reset();
_endp.close();
_endp.shutdownOutput();
}
finally
{

View File

@ -23,9 +23,12 @@ import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class LocalConnector extends AbstractConnector
{
private static final Logger LOG = Log.getLogger(LocalConnector.class);
private final BlockingQueue<Request> _requests = new LinkedBlockingQueue<Request>();
public LocalConnector()
@ -135,8 +138,14 @@ public class LocalConnector extends AbstractConnector
}
}
}
catch (IOException x)
{
LOG.debug(x);
leaveOpen = false;
}
catch (Exception x)
{
LOG.warn(x);
leaveOpen = false;
}
finally

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@ -251,6 +252,12 @@ public class SocketConnector extends AbstractConnector
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
catch (SocketException e)
{
LOG.debug("EOF", e);
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
catch (HttpException e)
{
LOG.debug("BAD", e);