ensure onAllDataRead always called

This commit is contained in:
Greg Wilkins 2015-01-02 19:17:26 +01:00
parent 5bed6323c1
commit 3d66b1d207
4 changed files with 57 additions and 46 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import javax.servlet.ReadListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@ -130,12 +131,15 @@ public class AsyncProxyServlet extends ProxyServlet
@Override
public void onDataAvailable() throws IOException
{
System.err.println(Thread.currentThread()+" ODA");
iterate();
System.err.println(Thread.currentThread()+" !ODA");
}
@Override
public void onAllDataRead() throws IOException
{
System.err.println(Thread.currentThread()+" ON ALL DATA READ!!!!");
if (_log.isDebugEnabled())
_log.debug("{} proxying content to upstream completed", getRequestId(request));
provider.close();
@ -150,14 +154,17 @@ public class AsyncProxyServlet extends ProxyServlet
@Override
protected Action process() throws Exception
{
System.err.println(Thread.currentThread()+" Process");
int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0;
ServletInputStream input = request.getInputStream();
// First check for isReady() because it has
// side effects, and then for isFinished().
System.err.printf(Thread.currentThread()+" process isFinished=%b%n",input.isFinished());
while (input.isReady() && !input.isFinished())
{
int read = input.read(buffer);
System.err.printf(Thread.currentThread()+" read=%d%n",read);
if (_log.isDebugEnabled())
_log.debug("{} asynchronous read {} bytes on {}", requestId, read, input);
if (read > 0)
@ -169,6 +176,8 @@ public class AsyncProxyServlet extends ProxyServlet
}
}
System.err.printf(Thread.currentThread()+" processed isFinished=%b%n",input.isFinished());
if (input.isFinished())
{
if (_log.isDebugEnabled())

View File

@ -27,7 +27,6 @@ import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -187,7 +186,15 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
LOG.debug("{} consumed {}", this, content);
if (content==EOF_CONTENT)
_state=EOF;
{
if (_listener==null)
_state=EOF;
else
{
_state=AEOF;
onReadPossible();
}
}
else if (content==EARLY_EOF_CONTENT)
_state=EARLY_EOF;
@ -376,7 +383,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
synchronized (_inputQ)
{
return _state.isEOF();
return _state instanceof EOFState;
}
}
@ -384,14 +391,13 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
@Override
public boolean isReady()
{
boolean finished=false;
synchronized (_inputQ)
{
if (_listener == null )
return true;
if (_unready)
return false;
if (_state.isEOF())
if (_state instanceof EOFState)
return true;
if (_inputQ.isEmpty())
@ -460,18 +466,20 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
final Throwable error;
final ReadListener listener;
final boolean eof;
boolean aeof=false;
synchronized (_inputQ)
{
if (!_unready || _listener == null)
if (_state==AEOF)
{
_state=EOF;
aeof=true;
}
else if (!_unready)
return;
listener = _listener;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
eof = isFinished();
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
_unready=false;
}
@ -479,12 +487,20 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
if (error != null)
listener.onError(error);
else if (eof)
else if (aeof)
listener.onAllDataRead();
else
{
listener.onDataAvailable();
if (isFinished())
synchronized (_inputQ)
{
if (_state==AEOF)
{
_state=EOF;
aeof=true;
}
}
if (aeof)
listener.onAllDataRead();
}
}
@ -549,14 +565,13 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
return -1;
}
}
public boolean isEOF()
{
return false;
}
protected static class EOFState extends State
{
}
protected static class ErrorState extends State
protected static class ErrorState extends EOFState
{
final Throwable _error;
ErrorState(Throwable error)
@ -577,12 +592,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
throw new IOException(_error);
}
@Override
public boolean isEOF()
{
return true;
}
@Override
public String toString()
{
@ -620,8 +629,8 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
return "ASYNC";
}
};
protected static final State EARLY_EOF = new State()
protected static final State EARLY_EOF = new EOFState()
{
@Override
public int noContent() throws IOException
@ -629,12 +638,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
throw new EofException("Early EOF");
}
@Override
public boolean isEOF()
{
return true;
}
@Override
public String toString()
{
@ -642,18 +645,21 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
};
protected static final State EOF = new State()
protected static final State EOF = new EOFState()
{
@Override
public boolean isEOF()
{
return true;
}
@Override
public String toString()
{
return "EOF";
}
};
protected static final State AEOF = new EOFState()
{
@Override
public String toString()
{
return "AEOF";
}
};
}

View File

@ -20,13 +20,8 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpInputOverHTTP extends HttpInput
{
private static final Logger LOG = Log.getLogger(HttpInputOverHTTP.class);
private final HttpConnection _httpConnection;
public HttpInputOverHTTP(HttpConnection httpConnection)

View File

@ -415,7 +415,7 @@ public class HttpInputTest
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
}
@ -462,8 +462,9 @@ public class HttpInputTest
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),nullValue());