Merge branch 'master' into jetty-9.4.x-Feature

This commit is contained in:
Greg Wilkins 2015-12-04 10:11:09 +11:00
commit 46ce2aef3a
1 changed files with 53 additions and 49 deletions

View File

@ -614,6 +614,9 @@ public class AsyncServletIOTest
@Test
public void testCompleteWhilePending() throws Exception
{
_servlet4.onDA.set(0);
_servlet4.onWP.set(0);
StringBuilder request = new StringBuilder(512);
request.append("POST /ctx/path4/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
@ -674,7 +677,7 @@ public class AsyncServletIOTest
}
catch(IOException e)
{
// ignored
}
LOG.debug("last: "+last);
@ -686,8 +689,8 @@ public class AsyncServletIOTest
assertTrue(_servlet4.completed.await(5, TimeUnit.SECONDS));
Thread.sleep(100);
assertEquals(2,_servlet4.onDA.get());
assertEquals(2,_servlet4.onWP.get());
assertEquals(0,_servlet4.onDA.get());
assertEquals(0,_servlet4.onWP.get());
}
@ -708,7 +711,6 @@ public class AsyncServletIOTest
in.setReadListener(new ReadListener()
{
@Override
public void onError(Throwable t)
{
@ -719,59 +721,61 @@ public class AsyncServletIOTest
public void onDataAvailable() throws IOException
{
onDA.incrementAndGet();
if (onDA.get()>2)
return;
boolean readF=false;
// Read all available content
while(in.isReady())
if (in.read()<0)
throw new IllegalStateException();
if (onDA.get()==1)
return;
final byte[] buffer = new byte[64*1024];
Arrays.fill(buffer,(byte)'X');
for (int i=199;i<buffer.length;i+=200)
buffer[i]=(byte)'\n';
// Once we read block, let's make ourselves write blocked
out.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
int c = in.read();
if (c<0)
throw new IllegalStateException();
if (c=='F')
readF=true;
}
if (readF)
{
onDA.set(0);
final byte[] buffer = new byte[64*1024];
Arrays.fill(buffer,(byte)'X');
for (int i=199;i<buffer.length;i+=200)
buffer[i]=(byte)'\n';
// Once we read block, let's make ourselves write blocked
out.setWriteListener(new WriteListener()
{
onWP.incrementAndGet();
if (onWP.get()>2)
return;
while (out.isReady())
out.write(buffer);
if (onWP.get()==1)
return;
try
@Override
public void onWritePossible() throws IOException
{
// As soon as we are write blocked, complete
async.complete();
onWP.incrementAndGet();
while (out.isReady())
out.write(buffer);
try
{
// As soon as we are write blocked, complete
onWP.set(0);
async.complete();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
completed.countDown();
}
}
catch(Exception e)
@Override
public void onError(Throwable t)
{
e.printStackTrace();
t.printStackTrace();
}
finally
{
completed.countDown();
}
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
}
});
});
}
}
@Override