292546 Proactively enforce HttpClient idle timeout

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1013 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2009-10-26 06:20:09 +00:00
parent 583fa4dcde
commit a24f48c113
9 changed files with 305 additions and 134 deletions

View File

@ -11,6 +11,7 @@ jetty-7.0.1-SNAPSHOT
+ 291589 Update jetty-rewrite demo
+ 292642 Fix errors in embedded Jetty examples
+ 292825 Continuations ISE rather than ignore bad transitions
+ 292546 Proactively enforce HttpClient idle timeout
+ JETTY-937 More JVM bug work arounds. Insert pause if all else fails
+ JETTY-983 Send content-length with multipart ranges
+ JETTY-1114 unsynchronised WebAppClassloader.getResource(String)

View File

@ -94,6 +94,7 @@ public class HttpClient extends HttpBuffers implements Attributes
private long _idleTimeout = 20000;
private long _timeout = 320000;
private Timeout _timeoutQ = new Timeout();
private Timeout _idleTimeoutQ = new Timeout();
private Address _proxy;
private Authorization _proxyAuthentication;
private Set<String> _noProxy;
@ -246,6 +247,12 @@ public class HttpClient extends HttpBuffers implements Attributes
_timeoutQ.schedule(task);
}
/* ------------------------------------------------------------ */
public void scheduleIdle(Timeout.Task task)
{
_idleTimeoutQ.schedule(task);
}
/* ------------------------------------------------------------ */
public void cancel(Timeout.Task task)
{
@ -420,8 +427,10 @@ public class HttpClient extends HttpBuffers implements Attributes
{
super.doStart();
_timeoutQ.setNow();
_timeoutQ.setDuration(_timeout);
_timeoutQ.setNow();
_idleTimeoutQ.setDuration(_idleTimeout);
_idleTimeoutQ.setNow();
if (_threadPool == null)
{
@ -455,11 +464,11 @@ public class HttpClient extends HttpBuffers implements Attributes
{
while (isRunning())
{
_timeoutQ.setNow();
_timeoutQ.tick();
_timeoutQ.tick(System.currentTimeMillis());
_idleTimeoutQ.tick(_timeoutQ.getNow());
try
{
Thread.sleep(1000);
Thread.sleep(200);
}
catch (InterruptedException e)
{
@ -470,6 +479,12 @@ public class HttpClient extends HttpBuffers implements Attributes
}
/* ------------------------------------------------------------ */
long getNow()
{
return _timeoutQ.getNow();
}
/* ------------------------------------------------------------ */
protected void doStop() throws Exception
{
@ -485,6 +500,7 @@ public class HttpClient extends HttpBuffers implements Attributes
}
_timeoutQ.cancelAll();
_idleTimeoutQ.cancelAll();
super.doStop();
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.security.Authorization;
import org.eclipse.jetty.http.HttpFields;
@ -55,6 +56,7 @@ public class HttpConnection implements Connection
private volatile HttpExchange _exchange;
private HttpExchange _pipeline;
private final Timeout.Task _timeout = new TimeoutTask();
private AtomicBoolean _idle = new AtomicBoolean(false);
public void dump() throws IOException
{
@ -535,30 +537,39 @@ public class HttpConnection implements Connection
return toString() + " ex=" + _exchange + " " + _timeout.getAge();
}
/**
* @return the last
*/
public long getLast()
{
return _last;
}
/**
* @param last
* the last to set
*/
public void setLast(long last)
{
_last = last;
}
public void close() throws IOException
{
_endp.close();
}
public void setIdleTimeout(long expire)
{
synchronized (this)
{
if (_idle.compareAndSet(false,true))
_destination.getHttpClient().scheduleIdle(_timeout);
else
throw new IllegalStateException();
}
}
public boolean cancelIdleTimeout()
{
synchronized (this)
{
if (_idle.compareAndSet(true,false))
{
_destination.getHttpClient().cancel(_timeout);
return true;
}
}
return false;
}
private class TimeoutTask extends Timeout.Task
{
@Override
public void expired()
{
HttpExchange ex = null;
@ -573,6 +584,10 @@ public class HttpConnection implements Connection
ex.disassociate(HttpConnection.this);
_destination.returnConnection(HttpConnection.this, true);
}
else if (_idle.compareAndSet(true,false))
{
_destination.returnIdleConnection(HttpConnection.this);
}
}
}
catch (Exception e)
@ -597,4 +612,5 @@ public class HttpConnection implements Connection
}
}
}
}

View File

@ -100,6 +100,22 @@ public class HttpDestination
return _ssl;
}
public int getConnections()
{
synchronized (this)
{
return _connections.size();
}
}
public int getIdleConnections()
{
synchronized (this)
{
return _idle.size();
}
}
public void addAuthorization(String pathSpec, Authorization authorization)
{
synchronized (this)
@ -194,7 +210,7 @@ public class HttpDestination
public HttpConnection getIdleConnection() throws IOException
{
long now = System.currentTimeMillis();
long now = _client.getNow();
long idleTimeout=_client.getIdleTimeout();
HttpConnection connection = null;
while (true)
@ -214,8 +230,7 @@ public class HttpDestination
if (connection==null)
return null;
long last = connection.getLast();
if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
if (connection.cancelIdleTimeout() )
return connection;
}
@ -349,7 +364,7 @@ public class HttpDestination
{
if (_queue.size() == 0)
{
connection.setLast(System.currentTimeMillis());
connection.setIdleTimeout(_client.getNow()+_client.getIdleTimeout());
_idle.add(connection);
}
else
@ -371,6 +386,27 @@ public class HttpDestination
}
}
public void returnIdleConnection(HttpConnection connection) throws IOException
{
try
{
connection.close();
}
catch (IOException e)
{
Log.ignore(e);
}
synchronized (this)
{
_idle.remove(connection);
_connections.remove(connection);
if (!_queue.isEmpty() && _client.isStarted())
startNewConnection();
}
}
public void send(HttpExchange ex) throws IOException
{
LinkedList<String> listeners = _client.getRegisteredListeners();

View File

@ -1,76 +0,0 @@
/*
* Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package org.eclipse.jetty.client;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
/**
* @version $Revision$ $Date$
*/
public class ConnectionFailedTest extends TestCase
{
public void testConnectionFailed() throws Exception
{
ServerSocket socket = new ServerSocket();
socket.bind(null);
int port=socket.getLocalPort();
socket.close();
HttpClient httpClient = new HttpClient();
httpClient.start();
CountDownLatch latch = new CountDownLatch(1);
HttpExchange exchange = new ConnectionFailedExchange(latch);
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
httpClient.send(exchange);
boolean passed = latch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(passed);
long wait = 100;
long maxWait = 10 * wait;
long curWait = wait;
while (curWait < maxWait && !exchange.isDone())
{
Thread.sleep(wait);
curWait += wait;
}
assertEquals(HttpExchange.STATUS_EXCEPTED, exchange.getStatus());
}
private class ConnectionFailedExchange extends HttpExchange
{
private final CountDownLatch latch;
private ConnectionFailedExchange(CountDownLatch latch)
{
this.latch = latch;
}
@Override
protected void onConnectionFailed(Throwable ex)
{
latch.countDown();
}
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package org.eclipse.jetty.client;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
/**
* @version $Revision$ $Date$
*/
public class ConnectionTest extends TestCase
{
public void testConnectionFailed() throws Exception
{
ServerSocket socket = new ServerSocket();
socket.bind(null);
int port=socket.getLocalPort();
socket.close();
HttpClient httpClient = new HttpClient();
httpClient.start();
CountDownLatch latch = new CountDownLatch(1);
HttpExchange exchange = new ConnectionExchange(latch);
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
httpClient.send(exchange);
boolean passed = latch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(passed);
long wait = 100;
long maxWait = 10 * wait;
long curWait = wait;
while (curWait < maxWait && !exchange.isDone())
{
Thread.sleep(wait);
curWait += wait;
}
assertEquals(HttpExchange.STATUS_EXCEPTED, exchange.getStatus());
}
public void testIdleConnection() throws Exception
{
ServerSocket socket = new ServerSocket();
socket.bind(null);
int port=socket.getLocalPort();
HttpClient httpClient = new HttpClient();
httpClient.setIdleTimeout(700);
httpClient.start();
HttpExchange exchange = new ConnectionExchange();
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
HttpDestination dest = httpClient.getDestination(new Address("localhost", port),false);
httpClient.send(exchange);
Socket s = socket.accept();
byte[] buf = new byte[4096];
s.getInputStream().read(buf);
assertEquals(1,dest.getConnections());
assertEquals(0,dest.getIdleConnections());
s.getOutputStream().write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".getBytes());
Thread.sleep(300);
assertEquals(1,dest.getConnections());
assertEquals(1,dest.getIdleConnections());
exchange = new ConnectionExchange();
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
httpClient.send(exchange);
s.getInputStream().read(buf);
assertEquals(1,dest.getConnections());
assertEquals(0,dest.getIdleConnections());
s.getOutputStream().write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".getBytes());
Thread.sleep(500);
assertEquals(1,dest.getConnections());
assertEquals(1,dest.getIdleConnections());
Thread.sleep(500);
assertEquals(0,dest.getConnections());
assertEquals(0,dest.getIdleConnections());
socket.close();
}
private class ConnectionExchange extends HttpExchange
{
private final CountDownLatch latch;
private ConnectionExchange()
{
this.latch = null;
}
private ConnectionExchange(CountDownLatch latch)
{
this.latch = latch;
}
@Override
protected void onConnectionFailed(Throwable ex)
{
if (latch!=null)
latch.countDown();
}
}
}

View File

@ -2,23 +2,68 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.http.HttpMethods;
import org.eclipse.jetty.io.Buffer;
/* ------------------------------------------------------------ */
/** A simple test http client like curl.
* <p>
* Usage is java -cp $CLASSPATH org.eclipse.jetty.client.Curl [ option | URL ] ...
* Options supported are: <ul>
* <li>--async : The following URLs are fetched in parallel (default)
* <li>--sync : The following URLs are fetched in sequence
* <li>--dump : The content is dumped to stdout
* <li>--nodump : The content is suppressed (default)
* </ul>
*/
public class Curl
{
public static void main(String[] args)
throws Exception
{
if (args.length==0)
args=new String[]
{ "--sync", "http://www.sun.com/robots.txt", "http://www.sun.com/favicon.ico" , "--dump", "http://www.sun.com/robots.txt"};
HttpClient client = new HttpClient();
client.setIdleTimeout(2000);
client.start();
boolean async=true;
boolean dump= false;
final CountDownLatch latch = new CountDownLatch(args.length);
for (String arg : args)
{
if ("--sync".equals(arg))
{
async=false;
continue;
}
if ("--async".equals(arg))
{
async=true;
continue;
}
if ("--dump".equals(arg))
{
dump=true;
continue;
}
if ("--nodump".equals(arg))
{
dump=false;
continue;
}
final boolean d = dump;
HttpExchange ex = new HttpExchange()
{
AtomicBoolean counted=new AtomicBoolean(false);
@ -59,6 +104,8 @@ public class Curl
protected void onResponseContent(Buffer content) throws IOException
{
super.onResponseContent(content);
if (d)
System.out.print(content.toString());
System.err.println("got "+content.length());
}
@ -98,8 +145,17 @@ public class Curl
ex.setMethod(HttpMethods.GET);
ex.setURL(arg);
System.err.println("\nSending "+ex);
client.send(ex);
if (!async)
{
System.err.println("waiting...");
ex.waitForDone();
System.err.println("Done");
}
}
latch.await();

View File

@ -32,7 +32,7 @@ public class Timeout
{
private Object _lock;
private long _duration;
private long _now=System.currentTimeMillis();
private volatile long _now=System.currentTimeMillis();
private Task _head=new Task();
/* ------------------------------------------------------------ */
@ -70,29 +70,19 @@ public class Timeout
/* ------------------------------------------------------------ */
public long setNow()
{
synchronized (_lock)
{
_now=System.currentTimeMillis();
return _now;
}
return _now=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
public long getNow()
{
synchronized (_lock)
{
return _now;
}
return _now;
}
/* ------------------------------------------------------------ */
public void setNow(long now)
{
synchronized (_lock)
{
_now=now;
}
_now=now;
}
/* ------------------------------------------------------------ */
@ -124,9 +114,9 @@ public class Timeout
}
/* ------------------------------------------------------------ */
public void tick(long now)
public void tick()
{
long _expiry = -1;
final long expiry = _now-_duration;
Task task=null;
while (true)
@ -135,15 +125,8 @@ public class Timeout
{
synchronized (_lock)
{
if (_expiry==-1)
{
if (now!=-1)
_now=now;
_expiry = _now-_duration;
}
task= _head._next;
if (task==_head || task._timestamp>_expiry)
if (task==_head || task._timestamp>expiry)
break;
task.unlink();
task._expired=true;
@ -160,9 +143,10 @@ public class Timeout
}
/* ------------------------------------------------------------ */
public void tick()
public void tick(long now)
{
tick(-1);
_now=now;
tick();
}
/* ------------------------------------------------------------ */
@ -285,9 +269,13 @@ public class Timeout
/* ------------------------------------------------------------ */
public long getAge()
{
Timeout t = _timeout;
if (t!=null && t._now!=0 && _timestamp!=0)
return t._now-_timestamp;
final Timeout t = _timeout;
if (t!=null)
{
final long now=t._now;
if (now!=0 && _timestamp!=0)
return now-_timestamp;
}
return 0;
}

View File

@ -133,7 +133,7 @@ public class TimeoutTest extends TestCase
/* ------------------------------------------------------------ */
public void testStress() throws Exception
{
final int LOOP=500;
final int LOOP=250;
final boolean[] running = {true};
final AtomicIntegerArray count = new AtomicIntegerArray( 3 );
@ -240,9 +240,9 @@ public class TimeoutTest extends TestCase
running[0]=false;
}
// give some time for test to stop
Thread.sleep(2000);
timeout.tick(System.currentTimeMillis());
Thread.sleep(1000);
timeout.tick(System.currentTimeMillis());
Thread.sleep(500);
// check the counts
assertEquals("count threads", LOOP,count.get( 0 ));