Merge remote-tracking branch 'origin/jetty-9.2.x'

Conflicts:
	jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
This commit is contained in:
Greg Wilkins 2014-09-18 16:36:51 +10:00
commit 2018098225
4 changed files with 196 additions and 31 deletions

View File

@ -1512,6 +1512,26 @@ public class HttpParserTest
} }
@Test
public void testFolded() throws Exception
{
ByteBuffer buffer= BufferUtil.toBuffer(
"GET / HTTP/1.0\015\012" +
"Host: localhost\015\012" +
"Connection: close\015\012" +
"Content-Type: application/soap+xml; charset=utf-8; \015\012"+
"\taction=\"xxx\" \015\012" +
"\015\012");
HttpParser.RequestHandler handler = new Handler();
HttpParser parser= new HttpParser(handler);
parseAll(parser,buffer);
assertFalse(_headerCompleted);
assertEquals(_bad, "Bad Continuation");
}
@Before @Before
public void init() public void init()
{ {

View File

@ -334,13 +334,13 @@ abstract public class WriteFlusher
try try
{ {
boolean flushed=_endPoint.flush(buffers); buffers=flush(buffers);
if (DEBUG)
LOG.debug("flushed {}", flushed);
// if we are incomplete? // if we are incomplete?
if (!flushed) if (buffers!=null)
{ {
if (DEBUG)
LOG.debug("flushed incomplete");
PendingState pending=new PendingState(buffers, callback); PendingState pending=new PendingState(buffers, callback);
if (updateState(__WRITING,pending)) if (updateState(__WRITING,pending))
onIncompleteFlushed(); onIncompleteFlushed();
@ -379,7 +379,7 @@ abstract public class WriteFlusher
* {@link #onFail(Throwable)} or {@link #onClose()} * {@link #onFail(Throwable)} or {@link #onClose()}
*/ */
public void completeWrite() public void completeWrite()
{ {
if (DEBUG) if (DEBUG)
LOG.debug("completeWrite: {}", this); LOG.debug("completeWrite: {}", this);
@ -396,13 +396,15 @@ abstract public class WriteFlusher
{ {
ByteBuffer[] buffers = pending.getBuffers(); ByteBuffer[] buffers = pending.getBuffers();
boolean flushed=_endPoint.flush(buffers); buffers=flush(buffers);
if (DEBUG)
LOG.debug("flushed {}", flushed);
// if we are incomplete? // if we are incomplete?
if (!flushed) if (buffers!=null)
{ {
if (DEBUG)
LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
if (buffers!=pending.getBuffers())
pending=new PendingState(buffers, pending._callback);
if (updateState(__COMPLETING,pending)) if (updateState(__COMPLETING,pending))
onIncompleteFlushed(); onIncompleteFlushed();
else else
@ -426,6 +428,46 @@ abstract public class WriteFlusher
} }
} }
/* ------------------------------------------------------------ */
/** Flush the buffers iteratively until no progress is made
* @param buffers The buffers to flush
* @return The unflushed buffers, or null if all flushed
* @throws IOException
*/
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{
// try the simple direct flush first, which also ensures that any null buffer
// flushes are passed through (for commits etc.)
if (_endPoint.flush(buffers))
return null;
// We were not fully flushed, so let's try again iteratively while we can make
// some progress
boolean progress=true;
while(true)
{
// Compact buffers array?
int not_empty=0;
while(not_empty<buffers.length && BufferUtil.isEmpty(buffers[not_empty]))
not_empty++;
if (not_empty==buffers.length)
return null;
if (not_empty>0)
buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
if (!progress)
break;
// try to flush the remainder
int r=buffers[0].remaining();
if (_endPoint.flush(buffers))
return null;
progress=r!=buffers[0].remaining();
}
return buffers;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Notify the flusher of a failure /** Notify the flusher of a failure
* @param cause The cause of the failure * @param cause The cause of the failure

View File

@ -345,12 +345,12 @@ public class SslConnection extends AbstractConnection
@Override @Override
protected void onIncompleteFlush() protected void onIncompleteFlush()
{ {
// This means that the decrypted endpoint write method was called and not // This means that the decrypted endpoint write method was called and not
// all data could be wrapped. So either we need to write some encrypted data, // all data could be wrapped. So either we need to write some encrypted data,
// OR if we are handshaking we need to read some encrypted data OR // OR if we are handshaking we need to read some encrypted data OR
// if neither then we should just try the flush again. // if neither then we should just try the flush again.
boolean flush = false; boolean try_again = false;
synchronized (DecryptedEndPoint.this) synchronized (DecryptedEndPoint.this)
{ {
if (DEBUG) if (DEBUG)
@ -371,10 +371,17 @@ public class SslConnection extends AbstractConnection
} }
else else
{ {
flush = true; // We can get here because the WriteFlusher might not see progress
// when it has just flushed the encrypted data, but not consumed anymore
// of the application buffers. This is mostly avoided by another iteration
// within DecryptedEndPoint flush(), but I cannot convince myself that
// this is never ever the case.
try_again = true;
} }
} }
if (flush)
if (try_again)
{ {
// If the output is closed, // If the output is closed,
if (isOutputShutdown()) if (isOutputShutdown())
@ -386,7 +393,9 @@ public class SslConnection extends AbstractConnection
else else
{ {
// try to flush what is pending // try to flush what is pending
getWriteFlusher().completeWrite(); // because this is a special case (see above) we could probably
// avoid the dispatch, but best to be sure
getExecutor().execute(_runCompletWrite);
} }
} }
} }
@ -713,18 +722,12 @@ public class SslConnection extends AbstractConnection
if (DEBUG) if (DEBUG)
LOG.debug("{} wrap {}", SslConnection.this, wrapResult.toString().replace('\n',' ')); LOG.debug("{} wrap {}", SslConnection.this, wrapResult.toString().replace('\n',' '));
BufferUtil.flipToFlush(_encryptedOutput, pos); BufferUtil.flipToFlush(_encryptedOutput, pos);
boolean allConsumed=true;
// clear empty buffers to prevent position creeping up the buffer
for (ByteBuffer b : appOuts)
{
if (BufferUtil.isEmpty(b))
BufferUtil.clear(b);
else
allConsumed=false;
}
Status wrapResultStatus = wrapResult.getStatus(); Status wrapResultStatus = wrapResult.getStatus();
boolean allConsumed=true;
for (ByteBuffer b : appOuts)
if (BufferUtil.hasContent(b))
allConsumed=false;
// and deal with the results returned from the sslEngineWrap // and deal with the results returned from the sslEngineWrap
switch (wrapResultStatus) switch (wrapResultStatus)
@ -777,13 +780,20 @@ public class SslConnection extends AbstractConnection
// if we have net bytes, let's try to flush them // if we have net bytes, let's try to flush them
if (BufferUtil.hasContent(_encryptedOutput)) if (BufferUtil.hasContent(_encryptedOutput))
getEndPoint().flush(_encryptedOutput); if (!getEndPoint().flush(_encryptedOutput));
getEndPoint().flush(_encryptedOutput); // one retry
// But we also might have more to do for the handshaking state. // But we also might have more to do for the handshaking state.
switch (handshakeStatus) switch (handshakeStatus)
{ {
case NOT_HANDSHAKING: case NOT_HANDSHAKING:
// Return with the number of bytes consumed (which may be 0) // If we have not consumed all and had just finished handshaking, then we may
// have just flushed the last handshake in the encrypted buffers, so we should
// try again.
if (!allConsumed && wrapResult.getHandshakeStatus()==HandshakeStatus.FINISHED && BufferUtil.isEmpty(_encryptedOutput))
continue;
// Return true if we consumed all the bytes and encrypted are all flushed
return allConsumed && BufferUtil.isEmpty(_encryptedOutput); return allConsumed && BufferUtil.isEmpty(_encryptedOutput);
case NEED_TASK: case NEED_TASK:

View File

@ -30,10 +30,12 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -41,7 +43,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
@ -509,7 +513,7 @@ public class WriteFlusherTest
BufferUtil.flipToFill(byteBuffer); // pretend everything has been written BufferUtil.flipToFill(byteBuffer); // pretend everything has been written
writeCalledLatch.countDown(); writeCalledLatch.countDown();
failedCalledLatch.await(5, TimeUnit.SECONDS); failedCalledLatch.await(5, TimeUnit.SECONDS);
return null; return Boolean.TRUE;
} }
}); });
} }
@ -522,7 +526,7 @@ public class WriteFlusherTest
final CountDownLatch writeCalledLatch = new CountDownLatch(1); final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch completeWrite = new CountDownLatch(1); final CountDownLatch completeWrite = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch)) final WriteFlusher writeFlusher = new WriteFlusher(new EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(writeCalledLatch, failedCalledLatch))
{ {
@Override @Override
protected void onIncompleteFlushed() protected void onIncompleteFlushed()
@ -555,12 +559,13 @@ public class WriteFlusherTest
assertThat("callback complete has not been called", callback.isCompleted(), is(false)); assertThat("callback complete has not been called", callback.isCompleted(), is(false));
} }
private static class EndPointMock extends ByteArrayEndPoint private static class EndPointConcurrentAccessToIncompleteWriteAndOnFailMock extends ByteArrayEndPoint
{ {
private final CountDownLatch writeCalledLatch; private final CountDownLatch writeCalledLatch;
private final CountDownLatch failedCalledLatch; private final CountDownLatch failedCalledLatch;
private final AtomicBoolean stalled=new AtomicBoolean(false);
public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch) public EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
{ {
this.writeCalledLatch = writeCalledLatch; this.writeCalledLatch = writeCalledLatch;
this.failedCalledLatch = failedCalledLatch; this.failedCalledLatch = failedCalledLatch;
@ -574,6 +579,13 @@ public class WriteFlusherTest
int oldPos = byteBuffer.position(); int oldPos = byteBuffer.position();
if (byteBuffer.remaining() == 2) if (byteBuffer.remaining() == 2)
{ {
// make sure we stall at least once
if (!stalled.get())
{
stalled.set(true);
return false;
}
// make sure failed is called before we go on // make sure failed is called before we go on
try try
{ {
@ -601,6 +613,87 @@ public class WriteFlusherTest
} }
} }
@Test
public void testIterationOnNonBlockedStall() throws Exception
{
final Exchanger<Integer> exchange = new Exchanger<>();
final AtomicInteger window = new AtomicInteger(10);
EndPointIterationOnNonBlockedStallMock endp=new EndPointIterationOnNonBlockedStallMock(window);
final WriteFlusher writeFlusher = new WriteFlusher(endp)
{
@Override
protected void onIncompleteFlushed()
{
executor.submit(new Runnable()
{
public void run()
{
try
{
while(window.get()==0)
window.addAndGet(exchange.exchange(0));
completeWrite();
}
catch(Throwable th)
{
th.printStackTrace();
}
}
});
}
};
BlockingCallback callback = new BlockingCallback();
writeFlusher.write(callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow."));
exchange.exchange(0);
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("How now br"));
exchange.exchange(1);
exchange.exchange(0);
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("o"));
exchange.exchange(8);
callback.block();
Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("wn cow."));
}
private static class EndPointIterationOnNonBlockedStallMock extends ByteArrayEndPoint
{
final AtomicInteger _window;
public EndPointIterationOnNonBlockedStallMock(AtomicInteger window)
{
_window=window;
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
ByteBuffer byteBuffer = buffers[0];
if (_window.get()>0 && byteBuffer.hasRemaining())
{
// consume 1 byte
byte one = byteBuffer.get(byteBuffer.position());
if (super.flush(ByteBuffer.wrap(new byte[]{one})))
{
_window.decrementAndGet();
byteBuffer.position(byteBuffer.position()+1);
}
}
for (ByteBuffer b: buffers)
if (BufferUtil.hasContent(b))
return false;
return true;
}
}
private static class FailedCaller implements Callable<FutureCallback> private static class FailedCaller implements Callable<FutureCallback>
{ {
private final WriteFlusher writeFlusher; private final WriteFlusher writeFlusher;