diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index cf2241cfdf3..685a95084b1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -21,10 +21,8 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import javax.servlet.DispatcherType; import javax.servlet.RequestDispatcher; @@ -90,7 +88,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable private final HttpURI _uri; private final HttpChannelState _state; private final Request _request; - private final Response _response; + private final Response _response; private final BlockingCallback _writeblock=new BlockingCallback(); private HttpVersion _version = HttpVersion.HTTP_1_1; private boolean _expect = false; @@ -124,7 +122,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { return _writeblock; } - + /** * @return the number of requests handled by this connection */ @@ -183,7 +181,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { return _configuration.getHeaderCacheSize(); } - + /** * If the associated response has the Expect header set to 100 Continue, * then accessing the input stream indicates that the handler/servlet @@ -229,7 +227,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { handle(); } - + /* ------------------------------------------------------------ */ /** * @return True if the channel is ready to continue handling (ie it is not suspended) @@ -287,7 +285,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { if ("ContinuationThrowable".equals(e.getClass().getSimpleName())) LOG.ignore(e); - else + else throw e; } catch (Exception e) @@ -342,7 +340,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable _transport.completed(); } - LOG.debug("{} handle exit", this); + LOG.debug("{} handle exit, result {}", this, next); return next!=Next.WAIT; } @@ -508,10 +506,10 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable if (charset != null) _request.setCharacterEncodingUnchecked(charset); break; - default: + default: } } - + if (field.getName()!=null) _request.getHttpFields().add(field); return false; @@ -566,7 +564,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable @SuppressWarnings("unchecked") HttpInput input = (HttpInput)_request.getHttpInput(); input.content(item); - + return false; } @@ -599,13 +597,13 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable LOG.warn(e); } finally - { + { if (_state.unhandle()==Next.COMPLETE) _state.completed(); } } - - protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete, final Callback callback) + + protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete, final Callback callback) { // TODO check that complete only set true once by changing _committed to AtomicRef boolean committing = _committed.compareAndSet(false, true); @@ -614,13 +612,13 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable // We need an info to commit if (info==null) info = _response.newResponseInfo(); - + // wrap callback to process 100 or 500 responses final int status=info.getStatus(); final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback); // committing write - _transport.send(info, content, complete, committed); + _transport.send(info, content, complete, committed); } else if (info==null) { @@ -635,7 +633,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable } protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException - { + { boolean committing=sendResponse(info,content,complete,_writeblock); _writeblock.block(); return committing; @@ -655,10 +653,10 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable */ protected void write(ByteBuffer content, boolean complete) throws IOException { - sendResponse(null,content,complete,_writeblock); + sendResponse(null,content,complete,_writeblock); _writeblock.block(); } - + /** *

Non-Blocking write, committing the response if needed.

* @@ -681,7 +679,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable { return _connector.getScheduler(); } - + /* ------------------------------------------------------------ */ /** * @return true if the HttpChannel can efficiently use direct buffer (typically this means it is not over SSL or a multiplexed protocol) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index f2ce11d61db..e5186cd9783 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -19,11 +19,9 @@ package org.eclipse.jetty.server; import java.io.IOException; -import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeoutException; import org.eclipse.jetty.http.HttpGenerator; import org.eclipse.jetty.http.HttpGenerator.ResponseInfo; @@ -331,7 +329,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http throw e; } } - + @Override public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) { @@ -352,7 +350,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http { new ContentCallback(content,lastContent,callback).iterate(); } - + @Override public void completed() { @@ -379,7 +377,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http reset(); // if we are not called from the onfillable thread, schedule completion - if (getCurrentConnection()==null) + if (getCurrentConnection()!=this) { if (_parser.isStart()) { @@ -431,8 +429,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http while (!_parser.isComplete()) { // Can the parser progress (even with an empty buffer) - boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer); - + boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer); + // If there is more content to parse, loop so we can queue all content from this buffer now without the // need to call blockForContent again while (!event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState()) @@ -441,7 +439,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http // If we have content, return if (_parser.isComplete() || available()>0) return; - + // Do we have content ready to parse? if (BufferUtil.isEmpty(_requestBuffer)) { @@ -588,7 +586,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http final boolean _lastContent; final ResponseInfo _info; ByteBuffer _header; - + CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback) { super(callback); @@ -596,7 +594,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _content=content; _lastContent=last; } - + @Override public boolean process() throws Exception { @@ -707,14 +705,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http { final ByteBuffer _content; final boolean _lastContent; - + ContentCallback(ByteBuffer content, boolean last, Callback callback) { super(callback); _content=content; _lastContent=last; } - + @Override public boolean process() throws Exception { diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletLongPollTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletLongPollTest.java new file mode 100644 index 00000000000..2b6174037c3 --- /dev/null +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletLongPollTest.java @@ -0,0 +1,164 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.servlet; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.servlet.AsyncContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser; +import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class AsyncServletLongPollTest +{ + @Rule + public TestTracker tracker = new TestTracker(); + private Server server; + private ServerConnector connector; + private ServletContextHandler context; + private String uri; + + protected void prepare(HttpServlet servlet) throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + server.addConnector(connector); + String contextPath = "/context"; + context = new ServletContextHandler(server, contextPath, ServletContextHandler.NO_SESSIONS); + ServletHolder servletHolder = new ServletHolder(servlet); + String servletPath = "/path"; + context.addServlet(servletHolder, servletPath); + uri = contextPath + servletPath; + server.start(); + } + + @After + public void destroy() throws Exception + { + server.stop(); + } + + @Test + public void testSuspendedRequestCompletedByAnotherRequest() throws Exception + { + final CountDownLatch asyncLatch = new CountDownLatch(1); + prepare(new HttpServlet() + { + private volatile AsyncContext asyncContext; + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + int suspend = 0; + String param = request.getParameter("suspend"); + if (param != null) + suspend = Integer.parseInt(param); + + if (suspend > 0) + { + asyncContext = request.startAsync(); + asyncLatch.countDown(); + } + } + + @Override + protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + int error = 0; + String param = request.getParameter("error"); + if (param != null) + error = Integer.parseInt(param); + + final AsyncContext asyncContext = this.asyncContext; + if (asyncContext != null) + { + HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse(); + asyncResponse.sendError(error); + asyncContext.complete(); + } + else + { + response.sendError(404); + } + } + }); + + try (Socket socket1 = new Socket("localhost", connector.getLocalPort())) + { + int wait = 1000; + String request1 = "GET " + uri + "?suspend=" + wait + " HTTP/1.1\r\n" + + "Host: localhost:" + connector.getLocalPort() + "\r\n" + + "\r\n"; + OutputStream output1 = socket1.getOutputStream(); + output1.write(request1.getBytes("UTF-8")); + output1.flush(); + + Assert.assertTrue(asyncLatch.await(5, TimeUnit.SECONDS)); + + String error = "408"; + try (Socket socket2 = new Socket("localhost", connector.getLocalPort())) + { + String request2 = "DELETE " + uri + "?error=" + error + " HTTP/1.1\r\n" + + "Host: localhost:" + connector.getLocalPort() + "\r\n" + + "\r\n"; + OutputStream output2 = socket2.getOutputStream(); + output2.write(request2.getBytes("UTF-8")); + output2.flush(); + + SimpleHttpParser parser2 = new SimpleHttpParser(); + BufferedReader input2 = new BufferedReader(new InputStreamReader(socket2.getInputStream(), "UTF-8")); + SimpleHttpResponse response2 = parser2.readResponse(input2); + Assert.assertEquals("200", response2.getCode()); + } + + socket1.setSoTimeout(2 * wait); + SimpleHttpParser parser1 = new SimpleHttpParser(); + BufferedReader input1 = new BufferedReader(new InputStreamReader(socket1.getInputStream(), "UTF-8")); + SimpleHttpResponse response1 = parser1.readResponse(input1); + Assert.assertEquals(error, response1.getCode()); + + // Now try to make another request on the first connection + // to verify that we set correctly the read interest (#409842) + String request3 = "GET " + uri + " HTTP/1.1\r\n" + + "Host: localhost:" + connector.getLocalPort() + "\r\n" + + "\r\n"; + output1.write(request3.getBytes("UTF-8")); + output1.flush(); + + SimpleHttpResponse response3 = parser1.readResponse(input1); + Assert.assertEquals("200", response3.getCode()); + } + } +} diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletTest.java index c2703fe7799..42da56aa74b 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletTest.java +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletTest.java @@ -18,14 +18,11 @@ package org.eclipse.jetty.servlet; -import static org.junit.Assert.assertEquals; - import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.util.Timer; import java.util.TimerTask; - import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; @@ -45,9 +42,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; -public class AsyncServletTest -{ + +public class AsyncServletTest +{ protected AsyncServlet _servlet=new AsyncServlet(); protected int _port; @@ -60,7 +59,7 @@ public class AsyncServletTest { _connector = new ServerConnector(_server); _server.setConnectors(new Connector[]{ _connector }); - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SECURITY|ServletContextHandler.NO_SESSIONS); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); context.setContextPath("/ctx"); _server.setHandler(context); _servletHandler=context.getServletHandler(); @@ -76,7 +75,7 @@ public class AsyncServletTest { _server.stop(); } - + @Test public void testNormal() throws Exception { @@ -116,10 +115,10 @@ public class AsyncServletTest "history: ERROR\r\n"+ "history: !initial\r\n"+ "history: onComplete\r\n",response); - + assertContains("ERROR: /ctx/path/info",response); } - + @Test public void testSuspendOnTimeoutDispatch() throws Exception { @@ -134,10 +133,10 @@ public class AsyncServletTest "history: ASYNC\r\n"+ "history: !initial\r\n"+ "history: onComplete\r\n",response); - + assertContains("DISPATCHED",response); } - + @Test public void testSuspendOnTimeoutComplete() throws Exception { @@ -150,7 +149,7 @@ public class AsyncServletTest "history: onTimeout\r\n"+ "history: complete\r\n"+ "history: onComplete\r\n",response); - + assertContains("COMPLETED",response); } @@ -333,18 +332,6 @@ public class AsyncServletTest assertContains("ERROR: /ctx/path/info",response); } - - protected void assertContains(String content,String response) - { - Assert.assertThat(response,Matchers.containsString(content)); - } - - protected void assertNotContains(String content,String response) - { - Assert.assertThat(response,Matchers.not(Matchers.containsString(content))); - } - - @Test public void testAsyncRead() throws Exception { @@ -358,7 +345,7 @@ public class AsyncServletTest "Connection: close\r\n"+ "\r\n"; - try (Socket socket = new Socket("localhost",_port);) + try (Socket socket = new Socket("localhost",_port)) { socket.setSoTimeout(10000); socket.getOutputStream().write(header.getBytes("ISO-8859-1")); @@ -367,7 +354,7 @@ public class AsyncServletTest Thread.sleep(500); socket.getOutputStream().write(body.getBytes("ISO-8859-1"),2,8); socket.getOutputStream().write(close.getBytes("ISO-8859-1")); - + String response = IO.toString(socket.getInputStream()); assertEquals("HTTP/1.1 200 OK",response.substring(0,15)); assertContains( @@ -381,12 +368,11 @@ public class AsyncServletTest "history: onComplete\r\n",response); } } - - + public synchronized String process(String query,String content) throws Exception { String request = "GET /ctx/path/info"; - + if (query!=null) request+="?"+query; request+=" HTTP/1.1\r\n"+ @@ -399,44 +385,43 @@ public class AsyncServletTest request+="Content-Length: "+content.length()+"\r\n"; request+="\r\n" + content; } - + int port=_port; - String response=null; - try (Socket socket = new Socket("localhost",port);) + try (Socket socket = new Socket("localhost",port)) { socket.setSoTimeout(1000000); socket.getOutputStream().write(request.getBytes("UTF-8")); - - response = IO.toString(socket.getInputStream()); + return IO.toString(socket.getInputStream()); } catch(Exception e) { System.err.println("failed on port "+port); e.printStackTrace(); throw e; - } - - return response; + } } + protected void assertContains(String content,String response) + { + Assert.assertThat(response,Matchers.containsString(content)); + } + + protected void assertNotContains(String content,String response) + { + Assert.assertThat(response,Matchers.not(Matchers.containsString(content))); + } - - private static class AsyncServlet extends HttpServlet { private static final long serialVersionUID = -8161977157098646562L; - private Timer _timer=new Timer(); - - public AsyncServlet() - {} - - /* ------------------------------------------------------------ */ + private final Timer _timer=new Timer(); + @Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { // System.err.println(request.getDispatcherType()+" "+request.getRequestURI()); response.addHeader("history",request.getDispatcherType().toString()); - + int read_before=0; long sleep_for=-1; long suspend_for=-1; @@ -445,7 +430,7 @@ public class AsyncServletTest long resume2_after=-1; long complete_after=-1; long complete2_after=-1; - + if (request.getParameter("read")!=null) read_before=Integer.parseInt(request.getParameter("read")); if (request.getParameter("sleep")!=null) @@ -462,7 +447,7 @@ public class AsyncServletTest complete_after=Integer.parseInt(request.getParameter("complete")); if (request.getParameter("complete2")!=null) complete2_after=Integer.parseInt(request.getParameter("complete2")); - + if (request.getDispatcherType()==DispatcherType.REQUEST) { response.addHeader("history","initial"); @@ -510,7 +495,7 @@ public class AsyncServletTest async.setTimeout(suspend_for); async.addListener(__listener); response.addHeader("history","suspend"); - + if (complete_after>0) { TimerTask complete = new TimerTask() @@ -564,7 +549,7 @@ public class AsyncServletTest ((HttpServletResponse)async.getResponse()).addHeader("history","resume"); async.dispatch(); } - + } else if (sleep_for>=0) { @@ -668,15 +653,15 @@ public class AsyncServletTest } } } - - + + private static AsyncListener __listener = new AsyncListener() { @Override public void onTimeout(AsyncEvent event) throws IOException - { + { ((HttpServletResponse)event.getSuppliedResponse()).addHeader("history","onTimeout"); - String action=((HttpServletRequest)event.getSuppliedRequest()).getParameter("timeout"); + String action=event.getSuppliedRequest().getParameter("timeout"); if (action!=null) { ((HttpServletResponse)event.getSuppliedResponse()).addHeader("history",action); @@ -684,27 +669,26 @@ public class AsyncServletTest event.getAsyncContext().dispatch(); if ("complete".equals(action)) { - ((HttpServletResponse)event.getSuppliedResponse()).getOutputStream().println("COMPLETED\n"); + event.getSuppliedResponse().getOutputStream().println("COMPLETED\n"); event.getAsyncContext().complete(); } } } - + @Override public void onStartAsync(AsyncEvent event) throws IOException { } - + @Override public void onError(AsyncEvent event) throws IOException { } - + @Override public void onComplete(AsyncEvent event) throws IOException { ((HttpServletResponse)event.getSuppliedResponse()).addHeader("history","onComplete"); } }; - } diff --git a/jetty-servlet/src/test/resources/jetty-logging.properties b/jetty-servlet/src/test/resources/jetty-logging.properties new file mode 100644 index 00000000000..50680ef1dcf --- /dev/null +++ b/jetty-servlet/src/test/resources/jetty-logging.properties @@ -0,0 +1,3 @@ +org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog +#org.eclipse.jetty.LEVEL=DEBUG +#org.eclipse.jetty.servlet.LEVEL=DEBUG