Merged branch 'jetty-9.2.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-04-23 09:09:48 +02:00
commit a2566bedd8
4 changed files with 248 additions and 16 deletions

View File

@ -149,13 +149,16 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
*/
private boolean parse()
{
// Must parse even if the buffer is fully consumed, to allow the
// parser to advance from asynchronous content to response complete.
boolean handle = parser.parseNext(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Parsed {}, remaining {} {}", handle, buffer.remaining(), parser);
return handle;
while (true)
{
// Must parse even if the buffer is fully consumed, to allow the
// parser to advance from asynchronous content to response complete.
boolean handle = parser.parseNext(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Parsed {}, remaining {} {}", handle, buffer.remaining(), parser);
if (handle || !buffer.hasRemaining())
return handle;
}
}
protected void fillInterested()

View File

@ -0,0 +1,216 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class HttpClientChunkedContentTest
{
@Rule
public final TestTracker tracker = new TestTracker();
private HttpClient client;
private void startClient() throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient();
client.setExecutor(clientThreads);
client.start();
}
@After
public void dispose() throws Exception
{
if (client != null)
client.stop();
}
@Test
public void test_Server_HeadersPauseTerminal_Client_Response() throws Exception
{
startClient();
try (ServerSocket server = new ServerSocket())
{
server.bind(new InetSocketAddress("localhost", 0));
final AtomicReference<Result> resultRef = new AtomicReference<>();
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", server.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
resultRef.set(result);
completeLatch.countDown();
}
});
try (Socket socket = server.accept())
{
consumeRequestHeaders(socket);
OutputStream output = socket.getOutputStream();
String headers = "" +
"HTTP/1.1 200 OK\r\n" +
"Transfer-Encoding: chunked\r\n" +
"\r\n";
output.write(headers.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(1000);
String terminal = "" +
"0\r\n" +
"\r\n";
output.write(terminal.getBytes(StandardCharsets.UTF_8));
output.flush();
assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Result result = resultRef.get();
assertTrue(result.isSucceeded());
Response response = result.getResponse();
Assert.assertEquals(200, response.getStatus());
}
}
}
@Test
public void test_Server_ContentTerminal_Client_ContentDelay() throws Exception
{
startClient();
try (ServerSocket server = new ServerSocket())
{
server.bind(new InetSocketAddress("localhost", 0));
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
final CountDownLatch firstContentLatch = new CountDownLatch(1);
final AtomicReference<Result> resultRef = new AtomicReference<>();
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", server.getLocalPort())
.onResponseContentAsync(new Response.AsyncContentListener()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
if (callbackRef.compareAndSet(null, callback))
firstContentLatch.countDown();
else
callback.succeeded();
}
})
.timeout(5, TimeUnit.SECONDS)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
resultRef.set(result);
completeLatch.countDown();
}
});
try (Socket socket = server.accept())
{
consumeRequestHeaders(socket);
OutputStream output = socket.getOutputStream();
String response = "" +
"HTTP/1.1 200 OK\r\n" +
"Transfer-Encoding: chunked\r\n" +
"\r\n" +
"8\r\n" +
"01234567\r\n" +
"0\r\n" +
"\r\n";
output.write(response.getBytes(StandardCharsets.UTF_8));
output.flush();
// Simulate a delay in consuming the content.
assertTrue(firstContentLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(1000);
callbackRef.get().succeeded();
// Wait for the client to read 0 and become idle.
Thread.sleep(1000);
assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Result result = resultRef.get();
assertTrue(result.isSucceeded());
Assert.assertEquals(200, result.getResponse().getStatus());
// Issue another request to be sure the connection is sane.
Request request = client.newRequest("localhost", server.getLocalPort())
.timeout(5, TimeUnit.SECONDS);
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
consumeRequestHeaders(socket);
output.write(response.getBytes(StandardCharsets.UTF_8));
output.flush();
Assert.assertEquals(200, listener.get(5, TimeUnit.SECONDS).getStatus());
}
}
}
private void consumeRequestHeaders(Socket socket) throws IOException
{
InputStream input = socket.getInputStream();
int crlfs = 0;
while (true)
{
int read = input.read();
if (read == '\r' || read == '\n')
++crlfs;
else
crlfs = 0;
if (crlfs == 4)
break;
}
}
}

View File

@ -124,6 +124,7 @@ public class HttpParser
CHUNK_SIZE,
CHUNK_PARAMS,
CHUNK,
CHUNK_END,
END,
CLOSE, // The associated stream/endpoint should be closed
CLOSED // The associated stream/endpoint is at EOF
@ -1409,10 +1410,7 @@ public class HttpParser
if (ch == HttpTokens.LINE_FEED)
{
if (_chunkLength == 0)
{
setState(State.END);
return _handler.messageComplete();
}
setState(State.CHUNK_END);
else
setState(State.CHUNK);
}
@ -1429,10 +1427,7 @@ public class HttpParser
if (ch == HttpTokens.LINE_FEED)
{
if (_chunkLength == 0)
{
setState(State.END);
return _handler.messageComplete();
}
setState(State.CHUNK_END);
else
setState(State.CHUNK);
}
@ -1463,6 +1458,20 @@ public class HttpParser
break;
}
case CHUNK_END:
{
// TODO handle chunk trailer
ch=next(buffer);
if (ch==0)
break;
if (ch == HttpTokens.LINE_FEED)
{
setState(State.END);
return _handler.messageComplete();
}
throw new IllegalCharacterException(_state,ch,buffer);
}
case CLOSED:
{
BufferUtil.clear(buffer);

View File

@ -670,7 +670,8 @@ public class HttpParserTest
+ "0123456789\015\012"
+ "1a\015\012"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ\015\012"
+ "0\015\012");
+ "0\015\012"
+ "\015\012");
HttpParser.RequestHandler handler = new Handler();
HttpParser parser= new HttpParser(handler);
parseAll(parser,buffer);
@ -682,6 +683,9 @@ public class HttpParserTest
assertEquals("Header1", _hdr[0]);
assertEquals("value1", _val[0]);
assertEquals("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ", _content);
assertTrue(_headerCompleted);
assertTrue(_messageCompleted);
}
@Test