Fixed bug in case of large files being downloaded, and refactored writing of data frames.

This commit is contained in:
Simone Bordet 2012-06-03 11:37:08 +02:00
parent b0156b69bc
commit 2cb703b0b5
2 changed files with 74 additions and 34 deletions

View File

@ -24,8 +24,10 @@ import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -47,6 +49,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.SPDYAsyncConnection; import org.eclipse.jetty.spdy.SPDYAsyncConnection;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler; import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.Headers;
@ -639,18 +642,13 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
} }
// We have to query the HttpGenerator and its buffers to know // We have to query the HttpGenerator and its buffers to know
// whether there is content buffered; if so, send the data frame // whether there is content buffered and update the generator state
Buffer content = getContentBuffer(); Buffer content = getContentBuffer();
reply(stream, new ReplyInfo(headers, content == null)); reply(stream, new ReplyInfo(headers, content == null));
if (content != null) if (content != null)
{ {
closed = allContentAdded || isAllContentWritten(); closed = allContentAdded || isAllContentWritten();
ByteBuffer buffer = ByteBuffer.wrap(content.asArray());
logger.debug("HTTP < {} bytes of content", buffer.remaining());
// Send the data frame
stream.data(new ByteBufferDataInfo(buffer, closed));
// Update HttpGenerator fields so that they remain consistent // Update HttpGenerator fields so that they remain consistent
content.clear();
_state = closed ? HttpGenerator.STATE_END : HttpGenerator.STATE_CONTENT; _state = closed ? HttpGenerator.STATE_END : HttpGenerator.STATE_CONTENT;
} }
else else
@ -665,7 +663,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
{ {
if (_buffer != null && _buffer.length() > 0) if (_buffer != null && _buffer.length() > 0)
return _buffer; return _buffer;
if (_bypass && _content != null && _content.length() > 0) if (_content != null && _content.length() > 0)
return _content; return _content;
return null; return null;
} }
@ -690,22 +688,46 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
@Override @Override
public void flush(long maxIdleTime) throws IOException public void flush(long maxIdleTime) throws IOException
{ {
while (_content != null && _content.length() > 0) try
{ {
_content.skip(_buffer.put(_content)); Buffer content = getContentBuffer();
ByteBuffer buffer = ByteBuffer.wrap(_buffer.asArray()); if (content != null)
logger.debug("HTTP < {} bytes of content", buffer.remaining());
_buffer.clear();
closed = _content.length() == 0 && _last;
stream.data(new ByteBufferDataInfo(buffer, closed));
boolean expired = !connection.getEndPoint().blockWritable(maxIdleTime);
if (expired)
{ {
stream.getSession().goAway(); DataInfo dataInfo = toDataInfo(content, closed);
throw new EOFException("write timeout"); logger.debug("HTTP < {} bytes of content", dataInfo.length());
stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS);
content.clear();
} }
} }
catch (TimeoutException x)
{
stream.getSession().goAway();
throw new EOFException("write timeout");
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
catch (ExecutionException x)
{
throw new IOException(x.getCause());
}
}
private DataInfo toDataInfo(Buffer buffer, boolean close)
{
if (buffer instanceof ByteArrayBuffer)
return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
if (buffer instanceof NIOBuffer)
{
ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
byteBuffer.limit(buffer.putIndex());
byteBuffer.position(buffer.getIndex());
return new ByteBufferDataInfo(byteBuffer, close);
}
return new BytesDataInfo(buffer.asArray(), close);
} }
@Override @Override
@ -732,19 +754,15 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
Buffer content = getContentBuffer(); Buffer content = getContentBuffer();
if (content != null) if (content != null)
{ {
ByteBuffer buffer = ByteBuffer.wrap(content.asArray()); closed = true;
logger.debug("HTTP < {} bytes of content", buffer.remaining());
// Update HttpGenerator fields so that they remain consistent
content.clear();
_state = STATE_END; _state = STATE_END;
// Send the data frame flush(getMaxIdleTime());
stream.data(new ByteBufferDataInfo(buffer, true));
} }
else if (!closed) else if (!closed)
{ {
closed = true; closed = true;
_state = STATE_END; _state = STATE_END;
// Send the data frame // Send the last, empty, data frame
stream.data(new ByteBufferDataInfo(ZERO_BYTES, true)); stream.data(new ByteBufferDataInfo(ZERO_BYTES, true));
} }
} }

View File

@ -16,6 +16,7 @@
package org.eclipse.jetty.spdy.http; package org.eclipse.jetty.spdy.http;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -939,9 +940,28 @@ public class ServerHTTPSPDYv2Test extends AbstractHTTPSPDYTest
} }
@Test @Test
public void testGETWithMediumContentByPassed() throws Exception public void testGETWithMediumContentAsInputStreamByPassed() throws Exception
{
byte[] data = new byte[2048];
testGETWithContentByPassed(new ByteArrayInputStream(data), data.length);
}
@Test
public void testGETWithBigContentAsInputStreamByPassed() throws Exception
{
byte[] data = new byte[128 * 1024];
testGETWithContentByPassed(new ByteArrayInputStream(data), data.length);
}
@Test
public void testGETWithMediumContentAsBufferByPassed() throws Exception
{
byte[] data = new byte[2048];
testGETWithContentByPassed(new ByteArrayBuffer(data), data.length);
}
private void testGETWithContentByPassed(final Object content, final int length) throws Exception
{ {
final byte[] data = new byte[2048];
final CountDownLatch handlerLatch = new CountDownLatch(1); final CountDownLatch handlerLatch = new CountDownLatch(1);
Session session = startClient(version(), startHTTPServer(version(), new AbstractHandler() Session session = startClient(version(), startHTTPServer(version(), new AbstractHandler()
{ {
@ -953,7 +973,7 @@ public class ServerHTTPSPDYv2Test extends AbstractHTTPSPDYTest
// We use this trick that's present in Jetty code: if we add a request attribute // We use this trick that's present in Jetty code: if we add a request attribute
// called "org.eclipse.jetty.server.sendContent", then it will trigger the // called "org.eclipse.jetty.server.sendContent", then it will trigger the
// content bypass that we want to test // content bypass that we want to test
request.setAttribute("org.eclipse.jetty.server.sendContent", new ByteArrayBuffer(data)); request.setAttribute("org.eclipse.jetty.server.sendContent", content);
handlerLatch.countDown(); handlerLatch.countDown();
} }
}), null); }), null);
@ -969,7 +989,7 @@ public class ServerHTTPSPDYv2Test extends AbstractHTTPSPDYTest
session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{ {
private final AtomicInteger replyFrames = new AtomicInteger(); private final AtomicInteger replyFrames = new AtomicInteger();
private final AtomicInteger dataFrames = new AtomicInteger(); private final AtomicInteger contentLength = new AtomicInteger();
@Override @Override
public void onReply(Stream stream, ReplyInfo replyInfo) public void onReply(Stream stream, ReplyInfo replyInfo)
@ -984,10 +1004,12 @@ public class ServerHTTPSPDYv2Test extends AbstractHTTPSPDYTest
@Override @Override
public void onData(Stream stream, DataInfo dataInfo) public void onData(Stream stream, DataInfo dataInfo)
{ {
Assert.assertEquals(1, dataFrames.incrementAndGet()); contentLength.addAndGet(dataInfo.asBytes(true).length);
Assert.assertTrue(dataInfo.isClose()); if (dataInfo.isClose())
Assert.assertArrayEquals(data, dataInfo.asBytes(true)); {
dataLatch.countDown(); Assert.assertEquals(length, contentLength.get());
dataLatch.countDown();
}
} }
}); });
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));