Removed wake() call from within pollContent
Instead the EOF status is evaluated :
 - when setReadListener is called
 - when read returns -1
 - when run() is called before calling either onDataAvailable or onAllDataRead

Squashed commit of the following:

commit 6a345356998331a90e60c7ee8ee590920464c72f
Merge: 92bc0e9 60d9001
Author: Greg Wilkins <gregw@webtide.com>
Date:   Mon Nov 7 09:46:23 2016 +1100

    Merge branch 'jetty-9.4.x' into jetty-9.4.x-issue-1074

commit 92bc0e9f3aafdce2f4aa7b2fde31affc942be042
Author: Greg Wilkins <gregw@webtide.com>
Date:   Sat Nov 5 18:24:00 2016 +1100

    Issue #1074

    Do not do async IO callbacks if completed

commit ee220a12d1e6c5f6e39b4597a209c5043aa775cf
Author: Greg Wilkins <gregw@webtide.com>
Date:   Sat Nov 5 16:22:55 2016 +1100

    Issue #1074

    Turned off debug
    improved proxy test to be able to run with debug on

commit e2fb0b9ef1ec422a2c82cb388820581e359234ba
Author: Greg Wilkins <gregw@webtide.com>
Date:   Sat Nov 5 15:37:27 2016 +1100

    Issue #1074

    Improved test cases
    Handle early EOF

commit 3c47c022fe7e48f82e41d9a208073b64cfeb5af7
Author: Greg Wilkins <gregw@webtide.com>
Date:   Sat Nov 5 12:28:15 2016 +1100

    provisional implementation
This commit is contained in:
Greg Wilkins 2016-11-07 15:01:39 +11:00
parent 60d90010c2
commit 0495bb896e
7 changed files with 888 additions and 54 deletions

View File

@ -360,13 +360,18 @@ public class ProxyServletTest
resp.addHeader(PROXIED_HEADER, "true");
InputStream input = req.getInputStream();
int index = 0;
byte[] buffer = new byte[16*1024];
while (true)
{
int value = input.read();
int value = input.read(buffer);
if (value < 0)
break;
Assert.assertEquals("Content mismatch at index=" + index, content[index] & 0xFF, value);
++index;
for (int i=0;i<value;i++)
{
Assert.assertEquals("Content mismatch at index=" + index, content[index] & 0xFF, buffer[i] & 0xFF);
++index;
}
}
}
});

View File

@ -2,3 +2,4 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.proxy.LEVEL=DEBUG
#org.eclipse.jetty.server.HttpInput.LEVEL=DEBUG

View File

@ -1008,6 +1008,14 @@ public class HttpChannelState
}
}
public boolean isAsyncComplete()
{
try(Locker.Lock lock= _locker.lock())
{
return _async==Async.COMPLETE;
}
}
public boolean isAsync()
{
try(Locker.Lock lock= _locker.lock())
@ -1168,6 +1176,31 @@ public class HttpChannelState
}
return woken;
}
/* ------------------------------------------------------------ */
/** Called to signal that a read has read -1.
* Will wake if the read was called while in ASYNC_WAIT state
* @return true if woken
*/
public boolean onReadEof()
{
boolean woken=false;
try(Locker.Lock lock= _locker.lock())
{
if(DEBUG)
LOG.debug("onReadEof {}",toStringLocked());
if (_state==State.ASYNC_WAIT)
{
_state=State.ASYNC_WOKEN;
_asyncReadUnready=true;
_asyncReadPossible=true;
woken=true;
}
}
return woken;
}
public boolean isReadPossible()
{

View File

@ -119,8 +119,8 @@ public class HttpInput extends ServletInputStream implements Runnable
}
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final static Content EOF_CONTENT = new EofContent("EOF");
private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
final static Content EOF_CONTENT = new EofContent("EOF");
final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
private final byte[] _oneByteBuffer = new byte[1];
private Content _content;
@ -232,7 +232,7 @@ public class HttpInput extends ServletInputStream implements Runnable
return available;
}
private void wake()
protected void wake()
{
HttpChannel channel = _channelState.getHttpChannel();
Executor executor = channel.getConnector().getServer().getThreadPool();
@ -256,6 +256,8 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public int read(byte[] b, int off, int len) throws IOException
{
boolean wake = false;
int l;
synchronized (_inputQ)
{
if (!isAsync())
@ -285,20 +287,29 @@ public class HttpInput extends ServletInputStream implements Runnable
Content item = nextContent();
if (item != null)
{
int l = get(item,b,off,len);
l = get(item,b,off,len);
if (LOG.isDebugEnabled())
LOG.debug("{} read {} from {}",this,l,item);
// Consume any following poison pills
pollReadableContent();
return l;
if (item.isEmpty())
pollReadableContent();
break;
}
if (!_state.blockForContent(this))
return _state.noContent();
{
l = _state.noContent();
if (l<0)
wake = _channelState.onReadEof();
break;
}
}
}
if (wake)
wake();
return l;
}
/**
@ -345,19 +356,14 @@ public class HttpInput extends ServletInputStream implements Runnable
// If it is EOF, consume it here
if (content instanceof SentinelContent)
{
if (content == EARLY_EOF_CONTENT)
_state = EARLY_EOF;
else if (content instanceof EofContent)
if (content instanceof EofContent)
{
if (_listener == null)
if (content == EARLY_EOF_CONTENT)
_state = EARLY_EOF;
else if (_listener == null)
_state = EOF;
else
{
_state = AEOF;
boolean woken = _channelState.onReadReady(); // force callback?
if (woken)
wake();
}
}
// Consume the EOF content, either if it was original content
@ -732,17 +738,25 @@ public class HttpInput extends ServletInputStream implements Runnable
{
if (_listener != null)
throw new IllegalStateException("ReadListener already set");
if (_state != STREAM)
throw new IllegalStateException("State " + STREAM + " != " + _state);
_state = ASYNC;
_listener = readListener;
boolean content = nextContent() != null;
if (content)
Content content = nextReadable();
if (content!=null)
{
_state = ASYNC;
woken = _channelState.onReadReady();
else
}
else if (_state == EOF)
{
_state = AEOF;
woken = _channelState.onReadReady();
}
else
{
_state = ASYNC;
_channelState.onReadUnready();
}
}
}
catch (IOException e)
@ -780,23 +794,54 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public void run()
{
final Throwable error;
final ReadListener listener;
Throwable error;
boolean aeof = false;
synchronized (_inputQ)
{
listener = _listener;
if (_channelState.isAsyncComplete())
return;
if (_state == EOF)
return;
if (_state == AEOF)
if (_state==AEOF)
{
_state = EOF;
aeof = true;
}
error = _state.getError();
if (!aeof && error==null)
{
Content content = pollReadableContent();
listener = _listener;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
// Consume EOF
if (content instanceof EofContent)
{
content.succeeded();
if (_content==content)
_content = null;
if (content == EARLY_EOF_CONTENT)
{
_state = EARLY_EOF;
error = _state.getError();
}
else
{
_state = EOF;
aeof = true;
}
}
else if (content==null)
{
LOG.warn("spurious run!");
return;
}
}
}
try
@ -813,6 +858,16 @@ public class HttpInput extends ServletInputStream implements Runnable
else
{
listener.onDataAvailable();
synchronized (_inputQ)
{
if (_state == AEOF)
{
_state = EOF;
aeof = !_channelState.isAsyncComplete();
}
}
if (aeof)
listener.onAllDataRead();
}
}
catch (Throwable e)
@ -956,6 +1011,11 @@ public class HttpInput extends ServletInputStream implements Runnable
{
return -1;
}
public Throwable getError()
{
return null;
}
}
protected static class EOFState extends State
@ -1027,7 +1087,7 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public int noContent() throws IOException
{
throw new EofException("Early EOF");
throw getError();
}
@Override
@ -1035,6 +1095,11 @@ public class HttpInput extends ServletInputStream implements Runnable
{
return "EARLY_EOF";
}
public IOException getError()
{
return new EofException("Early EOF");
}
};
protected static final State EOF = new EOFState()

View File

@ -0,0 +1,750 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT;
import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.ReadListener;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* this tests HttpInput and its interaction with HttpChannelState
*/
public class HttpInputAsyncStateTest
{
private static final Queue<String> __history = new LinkedBlockingQueue<>();
private ByteBuffer _expected = BufferUtil.allocate(16*1024);
private boolean _eof;
private boolean _noReadInDataAvailable;
private boolean _completeInOnDataAvailable;
private final ReadListener _listener = new ReadListener()
{
@Override
public void onError(Throwable t)
{
__history.add("onError:" + t);
}
@Override
public void onDataAvailable() throws IOException
{
__history.add("onDataAvailable");
if (!_noReadInDataAvailable && readAvailable() && _completeInOnDataAvailable)
{
__history.add("complete");
_state.complete();
}
}
@Override
public void onAllDataRead() throws IOException
{
__history.add("onAllDataRead");
}
};
private HttpInput _in;
HttpChannelState _state;
public static class TContent extends HttpInput.Content
{
public TContent(String content)
{
super(BufferUtil.toBuffer(content));
}
}
@Before
public void before()
{
_noReadInDataAvailable = false;
_in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null)
{
@Override
public void asyncReadFillInterested()
{
__history.add("asyncReadFillInterested");
}
@Override
public Scheduler getScheduler()
{
return null;
}
})
{
@Override
public void onReadUnready()
{
super.onReadUnready();
__history.add("onReadUnready");
}
@Override
public boolean onReadPossible()
{
boolean wake = super.onReadPossible();
__history.add("onReadPossible "+wake);
return wake;
}
@Override
public boolean onReadReady()
{
boolean wake = super.onReadReady();
__history.add("onReadReady "+wake);
return wake;
}
})
{
@Override
public void wake()
{
__history.add("wake");
}
};
_state = _in.getHttpChannelState();
__history.clear();
}
private void check(String... history)
{
if (history==null || history.length==0)
assertThat(__history,empty());
else
assertThat(__history.toArray(new String[__history.size()]),Matchers.arrayContaining(history));
__history.clear();
}
private void wake()
{
handle(null);
}
private void handle()
{
handle(null);
}
private void handle(Runnable run)
{
Action action = _state.handling();
loop: while(true)
{
switch(action)
{
case DISPATCH:
if (run==null)
Assert.fail();
run.run();
break;
case READ_CALLBACK:
_in.run();
break;
case TERMINATED:
case WAIT:
break loop;
case COMPLETE:
__history.add("COMPLETE");
break;
default:
Assert.fail();
}
action = _state.unhandle();
}
}
private void deliver(Content... content)
{
if (content!=null)
{
for (Content c: content)
{
if (c==EOF_CONTENT)
{
_in.eof();
_eof = true;
}
else if (c==HttpInput.EARLY_EOF_CONTENT)
{
_in.earlyEOF();
_eof = true;
}
else
{
_in.addContent(c);
BufferUtil.append(_expected,c.getByteBuffer().slice());
}
}
}
}
boolean readAvailable() throws IOException
{
int len=0;
try
{
while(_in.isReady())
{
int b = _in.read();
if (b<0)
{
if (len>0)
__history.add("read "+len);
__history.add("read -1");
assertTrue(BufferUtil.isEmpty(_expected));
assertTrue(_eof);
return true;
}
else
{
len++;
assertFalse(BufferUtil.isEmpty(_expected));
int a = 0xff & _expected.get();
assertThat(b,equalTo(a));
}
}
__history.add("read "+len);
assertTrue(BufferUtil.isEmpty(_expected));
}
catch(IOException e)
{
if (len>0)
__history.add("read "+len);
__history.add("read "+e);
throw e;
}
return false;
}
@After
public void after()
{
Assert.assertThat(__history.poll(), Matchers.nullValue());
}
@Test
public void testInitialEmptyListenInHandle() throws Exception
{
deliver(EOF_CONTENT);
check();
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadReady false");
});
check("onAllDataRead");
}
@Test
public void testInitialEmptyListenAfterHandle() throws Exception
{
deliver(EOF_CONTENT);
handle(()->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onReadReady true","wake");
wake();
check("onAllDataRead");
}
@Test
public void testListenInHandleEmpty() throws Exception
{
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("asyncReadFillInterested");
deliver(EOF_CONTENT);
check("onReadPossible true");
handle();
check("onAllDataRead");
}
@Test
public void testEmptyListenAfterHandle() throws Exception
{
handle(()->
{
_state.startAsync(null);
check();
});
deliver(EOF_CONTENT);
check();
_in.setReadListener(_listener);
check("onReadReady true","wake");
wake();
check("onAllDataRead");
}
@Test
public void testListenAfterHandleEmpty() throws Exception
{
handle(()->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("asyncReadFillInterested","onReadUnready");
deliver(EOF_CONTENT);
check("onReadPossible true");
handle();
check("onAllDataRead");
}
@Test
public void testInitialEarlyEOFListenInHandle() throws Exception
{
deliver(EARLY_EOF_CONTENT);
check();
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadReady false");
});
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testInitialEarlyEOFListenAfterHandle() throws Exception
{
deliver(EARLY_EOF_CONTENT);
handle(()->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onReadReady true","wake");
wake();
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testListenInHandleEarlyEOF() throws Exception
{
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("asyncReadFillInterested");
deliver(EARLY_EOF_CONTENT);
check("onReadPossible true");
handle();
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testEarlyEOFListenAfterHandle() throws Exception
{
handle(()->
{
_state.startAsync(null);
check();
});
deliver(EARLY_EOF_CONTENT);
check();
_in.setReadListener(_listener);
check("onReadReady true","wake");
wake();
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testListenAfterHandleEarlyEOF() throws Exception
{
handle(()->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("asyncReadFillInterested","onReadUnready");
deliver(EARLY_EOF_CONTENT);
check("onReadPossible true");
handle();
check("onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testInitialAllContentListenInHandle() throws Exception
{
deliver(new TContent("Hello"),EOF_CONTENT);
check();
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadReady false");
});
check("onDataAvailable","read 5","read -1","onAllDataRead");
}
@Test
public void testInitialAllContentListenAfterHandle() throws Exception
{
deliver(new TContent("Hello"),EOF_CONTENT);
handle(()->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onReadReady true","wake");
wake();
check("onDataAvailable","read 5","read -1","onAllDataRead");
}
@Test
public void testListenInHandleAllContent() throws Exception
{
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("asyncReadFillInterested");
deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false");
handle();
check("onDataAvailable","read 5","read -1","onAllDataRead");
}
@Test
public void testAllContentListenAfterHandle() throws Exception
{
handle(()->
{
_state.startAsync(null);
check();
});
deliver(new TContent("Hello"),EOF_CONTENT);
check();
_in.setReadListener(_listener);
check("onReadReady true","wake");
wake();
check("onDataAvailable","read 5","read -1","onAllDataRead");
}
@Test
public void testListenAfterHandleAllContent() throws Exception
{
handle(()->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("asyncReadFillInterested","onReadUnready");
deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false");
handle();
check("onDataAvailable","read 5","read -1","onAllDataRead");
}
@Test
public void testInitialIncompleteContentListenInHandle() throws Exception
{
deliver(new TContent("Hello"),EARLY_EOF_CONTENT);
check();
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadReady false");
});
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testInitialPartialContentListenAfterHandle() throws Exception
{
deliver(new TContent("Hello"),EARLY_EOF_CONTENT);
handle(()->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("onReadReady true","wake");
wake();
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testListenInHandlePartialContent() throws Exception
{
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("asyncReadFillInterested");
deliver(new TContent("Hello"),EARLY_EOF_CONTENT);
check("onReadPossible true","onReadPossible false");
handle();
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testPartialContentListenAfterHandle() throws Exception
{
handle(()->
{
_state.startAsync(null);
check();
});
deliver(new TContent("Hello"),EARLY_EOF_CONTENT);
check();
_in.setReadListener(_listener);
check("onReadReady true","wake");
wake();
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testListenAfterHandlePartialContent() throws Exception
{
handle(()->
{
_state.startAsync(null);
check();
});
_in.setReadListener(_listener);
check("asyncReadFillInterested","onReadUnready");
deliver(new TContent("Hello"),EARLY_EOF_CONTENT);
check("onReadPossible true","onReadPossible false");
handle();
check(
"onDataAvailable",
"read 5",
"read org.eclipse.jetty.io.EofException: Early EOF",
"onError:org.eclipse.jetty.io.EofException: Early EOF");
}
@Test
public void testReadAfterOnDataAvailable() throws Exception
{
_noReadInDataAvailable = true;
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("asyncReadFillInterested");
deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false");
handle();
check("onDataAvailable");
readAvailable();
check("wake","read 5","read -1");
wake();
check("onAllDataRead");
}
@Test
public void testReadOnlyExpectedAfterOnDataAvailable() throws Exception
{
_noReadInDataAvailable = true;
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("asyncReadFillInterested");
deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false");
handle();
check("onDataAvailable");
byte[] buffer = new byte[_expected.remaining()];
assertThat(_in.read(buffer),equalTo(buffer.length));
assertThat(new String(buffer),equalTo(BufferUtil.toString(_expected)));
BufferUtil.clear(_expected);
check();
assertTrue(_in.isReady());
check();
assertThat(_in.read(),equalTo(-1));
check("wake");
wake();
check("onAllDataRead");
}
@Test
public void testReadAndCompleteInOnDataAvailable() throws Exception
{
_completeInOnDataAvailable = true;
handle(()->
{
_state.startAsync(null);
_in.setReadListener(_listener);
check("onReadUnready");
});
check("asyncReadFillInterested");
deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false");
handle(()->{__history.add(_state.getState().toString());});
System.err.println(__history);
check(
"onDataAvailable",
"read 5",
"read -1",
"complete",
"COMPLETE"
);
}
}

View File

@ -390,10 +390,6 @@ public class HttpInputTest
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run();
Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
@ -413,10 +409,6 @@ public class HttpInputTest
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run();
Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
@ -466,10 +458,6 @@ public class HttpInputTest
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run();
Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.eof();
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
@ -478,7 +466,6 @@ public class HttpInputTest
Assert.assertThat(_in.read(), Matchers.equalTo(-1));
Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
Assert.assertThat(_history.poll(), Matchers.equalTo("ready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
}
@ -490,10 +477,6 @@ public class HttpInputTest
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run();
Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
@ -527,7 +510,6 @@ public class HttpInputTest
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
Assert.assertThat(_in.read(), Matchers.equalTo(-1));
Assert.assertThat(_in.isFinished(), Matchers.equalTo(true));
Assert.assertThat(_history.poll(), Matchers.equalTo("ready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
@ -541,9 +523,6 @@ public class HttpInputTest
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));
Assert.assertThat(_history.poll(), Matchers.equalTo("unready"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
_in.run();
Assert.assertThat(_history.poll(), Matchers.equalTo("onDataAvailable"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.isReady(), Matchers.equalTo(false));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 0"));

View File

@ -1001,6 +1001,7 @@ public class AsyncIOServletTest extends AbstractTest
@Override
public void onAllDataRead() throws IOException
{
asyncContext.complete();
}
@Override