Misc cleanups done whilst fixing 435322

This commit is contained in:
Greg Wilkins 2014-08-01 09:02:44 +10:00
parent 77b8077910
commit f970ffc0ac
5 changed files with 75 additions and 51 deletions

View File

@ -338,18 +338,15 @@ abstract public class WriteFlusher
if (DEBUG)
LOG.debug("flushed {}", flushed);
// Are we complete?
for (ByteBuffer b : buffers)
// if we are incomplete?
if (!flushed)
{
if (!flushed||BufferUtil.hasContent(b))
{
PendingState pending=new PendingState(buffers, callback);
if (updateState(__WRITING,pending))
onIncompleteFlushed();
else
fail(pending);
return;
}
PendingState pending=new PendingState(buffers, callback);
if (updateState(__WRITING,pending))
onIncompleteFlushed();
else
fail(pending);
return;
}
// If updateState didn't succeed, we don't care as our buffers have been written
@ -403,17 +400,14 @@ abstract public class WriteFlusher
if (DEBUG)
LOG.debug("flushed {}", flushed);
// Are we complete?
for (ByteBuffer b : buffers)
// if we are incomplete?
if (!flushed)
{
if (!flushed || BufferUtil.hasContent(b))
{
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
else
fail(pending);
return;
}
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
else
fail(pending);
return;
}
// If updateState didn't succeed, we don't care as our buffers have been written

View File

@ -49,6 +49,7 @@ import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.URIUtil;
@ -484,10 +485,11 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable, H
@Override
public String toString()
{
return String.format("%s@%x{r=%s,a=%s,uri=%s}",
return String.format("%s@%x{r=%s,c=%b,a=%s,uri=%s}",
getClass().getSimpleName(),
hashCode(),
_requests,
_committed.get(),
_state.getState(),
_state.getState()==HttpChannelState.State.IDLE?"-":_request.getRequestURI()
);
@ -724,7 +726,6 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable, H
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete, final Callback callback)
{
// TODO check that complete only set true once by changing _committed to AtomicRef<Enum>
boolean committing = _committed.compareAndSet(false, true);
if (committing)
{
@ -732,7 +733,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable, H
if (info==null)
info = _response.newResponseInfo();
// wrap callback to process 100 or 500 responses
// wrap callback to process 100 responses
final int status=info.getStatus();
final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback);
@ -864,8 +865,10 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable, H
@Override
public void succeeded()
{
_committed.set(false);
super.succeeded();
if (_committed.compareAndSet(true, false))
super.succeeded();
else
super.failed(new IllegalStateException());
}
}

View File

@ -444,15 +444,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// then we can't be persistent
_generator.setPersistent(false);
_sendCallback.reset(info,content,lastContent,callback);
_sendCallback.iterate();
if(_sendCallback.reset(info,content,lastContent,callback))
_sendCallback.iterate();
}
@Override
public void send(ByteBuffer content, boolean lastContent, Callback callback)
{
_sendCallback.reset(null,content,lastContent,callback);
_sendCallback.iterate();
if (!lastContent && BufferUtil.isEmpty(content))
callback.succeeded();
else if (_sendCallback.reset(null,content,lastContent,callback))
_sendCallback.iterate();
}
protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
@ -565,7 +567,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
super(true);
}
private void reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
private boolean reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
{
if (reset())
{
@ -575,15 +577,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_callback = callback;
_header = null;
_shutdownOut = false;
return true;
}
else if (isClosed())
{
if (isClosed())
callback.failed(new EofException());
}
else
{
callback.failed(new WritePendingException());
}
return false;
}
@Override
@ -651,7 +652,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
getEndPoint().write(this, _content);
}
else
continue;
{
succeeded(); // nothing to write
}
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
@ -661,7 +664,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
case DONE:
{
releaseHeader();
return Action.SUCCEEDED;
}
case CONTINUE:

View File

@ -30,6 +30,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -41,6 +42,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -49,6 +51,7 @@ import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
@ -69,7 +72,7 @@ public class HttpConnectionTest
http.getHttpConfiguration().setResponseHeaderSize(1024);
connector = new LocalConnector(server,http,null);
connector.setIdleTimeout(500);
connector.setIdleTimeout(5000);
server.addConnector(connector);
server.setHandler(new DumpHandler());
ErrorHandler eh=new ErrorHandler();
@ -324,6 +327,31 @@ public class HttpConnectionTest
offset = checkContains(response,offset,"/R1");
offset = checkContains(response,offset,"12345");
}
@Test
public void testEmptyFlush() throws Exception
{
server.stop();
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
response.setStatus(200);
OutputStream out =response.getOutputStream();
out.flush();
out.flush();
}
});
server.start();
String response=connector.getResponses("GET / HTTP/1.1\n"+
"Host: localhost\n"+
"Connection: close\n"+
"\n");
assertThat(response, Matchers.containsString("200 OK"));
}
@Test
public void testCharset() throws Exception
@ -431,6 +459,7 @@ public class HttpConnectionTest
@Test
public void testUnconsumedTimeout() throws Exception
{
connector.setIdleTimeout(500);
String response=null;
String requests=null;
int offset=0;

View File

@ -186,16 +186,7 @@ public class SharedBlockingCallback
try
{
while (_state == null)
{
// TODO remove this debug timout!
// This is here to help debug 435322,
if (!_complete.await(10,TimeUnit.MINUTES))
{
IOException x = new IOException("DEBUG timeout");
LOG.warn("Blocked too long (please report!!!) "+this, x);
_state=x;
}
}
_complete.await();
if (_state == SUCCEEDED)
return;
@ -241,7 +232,11 @@ public class SharedBlockingCallback
if (_state == IDLE)
throw new IllegalStateException("IDLE");
if (_state == null)
LOG.debug("Blocker not complete",new Throwable());
{
LOG.warn("Blocker not complete {}",this);
if (LOG.isDebugEnabled())
LOG.debug(new Throwable());
}
}
finally
{
@ -249,6 +244,7 @@ public class SharedBlockingCallback
{
_state = IDLE;
_idle.signalAll();
_complete.signalAll();
}
finally
{
@ -263,7 +259,7 @@ public class SharedBlockingCallback
_lock.lock();
try
{
return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
return String.format("%s@%x{%s}",Blocker.class.getSimpleName(),hashCode(),_state);
}
finally
{