Implemented v2 of the ProxyConnection protocol

This commit is contained in:
Greg Wilkins 2015-10-29 16:45:50 +11:00
parent ab7b1b7f10
commit 9fe7332413
2 changed files with 441 additions and 50 deletions

View File

@ -48,6 +48,7 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.TypeUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -80,13 +81,25 @@ public class ProxyProtocolTest
}
@Test
public void test_PROXY_GET() throws Exception
public void test_PROXY_GET_v1() throws Exception
{
startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
Assert.assertEquals("1.2.3.4",request.getRemoteAddr());
Assert.assertEquals(1111,request.getRemotePort());
Assert.assertEquals("5.6.7.8",request.getLocalAddr());
Assert.assertEquals(2222,request.getLocalPort());
}
catch(Throwable th)
{
th.printStackTrace();
response.setStatus(500);
}
baseRequest.setHandled(true);
}
});
@ -118,4 +131,56 @@ public class ProxyProtocolTest
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_PROXY_GET_v2() throws Exception
{
startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
Assert.assertEquals("10.0.0.4",request.getRemoteAddr());
Assert.assertEquals(33824,request.getRemotePort());
Assert.assertEquals("10.0.0.4",request.getLocalAddr());
Assert.assertEquals(8888,request.getLocalPort());
}
catch(Throwable th)
{
th.printStackTrace();
response.setStatus(500);
}
baseRequest.setHandled(true);
}
});
String request1 = "0D0A0D0A000D0A515549540A211100140A0000040A000004842022B82000050000000000";
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", connector.getLocalPort()));
channel.write(ByteBuffer.wrap(TypeUtil.fromHexString(request1)));
FuturePromise<Session> promise = new FuturePromise<>();
client.accept(null, channel, new Session.Listener.Adapter(), promise);
Session session = promise.get(5, TimeUnit.SECONDS);
HttpFields fields = new HttpFields();
String uri = "http://localhost:" + connector.getLocalPort() + "/";
MetaData.Request metaData = new MetaData.Request("GET", new HttpURI(uri), HttpVersion.HTTP_2, fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
if (frame.isEndStream())
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -19,6 +19,9 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
@ -30,15 +33,17 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog;
/* ------------------------------------------------------------ */
/**
* ConnectionFactory for the PROXY Protocol.
* <p>This factory can be placed in front of any other connection factory
* to process the proxy line before the normal protocol handling</p>
* to process the proxy v1 or v2 line before the normal protocol handling</p>
*
* @see <a href="http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt">http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt</a>
*/
@ -46,6 +51,7 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
{
private static final Logger LOG = Log.getLogger(ProxyConnectionFactory.class);
private final String _next;
private int _maxProxyHeader=1024;
/* ------------------------------------------------------------ */
/** Proxy Connection Factory that uses the next ConnectionFactory
@ -63,6 +69,16 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
_next=nextProtocol;
}
public int getMaxProxyHeader()
{
return _maxProxyHeader;
}
public void setMaxProxyHeader(int maxProxyHeader)
{
_maxProxyHeader = maxProxyHeader;
}
@Override
public Connection newConnection(Connector connector, EndPoint endp)
{
@ -80,24 +96,16 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
}
}
return new ProxyConnection(endp,connector,next);
return new ProxyProtocolV1orV2Connection(endp,connector,next);
}
public static class ProxyConnection extends AbstractConnection
public class ProxyProtocolV1orV2Connection extends AbstractConnection
{
// 0 1 2 3 4 5 6
// 98765432109876543210987654321
// PROXY P R.R.R.R L.L.L.L R Lrn
private final int[] __size = {29,23,21,13,5,3,1};
private final Connector _connector;
private final String _next;
private final StringBuilder _builder=new StringBuilder();
private final String[] _field=new String[6];
private int _fields;
private int _length;
protected ProxyConnection(EndPoint endp, Connector connector, String next)
private ByteBuffer _buffer = BufferUtil.allocate(16);
protected ProxyProtocolV1orV2Connection(EndPoint endp, Connector connector, String next)
{
super(endp,connector.getExecutor());
_connector=connector;
@ -111,15 +119,138 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
fillInterested();
}
@Override
public void onFillable()
{
try
{
while(BufferUtil.space(_buffer)>0)
{
// Read data
int fill=getEndPoint().fill(_buffer);
if (fill<0)
{
getEndPoint().shutdownOutput();
return;
}
if (fill==0)
{
fillInterested();
return;
}
}
// Is it a V1?
switch(_buffer.get(0))
{
case 'P':
{
ProxyProtocolV1Connection v1 = new ProxyProtocolV1Connection(getEndPoint(),_connector,_next,_buffer);
getEndPoint().upgrade(v1);
return;
}
case 0x0D:
{
ProxyProtocolV2Connection v2 = new ProxyProtocolV2Connection(getEndPoint(),_connector,_next,_buffer);
getEndPoint().upgrade(v2);
return;
}
default:
LOG.warn("Not PROXY protocol for {}",getEndPoint());
close();
}
}
catch (Throwable x)
{
LOG.warn("PROXY error for "+getEndPoint(),x);
close();
}
}
}
public static class ProxyProtocolV1Connection extends AbstractConnection
{
// 0 1 2 3 4 5 6
// 98765432109876543210987654321
// PROXY P R.R.R.R L.L.L.L R Lrn
private final int[] __size = {29,23,21,13,5,3,1};
private final Connector _connector;
private final String _next;
private final StringBuilder _builder=new StringBuilder();
private final String[] _field=new String[6];
private int _fields;
private int _length;
protected ProxyProtocolV1Connection(EndPoint endp, Connector connector, String next,ByteBuffer buffer)
{
super(endp,connector.getExecutor());
_connector=connector;
_next=next;
_length=buffer.remaining();
parse(buffer);
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
private boolean parse(ByteBuffer buffer)
{
// parse fields
while (buffer.hasRemaining())
{
byte b = buffer.get();
if (_fields<6)
{
if (b==' ' || b=='\r' && _fields==5)
{
_field[_fields++]=_builder.toString();
_builder.setLength(0);
}
else if (b<' ')
{
LOG.warn("Bad character {} for {}",b&0xFF,getEndPoint());
close();
return false;
}
else
{
_builder.append((char)b);
}
}
else
{
if (b=='\n')
{
_fields=7;
return true;
}
LOG.warn("Bad CRLF for {}",getEndPoint());
close();
return false;
}
}
return true;
}
@Override
public void onFillable()
{
try
{
ByteBuffer buffer=null;
loop: while(true)
while(_fields<7)
{
// Create a buffer that will not read too much data
// since once read it is impossible to push back for the
// real connection to read it.
int size=Math.max(1,__size[_fields]-_builder.length());
if (buffer==null || buffer.capacity()!=size)
buffer=BufferUtil.allocate(size);
@ -147,38 +278,8 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
return;
}
// parse fields
while (buffer.hasRemaining())
{
byte b = buffer.get();
if (_fields<6)
{
if (b==' ' || b=='\r' && _fields==5)
{
_field[_fields++]=_builder.toString();
_builder.setLength(0);
}
else if (b<' ')
{
LOG.warn("Bad character {} for {}",b&0xFF,getEndPoint());
close();
return;
}
else
{
_builder.append((char)b);
}
}
else
{
if (b=='\n')
break loop;
LOG.warn("Bad CRLF for {}",getEndPoint());
close();
return;
}
}
if (!parse(buffer))
return;
}
// Check proxy
@ -197,10 +298,13 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
ConnectionFactory connectionFactory = _connector.getConnectionFactory(_next);
if (connectionFactory == null)
{
LOG.info("Next protocol '{}' for {}",_next,getEndPoint());
LOG.warn("No Next protocol '{}' for {}",_next,getEndPoint());
close();
return;
}
if (LOG.isDebugEnabled())
LOG.warn("Next protocol '{}' for {} r={} l={}",_next,getEndPoint(),remote,local);
EndPoint endPoint = new ProxyEndPoint(getEndPoint(),remote,local);
Connection newConnection = connectionFactory.newConnection(_connector, endPoint);
@ -213,6 +317,228 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
}
}
}
enum Family { UNSPEC, INET, INET6, UNIX };
enum Transport { UNSPEC, STREAM, DGRAM };
private static final byte[] MAGIC = new byte[]{0x0D,0x0A,0x0D,0x0A,0x00,0x0D,0x0A,0x51,0x55,0x49,0x54,0x0A};
public class ProxyProtocolV2Connection extends AbstractConnection
{
private final Connector _connector;
private final String _next;
private final boolean _local;
private final Family _family;
private final Transport _transport;
private final int _length;
private final ByteBuffer _buffer;
protected ProxyProtocolV2Connection(EndPoint endp, Connector connector, String next,ByteBuffer buffer)
throws IOException
{
super(endp,connector.getExecutor());
_connector=connector;
_next=next;
if (buffer.remaining()!=16)
throw new IllegalStateException();
if (LOG.isDebugEnabled())
LOG.debug("PROXYv2 header {} for {}",BufferUtil.toHexSummary(buffer),this);
// struct proxy_hdr_v2 {
// uint8_t sig[12]; /* hex 0D 0A 0D 0A 00 0D 0A 51 55 49 54 0A */
// uint8_t ver_cmd; /* protocol version and command */
// uint8_t fam; /* protocol family and address */
// uint16_t len; /* number of following bytes part of the header */
// };
for (int i=0;i<MAGIC.length;i++)
if (buffer.get()!=MAGIC[i])
throw new IOException("Bad PROXY protocol v2 signature");
int versionAndCommand = 0xff & buffer.get();
if ((versionAndCommand&0xf0) != 0x20)
throw new IOException("Bad PROXY protocol v2 version");
_local=(versionAndCommand&0xf)==0x00;
int transportAndFamily = 0xff & buffer.get();
switch(transportAndFamily>>4)
{
case 0: _family=Family.UNSPEC; break;
case 1: _family=Family.INET; break;
case 2: _family=Family.INET6; break;
case 3: _family=Family.UNIX; break;
default:
throw new IOException("Bad PROXY protocol v2 family");
}
switch(0xf&transportAndFamily)
{
case 0: _transport=Transport.UNSPEC; break;
case 1: _transport=Transport.STREAM; break;
case 2: _transport=Transport.DGRAM; break;
default:
throw new IOException("Bad PROXY protocol v2 family");
}
_length = buffer.getChar();
if (!_local && (_family==Family.UNSPEC || _family==Family.UNIX || _transport!=Transport.STREAM))
throw new IOException(String.format("Unsupported PROXY protocol v2 mode 0x%x,0x%x",versionAndCommand,transportAndFamily));
if (_length>_maxProxyHeader)
throw new IOException(String.format("Unsupported PROXY protocol v2 mode 0x%x,0x%x,0x%x",versionAndCommand,transportAndFamily,_length));
_buffer = _length>0?BufferUtil.allocate(_length):BufferUtil.EMPTY_BUFFER;
}
@Override
public void onOpen()
{
super.onOpen();
if (_buffer.remaining()==_length)
next();
else
fillInterested();
}
@Override
public void onFillable()
{
try
{
while(_buffer.remaining()<_length)
{
// Read data
int fill=getEndPoint().fill(_buffer);
if (fill<0)
{
getEndPoint().shutdownOutput();
return;
}
if (fill==0)
{
fillInterested();
return;
}
}
}
catch (Throwable x)
{
LOG.warn("PROXY error for "+getEndPoint(),x);
close();
return;
}
next();
}
private void next()
{
if (LOG.isDebugEnabled())
LOG.debug("PROXYv2 next {} from {} for {}",_next,BufferUtil.toHexSummary(_buffer),this);
// Create the next protocol
ConnectionFactory connectionFactory = _connector.getConnectionFactory(_next);
if (connectionFactory == null)
{
LOG.info("Next protocol '{}' for {}",_next,getEndPoint());
close();
return;
}
// Do we need to wrap the endpoint?
EndPoint endPoint=getEndPoint();
if (!_local)
{
try
{
InetAddress src;
InetAddress dst;
int sp;
int dp;
switch(_family)
{
case INET:
{
byte[] addr=new byte[4];
_buffer.get(addr);
src = Inet4Address.getByAddress(addr);
_buffer.get(addr);
dst = Inet4Address.getByAddress(addr);
sp = _buffer.getChar();
dp = _buffer.getChar();
break;
}
case INET6:
{
byte[] addr=new byte[16];
_buffer.get(addr);
src = Inet6Address.getByAddress(addr);
_buffer.get(addr);
dst = Inet6Address.getByAddress(addr);
sp = _buffer.getChar();
dp = _buffer.getChar();
break;
}
default:
throw new IllegalStateException();
}
// Any additional info?
while(_buffer.hasRemaining())
{
int type = 0xff & _buffer.get();
int length = _buffer.getShort();
byte[] value = new byte[length];
_buffer.get(value);
if (LOG.isDebugEnabled())
LOG.debug(String.format("T=%x L=%d V=%s for {}%n",type,length,TypeUtil.toHexString(value),this));
// TODO interpret these values
switch(type)
{
case 0x01: // PP2_TYPE_ALPN
break;
case 0x02: // PP2_TYPE_AUTHORITY
break;
case 0x20: // PP2_TYPE_SSL
break;
case 0x21: // PP2_TYPE_SSL_VERSION
break;
case 0x22: // PP2_TYPE_SSL_CN
break;
case 0x30: // PP2_TYPE_NETNS
break;
default:
break;
}
}
// Extract Addresses
InetSocketAddress remote=new InetSocketAddress(src,sp);
InetSocketAddress local =new InetSocketAddress(dst,dp);
endPoint = new ProxyEndPoint(endPoint,remote,local);
}
catch(Exception e)
{
LOG.warn(e);
}
}
Connection newConnection = connectionFactory.newConnection(_connector, endPoint);
endPoint.upgrade(newConnection);
}
}
public static class ProxyEndPoint implements EndPoint
{