474634 - AsyncListener.onError() handling.

Refactored continuation tests.
This commit is contained in:
Simone Bordet 2015-08-14 11:14:30 +02:00 committed by Joakim Erdfelt
parent 2f4f4a2247
commit b819273c7b
5 changed files with 565 additions and 756 deletions

View File

@ -288,7 +288,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
case TERMINATED:
case WAIT:
break loop;
case DISPATCH:
{
if (!_request.hasMetaData())
@ -319,11 +319,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
case ERROR_DISPATCH:
{
Throwable ex = _state.getAsyncContextEvent().getThrowable();
// Check for error dispatch loops
Integer loop_detect = (Integer)_request.getAttribute("org.eclipse.jetty.server.ERROR_DISPATCH");
if (loop_detect==null)
loop_detect=new Integer(1);
loop_detect=1;
else
loop_detect=loop_detect+1;
_request.setAttribute("org.eclipse.jetty.server.ERROR_DISPATCH",loop_detect);
@ -340,13 +340,13 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
}
break loop;
}
_request.setHandled(false);
_response.resetBuffer();
_response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ERROR);
String reason=null;
String reason;
if (ex == null || ex instanceof TimeoutException)
{
reason = "Async Timeout";
@ -404,21 +404,21 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
case COMPLETE:
{
// TODO do onComplete here for continuations to work
_state.onComplete();
// _state.onComplete();
if (!_response.isCommitted() && !_request.isHandled())
_response.sendError(404);
else
_response.closeOutput();
// TODO do onComplete here to detect errors in final flush
// _state.onComplete();
_state.onComplete();
// TODO: verify this code is needed and whether
// TODO: it's needed for onError() case too.
_request.setHandled(true);
onCompleted();
break loop;
}
@ -455,7 +455,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
handleException(e);
}
}
action = _state.unhandle();
}
@ -488,16 +488,16 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_state.error(x);
return;
}
// TODO Can this happen? Should this just be ISE???
// We've already processed an error before!
root.addSuppressed(x);
root.addSuppressed(x);
LOG.warn("Error while handling async error: ",root);
abort(x);
_state.errorComplete();
_state.errorComplete();
return;
}
// Handle error normally
_request.setHandled(true);
_request.setAttribute(RequestDispatcher.ERROR_EXCEPTION,x);

View File

@ -0,0 +1,526 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.continuation;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.util.IO;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public abstract class AbstractContinuationTest
{
protected List<String> _history = new ArrayList<>();
protected ContinuationListener _listener = new Listener(_history);
protected SuspendServlet _servlet = new SuspendServlet(_history, _listener);
protected int _port;
protected void testNormal(String type) throws Exception
{
String response = process(null, null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("NORMAL"));
assertThat(_history, hasItem(type));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, not(hasItem("onComplete")));
}
@Test
public void testSleep() throws Exception
{
String response = process("sleep=200", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("SLEPT"));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, not(hasItem("onComplete")));
}
@Test
public void testSuspend() throws Exception
{
String response = process("suspend=200", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("TIMEOUT"));
assertThat(_history, hasItem("onTimeout"));
assertThat(_history, hasItem("onComplete"));
}
@Test
public void testSuspendWaitResume() throws Exception
{
String response = process("suspend=200&resume=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("RESUMED"));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, hasItem("onComplete"));
}
@Test
public void testSuspendResume() throws Exception
{
String response = process("suspend=200&resume=0", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("RESUMED"));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, hasItem("onComplete"));
}
@Test
public void testSuspendWaitComplete() throws Exception
{
String response = process("suspend=200&complete=50", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("COMPLETED"));
assertThat(_history, hasItem("initial"));
assertThat(_history, not(hasItem("!initial")));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, hasItem("onComplete"));
}
@Test
public void testSuspendComplete() throws Exception
{
String response = process("suspend=200&complete=0", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("COMPLETED"));
assertThat(_history, hasItem("initial"));
assertThat(_history, not(hasItem("!initial")));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, hasItem("onComplete"));
}
@Test
public void testSuspendWaitResumeSuspendWaitResume() throws Exception
{
String response = process("suspend=1000&resume=10&suspend2=1000&resume2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("RESUMED"));
assertEquals(2, count(_history, "suspend"));
assertEquals(2, count(_history, "resume"));
assertEquals(0, count(_history, "onTimeout"));
assertEquals(1, count(_history, "onComplete"));
}
@Test
public void testSuspendWaitResumeSuspendComplete() throws Exception
{
String response = process("suspend=1000&resume=10&suspend2=1000&complete2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("COMPLETED"));
assertEquals(2, count(_history, "suspend"));
assertEquals(1, count(_history, "resume"));
assertEquals(0, count(_history, "onTimeout"));
assertEquals(1, count(_history, "onComplete"));
}
@Test
public void testSuspendWaitResumeSuspend() throws Exception
{
String response = process("suspend=1000&resume=10&suspend2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("TIMEOUT"));
assertEquals(2, count(_history, "suspend"));
assertEquals(1, count(_history, "resume"));
assertEquals(1, count(_history, "onTimeout"));
assertEquals(1, count(_history, "onComplete"));
}
@Test
public void testSuspendTimeoutSuspendResume() throws Exception
{
String response = process("suspend=10&suspend2=1000&resume2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("RESUMED"));
assertEquals(2, count(_history, "suspend"));
assertEquals(1, count(_history, "resume"));
assertEquals(1, count(_history, "onTimeout"));
assertEquals(1, count(_history, "onComplete"));
}
@Test
public void testSuspendTimeoutSuspendComplete() throws Exception
{
String response = process("suspend=10&suspend2=1000&complete2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("COMPLETED"));
assertEquals(2, count(_history, "suspend"));
assertEquals(0, count(_history, "resume"));
assertEquals(1, count(_history, "onTimeout"));
assertEquals(1, count(_history, "onComplete"));
}
@Test
public void testSuspendTimeoutSuspend() throws Exception
{
String response = process("suspend=10&suspend2=10", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("TIMEOUT"));
assertEquals(2, count(_history, "suspend"));
assertEquals(0, count(_history, "resume"));
assertEquals(2, count(_history, "onTimeout"));
assertEquals(1, count(_history, "onComplete"));
}
@Test
public void testSuspendThrowResume() throws Exception
{
String response = process("suspend=200&resume=10&undispatch=true", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("RESUMED"));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, hasItem("onComplete"));
}
@Test
public void testSuspendResumeThrow() throws Exception
{
String response = process("suspend=200&resume=0&undispatch=true", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("RESUMED"));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, hasItem("onComplete"));
}
@Test
public void testSuspendThrowComplete() throws Exception
{
String response = process("suspend=200&complete=10&undispatch=true", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("COMPLETED"));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, hasItem("onComplete"));
}
@Test
public void testSuspendCompleteThrow() throws Exception
{
String response = process("suspend=200&complete=0&undispatch=true", null);
assertThat(response, startsWith("HTTP/1.1 200 OK"));
assertThat(response, containsString("COMPLETED"));
assertThat(_history, not(hasItem("onTimeout")));
assertThat(_history, hasItem("onComplete"));
}
private long count(List<String> history, String value)
{
return history.stream()
.filter(value::equals)
.count();
}
private String process(String query, String content) throws Exception
{
StringBuilder request = new StringBuilder("GET /");
if (query != null)
request.append("?").append(query);
request.append(" HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Connection: close\r\n");
if (content == null)
{
request.append("\r\n");
}
else
{
request.append("Content-Length: ").append(content.length()).append("\r\n");
request.append("\r\n").append(content);
}
try (Socket socket = new Socket("localhost", _port))
{
socket.setSoTimeout(10000);
socket.getOutputStream().write(request.toString().getBytes(StandardCharsets.UTF_8));
socket.getOutputStream().flush();
return toString(socket.getInputStream());
}
}
protected String toString(InputStream in) throws IOException
{
return IO.toString(in);
}
private static class SuspendServlet extends HttpServlet
{
private final Timer _timer = new Timer();
private final List<String> _history;
private final ContinuationListener _listener;
public SuspendServlet(List<String> history, ContinuationListener listener)
{
_history = history;
_listener = listener;
}
@Override
protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
final Continuation continuation = ContinuationSupport.getContinuation(request);
_history.add(continuation.getClass().getName());
int read_before = 0;
long sleep_for = -1;
long suspend_for = -1;
long suspend2_for = -1;
long resume_after = -1;
long resume2_after = -1;
long complete_after = -1;
long complete2_after = -1;
boolean undispatch = false;
if (request.getParameter("read") != null)
read_before = Integer.parseInt(request.getParameter("read"));
if (request.getParameter("sleep") != null)
sleep_for = Integer.parseInt(request.getParameter("sleep"));
if (request.getParameter("suspend") != null)
suspend_for = Integer.parseInt(request.getParameter("suspend"));
if (request.getParameter("suspend2") != null)
suspend2_for = Integer.parseInt(request.getParameter("suspend2"));
if (request.getParameter("resume") != null)
resume_after = Integer.parseInt(request.getParameter("resume"));
if (request.getParameter("resume2") != null)
resume2_after = Integer.parseInt(request.getParameter("resume2"));
if (request.getParameter("complete") != null)
complete_after = Integer.parseInt(request.getParameter("complete"));
if (request.getParameter("complete2") != null)
complete2_after = Integer.parseInt(request.getParameter("complete2"));
if (request.getParameter("undispatch") != null)
undispatch = Boolean.parseBoolean(request.getParameter("undispatch"));
if (continuation.isInitial())
{
_history.add("initial");
if (read_before > 0)
{
byte[] buf = new byte[read_before];
request.getInputStream().read(buf);
}
else if (read_before < 0)
{
InputStream in = request.getInputStream();
int b = in.read();
while (b != -1)
b = in.read();
}
if (suspend_for >= 0)
{
if (suspend_for > 0)
continuation.setTimeout(suspend_for);
continuation.addContinuationListener(_listener);
_history.add("suspend");
continuation.suspend(response);
if (complete_after > 0)
{
TimerTask complete = new TimerTask()
{
@Override
public void run()
{
try
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED");
continuation.complete();
}
catch (Exception e)
{
e.printStackTrace();
}
}
};
_timer.schedule(complete, complete_after);
}
else if (complete_after == 0)
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED");
continuation.complete();
}
else if (resume_after > 0)
{
TimerTask resume = new TimerTask()
{
@Override
public void run()
{
_history.add("resume");
continuation.resume();
}
};
_timer.schedule(resume, resume_after);
}
else if (resume_after == 0)
{
_history.add("resume");
continuation.resume();
}
if (undispatch)
{
continuation.undispatch();
}
}
else if (sleep_for >= 0)
{
try
{
Thread.sleep(sleep_for);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
response.setStatus(200);
response.getOutputStream().println("SLEPT");
}
else
{
response.setStatus(200);
response.getOutputStream().println("NORMAL");
}
}
else
{
_history.add("!initial");
if (suspend2_for >= 0 && request.getAttribute("2nd") == null)
{
request.setAttribute("2nd", "cycle");
if (suspend2_for > 0)
continuation.setTimeout(suspend2_for);
_history.add("suspend");
continuation.suspend(response);
if (complete2_after > 0)
{
TimerTask complete = new TimerTask()
{
@Override
public void run()
{
try
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED");
continuation.complete();
}
catch (Exception e)
{
e.printStackTrace();
}
}
};
_timer.schedule(complete, complete2_after);
}
else if (complete2_after == 0)
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED");
continuation.complete();
}
else if (resume2_after > 0)
{
TimerTask resume = new TimerTask()
{
@Override
public void run()
{
_history.add("resume");
continuation.resume();
}
};
_timer.schedule(resume, resume2_after);
}
else if (resume2_after == 0)
{
_history.add("resume");
continuation.resume();
}
if (undispatch)
{
continuation.undispatch();
}
}
else if (continuation.isExpired())
{
response.setStatus(200);
response.getOutputStream().println("TIMEOUT");
}
else if (continuation.isResumed())
{
response.setStatus(200);
response.getOutputStream().println("RESUMED");
}
else
{
response.setStatus(200);
response.getOutputStream().println("UNKNOWN");
}
}
}
}
private static class Listener implements ContinuationListener
{
private final List<String> _history;
public Listener(List<String> history)
{
_history = history;
}
@Override
public void onComplete(Continuation continuation)
{
_history.add("onComplete");
}
@Override
public void onTimeout(Continuation continuation)
{
_history.add("onTimeout");
}
}
}

View File

@ -1,521 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.continuation;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public abstract class ContinuationBase
{
protected SuspendServlet _servlet=new SuspendServlet();
protected int _port;
protected void doNormal(String type) throws Exception
{
String response=process(null,null);
assertContains(type,response);
assertContains("NORMAL",response);
assertNotContains("history: onTimeout",response);
assertNotContains("history: onComplete",response);
}
protected void doSleep() throws Exception
{
String response=process("sleep=200",null);
assertContains("SLEPT",response);
assertNotContains("history: onTimeout",response);
assertNotContains("history: onComplete",response);
}
protected void doSuspend() throws Exception
{
String response=process("suspend=200",null);
assertContains("TIMEOUT",response);
assertContains("history: onTimeout",response);
assertContains("history: onComplete",response);
}
protected void doSuspendWaitResume() throws Exception
{
String response=process("suspend=200&resume=10",null);
assertContains("RESUMED",response);
assertNotContains("history: onTimeout",response);
assertContains("history: onComplete",response);
}
protected void doSuspendResume() throws Exception
{
String response=process("suspend=200&resume=0",null);
assertContains("RESUMED",response);
assertNotContains("history: onTimeout",response);
assertContains("history: onComplete",response);
}
protected void doSuspendWaitComplete() throws Exception
{
String response=process("suspend=200&complete=50",null);
assertContains("COMPLETED",response);
assertContains("history: initial",response);
assertNotContains("history: onTimeout",response);
assertContains("history: onComplete",response);
assertNotContains("history: !initial",response);
}
protected void doSuspendComplete() throws Exception
{
String response=process("suspend=200&complete=0",null);
assertContains("COMPLETED",response);
assertContains("history: initial",response);
assertNotContains("history: onTimeout",response);
assertContains("history: onComplete",response);
assertNotContains("history: !initial",response);
}
protected void doSuspendWaitResumeSuspendWaitResume() throws Exception
{
String response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
assertEquals(2,count(response,"history: suspend"));
assertEquals(2,count(response,"history: resume"));
assertEquals(0,count(response,"history: onTimeout"));
assertEquals(1,count(response,"history: onComplete"));
assertContains("RESUMED",response);
}
protected void doSuspendWaitResumeSuspendComplete() throws Exception
{
String response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
assertEquals(2,count(response,"history: suspend"));
assertEquals(1,count(response,"history: resume"));
assertEquals(0,count(response,"history: onTimeout"));
assertEquals(1,count(response,"history: onComplete"));
assertContains("COMPLETED",response);
}
protected void doSuspendWaitResumeSuspend() throws Exception
{
String response=process("suspend=1000&resume=10&suspend2=10",null);
assertEquals(2,count(response,"history: suspend"));
assertEquals(1,count(response,"history: resume"));
assertEquals(1,count(response,"history: onTimeout"));
assertEquals(1,count(response,"history: onComplete"));
assertContains("TIMEOUT",response);
}
protected void doSuspendTimeoutSuspendResume() throws Exception
{
String response=process("suspend=10&suspend2=1000&resume2=10",null);
assertEquals(2,count(response,"history: suspend"));
assertEquals(1,count(response,"history: resume"));
assertEquals(1,count(response,"history: onTimeout"));
assertEquals(1,count(response,"history: onComplete"));
assertContains("RESUMED",response);
}
protected void doSuspendTimeoutSuspendComplete() throws Exception
{
String response=process("suspend=10&suspend2=1000&complete2=10",null);
assertEquals(2,count(response,"history: suspend"));
assertEquals(0,count(response,"history: resume"));
assertEquals(1,count(response,"history: onTimeout"));
assertEquals(1,count(response,"history: onComplete"));
assertContains("COMPLETED",response);
}
protected void doSuspendTimeoutSuspend() throws Exception
{
String response=process("suspend=10&suspend2=10",null);
assertEquals(2,count(response,"history: suspend"));
assertEquals(0,count(response,"history: resume"));
assertEquals(2,count(response,"history: onTimeout"));
assertEquals(1,count(response,"history: onComplete"));
assertContains("TIMEOUT",response);
}
protected void doSuspendThrowResume() throws Exception
{
String response=process("suspend=200&resume=10&undispatch=true",null);
assertContains("RESUMED",response);
assertNotContains("history: onTimeout",response);
assertContains("history: onComplete",response);
}
protected void doSuspendResumeThrow() throws Exception
{
String response=process("suspend=200&resume=0&undispatch=true",null);
assertContains("RESUMED",response);
assertNotContains("history: onTimeout",response);
assertContains("history: onComplete",response);
}
protected void doSuspendThrowComplete() throws Exception
{
String response=process("suspend=200&complete=10&undispatch=true",null);
assertContains("COMPLETED",response);
assertNotContains("history: onTimeout",response);
assertContains("history: onComplete",response);
}
protected void doSuspendCompleteThrow() throws Exception
{
String response=process("suspend=200&complete=0&undispatch=true",null);
assertContains("COMPLETED",response);
assertNotContains("history: onTimeout",response);
assertContains("history: onComplete",response);
}
private int count(String responses,String substring)
{
int count=0;
int i=responses.indexOf(substring);
while (i>=0)
{
count++;
i=responses.indexOf(substring,i+substring.length());
}
return count;
}
protected void assertContains(String content,String response)
{
assertThat(response,startsWith("HTTP/1.1 200 OK"));
assertThat(response,containsString(content));
}
protected void assertNotContains(String content,String response)
{
assertThat(response,startsWith("HTTP/1.1 200 OK"));
assertThat(response,not(containsString(content)));
}
public synchronized String process(String query,String content) throws Exception
{
String request = "GET /";
if (query!=null)
request+="?"+query;
request+=" HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Connection: close\r\n";
if (content==null)
request+="\r\n";
else
{
request+="Content-Length: "+content.length()+"\r\n";
request+="\r\n" + content;
}
int port=_port;
String response=null;
try (Socket socket = new Socket("localhost",port);)
{
socket.setSoTimeout(10000);
socket.getOutputStream().write(request.getBytes(StandardCharsets.UTF_8));
socket.getOutputStream().flush();
response = toString(socket.getInputStream());
}
catch(Exception e)
{
System.err.println("failed on port "+port);
e.printStackTrace();
throw e;
}
return response;
}
protected abstract String toString(InputStream in) throws IOException;
public static void addHistory(HttpServletResponse response, String event)
{
try
{
response.getOutputStream().print("history: " + event + "\n");
}
catch (IOException e)
{
throw new RuntimeException("Unable to write history: " + event, e);
}
}
private static class SuspendServlet extends HttpServlet
{
private Timer _timer=new Timer();
public SuspendServlet()
{}
@Override
protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
final Continuation continuation = ContinuationSupport.getContinuation(request);
addHistory(response, continuation.getClass().toString());
int read_before=0;
long sleep_for=-1;
long suspend_for=-1;
long suspend2_for=-1;
long resume_after=-1;
long resume2_after=-1;
long complete_after=-1;
long complete2_after=-1;
boolean undispatch=false;
if (request.getParameter("read")!=null)
read_before=Integer.parseInt(request.getParameter("read"));
if (request.getParameter("sleep")!=null)
sleep_for=Integer.parseInt(request.getParameter("sleep"));
if (request.getParameter("suspend")!=null)
suspend_for=Integer.parseInt(request.getParameter("suspend"));
if (request.getParameter("suspend2")!=null)
suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
if (request.getParameter("resume")!=null)
resume_after=Integer.parseInt(request.getParameter("resume"));
if (request.getParameter("resume2")!=null)
resume2_after=Integer.parseInt(request.getParameter("resume2"));
if (request.getParameter("complete")!=null)
complete_after=Integer.parseInt(request.getParameter("complete"));
if (request.getParameter("complete2")!=null)
complete2_after=Integer.parseInt(request.getParameter("complete2"));
if (request.getParameter("undispatch")!=null)
undispatch=Boolean.parseBoolean(request.getParameter("undispatch"));
if (continuation.isInitial())
{
addHistory(response, "initial");
if (read_before>0)
{
byte[] buf=new byte[read_before];
request.getInputStream().read(buf);
}
else if (read_before<0)
{
InputStream in = request.getInputStream();
int b=in.read();
while(b!=-1)
b=in.read();
}
if (suspend_for>=0)
{
if (suspend_for>0)
continuation.setTimeout(suspend_for);
continuation.addContinuationListener(__listener);
addHistory(response, "suspend");
continuation.suspend(response);
if (complete_after>0)
{
TimerTask complete = new TimerTask()
{
@Override
public void run()
{
try
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED\n");
continuation.complete();
}
catch(Exception e)
{
e.printStackTrace();
}
}
};
synchronized (_timer)
{
_timer.schedule(complete,complete_after);
}
}
else if (complete_after==0)
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED\n");
continuation.complete();
}
else if (resume_after>0)
{
TimerTask resume = new TimerTask()
{
@Override
public void run()
{
addHistory(((HttpServletResponse)continuation.getServletResponse()), "resume");
continuation.resume();
}
};
synchronized (_timer)
{
_timer.schedule(resume,resume_after);
}
}
else if (resume_after==0)
{
addHistory(((HttpServletResponse)continuation.getServletResponse()), "resume");
continuation.resume();
}
if (undispatch)
continuation.undispatch();
}
else if (sleep_for>=0)
{
try
{
Thread.sleep(sleep_for);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
response.setStatus(200);
response.getOutputStream().println("SLEPT\n");
}
else
{
response.setStatus(200);
response.getOutputStream().println("NORMAL\n");
}
}
else
{
response.addHeader("history","!initial");
if (suspend2_for>=0 && request.getAttribute("2nd")==null)
{
request.setAttribute("2nd","cycle");
if (suspend2_for>0)
continuation.setTimeout(suspend2_for);
// continuation.addContinuationListener(__listener);
addHistory(response, "suspend");
continuation.suspend(response);
if (complete2_after>0)
{
TimerTask complete = new TimerTask()
{
@Override
public void run()
{
try
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED\n");
continuation.complete();
}
catch(Exception e)
{
e.printStackTrace();
}
}
};
synchronized (_timer)
{
_timer.schedule(complete,complete2_after);
}
}
else if (complete2_after==0)
{
response.setStatus(200);
response.getOutputStream().println("COMPLETED\n");
continuation.complete();
}
else if (resume2_after>0)
{
TimerTask resume = new TimerTask()
{
@Override
public void run()
{
addHistory(response, "resume");
continuation.resume();
}
};
synchronized (_timer)
{
_timer.schedule(resume,resume2_after);
}
}
else if (resume2_after==0)
{
addHistory(response, "resume");
continuation.resume();
}
if (undispatch)
continuation.undispatch();
return;
}
else if (continuation.isExpired())
{
response.setStatus(200);
response.getOutputStream().println("TIMEOUT\n");
}
else if (continuation.isResumed())
{
response.setStatus(200);
response.getOutputStream().println("RESUMED\n");
}
else
{
response.setStatus(200);
response.getOutputStream().println("unknown???\n");
}
}
}
}
private static ContinuationListener __listener = new ContinuationListener()
{
@Override
public void onComplete(Continuation continuation)
{
// FIXME: Servlet3Continuation's calls this from AsyncListener.onComplete() which is
// not allowed to modify the servlet's response at that point.
addHistory(((HttpServletResponse)continuation.getServletResponse()),"onComplete");
}
@Override
public void onTimeout(Continuation continuation)
{
addHistory(((HttpServletResponse)continuation.getServletResponse()),"onTimeout");
}
};
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.continuation;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@ -30,26 +28,21 @@ import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ContinuationTest extends ContinuationBase
public class ContinuationTest extends AbstractContinuationTest
{
protected Server _server = new Server();
protected ServletHandler _servletHandler;
protected ServerConnector _connector;
FilterHolder _filter;
protected List<String> _log = new ArrayList<String>();
protected List<String> _log = new ArrayList<>();
@Before
public void setUp() throws Exception
@ -61,15 +54,15 @@ public class ContinuationTest extends ContinuationBase
RequestLogHandler requestLogHandler = new RequestLogHandler();
requestLogHandler.setRequestLog(new Log());
_server.setHandler(requestLogHandler);
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.NO_SECURITY|ServletContextHandler.NO_SESSIONS);
ServletContextHandler servletContext = new ServletContextHandler();
requestLogHandler.setHandler(servletContext);
_servletHandler=servletContext.getServletHandler();
ServletHolder holder=new ServletHolder(_servlet);
holder.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder,"/");
_servletHandler.addServletWithMapping(holder, "/");
_server.start();
_port=_connector.getLocalPort();
}
@ -82,115 +75,13 @@ public class ContinuationTest extends ContinuationBase
Assert.assertTrue(_log.get(0).startsWith("200 "));
Assert.assertTrue(_log.get(0).endsWith(" /"));
}
@Test
public void testContinuation() throws Exception
{
doNormal("Servlet3Continuation");
testNormal(Servlet3Continuation.class.getName());
}
@Test
public void testSleep() throws Exception
{
doSleep();
}
@Test
public void testSuspend() throws Exception
{
doSuspend();
}
@Test
public void testSuspendWaitResume() throws Exception
{
doSuspendWaitResume();
}
@Test
public void testSuspendResume() throws Exception
{
doSuspendResume();
}
@Test
public void testSuspendWaitComplete() throws Exception
{
doSuspendWaitComplete();
}
@Test
public void testSuspendComplete() throws Exception
{
doSuspendComplete();
}
@Test
public void testSuspendWaitResumeSuspendWaitResume() throws Exception
{
doSuspendWaitResumeSuspendWaitResume();
}
@Test
public void testSuspendWaitResumeSuspendComplete() throws Exception
{
doSuspendWaitResumeSuspendComplete();
}
@Test
public void testSuspendWaitResumeSuspend() throws Exception
{
doSuspendWaitResumeSuspend();
}
@Test
public void testSuspendTimeoutSuspendResume() throws Exception
{
doSuspendTimeoutSuspendResume();
}
@Test
public void testSuspendTimeoutSuspendComplete() throws Exception
{
doSuspendTimeoutSuspendComplete();
}
@Test
public void testSuspendTimeoutSuspend() throws Exception
{
doSuspendTimeoutSuspend();
}
@Test
public void testSuspendThrowResume() throws Exception
{
doSuspendThrowResume();
}
@Test
public void testSuspendResumeThrow() throws Exception
{
doSuspendResumeThrow();
}
@Test
public void testSuspendThrowComplete() throws Exception
{
doSuspendThrowComplete();
}
@Test
public void testSuspendCompleteThrow() throws Exception
{
doSuspendCompleteThrow();
}
@Override
protected String toString(InputStream in) throws IOException
{
return IO.toString(in);
}
class Log extends AbstractLifeCycle implements RequestLog
{
@Override
@ -200,6 +91,5 @@ public class ContinuationTest extends ContinuationBase
long written = response.getHttpChannel().getBytesWritten();
_log.add(status+" "+written+" "+request.getRequestURI());
}
}
}

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.continuation;
import java.io.IOException;
import java.io.InputStream;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@ -28,129 +25,46 @@ import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.IO;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class FauxContinuationTest extends ContinuationBase
@Ignore
public class FauxContinuationTest extends AbstractContinuationTest
{
protected Server _server = new Server();
protected ServletHandler _servletHandler;
protected ServerConnector _connector;
FilterHolder _filter;
protected void setUp() throws Exception
@Before
public void setUp() throws Exception
{
_connector = new ServerConnector(_server);
_server.setConnectors(new Connector[]{ _connector });
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.NO_SECURITY|ServletContextHandler.NO_SESSIONS);
ServletContextHandler servletContext = new ServletContextHandler();
_server.setHandler(servletContext);
_servletHandler=servletContext.getServletHandler();
ServletHolder holder=new ServletHolder(_servlet);
_servletHandler.addServletWithMapping(holder,"/");
_filter=_servletHandler.addFilterWithMapping(ContinuationFilter.class,"/*",null);
_filter.setInitParameter("debug","true");
_filter.setInitParameter("faux","true");
_server.start();
_port=_connector.getLocalPort();
}
protected void tearDown() throws Exception
@After
public void tearDown() throws Exception
{
_server.stop();
}
@Test
public void testContinuation() throws Exception
{
doNormal("FauxContinuation");
testNormal("FauxContinuation");
}
public void testSleep() throws Exception
{
doSleep();
}
public void testSuspend() throws Exception
{
doSuspend();
}
public void testSuspendWaitResume() throws Exception
{
doSuspendWaitResume();
}
public void testSuspendResume() throws Exception
{
doSuspendResume();
}
public void testSuspendWaitComplete() throws Exception
{
doSuspendWaitComplete();
}
public void testSuspendComplete() throws Exception
{
doSuspendComplete();
}
public void testSuspendWaitResumeSuspendWaitResume() throws Exception
{
doSuspendWaitResumeSuspendWaitResume();
}
public void testSuspendWaitResumeSuspendComplete() throws Exception
{
doSuspendWaitResumeSuspendComplete();
}
public void testSuspendWaitResumeSuspend() throws Exception
{
doSuspendWaitResumeSuspend();
}
public void testSuspendTimeoutSuspendResume() throws Exception
{
doSuspendTimeoutSuspendResume();
}
public void testSuspendTimeoutSuspendComplete() throws Exception
{
doSuspendTimeoutSuspendComplete();
}
public void testSuspendTimeoutSuspend() throws Exception
{
doSuspendTimeoutSuspend();
}
public void testSuspendThrowResume() throws Exception
{
doSuspendThrowResume();
}
public void testSuspendResumeThrow() throws Exception
{
doSuspendResumeThrow();
}
public void testSuspendThrowComplete() throws Exception
{
doSuspendThrowComplete();
}
public void testSuspendCompleteThrow() throws Exception
{
doSuspendCompleteThrow();
}
protected String toString(InputStream in) throws IOException
{
return IO.toString(in);
}
}