work in progress

This commit is contained in:
Greg Wilkins 2011-10-26 10:14:03 +11:00
parent 61664d3c0f
commit 010328fe2b
3 changed files with 410 additions and 50 deletions

View File

@ -25,6 +25,7 @@ import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
@ -49,23 +50,34 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
private final ThreadLocal<NIOBuffer> __outBuffer = new ThreadLocal<NIOBuffer>(); private final ThreadLocal<NIOBuffer> __outBuffer = new ThreadLocal<NIOBuffer>();
private final SSLEngine _engine; private final SSLEngine _engine;
private final SSLSession _session; private final SSLSession _session;
private AsyncConnection _delegate; private AsyncConnection _connection;
private int _allocations; private int _allocations;
private NIOBuffer _inbound; private NIOBuffer _inbound;
private NIOBuffer _unwrapBuf; private NIOBuffer _unwrapBuf;
private NIOBuffer _outbound; private NIOBuffer _outbound;
private AsyncEndPoint _aEndp;
public SslConnection(SSLEngine engine,AsyncConnection connection,EndPoint endp) public SslConnection(SSLEngine engine,EndPoint endp)
{ {
this(engine,connection,endp,System.currentTimeMillis()); this(engine,endp,System.currentTimeMillis());
} }
public SslConnection(SSLEngine engine,AsyncConnection connection,EndPoint endp, long timeStamp) public SslConnection(SSLEngine engine,EndPoint endp, long timeStamp)
{ {
super(endp,timeStamp); super(endp,timeStamp);
_delegate=connection;
_engine=engine; _engine=engine;
_session=_engine.getSession(); _session=_engine.getSession();
_aEndp=(AsyncEndPoint)endp;
}
public synchronized void setConnection(AsyncConnection connection)
{
_connection=connection;
}
public synchronized AsyncConnection getConnection()
{
return _connection;
} }
private void allocateBuffers() private void allocateBuffers()
@ -139,20 +151,24 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
LOG.debug("{} filled={} flushed={}",_session,filled,flushed); LOG.debug("{} filled={} flushed={}",_session,filled,flushed);
// If we are handshook let the delegate connection // If we are handshook let the delegate connection
if (_engine.getHandshakeStatus()==HandshakeStatus.NOT_HANDSHAKING) if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
process(null,null);
else
{ {
// handle the delegate connection // handle the delegate connection
AsyncConnection next = (AsyncConnection)_delegate.handle(); AsyncConnection next = (AsyncConnection)_connection.handle();
if (next!=_delegate && next==null) if (next!=_connection && next==null)
{ {
_delegate=next; _connection=next;
progress=true; progress=true;
} }
} }
else
{ // pass on ishut/oshut state
process(null,null); if (!_inbound.hasContent() && _endp.isInputShutdown())
} _engine.closeInbound();
if (!_outbound.hasContent() && _engine.isOutboundDone())
_endp.shutdownOutput();
} }
} }
finally finally
@ -184,7 +200,7 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private synchronized int process(NIOBuffer toFill, NIOBuffer toFlush) throws IOException private synchronized boolean process(Buffer toFill, Buffer toFlush) throws IOException
{ {
if (toFill==null) if (toFill==null)
{ {
@ -195,16 +211,14 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
else if (_unwrapBuf!=null && _unwrapBuf.hasContent()) else if (_unwrapBuf!=null && _unwrapBuf.hasContent())
{ {
_unwrapBuf.skip(toFill.put(_unwrapBuf)); _unwrapBuf.skip(toFill.put(_unwrapBuf));
return 1; return true;
} }
if (toFlush==null) if (toFlush==null)
toFlush=__ZERO_BUFFER; toFlush=__ZERO_BUFFER;
HandshakeStatus initialStatus = _engine.getHandshakeStatus(); HandshakeStatus initialStatus = _engine.getHandshakeStatus();
boolean progress=true; boolean progress=true;
int received=0; boolean some_progress=false;
int sent=0;
try try
{ {
allocateBuffers(); allocateBuffers();
@ -245,14 +259,15 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
// Detect SUN JVM Bug!!! // Detect SUN JVM Bug!!!
if(initialStatus==HandshakeStatus.NOT_HANDSHAKING && if(initialStatus==HandshakeStatus.NOT_HANDSHAKING &&
_engine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP && sent==0) _engine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP /* && sent==0 */ )
{ {
// This should be NEED_WRAP // This should be NEED_WRAP
// The fix simply detects the signature of the bug and then close the connection (fail-fast) so that ff3 will delegate to using SSL instead of TLS. // The fix simply detects the signature of the bug and then close the connection (fail-fast) so that ff3 will delegate to using SSL instead of TLS.
// This is a jvm bug on java1.6 where the SSLEngine expects more data from the initial handshake when the client(ff3-tls) already had given it. // This is a jvm bug on java1.6 where the SSLEngine expects more data from the initial handshake when the client(ff3-tls) already had given it.
// See http://jira.codehaus.org/browse/JETTY-567 for more details // See http://jira.codehaus.org/browse/JETTY-567 for more details
LOG.warn("{} JETTY-567",_session); LOG.warn("{} JETTY-567",_session);
return -1; _endp.close();
return false;
} }
} }
break; break;
@ -274,21 +289,20 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
break; break;
} }
LOG.debug("{} progress {}",_session,progress); LOG.debug("{} progress={}",_session,progress);
some_progress|=progress;
} }
LOG.debug("{} received {} sent {}",_session,received,sent);
} }
finally finally
{ {
releaseBuffers(); releaseBuffers();
} }
return (received<0||sent<0)?-1:(received+sent); return some_progress;
} }
private synchronized boolean wrap(final NIOBuffer buffer) throws IOException private synchronized boolean wrap(final Buffer buffer) throws IOException
{ {
ByteBuffer bbuf=buffer.getByteBuffer(); ByteBuffer bbuf=extractByteBuffer(buffer);
final SSLEngineResult result; final SSLEngineResult result;
synchronized(bbuf) synchronized(bbuf)
@ -355,13 +369,13 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
return result.bytesConsumed()>0 || result.bytesProduced()>0; return result.bytesConsumed()>0 || result.bytesProduced()>0;
} }
private synchronized boolean unwrap(final NIOBuffer buffer) throws IOException private synchronized boolean unwrap(final Buffer buffer) throws IOException
{ {
if (!_inbound.hasContent()) if (!_inbound.hasContent())
return false; return false;
buffer.compact(); buffer.compact();
ByteBuffer bbuf=buffer.getByteBuffer(); ByteBuffer bbuf=extractByteBuffer(buffer);
final SSLEngineResult result; final SSLEngineResult result;
synchronized(bbuf) synchronized(bbuf)
@ -432,4 +446,197 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
return result.bytesConsumed()>0 || result.bytesProduced()>0; return result.bytesConsumed()>0 || result.bytesProduced()>0;
} }
/* ------------------------------------------------------------ */
private ByteBuffer extractByteBuffer(Buffer buffer)
{
if (buffer.buffer() instanceof NIOBuffer)
return ((NIOBuffer)buffer.buffer()).getByteBuffer();
return ByteBuffer.wrap(buffer.array());
}
/* ------------------------------------------------------------ */
public AsyncEndPoint getSslEndPoint()
{
return new EP();
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
public class EP implements AsyncEndPoint
{
public void shutdownOutput() throws IOException
{
_engine.closeOutbound();
}
public boolean isOutputShutdown()
{
return _engine.isOutboundDone();
}
public void shutdownInput() throws IOException
{
_engine.closeInbound();
}
public boolean isInputShutdown()
{
return _engine.isInboundDone();
}
public void close() throws IOException
{
_endp.close();
}
public int fill(Buffer buffer) throws IOException
{
int size=buffer.length();
process(buffer,null);
int filled=buffer.length()-size;
if (filled==0 && isInputShutdown())
return -1;
return filled;
}
public int flush(Buffer buffer) throws IOException
{
int size=buffer.length();
process(null,buffer);
int flushed=size-buffer.length();
return flushed;
}
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
if (header!=null && header.hasContent())
return flush(header);
if (buffer!=null && buffer.hasContent())
return flush(buffer);
if (trailer!=null && trailer.hasContent())
return flush(trailer);
return 0;
}
public boolean blockReadable(long millisecs) throws IOException
{
return false;
}
public boolean blockWritable(long millisecs) throws IOException
{
return false;
}
public boolean isOpen()
{
return false;
}
public Object getTransport()
{
return null;
}
public boolean isBufferingInput()
{
return false;
}
public boolean isBufferingOutput()
{
return false;
}
public void flush() throws IOException
{
}
public void asyncDispatch()
{
_aEndp.asyncDispatch();
}
public void scheduleWrite()
{
_aEndp.scheduleWrite();
}
public void scheduleIdle()
{
_aEndp.scheduleIdle();
}
public void cancelIdle()
{
_aEndp.cancelIdle();
}
public boolean isWritable()
{
return _aEndp.isWritable();
}
public boolean hasProgressed()
{
return _aEndp.hasProgressed();
}
public String getLocalAddr()
{
return _aEndp.getLocalAddr();
}
public String getLocalHost()
{
return _aEndp.getLocalHost();
}
public int getLocalPort()
{
return _aEndp.getLocalPort();
}
public String getRemoteAddr()
{
return _aEndp.getRemoteAddr();
}
public String getRemoteHost()
{
return _aEndp.getRemoteHost();
}
public int getRemotePort()
{
return _aEndp.getRemotePort();
}
public boolean isBlocking()
{
return _aEndp.isBlocking();
}
public boolean isBufferred()
{
return _aEndp.isBufferred();
}
public int getMaxIdleTime()
{
return _aEndp.getMaxIdleTime();
}
public void setMaxIdleTime(int timeMs) throws IOException
{
_aEndp.setMaxIdleTime(timeMs);
}
}
} }

View File

@ -60,6 +60,9 @@ public class SelectChannelEndPointTest
} }
}; };
boolean _echo=true;
boolean _block=true;
@Before @Before
public void startManager() throws Exception public void startManager() throws Exception
{ {
@ -82,16 +85,15 @@ public class SelectChannelEndPointTest
return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort()); return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort());
} }
protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) protected AsyncConnection newConnection(SocketChannel channel, EndPoint endpoint)
{ {
return new TestConnection(endpoint); return new TestConnection(endpoint);
} }
public static class TestConnection extends AbstractConnection implements AsyncConnection public class TestConnection extends AbstractConnection implements AsyncConnection
{ {
NIOBuffer _in = new IndirectNIOBuffer(32*1024); NIOBuffer _in = new IndirectNIOBuffer(32*1024);
NIOBuffer _out = new IndirectNIOBuffer(32*1024); NIOBuffer _out = new IndirectNIOBuffer(32*1024);
boolean _echo=true;
public TestConnection(EndPoint endp) public TestConnection(EndPoint endp)
{ {
@ -178,12 +180,63 @@ public class SelectChannelEndPointTest
} }
// write then shutdown // write then shutdown
client.getOutputStream().write("Goodbye".getBytes("UTF-8")); client.getOutputStream().write("Goodbye Cruel TLS".getBytes("UTF-8"));
// Verify echo server to client
for (char c : "Goodbye Cruel TLS".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b>0);
assertEquals(c,(char)b);
}
client.close();
}
@Test
public void testShutdown() throws Exception
{
Socket client = newClient();
client.setSoTimeout(500);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.register(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b>0);
assertEquals(c,(char)b);
}
// wait for read timeout
long start=System.currentTimeMillis();
try
{
client.getInputStream().read();
Assert.fail();
}
catch(SocketTimeoutException e)
{
assertTrue(System.currentTimeMillis()-start>=400);
}
// write then shutdown
client.getOutputStream().write("Goodbye Cruel TLS".getBytes("UTF-8"));
client.shutdownOutput(); client.shutdownOutput();
// Verify echo server to client // Verify echo server to client
for (char c : "Goodbye".toCharArray()) for (char c : "Goodbye Cruel TLS".toCharArray())
{ {
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertTrue(b>0); assertTrue(b>0);

View File

@ -3,16 +3,19 @@ package org.eclipse.jetty.io.nio;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException; import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test;
public class SslSelectChannelEndPointTest extends SelectChannelEndPointTest public class SslSelectChannelEndPointTest extends SelectChannelEndPointTest
@ -26,7 +29,6 @@ public class SslSelectChannelEndPointTest extends SelectChannelEndPointTest
__sslCtxFactory.setKeyStorePath(keystore.getAbsolutePath()); __sslCtxFactory.setKeyStorePath(keystore.getAbsolutePath());
__sslCtxFactory.setKeyStorePassword("storepwd"); __sslCtxFactory.setKeyStorePassword("storepwd");
__sslCtxFactory.setKeyManagerPassword("keypwd"); __sslCtxFactory.setKeyManagerPassword("keypwd");
__sslCtxFactory.setTrustAll(true);
__sslCtxFactory.start(); __sslCtxFactory.start();
} }
@ -39,22 +41,120 @@ public class SslSelectChannelEndPointTest extends SelectChannelEndPointTest
} }
@Override @Override
protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) protected AsyncConnection newConnection(SocketChannel channel, EndPoint endpoint)
{ {
try
{
AsyncConnection delegate = super.newConnection(channel,endpoint);
SSLEngine engine = __sslCtxFactory.newSslEngine(); SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(false); engine.setUseClientMode(false);
engine.beginHandshake(); SslConnection connection = new SslConnection(engine,endpoint);
return new SslConnection(engine,delegate,endpoint);
AsyncConnection delegate = super.newConnection(channel,connection.getSslEndPoint());
connection.setConnection(delegate);
return connection;
} }
catch(SSLException e)
@Test
@Override
public void testShutdown() throws Exception
{ {
throw new RuntimeException(e);
SocketChannel client = SocketChannel.open(_connector.socket().getLocalSocketAddress());
client.socket().setSoTimeout(500);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.register(server);
SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(true);
engine.beginHandshake();
ByteBuffer appOut = ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
ByteBuffer sslOut = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
ByteBuffer appIn = ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
ByteBuffer sslIn = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
appOut.put("HelloWorld".getBytes("UTF-8"));
appOut.flip();
System.err.println(engine.getHandshakeStatus());
while (engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
{
if (engine.getHandshakeStatus()==HandshakeStatus.NEED_WRAP)
{
SSLEngineResult result =engine.wrap(appOut,sslOut);
System.err.println(result);
sslOut.flip();
int flushed=client.write(sslOut);
System.err.println("out="+flushed);
sslOut.clear();
} }
if (engine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP)
{
int filled=client.read(sslIn);
System.err.println("in="+filled);
sslIn.flip();
SSLEngineResult result =engine.unwrap(sslIn,appIn);
sslIn.flip();
sslIn.compact();
System.err.println(result);
} }
if (engine.getHandshakeStatus()==HandshakeStatus.NEED_TASK)
{
Runnable task;
while ((task=engine.getDelegatedTask())!=null)
task.run();
System.err.println(engine.getHandshakeStatus());
}
}
Thread.sleep(2000);
/*
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b>0);
assertEquals(c,(char)b);
}
// wait for read timeout
long start=System.currentTimeMillis();
try
{
client.getInputStream().read();
Assert.fail();
}
catch(SocketTimeoutException e)
{
assertTrue(System.currentTimeMillis()-start>=400);
}
// write then shutdown
client.getOutputStream().write("Goodbye Cruel TLS".getBytes("UTF-8"));
client.shutdownOutput();
// Verify echo server to client
for (char c : "Goodbye Cruel TLS".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b>0);
assertEquals(c,(char)b);
}
// Read close
assertEquals(-1,client.getInputStream().read());
*/
}
} }