432901 ensure a single onError callback only in pending and unready states
This commit is contained in:
parent
b4542a031b
commit
87c5b30d1c
|
@ -74,7 +74,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
write completed - - - ASYNC READY->owp -
|
||||
|
||||
*/
|
||||
enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
|
||||
enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
|
||||
private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
|
||||
|
||||
public HttpOutput(HttpChannel<?> channel)
|
||||
|
@ -146,7 +146,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
break loop;
|
||||
|
||||
case UNREADY:
|
||||
throw new WritePendingException(); // TODO ?
|
||||
if (_state.compareAndSet(state,OutputState.ERROR))
|
||||
_writeListener.onError(_onError==null?new EofException("Async close"):_onError);
|
||||
continue;
|
||||
|
||||
default:
|
||||
if (_state.compareAndSet(state,OutputState.CLOSED))
|
||||
|
@ -179,7 +181,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
break loop;
|
||||
|
||||
case UNREADY:
|
||||
throw new WritePendingException(); // TODO ?
|
||||
if (_state.compareAndSet(state,OutputState.ERROR))
|
||||
_writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
|
||||
continue;
|
||||
|
||||
default:
|
||||
if (_state.compareAndSet(state,OutputState.CLOSED))
|
||||
|
@ -238,6 +242,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
case UNREADY:
|
||||
throw new WritePendingException();
|
||||
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
|
||||
case CLOSED:
|
||||
return;
|
||||
}
|
||||
|
@ -298,6 +305,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
case UNREADY:
|
||||
throw new WritePendingException();
|
||||
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
|
||||
case CLOSED:
|
||||
throw new EofException("Closed");
|
||||
}
|
||||
|
@ -396,6 +406,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
case UNREADY:
|
||||
throw new WritePendingException();
|
||||
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
|
||||
case CLOSED:
|
||||
throw new EofException("Closed");
|
||||
}
|
||||
|
@ -476,6 +489,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
case UNREADY:
|
||||
throw new WritePendingException();
|
||||
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
|
||||
case CLOSED:
|
||||
throw new EofException("Closed");
|
||||
}
|
||||
|
@ -615,6 +631,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
|
||||
continue;
|
||||
break;
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
case CLOSED:
|
||||
throw new EofException("Closed");
|
||||
default:
|
||||
|
@ -706,6 +724,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
return false;
|
||||
case UNREADY:
|
||||
return false;
|
||||
|
||||
case ERROR:
|
||||
return true;
|
||||
|
||||
case CLOSED:
|
||||
return true;
|
||||
|
@ -716,45 +737,54 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
if(_onError!=null)
|
||||
loop: while (true)
|
||||
{
|
||||
Throwable th=_onError;
|
||||
_onError=null;
|
||||
_writeListener.onError(new IOException(th));
|
||||
close();
|
||||
}
|
||||
|
||||
switch(_state.get())
|
||||
{
|
||||
case READY:
|
||||
try
|
||||
OutputState state = _state.get();
|
||||
|
||||
if(_onError!=null)
|
||||
{
|
||||
switch(state)
|
||||
{
|
||||
_writeListener.onWritePossible();
|
||||
case CLOSED:
|
||||
case ERROR:
|
||||
_onError=null;
|
||||
break loop;
|
||||
|
||||
default:
|
||||
if (_state.compareAndSet(state, OutputState.ERROR))
|
||||
{
|
||||
Throwable th=_onError;
|
||||
_onError=null;
|
||||
_writeListener.onError(new IOException(th));
|
||||
close();
|
||||
|
||||
break loop;
|
||||
}
|
||||
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
_writeListener.onError(e);
|
||||
close();
|
||||
}
|
||||
break;
|
||||
|
||||
case CLOSED:
|
||||
try
|
||||
{
|
||||
new Throwable().printStackTrace();
|
||||
continue loop;
|
||||
}
|
||||
|
||||
switch(_state.get())
|
||||
{
|
||||
case READY:
|
||||
case CLOSED:
|
||||
// even though a write is not possible, because a close has
|
||||
// occurred, we need to call onWritePossible to tell async
|
||||
// producer that the last write completed.
|
||||
_writeListener.onWritePossible();
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
_writeListener.onError(e);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
|
||||
try
|
||||
{
|
||||
_writeListener.onWritePossible();
|
||||
break loop;
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
_onError=e;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -769,37 +799,29 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
@Override
|
||||
protected void completed()
|
||||
{
|
||||
try
|
||||
while(true)
|
||||
{
|
||||
while(true)
|
||||
OutputState last=_state.get();
|
||||
switch(last)
|
||||
{
|
||||
HttpOutput.OutputState last=_state.get();
|
||||
switch(last)
|
||||
{
|
||||
case PENDING:
|
||||
if (!_state.compareAndSet(HttpOutput.OutputState.PENDING, HttpOutput.OutputState.ASYNC))
|
||||
continue;
|
||||
break;
|
||||
case PENDING:
|
||||
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
|
||||
continue;
|
||||
break;
|
||||
|
||||
case UNREADY:
|
||||
if (!_state.compareAndSet(HttpOutput.OutputState.UNREADY, HttpOutput.OutputState.READY))
|
||||
continue;
|
||||
_channel.getState().onWritePossible();
|
||||
break;
|
||||
case UNREADY:
|
||||
if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
|
||||
continue;
|
||||
_channel.getState().onWritePossible();
|
||||
break;
|
||||
|
||||
case CLOSED:
|
||||
break;
|
||||
case CLOSED:
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_onError=e;
|
||||
_channel.getState().onWritePossible();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,11 +18,16 @@
|
|||
|
||||
package org.eclipse.jetty.servlet;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -30,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ReadListener;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.WriteListener;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -39,6 +45,7 @@ import org.eclipse.jetty.server.Server;
|
|||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser;
|
||||
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -56,7 +63,7 @@ public class AsyncIOServletTest
|
|||
connector = new ServerConnector(server);
|
||||
server.addConnector(connector);
|
||||
|
||||
context = new ServletContextHandler(server, "", false, false);
|
||||
context = new ServletContextHandler(server, "/", false, false);
|
||||
ServletHolder holder = new ServletHolder(servlet);
|
||||
holder.setAsyncSupported(true);
|
||||
context.addServlet(holder, path);
|
||||
|
@ -257,4 +264,81 @@ public class AsyncIOServletTest
|
|||
Assert.assertEquals("500", response.getCode());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAsyncWriteClosed() throws Exception
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
String text = "Now is the winter of our discontent. How Now Brown Cow. The quick brown fox jumped over the lazy dog.\n";
|
||||
for (int i=0;i<10;i++)
|
||||
text=text+text;
|
||||
final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1);
|
||||
|
||||
startServer(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
response.flushBuffer();
|
||||
|
||||
final AsyncContext async = request.startAsync();
|
||||
final ServletOutputStream out = response.getOutputStream();
|
||||
out.setWriteListener(new WriteListener()
|
||||
{
|
||||
@Override
|
||||
public void onWritePossible() throws IOException
|
||||
{
|
||||
while (out.isReady())
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(100);
|
||||
out.write(data);
|
||||
}
|
||||
catch(IOException e)
|
||||
{
|
||||
throw e;
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
async.complete();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
String request = "GET " + path + " HTTP/1.1\r\n" +
|
||||
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
|
||||
"\r\n";
|
||||
|
||||
try (Socket client = new Socket("localhost", connector.getLocalPort()))
|
||||
{
|
||||
OutputStream output = client.getOutputStream();
|
||||
output.write(request.getBytes("UTF-8"));
|
||||
output.flush();
|
||||
|
||||
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
|
||||
String line=in.readLine();
|
||||
assertThat(line, containsString("200 OK"));
|
||||
while (line.length()>0)
|
||||
line=in.readLine();
|
||||
line=in.readLine();
|
||||
assertThat(line, not(containsString(" ")));
|
||||
line=in.readLine();
|
||||
assertThat(line, containsString("discontent. How Now Brown Cow. The "));
|
||||
}
|
||||
|
||||
if (!latch.await(5, TimeUnit.SECONDS))
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue