Merged branch 'jetty-9.3.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-11-04 18:47:32 +01:00
commit 3e2658a41b
3 changed files with 133 additions and 86 deletions

View File

@ -182,7 +182,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
Connection old_connection = getConnection();
if (LOG.isDebugEnabled())
LOG.debug("{} upgradeing from {} to {}", this, old_connection, newConnection);
LOG.debug("{} upgrading from {} to {}", this, old_connection, newConnection);
ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom)
?((Connection.UpgradeFrom)old_connection).onUpgradeFrom():null;

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.proxy;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -51,6 +52,7 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -160,21 +162,17 @@ public class ConnectHandler extends HandlerWrapper
protected void doStart() throws Exception
{
if (executor == null)
{
setExecutor(getServer().getThreadPool());
}
executor = getServer().getThreadPool();
if (scheduler == null)
{
setScheduler(new ScheduledExecutorScheduler());
addBean(getScheduler());
}
addBean(scheduler = new ScheduledExecutorScheduler());
if (bufferPool == null)
{
setByteBufferPool(new MappedByteBufferPool());
addBean(getByteBufferPool());
}
addBean(bufferPool = new MappedByteBufferPool());
addBean(selector = newSelectorManager());
selector.setConnectTimeout(getConnectTimeout());
super.doStart();
}
@ -191,16 +189,8 @@ public class ConnectHandler extends HandlerWrapper
String serverAddress = request.getRequestURI();
if (LOG.isDebugEnabled())
LOG.debug("CONNECT request for {}", serverAddress);
try
{
handleConnect(baseRequest, request, response, serverAddress);
}
catch (Exception x)
{
// TODO
LOG.warn("ConnectHandler " + baseRequest.getHttpURI() + " " + x);
LOG.debug(x);
}
handleConnect(baseRequest, request, response, serverAddress);
}
else
{
@ -249,32 +239,40 @@ public class ConnectHandler extends HandlerWrapper
return;
}
SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
channel.configureBlocking(false);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
HttpTransport transport = baseRequest.getHttpChannel().getHttpTransport();
// TODO Handle CONNECT over HTTP2!
if (!(transport instanceof HttpConnection))
{
if (LOG.isDebugEnabled())
LOG.debug("CONNECT forbidden for {}", transport);
LOG.debug("CONNECT not supported for {}", transport);
sendConnectResponse(request, response, HttpServletResponse.SC_FORBIDDEN);
return;
}
InetSocketAddress address = newConnectAddress(host, port);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
if (LOG.isDebugEnabled())
LOG.debug("Connecting to {}", address);
ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport);
if (channel.connect(address))
selector.accept(channel, connectContext);
else
selector.connect(channel, connectContext);
LOG.debug("Connecting to {}:{}", host, port);
connectToServer(request, host, port, new Promise<SocketChannel>()
{
@Override
public void succeeded(SocketChannel channel)
{
ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport);
if (channel.isConnected())
selector.accept(channel, connectContext);
else
selector.connect(channel, connectContext);
}
@Override
public void failed(Throwable x)
{
onConnectFailure(request, response, asyncContext, x);
}
});
}
catch (Exception x)
{
@ -282,37 +280,59 @@ public class ConnectHandler extends HandlerWrapper
}
}
/* ------------------------------------------------------------ */
/** Create the address the connect channel will connect to.
* @param host The host from the connect request
* @param port The port from the connect request
protected void connectToServer(HttpServletRequest request, String host, int port, Promise<SocketChannel> promise)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
channel.configureBlocking(false);
InetSocketAddress address = newConnectAddress(host, port);
channel.connect(address);
promise.succeeded(channel);
}
catch (Throwable x)
{
close(channel);
promise.failed(x);
}
}
private void close(Closeable closeable)
{
try
{
if (closeable != null)
closeable.close();
}
catch (Throwable x)
{
LOG.ignore(x);
}
}
/**
* Creates the server address to connect to.
*
* @param host The host from the CONNECT request
* @param port The port from the CONNECT request
* @return The InetSocketAddress to connect to.
*/
protected InetSocketAddress newConnectAddress(String host, int port)
{
return new InetSocketAddress(host, port);
}
protected void onConnectSuccess(ConnectContext connectContext, UpstreamConnection upstreamConnection)
{
HttpConnection httpConnection = connectContext.getHttpConnection();
ByteBuffer requestBuffer = httpConnection.getRequestBuffer();
ByteBuffer buffer = BufferUtil.EMPTY_BUFFER;
int remaining = requestBuffer.remaining();
if (remaining > 0)
{
buffer = bufferPool.acquire(remaining, requestBuffer.isDirect());
BufferUtil.flipToFill(buffer);
buffer.put(requestBuffer);
buffer.flip();
}
ConcurrentMap<String, Object> context = connectContext.getContext();
HttpServletRequest request = connectContext.getRequest();
prepareContext(request, context);
HttpConnection httpConnection = connectContext.getHttpConnection();
EndPoint downstreamEndPoint = httpConnection.getEndPoint();
DownstreamConnection downstreamConnection = newDownstreamConnection(downstreamEndPoint, context, buffer);
DownstreamConnection downstreamConnection = newDownstreamConnection(downstreamEndPoint, context);
downstreamConnection.setInputBufferSize(getBufferSize());
upstreamConnection.setConnection(downstreamConnection);
@ -324,6 +344,7 @@ public class ConnectHandler extends HandlerWrapper
sendConnectResponse(request, response, HttpServletResponse.SC_OK);
upgradeConnection(request, response, downstreamConnection);
connectContext.getAsyncContext().complete();
}
@ -349,7 +370,8 @@ public class ConnectHandler extends HandlerWrapper
}
catch (IOException x)
{
// TODO: nothing we can do, close the connection
if (LOG.isDebugEnabled())
LOG.debug("Could not send CONNECT response", x);
}
}
@ -367,9 +389,9 @@ public class ConnectHandler extends HandlerWrapper
return true;
}
protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context, ByteBuffer buffer)
protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context)
{
return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context, buffer);
return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context);
}
protected UpstreamConnection newUpstreamConnection(EndPoint endPoint, ConnectContext connectContext)
@ -396,13 +418,17 @@ public class ConnectHandler extends HandlerWrapper
*
* @param endPoint the endPoint to read from
* @param buffer the buffer to read data into
* @param context the context information related to the connection
* @return the number of bytes read (possibly 0 since the read is non-blocking)
* or -1 if the channel has been closed remotely
* @throws IOException if the endPoint cannot be read
*/
protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
protected int read(EndPoint endPoint, ByteBuffer buffer, ConcurrentMap<String, Object> context) throws IOException
{
return endPoint.fill(buffer);
int read = endPoint.fill(buffer);
if (LOG.isDebugEnabled())
LOG.debug("{} read {} bytes", this, read);
return read;
}
/**
@ -411,8 +437,9 @@ public class ConnectHandler extends HandlerWrapper
* @param endPoint the endPoint to write to
* @param buffer the buffer to write
* @param callback the completion callback to invoke
* @param context the context information related to the connection
*/
protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback, ConcurrentMap<String, Object> context)
{
if (LOG.isDebugEnabled())
LOG.debug("{} writing {} bytes", this, buffer.remaining());
@ -494,14 +521,9 @@ public class ConnectHandler extends HandlerWrapper
@Override
protected void connectionFailed(SocketChannel channel, final Throwable ex, final Object attachment)
{
getExecutor().execute(new Runnable()
{
public void run()
{
ConnectContext connectContext = (ConnectContext)attachment;
onConnectFailure(connectContext.request, connectContext.response, connectContext.asyncContext, ex);
}
});
close(channel);
ConnectContext connectContext = (ConnectContext)attachment;
onConnectFailure(connectContext.request, connectContext.response, connectContext.asyncContext, ex);
}
}
@ -561,37 +583,36 @@ public class ConnectHandler extends HandlerWrapper
public void onOpen()
{
super.onOpen();
getExecutor().execute(new Runnable()
{
public void run()
{
onConnectSuccess(connectContext, UpstreamConnection.this);
fillInterested();
}
});
onConnectSuccess(connectContext, UpstreamConnection.this);
fillInterested();
}
@Override
protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
{
return ConnectHandler.this.read(endPoint, buffer);
return ConnectHandler.this.read(endPoint, buffer, getContext());
}
@Override
protected void write(EndPoint endPoint, ByteBuffer buffer,Callback callback)
{
ConnectHandler.this.write(endPoint, buffer, callback);
ConnectHandler.this.write(endPoint, buffer, callback, getContext());
}
}
public class DownstreamConnection extends ProxyConnection
public class DownstreamConnection extends ProxyConnection implements Connection.UpgradeTo
{
private final ByteBuffer buffer;
private ByteBuffer buffer;
public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context, ByteBuffer buffer)
public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context)
{
super(endPoint, executor, bufferPool, context);
this.buffer = buffer;
}
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
this.buffer = buffer == null ? BufferUtil.EMPTY_BUFFER : buffer;
}
@Override
@ -623,13 +644,13 @@ public class ConnectHandler extends HandlerWrapper
@Override
protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
{
return ConnectHandler.this.read(endPoint, buffer);
return ConnectHandler.this.read(endPoint, buffer, getContext());
}
@Override
protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
{
ConnectHandler.this.write(endPoint, buffer, callback);
ConnectHandler.this.write(endPoint, buffer, callback, getContext());
}
}
}

View File

@ -27,6 +27,8 @@ import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.concurrent.ConcurrentMap;
@ -36,12 +38,15 @@ import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -630,6 +635,13 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest
return super.handleAuthentication(request, response, address);
}
@Override
protected void connectToServer(HttpServletRequest request, String host, int port, Promise<SocketChannel> promise)
{
Assert.assertEquals(contextValue, request.getAttribute(contextKey));
super.connectToServer(request, host, port, promise);
}
@Override
protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
{
@ -637,6 +649,20 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest
Assert.assertEquals(contextValue, request.getAttribute(contextKey));
context.put(contextKey, request.getAttribute(contextKey));
}
@Override
protected int read(EndPoint endPoint, ByteBuffer buffer, ConcurrentMap<String, Object> context) throws IOException
{
Assert.assertEquals(contextValue, context.get(contextKey));
return super.read(endPoint, buffer, context);
}
@Override
protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback, ConcurrentMap<String, Object> context)
{
Assert.assertEquals(contextValue, context.get(contextKey));
super.write(endPoint, buffer, callback, context);
}
});
proxy.start();