fixed read while suspended issue with double dispatch
This commit is contained in:
parent
722f390800
commit
c9e08217d8
|
@ -148,7 +148,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are threads dispatched reading and writing
|
// If there are threads dispatched reading and writing
|
||||||
if (_readBlocked || _writeBlocked)
|
if (_readBlocked || _writeBlocked)
|
||||||
{
|
{
|
||||||
// assert _dispatched;
|
// assert _dispatched;
|
||||||
|
@ -161,8 +161,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
|
||||||
this.notifyAll();
|
this.notifyAll();
|
||||||
|
|
||||||
// we are not interested in further selecting
|
// we are not interested in further selecting
|
||||||
if (_dispatched)
|
_key.interestOps(0);
|
||||||
_key.interestOps(0);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.Exchanger;
|
import java.util.concurrent.Exchanger;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -98,6 +99,57 @@ public class AsyncRequestReadTest
|
||||||
assertEquals(content.length, total);
|
assertEquals(content.length, total);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void tests() throws Exception
|
||||||
|
{
|
||||||
|
runTest(64,4,4,20);
|
||||||
|
runTest(256,16,16,50);
|
||||||
|
runTest(256,1,128,10);
|
||||||
|
runTest(128*1024,1,64,10);
|
||||||
|
runTest(256*1024,5321,10,100);
|
||||||
|
runTest(512*1024,32*1024,10,10);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void runTest(int contentSize, int chunkSize, int chunks, int delayMS) throws Exception
|
||||||
|
{
|
||||||
|
String tst=contentSize+","+chunkSize+","+chunks+","+delayMS;
|
||||||
|
System.err.println(tst);
|
||||||
|
|
||||||
|
final Socket socket = new Socket("localhost",connector.getLocalPort());
|
||||||
|
|
||||||
|
byte[] content = new byte[contentSize];
|
||||||
|
Arrays.fill(content, (byte)120);
|
||||||
|
|
||||||
|
OutputStream out = socket.getOutputStream();
|
||||||
|
out.write("POST / HTTP/1.1\r\n".getBytes());
|
||||||
|
out.write("Host: localhost\r\n".getBytes());
|
||||||
|
out.write(("Content-Length: "+content.length+"\r\n").getBytes());
|
||||||
|
out.write("Content-Type: bytes\r\n".getBytes());
|
||||||
|
out.write("Connection: close\r\n".getBytes());
|
||||||
|
out.write("\r\n".getBytes());
|
||||||
|
out.flush();
|
||||||
|
|
||||||
|
int offset=0;
|
||||||
|
for (int i=0;i<chunks;i++)
|
||||||
|
{
|
||||||
|
out.write(content,offset,chunkSize);
|
||||||
|
offset+=chunkSize;
|
||||||
|
Thread.sleep(delayMS);
|
||||||
|
}
|
||||||
|
out.write(content,offset,content.length-offset);
|
||||||
|
|
||||||
|
out.flush();
|
||||||
|
|
||||||
|
InputStream in = socket.getInputStream();
|
||||||
|
String response = IO.toString(in);
|
||||||
|
assertTrue(tst,response.indexOf("200 OK")>0);
|
||||||
|
|
||||||
|
long total=__total.exchange(0L,30,TimeUnit.SECONDS);
|
||||||
|
assertEquals(tst,content.length, total);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class EmptyHandler extends AbstractHandler
|
private static class EmptyHandler extends AbstractHandler
|
||||||
{
|
{
|
||||||
public void handle(String path, final Request request, HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws IOException, ServletException
|
public void handle(String path, final Request request, HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws IOException, ServletException
|
||||||
|
|
Loading…
Reference in New Issue