470311 - Introduce a proxy-protocol module.

Support for the PROXY protocol is now enabled via 2 new modules:
proxy-protocol and proxy-protocol-ssl, respectively for the HTTP
connector and the SSL connector.
This commit is contained in:
Simone Bordet 2015-08-11 12:17:24 +02:00
parent 8837291393
commit aa684a5dcc
13 changed files with 326 additions and 99 deletions

View File

@ -261,7 +261,7 @@ public class HTTP2Client extends ContainerLifeCycle
public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise) public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
{ {
connect(sslContextFactory, address, listener, promise, new HashMap<String, Object>()); connect(sslContextFactory, address, listener, promise, null);
} }
public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context) public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
@ -271,15 +271,7 @@ public class HTTP2Client extends ContainerLifeCycle
SocketChannel channel = SocketChannel.open(); SocketChannel channel = SocketChannel.open();
configure(channel); configure(channel);
channel.configureBlocking(false); channel.configureBlocking(false);
context = contextFrom(sslContextFactory, address, listener, promise, context);
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
if (sslContextFactory != null)
context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
if (channel.connect(address)) if (channel.connect(address))
selector.accept(channel, context); selector.accept(channel, context);
else else
@ -291,6 +283,36 @@ public class HTTP2Client extends ContainerLifeCycle
} }
} }
public void accept(SslContextFactory sslContextFactory, SocketChannel channel, Session.Listener listener, Promise<Session> promise)
{
try
{
if (!channel.isConnected())
throw new IllegalStateException("SocketChannel must be connected");
channel.configureBlocking(false);
Map<String, Object> context = contextFrom(sslContextFactory, (InetSocketAddress)channel.getRemoteAddress(), listener, promise, null);
selector.accept(channel, context);
}
catch (Throwable x)
{
promise.failed(x);
}
}
private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
if (context == null)
context = new HashMap<>();
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
if (sslContextFactory != null)
context.put(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY, sslContextFactory);
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, address.getHostString());
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, address.getPort());
return context;
}
protected void configure(SocketChannel channel) throws IOException protected void configure(SocketChannel channel) throws IOException
{ {
channel.socket().setTcpNoDelay(true); channel.socket().setTcpNoDelay(true);

View File

@ -0,0 +1,121 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class ProxyProtocolTest
{
private Server server;
private ServerConnector connector;
private HTTP2Client client;
public void startServer(Handler handler) throws Exception
{
server = new Server();
HttpConfiguration configuration = new HttpConfiguration();
connector = new ServerConnector(server, new ProxyConnectionFactory(), new HTTP2CServerConnectionFactory(configuration));
server.addConnector(connector);
server.setHandler(handler);
client = new HTTP2Client();
server.addBean(client, true);
server.start();
}
@After
public void dispose() throws Exception
{
if (server != null)
server.stop();
}
@Test
public void test_PROXY_GET() throws Exception
{
startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
});
String request1 = "PROXY TCP4 1.2.3.4 5.6.7.8 1111 2222\r\n";
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", connector.getLocalPort()));
channel.write(ByteBuffer.wrap(request1.getBytes(StandardCharsets.UTF_8)));
FuturePromise<Session> promise = new FuturePromise<>();
client.accept(null, channel, new Session.Listener.Adapter(), promise);
Session session = promise.get(5, TimeUnit.SECONDS);
HttpFields fields = new HttpFields();
String uri = "http://localhost:" + connector.getLocalPort() + "/";
MetaData.Request metaData = new MetaData.Request("GET", new HttpURI(uri), HttpVersion.HTTP_2, fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, true);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
if (frame.isEndStream())
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -38,7 +38,7 @@ import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractConnection implements Connection public abstract class AbstractConnection implements Connection
{ {
private static final Logger LOG = Log.getLogger(AbstractConnection.class); private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final List<Listener> listeners = new CopyOnWriteArrayList<>(); private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final long _created=System.currentTimeMillis(); private final long _created=System.currentTimeMillis();
private final EndPoint _endPoint; private final EndPoint _endPoint;
@ -109,7 +109,7 @@ public abstract class AbstractConnection implements Connection
callback.failed(x); callback.failed(x);
} }
} }
/** /**
* <p>Utility method to be called to register read interest.</p> * <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)} * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
@ -122,12 +122,12 @@ public abstract class AbstractConnection implements Connection
LOG.debug("fillInterested {}",this); LOG.debug("fillInterested {}",this);
getEndPoint().fillInterested(_readCallback); getEndPoint().fillInterested(_readCallback);
} }
public boolean isFillInterested() public boolean isFillInterested()
{ {
return ((AbstractEndPoint)getEndPoint()).getFillInterest().isInterested(); return getEndPoint().isFillInterested();
} }
/** /**
* <p>Callback method invoked when the endpoint is ready to be read.</p> * <p>Callback method invoked when the endpoint is ready to be read.</p>
* @see #fillInterested() * @see #fillInterested()
@ -154,10 +154,10 @@ public abstract class AbstractConnection implements Connection
else else
{ {
_endPoint.shutdownOutput(); _endPoint.shutdownOutput();
fillInterested(); fillInterested();
} }
} }
} }
} }
/** /**
@ -236,9 +236,9 @@ public abstract class AbstractConnection implements Connection
{ {
return String.format("%s@%x", getClass().getSimpleName(), hashCode()); return String.format("%s@%x", getClass().getSimpleName(), hashCode());
} }
private class ReadCallback implements Callback private class ReadCallback implements Callback
{ {
@Override @Override
public void succeeded() public void succeeded()
{ {
@ -250,7 +250,7 @@ public abstract class AbstractConnection implements Connection
{ {
onFillInterestedFailed(x); onFillInterestedFailed(x);
} }
@Override @Override
public String toString() public String toString()
{ {

View File

@ -45,7 +45,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
AbstractEndPoint.this.needsFillInterest(); AbstractEndPoint.this.needsFillInterest();
} }
}; };
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
@ -79,7 +79,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
{ {
return _remote; return _remote;
} }
@Override @Override
public Connection getConnection() public Connection getConnection()
{ {
@ -115,7 +115,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
_writeFlusher.onClose(); _writeFlusher.onClose();
_fillInterest.onClose(); _fillInterest.onClose();
} }
@Override @Override
public void close() public void close()
{ {
@ -129,6 +129,12 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
_fillInterest.register(callback); _fillInterest.register(callback);
} }
@Override
public boolean isFillInterested()
{
return _fillInterest.isInterested();
}
@Override @Override
public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException
{ {
@ -156,17 +162,17 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
boolean input_shutdown=isInputShutdown(); boolean input_shutdown=isInputShutdown();
boolean fillFailed = _fillInterest.onFail(timeout); boolean fillFailed = _fillInterest.onFail(timeout);
boolean writeFailed = _writeFlusher.onFail(timeout); boolean writeFailed = _writeFlusher.onFail(timeout);
// If the endpoint is half closed and there was no fill/write handling, then close here. // If the endpoint is half closed and there was no fill/write handling, then close here.
// This handles the situation where the connection has completed its close handling // This handles the situation where the connection has completed its close handling
// and the endpoint is half closed, but the other party does not complete the close. // and the endpoint is half closed, but the other party does not complete the close.
// This perhaps should not check for half closed, however the servlet spec case allows // This perhaps should not check for half closed, however the servlet spec case allows
// for a dispatched servlet or suspended request to extend beyond the connections idle // for a dispatched servlet or suspended request to extend beyond the connections idle
// time. So if this test would always close an idle endpoint that is not handled, then // time. So if this test would always close an idle endpoint that is not handled, then
// we would need a mode to ignore timeouts for some HTTP states // we would need a mode to ignore timeouts for some HTTP states
if (isOpen() && (output_shutdown || input_shutdown) && !(fillFailed || writeFailed)) if (isOpen() && (output_shutdown || input_shutdown) && !(fillFailed || writeFailed))
close(); close();
else else
LOG.debug("Ignored idle endpoint {}",this); LOG.debug("Ignored idle endpoint {}",this);
} }
@ -177,7 +183,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} upgradeing from {} to {}", this, old_connection, newConnection); LOG.debug("{} upgradeing from {} to {}", this, old_connection, newConnection);
ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom) ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom)
?((Connection.UpgradeFrom)old_connection).onUpgradeFrom():null; ?((Connection.UpgradeFrom)old_connection).onUpgradeFrom():null;
old_connection.onClose(); old_connection.onClose();
@ -190,7 +196,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
newConnection.onOpen(); newConnection.onOpen();
} }
@Override @Override
public String toString() public String toString()
{ {
@ -201,7 +207,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
c=c.getSuperclass(); c=c.getSuperclass();
name=c.getSimpleName(); name=c.getSimpleName();
} }
return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}", return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}",
name, name,
hashCode(), hashCode(),

View File

@ -32,9 +32,9 @@ import org.eclipse.jetty.util.IteratingCallback;
/** /**
* *
* A transport EndPoint * A transport EndPoint
* *
* <h3>Asynchronous Methods</h3> * <h3>Asynchronous Methods</h3>
* <p>The asynchronous scheduling methods of {@link EndPoint} * <p>The asynchronous scheduling methods of {@link EndPoint}
* has been influenced by NIO.2 Futures and Completion * has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because they have * handlers, but does not use those actual interfaces because they have
* some inefficiencies.</p> * some inefficiencies.</p>
@ -170,7 +170,7 @@ public interface EndPoint extends Closeable
* are taken from the header/buffer position up until the buffer limit. The header/buffers position * are taken from the header/buffer position up until the buffer limit. The header/buffers position
* is updated to indicate how many bytes have been consumed. * is updated to indicate how many bytes have been consumed.
* @param buffer the buffers to flush * @param buffer the buffers to flush
* @return True IFF all the buffers have been consumed and the endpoint has flushed the data to its * @return True IFF all the buffers have been consumed and the endpoint has flushed the data to its
* destination (ie is not buffering any data). * destination (ie is not buffering any data).
* @throws IOException If the endpoint is closed or output is shutdown. * @throws IOException If the endpoint is closed or output is shutdown.
*/ */
@ -205,6 +205,12 @@ public interface EndPoint extends Closeable
*/ */
void fillInterested(Callback callback) throws ReadPendingException; void fillInterested(Callback callback) throws ReadPendingException;
/**
* @return whether {@link #fillInterested(Callback)} has been called, but {@link #fill(ByteBuffer)} has not yet
* been called
*/
boolean isFillInterested();
/** /**
* <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either * <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either
* all the data has been flushed or an error occurs.</p> * all the data has been flushed or an error occurs.</p>

View File

@ -26,10 +26,6 @@
<Arg name="selectors" type="int"><Property name="jetty.http.selectors" deprecated="http.selectors" default="-1"/></Arg> <Arg name="selectors" type="int"><Property name="jetty.http.selectors" deprecated="http.selectors" default="-1"/></Arg>
<Arg name="factories"> <Arg name="factories">
<Array type="org.eclipse.jetty.server.ConnectionFactory"> <Array type="org.eclipse.jetty.server.ConnectionFactory">
<!-- uncomment to support proxy protocol
<Item>
<New class="org.eclipse.jetty.server.ProxyConnectionFactory"/>
</Item>-->
<Item> <Item>
<New class="org.eclipse.jetty.server.HttpConnectionFactory"> <New class="org.eclipse.jetty.server.HttpConnectionFactory">
<Arg name="config"><Ref refid="httpConfig" /></Arg> <Arg name="config"><Ref refid="httpConfig" /></Arg>

View File

@ -0,0 +1,10 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="sslConnector" class="org.eclipse.jetty.server.ServerConnector">
<Call name="addFirstConnectionFactory">
<Arg>
<New class="org.eclipse.jetty.server.ProxyConnectionFactory"/>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,10 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="httpConnector" class="org.eclipse.jetty.server.ServerConnector">
<Call name="addFirstConnectionFactory">
<Arg>
<New class="org.eclipse.jetty.server.ProxyConnectionFactory"/>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,9 @@
#
# PROXY Protocol Module - SSL
#
[depend]
ssl
[xml]
etc/jetty-proxy-protocol-ssl.xml

View File

@ -0,0 +1,9 @@
#
# PROXY Protocol Module - HTTP
#
[depend]
http
[xml]
etc/jetty-proxy-protocol.xml

View File

@ -144,7 +144,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
private final Scheduler _scheduler; private final Scheduler _scheduler;
private final ByteBufferPool _byteBufferPool; private final ByteBufferPool _byteBufferPool;
private final Thread[] _acceptors; private final Thread[] _acceptors;
private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<EndPoint, Boolean>()); private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints); private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
private volatile CountDownLatch _stopping; private volatile CountDownLatch _stopping;
private long _idleTimeout = 30000; private long _idleTimeout = 30000;
@ -191,7 +191,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
int cores = Runtime.getRuntime().availableProcessors(); int cores = Runtime.getRuntime().availableProcessors();
if (acceptors < 0) if (acceptors < 0)
acceptors=Math.max(1, Math.min(4,cores/8)); acceptors=Math.max(1, Math.min(4,cores/8));
if (acceptors > cores) if (acceptors > cores)
LOG.warn("Acceptors should be <= availableProcessors: " + this); LOG.warn("Acceptors should be <= availableProcessors: " + this);
_acceptors = new Thread[acceptors]; _acceptors = new Thread[acceptors];
@ -303,7 +303,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
_stopping=null; _stopping=null;
super.doStop(); super.doStop();
for (Acceptor a : getBeans(Acceptor.class)) for (Acceptor a : getBeans(Acceptor.class))
removeBean(a); removeBean(a);
@ -362,7 +362,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
{ {
synchronized (_factories) synchronized (_factories)
{ {
Set<ConnectionFactory> to_remove = new HashSet<ConnectionFactory>(); Set<ConnectionFactory> to_remove = new HashSet<>();
for (String key:factory.getProtocols()) for (String key:factory.getProtocols())
{ {
key=StringUtil.asciiToLowerCase(key); key=StringUtil.asciiToLowerCase(key);
@ -375,11 +375,11 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
} }
_factories.put(key, factory); _factories.put(key, factory);
} }
// keep factories still referenced // keep factories still referenced
for (ConnectionFactory f : _factories.values()) for (ConnectionFactory f : _factories.values())
to_remove.remove(f); to_remove.remove(f);
// remove old factories // remove old factories
for (ConnectionFactory old: to_remove) for (ConnectionFactory old: to_remove)
{ {
@ -396,7 +396,20 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
LOG.debug("{} added {}", this, factory); LOG.debug("{} added {}", this, factory);
} }
} }
public void addFirstConnectionFactory(ConnectionFactory factory)
{
synchronized (_factories)
{
List<ConnectionFactory> existings = new ArrayList<>(_factories.values());
_factories.clear();
addConnectionFactory(factory);
for (ConnectionFactory existing : existings)
addConnectionFactory(existing);
_defaultProtocol = factory.getProtocol();
}
}
public void addIfAbsentConnectionFactory(ConnectionFactory factory) public void addIfAbsentConnectionFactory(ConnectionFactory factory)
{ {
synchronized (_factories) synchronized (_factories)
@ -460,8 +473,8 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Set the acceptor thread priority delta. /** Set the acceptor thread priority delta.
* <p>This allows the acceptor thread to run at a different priority. * <p>This allows the acceptor thread to run at a different priority.
* Typically this would be used to lower the priority to give preference * Typically this would be used to lower the priority to give preference
* to handling previously accepted connections rather than accepting * to handling previously accepted connections rather than accepting
* new connections</p> * new connections</p>
* @param acceptorPriorityDelta the acceptor priority delta * @param acceptorPriorityDelta the acceptor priority delta
*/ */
@ -532,7 +545,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
String name=thread.getName(); String name=thread.getName();
_name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString()); _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
thread.setName(_name); thread.setName(_name);
int priority=thread.getPriority(); int priority=thread.getPriority();
if (_acceptorPriorityDelta!=0) if (_acceptorPriorityDelta!=0)
thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta))); thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));
@ -574,7 +587,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
stopping.countDown(); stopping.countDown();
} }
} }
@Override @Override
public String toString() public String toString()
{ {
@ -583,7 +596,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
return String.format("acceptor-%d@%x", _id, hashCode()); return String.format("acceptor-%d@%x", _id, hashCode());
return name; return name;
} }
} }
@ -636,7 +649,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
{ {
return _name; return _name;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Set a connector name. A context may be configured with * Set a connector name. A context may be configured with
@ -648,7 +661,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
{ {
_name=name; _name=name;
} }
@Override @Override
public String toString() public String toString()
{ {

View File

@ -35,11 +35,11 @@ import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* ConnectionFactory for the PROXY Protocol. * ConnectionFactory for the PROXY Protocol.
* <p>This factory can be placed in front of any other connection factory * <p>This factory can be placed in front of any other connection factory
* to process the proxy line before the normal protocol handling</p> * to process the proxy line before the normal protocol handling</p>
* *
* @see <a href="http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt">http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt</a> * @see <a href="http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt">http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt</a>
*/ */
public class ProxyConnectionFactory extends AbstractConnectionFactory public class ProxyConnectionFactory extends AbstractConnectionFactory
@ -48,7 +48,7 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
private final String _next; private final String _next;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Proxy Connection Factory that uses the next ConnectionFactory /** Proxy Connection Factory that uses the next ConnectionFactory
* on the connector as the next protocol * on the connector as the next protocol
*/ */
public ProxyConnectionFactory() public ProxyConnectionFactory()
@ -56,13 +56,13 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
super("proxy"); super("proxy");
_next=null; _next=null;
} }
public ProxyConnectionFactory(String nextProtocol) public ProxyConnectionFactory(String nextProtocol)
{ {
super("proxy"); super("proxy");
_next=nextProtocol; _next=nextProtocol;
} }
@Override @Override
public Connection newConnection(Connector connector, EndPoint endp) public Connection newConnection(Connector connector, EndPoint endp)
{ {
@ -71,7 +71,7 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
{ {
for (Iterator<String> i = connector.getProtocols().iterator();i.hasNext();) for (Iterator<String> i = connector.getProtocols().iterator();i.hasNext();)
{ {
String p=i.next(); String p=i.next();
if (getProtocol().equalsIgnoreCase(p)) if (getProtocol().equalsIgnoreCase(p))
{ {
next=i.next(); next=i.next();
@ -79,16 +79,16 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
} }
} }
} }
return new ProxyConnection(endp,connector,next); return new ProxyConnection(endp,connector,next);
} }
public static class ProxyConnection extends AbstractConnection public static class ProxyConnection extends AbstractConnection
{ {
// 0 1 2 3 4 5 6 // 0 1 2 3 4 5 6
// 98765432109876543210987654321 // 98765432109876543210987654321
// PROXY P R.R.R.R L.L.L.L R Lrn // PROXY P R.R.R.R L.L.L.L R Lrn
private final int[] __size = {29,23,21,13,5,3,1}; private final int[] __size = {29,23,21,13,5,3,1};
private final Connector _connector; private final Connector _connector;
private final String _next; private final String _next;
@ -96,7 +96,7 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
private final String[] _field=new String[6]; private final String[] _field=new String[6];
private int _fields; private int _fields;
private int _length; private int _length;
protected ProxyConnection(EndPoint endp, Connector connector, String next) protected ProxyConnection(EndPoint endp, Connector connector, String next)
{ {
super(endp,connector.getExecutor()); super(endp,connector.getExecutor());
@ -110,9 +110,9 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
super.onOpen(); super.onOpen();
fillInterested(); fillInterested();
} }
@Override @Override
public void onFillable() public void onFillable()
{ {
try try
{ {
@ -125,7 +125,7 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
buffer=BufferUtil.allocate(size); buffer=BufferUtil.allocate(size);
else else
BufferUtil.clear(buffer); BufferUtil.clear(buffer);
// Read data // Read data
int fill=getEndPoint().fill(buffer); int fill=getEndPoint().fill(buffer);
if (fill<0) if (fill<0)
@ -138,15 +138,15 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
fillInterested(); fillInterested();
return; return;
} }
_length+=fill; _length+=fill;
if (_length>=108) if (_length>=108)
{ {
LOG.warn("PROXY line too long {}",getEndPoint()); LOG.warn("PROXY line too long {} for {}",_length,getEndPoint());
close(); close();
return; return;
} }
// parse fields // parse fields
while (buffer.hasRemaining()) while (buffer.hasRemaining())
{ {
@ -160,61 +160,60 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
} }
else if (b<' ') else if (b<' ')
{ {
LOG.warn("Bad char {}",getEndPoint()); LOG.warn("Bad character {} for {}",b&0xFF,getEndPoint());
close(); close();
return; return;
} }
else else
{
_builder.append((char)b); _builder.append((char)b);
}
} }
else else
{ {
if (b=='\n') if (b=='\n')
break loop; break loop;
LOG.warn("Bad CRLF {}",getEndPoint()); LOG.warn("Bad CRLF for {}",getEndPoint());
close(); close();
return; return;
} }
} }
} }
// Check proxy // Check proxy
if (!"PROXY".equals(_field[0])) if (!"PROXY".equals(_field[0]))
{ {
LOG.warn("Bad PROXY {}",getEndPoint()); LOG.warn("Not PROXY protocol for {}",getEndPoint());
close(); close();
return; return;
} }
// Extract Addresses // Extract Addresses
InetSocketAddress remote=new InetSocketAddress(_field[2],Integer.parseInt(_field[4])); InetSocketAddress remote=new InetSocketAddress(_field[2],Integer.parseInt(_field[4]));
InetSocketAddress local =new InetSocketAddress(_field[3],Integer.parseInt(_field[5])); InetSocketAddress local =new InetSocketAddress(_field[3],Integer.parseInt(_field[5]));
// Create the next protocol // Create the next protocol
ConnectionFactory connectionFactory = _connector.getConnectionFactory(_next); ConnectionFactory connectionFactory = _connector.getConnectionFactory(_next);
if (connectionFactory == null) if (connectionFactory == null)
{ {
LOG.info("{} next protocol '{}'",getEndPoint(), _next); LOG.info("Next protocol '{}' for {}",_next,getEndPoint());
close(); close();
return; return;
} }
EndPoint endPoint = new ProxyEndPoint(getEndPoint(),remote,local); EndPoint endPoint = new ProxyEndPoint(getEndPoint(),remote,local);
Connection newConnection = connectionFactory.newConnection(_connector, endPoint); Connection newConnection = connectionFactory.newConnection(_connector, endPoint);
endPoint.upgrade(newConnection); endPoint.upgrade(newConnection);
} }
catch (Throwable e) catch (Throwable x)
{ {
LOG.warn("Bad PROXY {} {}",e.toString(),getEndPoint()); LOG.warn("PROXY error for "+getEndPoint(),x);
LOG.debug(e);
close(); close();
} }
} }
} }
public static class ProxyEndPoint implements EndPoint public static class ProxyEndPoint implements EndPoint
{ {
private final EndPoint _endp; private final EndPoint _endp;
@ -233,7 +232,7 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
{ {
return _endp.isOptimizedForDirectBuffers(); return _endp.isOptimizedForDirectBuffers();
} }
public InetSocketAddress getLocalAddress() public InetSocketAddress getLocalAddress()
{ {
return _local; return _local;
@ -304,6 +303,12 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
_endp.fillInterested(callback); _endp.fillInterested(callback);
} }
@Override
public boolean isFillInterested()
{
return _endp.isFillInterested();
}
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{ {
_endp.write(callback,buffers); _endp.write(callback,buffers);

View File

@ -18,23 +18,16 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.Socket; import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collection;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -49,6 +42,14 @@ import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
public class ServerConnectorTest public class ServerConnectorTest
{ {
public static class ReuseInfoHandler extends AbstractHandler public static class ReuseInfoHandler extends AbstractHandler
@ -79,9 +80,9 @@ public class ServerConnectorTest
{ {
t.printStackTrace(out); t.printStackTrace(out);
} }
out.printf("socket.getReuseAddress() = %b%n",socket.getReuseAddress()); out.printf("socket.getReuseAddress() = %b%n",socket.getReuseAddress());
baseRequest.setHandled(true); baseRequest.setHandled(true);
} }
} }
@ -97,7 +98,7 @@ public class ServerConnectorTest
return new URI(String.format("http://%s:%d/",host,port)); return new URI(String.format("http://%s:%d/",host,port));
} }
private String getResponse(URI uri) throws MalformedURLException, IOException private String getResponse(URI uri) throws IOException
{ {
HttpURLConnection http = (HttpURLConnection)uri.toURL().openConnection(); HttpURLConnection http = (HttpURLConnection)uri.toURL().openConnection();
assertThat("Valid Response Code",http.getResponseCode(),anyOf(is(200),is(404))); assertThat("Valid Response Code",http.getResponseCode(),anyOf(is(200),is(404)));
@ -130,7 +131,7 @@ public class ServerConnectorTest
String response = getResponse(uri); String response = getResponse(uri);
assertThat("Response",response,containsString("connector.getReuseAddress() = true")); assertThat("Response",response,containsString("connector.getReuseAddress() = true"));
assertThat("Response",response,containsString("connector._reuseAddress() = true")); assertThat("Response",response,containsString("connector._reuseAddress() = true"));
// Java on Windows is incapable of propagating reuse-address this to the opened socket. // Java on Windows is incapable of propagating reuse-address this to the opened socket.
if (!OS.IS_WINDOWS) if (!OS.IS_WINDOWS)
{ {
@ -166,7 +167,7 @@ public class ServerConnectorTest
String response = getResponse(uri); String response = getResponse(uri);
assertThat("Response",response,containsString("connector.getReuseAddress() = true")); assertThat("Response",response,containsString("connector.getReuseAddress() = true"));
assertThat("Response",response,containsString("connector._reuseAddress() = true")); assertThat("Response",response,containsString("connector._reuseAddress() = true"));
// Java on Windows is incapable of propagating reuse-address this to the opened socket. // Java on Windows is incapable of propagating reuse-address this to the opened socket.
if (!OS.IS_WINDOWS) if (!OS.IS_WINDOWS)
{ {
@ -202,7 +203,7 @@ public class ServerConnectorTest
String response = getResponse(uri); String response = getResponse(uri);
assertThat("Response",response,containsString("connector.getReuseAddress() = false")); assertThat("Response",response,containsString("connector.getReuseAddress() = false"));
assertThat("Response",response,containsString("connector._reuseAddress() = false")); assertThat("Response",response,containsString("connector._reuseAddress() = false"));
// Java on Windows is incapable of propagating reuse-address this to the opened socket. // Java on Windows is incapable of propagating reuse-address this to the opened socket.
if (!OS.IS_WINDOWS) if (!OS.IS_WINDOWS)
{ {
@ -214,4 +215,23 @@ public class ServerConnectorTest
server.stop(); server.stop();
} }
} }
@Test
public void testAddFirstConnectionFactory() throws Exception
{
Server server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);
HttpConnectionFactory http = new HttpConnectionFactory();
connector.addConnectionFactory(http);
ProxyConnectionFactory proxy = new ProxyConnectionFactory();
connector.addFirstConnectionFactory(proxy);
Collection<ConnectionFactory> factories = connector.getConnectionFactories();
assertEquals(2, factories.size());
assertSame(proxy, factories.iterator().next());
assertEquals(2, connector.getBeans(ConnectionFactory.class).size());
assertEquals(proxy.getProtocol(), connector.getDefaultProtocol());
}
} }