Jetty9 - Intermediate commit.

This commit is contained in:
Simone Bordet 2012-07-19 18:31:34 +02:00
parent 03174b55a5
commit 179a5e9c18
3 changed files with 117 additions and 2 deletions

View File

@ -13,8 +13,12 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@ -22,12 +26,16 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Address;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Name;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
/**
@ -68,19 +76,49 @@ public class HTTPClient extends AggregateLifeCycle
private volatile boolean followRedirects;
private Executor executor;
private int maxConnectionsPerAddress = Integer.MAX_VALUE;
private int maxQueueSizePerAddress = Integer.MAX_VALUE;
private SelectorManager selectorManager;
private SocketAddress bindAddress;
@Override
protected void doStart() throws Exception
{
selectorManager = newSelectorManager();
addBean(selectorManager);
super.doStart();
}
protected SelectorManager newSelectorManager()
{
ClientSelectorManager result = new ClientSelectorManager();
result.setMaxIdleTime(getIdleTimeout());
return result;
}
@Override
protected void doStop() throws Exception
{
super.doStop();
}
/**
* @return the address to bind socket channels to
* @see #setBindAddress(SocketAddress)
*/
public SocketAddress getBindAddress()
{
return bindAddress;
}
/**
* @param bindAddress the address to bind socket channels to
* @see #getBindAddress()
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
public Future<Response> GET(String absoluteURL)
{
try
@ -190,6 +228,30 @@ public class HTTPClient extends AggregateLifeCycle
this.maxConnectionsPerAddress = maxConnectionsPerAddress;
}
public int getMaxQueueSizePerAddress()
{
return maxQueueSizePerAddress;
}
public void setMaxQueueSizePerAddress(int maxQueueSizePerAddress)
{
this.maxQueueSizePerAddress = maxQueueSizePerAddress;
}
protected Future<Connection> newConnection(Destination destination) throws IOException
{
SocketChannel channel = SocketChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
channel.socket().setTcpNoDelay(true);
channel.connect(destination.address().toSocketAddress());
FutureCallback<Connection> result = new FutureCallback<>();
selectorManager.connect(channel, result);
return result;
}
protected class ClientSelectorManager extends SelectorManager
{
public ClientSelectorManager()
@ -202,6 +264,22 @@ public class HTTPClient extends AggregateLifeCycle
super(selectors);
}
@Override
protected Selectable newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey sKey) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, sKey, getMaxIdleTime());
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{
return null;
}
@Override
protected void execute(Runnable task)
{
getExecutor().execute(task);
}
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.client;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Address;
@ -28,15 +29,23 @@ public class StandardDestination implements Destination
{
private final HTTPClient client;
private final Address address;
private final Queue<Response> requests;
private final Queue<Connection> connections;
public StandardDestination(HTTPClient client, Address address)
{
this.client = client;
this.address = address;
this.requests = new ArrayBlockingQueue<>(client.getMaxQueueSizePerAddress());
this.connections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
}
@Override
public Address address()
{
return address;
}
@Override
public Connection connect(long timeout, TimeUnit unit)
{
@ -48,7 +57,22 @@ public class StandardDestination implements Destination
{
if (!address.equals(request.address()))
throw new IllegalArgumentException("Invalid request address " + request.address() + " for destination " + this);
return getConnection().send(request, listener);
StandardResponse response = new StandardResponse(request, listener);
Connection connection = connections.poll();
if (connection == null)
newConnection();
if (!requests.offer(response))
throw new RejectedExecutionException("Max requests per address " + client.getMaxQueueSizePerAddress() + " exceeded");
return response;
}
protected Future<Connection> newConnection()
{
return client.newConnection(this);
}
protected Connection getConnection()
@ -60,4 +84,15 @@ public class StandardDestination implements Destination
}
return connection;
}
// TODO: 1. We must do queuing of requests in any case, because we cannot do blocking connect
// TODO: 2. We must be non-blocking connect, therefore we need to queue
// Connections should compete for the queue of requests in separate threads
// that poses a problem of thread pool size: if < maxConnections we're starving
//
/**
* I need a Future<Connection> connect(), and a void connect(Callback<Connection>)
*/
}

View File

@ -21,4 +21,6 @@ public interface Destination
Connection connect(long timeout, TimeUnit unit);
Future<Response> send(Request request, Response.Listener listener);
Address address();
}