Fixes #343567 (HttpClient does not limit the destination's exchange queue).
git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@3028 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
parent
e38ed57324
commit
057eb0a102
|
@ -2,6 +2,7 @@ jetty-7.4.1-SNAPSHOT
|
|||
+ 343083 Set nested dispatch type and connection
|
||||
+ 343277 add support for a context white list
|
||||
+ 343352 make sure that jetty.osgi.boot is activated when a WAB is registered
|
||||
+ 343567 HttpClient does not limit the destination's exchange queue
|
||||
|
||||
jetty-7.4.0.v20110414
|
||||
+ 342504 Scanner Listener
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.eclipse.jetty.client.security.Authentication;
|
||||
|
@ -74,6 +73,7 @@ public class HttpClient extends HttpBuffers implements Attributes
|
|||
private boolean _useDirectBuffers = true;
|
||||
private boolean _connectBlocking = true;
|
||||
private int _maxConnectionsPerAddress = Integer.MAX_VALUE;
|
||||
private int _maxQueueSizePerAddress = Integer.MAX_VALUE;
|
||||
private ConcurrentMap<Address, HttpDestination> _destinations = new ConcurrentHashMap<Address, HttpDestination>();
|
||||
ThreadPool _threadPool;
|
||||
Connector _connector;
|
||||
|
@ -240,7 +240,7 @@ public class HttpClient extends HttpBuffers implements Attributes
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------------------------- */
|
||||
public HttpDestination getDestination(Address remote, boolean ssl) throws UnknownHostException, IOException
|
||||
public HttpDestination getDestination(Address remote, boolean ssl) throws IOException
|
||||
{
|
||||
if (remote == null)
|
||||
throw new UnknownHostException("Remote socket address cannot be null.");
|
||||
|
@ -248,7 +248,7 @@ public class HttpClient extends HttpBuffers implements Attributes
|
|||
HttpDestination destination = _destinations.get(remote);
|
||||
if (destination == null)
|
||||
{
|
||||
destination = new HttpDestination(this, remote, ssl, _maxConnectionsPerAddress);
|
||||
destination = new HttpDestination(this, remote, ssl);
|
||||
if (_proxy != null && (_noProxy == null || !_noProxy.contains(remote.getHost())))
|
||||
{
|
||||
destination.setProxy(_proxy);
|
||||
|
@ -394,6 +394,16 @@ public class HttpClient extends HttpBuffers implements Attributes
|
|||
_maxConnectionsPerAddress = maxConnectionsPerAddress;
|
||||
}
|
||||
|
||||
public int getMaxQueueSizePerAddress()
|
||||
{
|
||||
return _maxQueueSizePerAddress;
|
||||
}
|
||||
|
||||
public void setMaxQueueSizePerAddress(int maxQueueSizePerAddress)
|
||||
{
|
||||
this._maxQueueSizePerAddress = maxQueueSizePerAddress;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
|
@ -451,7 +461,6 @@ public class HttpClient extends HttpBuffers implements Attributes
|
|||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
|
|
@ -20,6 +20,8 @@ import java.util.ArrayList;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
import org.eclipse.jetty.client.HttpClient.Connector;
|
||||
import org.eclipse.jetty.client.security.Authentication;
|
||||
|
@ -40,17 +42,19 @@ import org.eclipse.jetty.util.log.Log;
|
|||
*/
|
||||
public class HttpDestination
|
||||
{
|
||||
private final ByteArrayBuffer _hostHeader;
|
||||
private final Address _address;
|
||||
private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
|
||||
private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
|
||||
private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
|
||||
private final List<HttpConnection> _connections = new LinkedList<HttpConnection>();
|
||||
private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
|
||||
private final List<HttpConnection> _idle = new ArrayList<HttpConnection>();
|
||||
private final HttpClient _client;
|
||||
private final Address _address;
|
||||
private final boolean _ssl;
|
||||
private final int _maxConnections;
|
||||
private final ByteArrayBuffer _hostHeader;
|
||||
private volatile int _maxConnections;
|
||||
private volatile int _maxQueueSize;
|
||||
private int _pendingConnections = 0;
|
||||
private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
|
||||
private int _newConnection = 0;
|
||||
private Address _proxy;
|
||||
private volatile Address _proxy;
|
||||
private Authentication _proxyAuthentication;
|
||||
private PathMap _authorizations;
|
||||
private List<HttpCookie> _cookies;
|
||||
|
@ -71,41 +75,59 @@ public class HttpDestination
|
|||
}
|
||||
}
|
||||
|
||||
/* The queue of exchanged for this destination if connections are limited */
|
||||
private LinkedList<HttpExchange> _queue = new LinkedList<HttpExchange>();
|
||||
|
||||
HttpDestination(HttpClient client, Address address, boolean ssl, int maxConnections)
|
||||
HttpDestination(HttpClient client, Address address, boolean ssl)
|
||||
{
|
||||
_client = client;
|
||||
_address = address;
|
||||
_ssl = ssl;
|
||||
_maxConnections = maxConnections;
|
||||
_maxConnections = _client.getMaxConnectionsPerAddress();
|
||||
_maxQueueSize = _client.getMaxQueueSizePerAddress();
|
||||
String addressString = address.getHost();
|
||||
if (address.getPort() != (_ssl ? 443 : 80))
|
||||
addressString += ":" + address.getPort();
|
||||
_hostHeader = new ByteArrayBuffer(addressString);
|
||||
}
|
||||
|
||||
public Address getAddress()
|
||||
{
|
||||
return _address;
|
||||
}
|
||||
|
||||
public Buffer getHostHeader()
|
||||
{
|
||||
return _hostHeader;
|
||||
}
|
||||
|
||||
public HttpClient getHttpClient()
|
||||
{
|
||||
return _client;
|
||||
}
|
||||
|
||||
public Address getAddress()
|
||||
{
|
||||
return _address;
|
||||
}
|
||||
|
||||
public boolean isSecure()
|
||||
{
|
||||
return _ssl;
|
||||
}
|
||||
|
||||
public Buffer getHostHeader()
|
||||
{
|
||||
return _hostHeader;
|
||||
}
|
||||
|
||||
public int getMaxConnections()
|
||||
{
|
||||
return _maxConnections;
|
||||
}
|
||||
|
||||
public void setMaxConnections(int maxConnections)
|
||||
{
|
||||
this._maxConnections = maxConnections;
|
||||
}
|
||||
|
||||
public int getMaxQueueSize()
|
||||
{
|
||||
return _maxQueueSize;
|
||||
}
|
||||
|
||||
public void setMaxQueueSize(int maxQueueSize)
|
||||
{
|
||||
this._maxQueueSize = maxQueueSize;
|
||||
}
|
||||
|
||||
public int getConnections()
|
||||
{
|
||||
synchronized (this)
|
||||
|
@ -276,7 +298,7 @@ public class HttpDestination
|
|||
}
|
||||
else if (_queue.size() > 0)
|
||||
{
|
||||
HttpExchange ex = _queue.removeFirst();
|
||||
HttpExchange ex = _queue.remove(0);
|
||||
ex.setStatus(HttpExchange.STATUS_EXCEPTED);
|
||||
ex.getEventListener().onConnectionFailed(throwable);
|
||||
|
||||
|
@ -310,7 +332,7 @@ public class HttpDestination
|
|||
_pendingConnections--;
|
||||
if (_queue.size() > 0)
|
||||
{
|
||||
HttpExchange ex = _queue.removeFirst();
|
||||
HttpExchange ex = _queue.remove(0);
|
||||
ex.setStatus(HttpExchange.STATUS_EXCEPTED);
|
||||
ex.getEventListener().onException(throwable);
|
||||
}
|
||||
|
@ -349,7 +371,7 @@ public class HttpDestination
|
|||
}
|
||||
else
|
||||
{
|
||||
HttpExchange exchange = _queue.removeFirst();
|
||||
HttpExchange exchange = _queue.remove(0);
|
||||
send(connection, exchange);
|
||||
}
|
||||
}
|
||||
|
@ -399,7 +421,7 @@ public class HttpDestination
|
|||
}
|
||||
else
|
||||
{
|
||||
HttpExchange ex = _queue.removeFirst();
|
||||
HttpExchange ex = _queue.remove(0);
|
||||
send(connection, ex);
|
||||
}
|
||||
this.notifyAll();
|
||||
|
@ -526,6 +548,9 @@ public class HttpDestination
|
|||
boolean startConnection = false;
|
||||
synchronized (this)
|
||||
{
|
||||
if (_queue.size() == _maxQueueSize)
|
||||
throw new RejectedExecutionException("Queue full for address " + _address);
|
||||
|
||||
_queue.add(ex);
|
||||
if (_connections.size() + _pendingConnections < _maxConnections)
|
||||
startConnection = true;
|
||||
|
@ -544,7 +569,7 @@ public class HttpDestination
|
|||
// to the exchange queue and recycle the connection
|
||||
if (!connection.send(exchange))
|
||||
{
|
||||
_queue.addFirst(exchange);
|
||||
_queue.add(0, exchange);
|
||||
returnIdleConnection(connection);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class HttpDestinationQueueTest
|
||||
{
|
||||
@Test
|
||||
public void testDestinationMaxQueueSize() throws Exception
|
||||
{
|
||||
HttpClient client = new HttpClient();
|
||||
client.setMaxConnectionsPerAddress(1);
|
||||
client.setMaxQueueSizePerAddress(1);
|
||||
client.start();
|
||||
|
||||
ServerSocket server = new ServerSocket(0);
|
||||
|
||||
// This will keep the connection busy
|
||||
HttpExchange exchange1 = new HttpExchange();
|
||||
exchange1.setMethod("GET");
|
||||
exchange1.setURL("http://localhost:" + server.getLocalPort() + "/exchange1");
|
||||
client.send(exchange1);
|
||||
|
||||
// Read request so we are sure that this exchange is out of the queue
|
||||
Socket socket = server.accept();
|
||||
byte[] buffer = new byte[1024];
|
||||
StringBuilder request = new StringBuilder();
|
||||
while (true)
|
||||
{
|
||||
int read = socket.getInputStream().read(buffer);
|
||||
request.append(new String(buffer, 0, read, "UTF-8"));
|
||||
if (request.toString().endsWith("\r\n\r\n"))
|
||||
break;
|
||||
}
|
||||
Assert.assertTrue(request.toString().contains("exchange1"));
|
||||
|
||||
// This will be queued
|
||||
HttpExchange exchange2 = new HttpExchange();
|
||||
exchange2.setMethod("GET");
|
||||
exchange2.setURL("http://localhost:" + server.getLocalPort() + "/exchange2");
|
||||
client.send(exchange2);
|
||||
|
||||
// This will be rejected, since the connection is busy and the queue is full
|
||||
HttpExchange exchange3 = new HttpExchange();
|
||||
exchange3.setMethod("GET");
|
||||
exchange3.setURL("http://localhost:" + server.getLocalPort() + "/exchange3");
|
||||
try
|
||||
{
|
||||
client.send(exchange3);
|
||||
Assert.fail();
|
||||
}
|
||||
catch (RejectedExecutionException x)
|
||||
{
|
||||
// Expected
|
||||
}
|
||||
|
||||
// Send the response to avoid exceptions in the console
|
||||
socket.getOutputStream().write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".getBytes("UTF-8"));
|
||||
Assert.assertEquals(HttpExchange.STATUS_COMPLETED, exchange1.waitForDone());
|
||||
|
||||
// Be sure that the second exchange can be sent
|
||||
request.setLength(0);
|
||||
while (true)
|
||||
{
|
||||
int read = socket.getInputStream().read(buffer);
|
||||
request.append(new String(buffer, 0, read, "UTF-8"));
|
||||
if (request.toString().endsWith("\r\n\r\n"))
|
||||
break;
|
||||
}
|
||||
Assert.assertTrue(request.toString().contains("exchange2"));
|
||||
|
||||
socket.getOutputStream().write("HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n".getBytes("UTF-8"));
|
||||
socket.close();
|
||||
Assert.assertEquals(HttpExchange.STATUS_COMPLETED, exchange2.waitForDone());
|
||||
|
||||
server.close();
|
||||
|
||||
client.stop();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue