452246 Fixed SSL hang on last chunk

Fixed WriteFlusher to distinguish between a flush that consumes all content and returns false, from one
that consumes all content and returns true.
If false is returned, the flusher needs to remain in pending so encrypted buffers can be flushed.
This commit is contained in:
Greg Wilkins 2014-11-27 11:45:08 +11:00
parent 16737804c8
commit a39dcd6fe5
3 changed files with 97 additions and 69 deletions

View File

@ -133,7 +133,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
{
if (LOG.isDebugEnabled())
LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
LOG.debug("Local interests updating {} -> {} for {}", oldInterestOps, newInterestOps, this);
_selector.updateKey(_updateTask);
}
else
@ -155,9 +155,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
private void setKeyInterests(int oldInterestOps, int newInterestOps)
{
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
_key.interestOps(newInterestOps);
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
@Override

View File

@ -46,7 +46,8 @@ abstract public class WriteFlusher
{
private static final Logger LOG = Log.getLogger(WriteFlusher.class);
private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[0];
private static final ByteBuffer[] NO_BUFFERS = new ByteBuffer[0];
private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER};
private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
private static final State __IDLE = new IdleState();
private static final State __WRITING = new WritingState();
@ -245,7 +246,7 @@ abstract public class WriteFlusher
private PendingState(ByteBuffer[] buffers, Callback callback)
{
super(StateType.PENDING);
_buffers = compact(buffers);
_buffers = buffers;
_callback = callback;
}
@ -269,41 +270,6 @@ abstract public class WriteFlusher
if (_callback!=null)
_callback.succeeded();
}
/**
* Compacting the buffers is needed because the semantic of WriteFlusher is
* to write the buffers and if the caller sees that the buffer is consumed,
* then it can recycle it.
* If we do not compact, then it is possible that we store a consumed buffer,
* which is then recycled and refilled; when the WriteFlusher is invoked to
* complete the write, it will write the refilled bytes, garbling the content.
*
* @param buffers the buffers to compact
* @return the compacted buffers
*/
private ByteBuffer[] compact(ByteBuffer[] buffers)
{
int length = buffers.length;
// Just one element, no need to compact
if (length < 2)
return buffers;
// How many still have content ?
int consumed = 0;
while (consumed < length && BufferUtil.isEmpty(buffers[consumed]))
++consumed;
// All of them still have content, no need to compact
if (consumed == 0)
return buffers;
// None has content, return empty
if (consumed == length)
return EMPTY_BUFFERS;
return Arrays.copyOfRange(buffers,consumed,length);
}
}
/**
@ -436,36 +402,36 @@ abstract public class WriteFlusher
*/
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{
// try the simple direct flush first, which also ensures that any null buffer
// flushes are passed through (for commits etc.)
if (_endPoint.flush(buffers))
return null;
// We were not fully flushed, so let's try again iteratively while we can make
// some progress
boolean progress=true;
while(true)
while(progress && buffers!=null)
{
// Compact buffers array?
int not_empty=0;
while(not_empty<buffers.length && BufferUtil.isEmpty(buffers[not_empty]))
not_empty++;
if (not_empty==buffers.length)
return null;
if (not_empty>0)
buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
int before=buffers[0].remaining();
if (!progress)
break;
// try to flush the remainder
int r=buffers[0].remaining();
if (_endPoint.flush(buffers))
return null;
progress=r!=buffers[0].remaining();
}
int r=buffers[0].remaining();
progress=before!=r;
int not_empty=0;
while(r==0)
{
if (++not_empty==buffers.length)
{
buffers=null;
not_empty=0;
break;
}
r=buffers[not_empty].remaining();
}
if (not_empty>0)
buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
}
return buffers;
// If buffers is null, then flush has returned false but has consumed all the data!
// This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS
// and that will keep this WriteFlusher pending.
return buffers==null?EMPTY_BUFFERS:buffers;
}
/* ------------------------------------------------------------ */

View File

@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSocket;
import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
@ -88,14 +89,45 @@ public class SslConnectionTest
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, getScheduler(), 60000);
SelectChannelEndPoint endp = new TestEP(channel,selectSet, selectionKey, getScheduler(), 60000);
_lastEndp=endp;
return endp;
}
};
// Must be volatile or the test may fail spuriously
protected volatile int _blockAt=0;
static final AtomicInteger __startBlocking = new AtomicInteger();
static final AtomicInteger __blockFor = new AtomicInteger();
private static class TestEP extends SelectChannelEndPoint
{
public TestEP(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
{
super(channel,selector,key,scheduler,idleTimeout);
}
@Override
protected void onIncompleteFlush()
{
super.onIncompleteFlush();
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
if (__startBlocking.get()==0 || __startBlocking.decrementAndGet()==0)
{
if (__blockFor.get()>0 && __blockFor.getAndDecrement()>0)
{
return false;
}
}
String s=BufferUtil.toDetailString(buffers[0]);
boolean flushed=super.flush(buffers);
return flushed;
}
}
@BeforeClass
public static void initSslEngine() throws Exception
@ -138,7 +170,7 @@ public class SslConnectionTest
public TestConnection(EndPoint endp)
{
super(endp, _threadPool,false);
super(endp, _threadPool,true);
}
@Override
@ -245,7 +277,7 @@ public class SslConnectionTest
len=5;
while(len>0)
len-=client.getInputStream().read(buffer);
Assert.assertEquals(0, _dispatches.get());
Assert.assertEquals(1, _dispatches.get());
client.close();
}
@ -270,6 +302,36 @@ public class SslConnectionTest
Assert.assertEquals(null,_writeCallback.get(100,TimeUnit.MILLISECONDS));
client.close();
}
@Test
public void testBlockedWrite() throws Exception
{
Socket client = newClient();
client.setSoTimeout(60000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
__startBlocking.set(5);
__blockFor.set(3);
client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8));
byte[] buffer = new byte[1024];
int len=client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8));
_dispatches.set(0);
client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
len=5;
while(len>0)
len-=client.getInputStream().read(buffer);
Assert.assertEquals(0, len);
client.close();
}
@Test
public void testManyLines() throws Exception