406390 Close if at END and content remaining

This commit is contained in:
Greg Wilkins 2013-04-29 14:01:11 +10:00
parent cbb76283c9
commit 7955548d10
4 changed files with 209 additions and 21 deletions

View File

@ -1388,6 +1388,7 @@ public class HttpParser
/* ------------------------------------------------------------------------------- */
private void setState(State state)
{
// LOG.debug("{} --> {}",_state,state);
_state=state;
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -112,13 +113,130 @@ public abstract class AbstractConnection implements Connection
if (_state.compareAndSet(State.FILLING,State.FILLING_INTERESTED))
break loop;
break;
case FILLING_BLOCKED:
if (_state.compareAndSet(State.FILLING_BLOCKED,State.FILLING_BLOCKED_INTERESTED))
break loop;
break;
case BLOCKED:
if (_state.compareAndSet(State.BLOCKED,State.BLOCKED_INTERESTED))
break loop;
break;
case FILLING_BLOCKED_INTERESTED:
case FILLING_INTERESTED:
case BLOCKED_INTERESTED:
case INTERESTED:
break loop;
}
}
}
private void unblock()
{
LOG.debug("unblock {}",this);
loop:while(true)
{
switch(_state.get())
{
case FILLING_BLOCKED:
if (_state.compareAndSet(State.FILLING_BLOCKED,State.FILLING))
break loop;
break;
case FILLING_BLOCKED_INTERESTED:
if (_state.compareAndSet(State.FILLING_BLOCKED_INTERESTED,State.FILLING_INTERESTED))
break loop;
break;
case BLOCKED_INTERESTED:
if (_state.compareAndSet(State.BLOCKED_INTERESTED,State.INTERESTED))
{
getEndPoint().fillInterested(_readCallback);
break loop;
}
break;
case BLOCKED:
if (_state.compareAndSet(State.BLOCKED,State.IDLE))
break loop;
break;
case FILLING:
case IDLE:
case FILLING_INTERESTED:
case INTERESTED:
break loop;
}
}
}
/**
*/
protected void block(final BlockingCallback callback)
{
LOG.debug("block {}",this);
final Callback blocked=new Callback()
{
@Override
public void succeeded()
{
unblock();
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
unblock();
callback.failed(x);
}
};
loop:while(true)
{
switch(_state.get())
{
case IDLE:
if (_state.compareAndSet(State.IDLE,State.BLOCKED))
{
getEndPoint().fillInterested(blocked);
break loop;
}
break;
case FILLING:
if (_state.compareAndSet(State.FILLING,State.FILLING_BLOCKED))
{
getEndPoint().fillInterested(blocked);
break loop;
}
break;
case FILLING_INTERESTED:
if (_state.compareAndSet(State.FILLING_INTERESTED,State.FILLING_BLOCKED_INTERESTED))
{
getEndPoint().fillInterested(blocked);
break loop;
}
break;
case BLOCKED:
case BLOCKED_INTERESTED:
case FILLING_BLOCKED:
case FILLING_BLOCKED_INTERESTED:
throw new IllegalStateException("Already Blocked");
case INTERESTED:
throw new IllegalStateException();
}
}
}
/**
* <p>Callback method invoked when the endpoint is ready to be read.</p>
@ -225,7 +343,7 @@ public abstract class AbstractConnection implements Connection
private enum State
{
IDLE, INTERESTED, FILLING, FILLING_INTERESTED
IDLE, INTERESTED, FILLING, FILLING_INTERESTED, FILLING_BLOCKED, BLOCKED, FILLING_BLOCKED_INTERESTED, BLOCKED_INTERESTED
}
private class ReadCallback implements Callback, Runnable
@ -247,12 +365,25 @@ public abstract class AbstractConnection implements Connection
{
case IDLE:
case INTERESTED:
throw new IllegalStateException();
case BLOCKED:
case BLOCKED_INTERESTED:
LOG.warn(new IllegalStateException());
return;
case FILLING:
if (_state.compareAndSet(State.FILLING,State.IDLE))
break loop;
break;
case FILLING_BLOCKED:
if (_state.compareAndSet(State.FILLING_BLOCKED,State.BLOCKED))
break loop;
break;
case FILLING_BLOCKED_INTERESTED:
if (_state.compareAndSet(State.FILLING_BLOCKED_INTERESTED,State.BLOCKED_INTERESTED))
break loop;
break;
case FILLING_INTERESTED:
if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
@ -266,7 +397,7 @@ public abstract class AbstractConnection implements Connection
}
}
else
LOG.warn(new Throwable());
LOG.warn(new IllegalStateException());
}
@Override

View File

@ -550,7 +550,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// need to call blockForContent again
while (event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
_parser.parseNext(_requestBuffer);
// If we have an event, return
if (event)
return;
@ -566,7 +566,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
// Wait until we can read
getEndPoint().fillInterested(_readBlocker);
block(_readBlocker);
LOG.debug("{} block readable on {}",this,_readBlocker);
_readBlocker.block();

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.servlet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
@ -345,6 +344,45 @@ public class AsyncServletTest
Assert.assertThat(response,Matchers.not(Matchers.containsString(content)));
}
@Test
public void testAsyncRead() throws Exception
{
String header="GET /ctx/path/info?suspend=2000&resume=1500 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: 10\r\n"+
"\r\n";
String body="12345678\r\n";
String close="GET /ctx/path/info HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Connection: close\r\n"+
"\r\n";
try (Socket socket = new Socket("localhost",_port);)
{
socket.setSoTimeout(10000);
socket.getOutputStream().write(header.getBytes("ISO-8859-1"));
Thread.sleep(500);
socket.getOutputStream().write(body.getBytes("ISO-8859-1"),0,2);
Thread.sleep(500);
socket.getOutputStream().write(body.getBytes("ISO-8859-1"),2,8);
socket.getOutputStream().write(close.getBytes("ISO-8859-1"));
String response = IO.toString(socket.getInputStream());
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
assertContains(
"history: REQUEST\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: async-read=10\r\n"+
"history: resume\r\n"+
"history: ASYNC\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
}
}
public synchronized String process(String query,String content) throws Exception
{
String request = "GET /ctx/path/info";
@ -364,9 +402,8 @@ public class AsyncServletTest
int port=_port;
String response=null;
try
try (Socket socket = new Socket("localhost",port);)
{
Socket socket = new Socket("localhost",port);
socket.setSoTimeout(1000000);
socket.getOutputStream().write(request.getBytes("UTF-8"));
@ -379,11 +416,10 @@ public class AsyncServletTest
throw e;
}
// System.err.println(response);
return response;
}
private static class AsyncServlet extends HttpServlet
@ -429,7 +465,7 @@ public class AsyncServletTest
if (request.getDispatcherType()==DispatcherType.REQUEST)
{
((HttpServletResponse)response).addHeader("history","initial");
response.addHeader("history","initial");
if (read_before>0)
{
byte[] buf=new byte[read_before];
@ -442,6 +478,30 @@ public class AsyncServletTest
while(b!=-1)
b=in.read();
}
else if (request.getContentLength()>0)
{
new Thread()
{
@Override
public void run()
{
int c=0;
try
{
InputStream in=request.getInputStream();
int b=0;
while(b!=-1)
if((b=in.read())>=0)
c++;
response.addHeader("history","async-read="+c);
}
catch(Exception e)
{
e.printStackTrace();
}
}
}.start();
}
if (suspend_for>=0)
{
@ -449,7 +509,7 @@ public class AsyncServletTest
if (suspend_for>0)
async.setTimeout(suspend_for);
async.addListener(__listener);
((HttpServletResponse)response).addHeader("history","suspend");
response.addHeader("history","suspend");
if (complete_after>0)
{
@ -527,7 +587,7 @@ public class AsyncServletTest
}
else
{
((HttpServletResponse)response).addHeader("history","!initial");
response.addHeader("history","!initial");
if (suspend2_for>=0 && request.getAttribute("2nd")==null)
{
@ -540,7 +600,7 @@ public class AsyncServletTest
async.setTimeout(suspend2_for);
}
// continuation.addContinuationListener(__listener);
((HttpServletResponse)response).addHeader("history","suspend");
response.addHeader("history","suspend");
if (complete2_after>0)
{
@ -581,7 +641,7 @@ public class AsyncServletTest
@Override
public void run()
{
((HttpServletResponse)response).addHeader("history","resume");
response.addHeader("history","resume");
async.dispatch();
}
};
@ -592,7 +652,7 @@ public class AsyncServletTest
}
else if (resume2_after==0)
{
((HttpServletResponse)response).addHeader("history","dispatch");
response.addHeader("history","dispatch");
async.dispatch();
}
}
@ -633,15 +693,11 @@ public class AsyncServletTest
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
// TODO Auto-generated method stub
}
@Override
public void onError(AsyncEvent event) throws IOException
{
// TODO Auto-generated method stub
}
@Override