Test Harness for #596

Test harness to try to repeat problem with #596 of content-length added to a HEAD response.

In the process added a much better getResponse mechanism to the local connector that avoids
using the idle time.
This commit is contained in:
Greg Wilkins 2016-06-03 14:15:25 +10:00
parent ec7871718c
commit a2309057dc
7 changed files with 378 additions and 113 deletions

View File

@ -26,6 +26,8 @@ import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
@ -55,6 +57,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
};
private final Locker _locker = new Locker();
private final Condition _hasOutput = _locker.newCondition();
private final Queue<ByteBuffer> _inQ = new ArrayQueue<>();
private ByteBuffer _out;
private boolean _ishut;
@ -182,6 +185,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
_runFillable.run();
}
/* ------------------------------------------------------------ */
public void addInputAndExecute(ByteBuffer in)
{
boolean fillable=false;
@ -223,7 +227,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public ByteBuffer getOutput()
{
return _out;
try(Locker.Lock lock = _locker.lock())
{
return _out;
}
}
/* ------------------------------------------------------------ */
@ -251,8 +258,36 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public ByteBuffer takeOutput()
{
ByteBuffer b=_out;
_out=BufferUtil.allocate(b.capacity());
ByteBuffer b;
try(Locker.Lock lock = _locker.lock())
{
b=_out;
_out=BufferUtil.allocate(b.capacity());
}
getWriteFlusher().completeWrite();
return b;
}
/* ------------------------------------------------------------ */
/** Wait for some output
* @param time Time to wait
* @param unit Units for time to wait
* @return The buffer of output
* @throws InterruptedException
*/
public ByteBuffer waitForOutput(long time,TimeUnit unit) throws InterruptedException
{
ByteBuffer b;
try(Locker.Lock lock = _locker.lock())
{
if (BufferUtil.isEmpty(_out))
_hasOutput.await(time,unit);
b=_out;
_out=BufferUtil.allocate(b.capacity());
}
getWriteFlusher().completeWrite();
return b;
}
@ -283,7 +318,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public void setOutput(ByteBuffer out)
{
_out = out;
try(Locker.Lock lock = _locker.lock())
{
_out = out;
}
getWriteFlusher().completeWrite();
}
@ -349,6 +387,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
try(Locker.Lock lock = _locker.lock())
{
_oshut=true;
_hasOutput.signalAll();
if (_ishut && !_closed)
close=_closed=true;
}
@ -368,6 +407,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{
if (!_closed)
close=_closed=_ishut=_oshut=true;
_hasOutput.signalAll();
}
if (close)
super.close();
@ -439,41 +480,47 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
if (_closed)
throw new IOException("CLOSED");
if (_oshut)
throw new IOException("OSHUT");
boolean flushed=true;
boolean idle=true;
for (ByteBuffer b : buffers)
try(Locker.Lock lock = _locker.lock())
{
if (BufferUtil.hasContent(b))
if (_closed)
throw new IOException("CLOSED");
if (_oshut)
throw new IOException("OSHUT");
boolean idle=true;
for (ByteBuffer b : buffers)
{
if (_growOutput && b.remaining()>BufferUtil.space(_out))
{
BufferUtil.compact(_out);
if (b.remaining()>BufferUtil.space(_out))
{
ByteBuffer n = BufferUtil.allocate(_out.capacity()+b.remaining()*2);
BufferUtil.append(n,_out);
_out=n;
}
}
if (BufferUtil.append(_out,b)>0)
idle=false;
if (BufferUtil.hasContent(b))
{
flushed=false;
break;
if (_growOutput && b.remaining()>BufferUtil.space(_out))
{
BufferUtil.compact(_out);
if (b.remaining()>BufferUtil.space(_out))
{
ByteBuffer n = BufferUtil.allocate(_out.capacity()+b.remaining()*2);
BufferUtil.append(n,_out);
_out=n;
}
}
if (BufferUtil.append(_out,b)>0)
idle=false;
if (BufferUtil.hasContent(b))
{
flushed=false;
break;
}
}
}
if (!idle)
{
notIdle();
_hasOutput.signalAll();
}
}
if (!idle)
notIdle();
return flushed;
}
@ -483,13 +530,16 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public void reset()
{
getFillInterest().onClose();
getWriteFlusher().onClose();
_ishut=false;
_oshut=false;
_closed=false;
_inQ.clear();
BufferUtil.clear(_out);
try(Locker.Lock lock = _locker.lock())
{
getFillInterest().onClose();
getWriteFlusher().onClose();
_ishut=false;
_oshut=false;
_closed=false;
_inQ.clear();
BufferUtil.clear(_out);
}
}
/* ------------------------------------------------------------ */

View File

@ -27,10 +27,14 @@ import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ByteArrayOutputStream2;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
@ -76,7 +80,8 @@ public class LocalConnector extends AbstractConnector
* returned to the level it was before the requests.
* <p>
* This methods waits until the connection is closed or
* is idle for 1s before returning the responses.
* is idle for 5s before returning the responses.
* <p>Use {@link #getResponse(String)} for an alternative that does not wait for idle.
* @param requests the requests
* @return the responses
* @throws Exception if the requests fail
@ -92,6 +97,7 @@ public class LocalConnector extends AbstractConnector
* <p>
* This methods waits until the connection is closed or
* an idle period before returning the responses.
* <p>Use {@link #getResponse(String)} for an alternative that does not wait for idle.
* @param requests the requests
* @param idleFor The time the response stream must be idle for before returning
* @param units The units of idleFor
@ -109,7 +115,8 @@ public class LocalConnector extends AbstractConnector
* returned to the level it was before the requests.
* <p>
* This methods waits until the connection is closed or
* is idle for 1s before returning the responses.
* is idle for 5s before returning the responses.
* <p>Use {@link #getResponse(ByteBuffer)} for an alternative that does not wait for idle.
* @param requestsBuffer the requests
* @return the responses
* @throws Exception if the requests fail
@ -147,7 +154,7 @@ public class LocalConnector extends AbstractConnector
/**
* Execute a request and return the EndPoint through which
* responses can be received.
* multiple responses can be received or more input provided.
* @param rawRequest the request
* @return the local endpoint
*/
@ -181,9 +188,70 @@ public class LocalConnector extends AbstractConnector
connection.onOpen();
}
/** Get a single response using a parser to search for the end of the message.
* @param requestsBuffer The request to send
* @return ByteBuffer containing response or null.
* @throws Exception If there is a problem
*/
public ByteBuffer getResponse(ByteBuffer requestsBuffer) throws Exception
{
return getResponse(requestsBuffer,false,10,TimeUnit.SECONDS);
}
/** Get a single response using a parser to search for the end of the message.
* @param requestsBuffer The request to send
* @param head True if the response is for a head request
* @param time The time to wait
* @param unit The units of the wait
* @return ByteBuffer containing response or null.
* @throws Exception If there is a problem
*/
public ByteBuffer getResponse(ByteBuffer requestsBuffer,boolean head, long time,TimeUnit unit) throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
LocalEndPoint endp = executeRequest(requestsBuffer);
return endp.waitForResponse(head,time,unit);
}
/** Get a single response using a parser to search for the end of the message.
* @param rawRequest The request to send
* @return ByteBuffer containing response or null.
* @throws Exception If there is a problem
*/
public String getResponse(String rawRequest) throws Exception
{
return getResponse(rawRequest,false,10,TimeUnit.SECONDS);
}
/** Get a single response using a parser to search for the end of the message.
* @param rawRequest The request to send
* @param head True if the response is for a head request
* @param time The time to wait
* @param unit The units of the wait
* @return ByteBuffer containing response or null.
* @throws Exception If there is a problem
*/
public String getResponse(String rawRequest,boolean head, long time,TimeUnit unit) throws Exception
{
ByteBuffer requestsBuffer = BufferUtil.toBuffer(rawRequest, StandardCharsets.ISO_8859_1);
if (LOG.isDebugEnabled())
LOG.debug("request {}", BufferUtil.toUTF8String(requestsBuffer));
LocalEndPoint endp = executeRequest(requestsBuffer);
return BufferUtil.toString(endp.waitForResponse(head,time,unit), StandardCharsets.ISO_8859_1);
}
/** Local EndPoint
*/
public class LocalEndPoint extends ByteArrayEndPoint
{
private final CountDownLatch _closed = new CountDownLatch(1);
private ByteBuffer _responseData;
public LocalEndPoint()
{
@ -264,5 +332,104 @@ public class LocalConnector extends AbstractConnector
}
}
}
/** Wait for a response using a parser to detect the end of message
* @param head
* @param time
* @param unit
* @return Buffer containing full response or null for EOF;
* @throws Exception
*/
public ByteBuffer waitForResponse(boolean head, long time,TimeUnit unit) throws Exception
{
HttpParser.ResponseHandler handler = new HttpParser.ResponseHandler()
{
@Override
public void parsedHeader(HttpField field)
{
}
@Override
public boolean messageComplete()
{
return true;
}
@Override
public boolean headerComplete()
{
return false;
}
@Override
public int getHeaderCacheSize()
{
return 0;
}
@Override
public void earlyEOF()
{
}
@Override
public boolean content(ByteBuffer item)
{
return false;
}
@Override
public void badMessage(int status, String reason)
{
}
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
return false;
}
};
HttpParser parser = new HttpParser(handler);
parser.setHeadResponse(head);
try(ByteArrayOutputStream2 bout = new ByteArrayOutputStream2();)
{
loop: while(true)
{
// read a chunk of response
ByteBuffer chunk = BufferUtil.hasContent(_responseData)
? _responseData : waitForOutput(time,unit);
_responseData=null;
// Parse the content of this chunk
while (BufferUtil.hasContent(chunk))
{
int pos=chunk.position();
boolean complete=parser.parseNext(chunk);
if (chunk.position()==pos)
{
// Nothing consumed
if (BufferUtil.isEmpty(chunk))
break;
return null;
}
// Add all consumed bytes to the output stream
bout.write(chunk.array(),chunk.arrayOffset()+pos,chunk.position()-pos);
// If we are complete then break the outer loop
if (complete)
{
if (BufferUtil.hasContent(chunk))
_responseData=chunk;
break loop;
}
}
}
return ByteBuffer.wrap(bout.getBuf(),0,bout.getCount());
}
}
}
}

View File

@ -223,7 +223,8 @@ public class DumpHandler extends AbstractHandler
writer.flush();
// commit now
response.setContentLength(buf.size()+1000);
if (!Boolean.valueOf(request.getParameter("no-content-length")))
response.setContentLength(buf.size()+1000);
response.addHeader("Before-Flush",response.isCommitted()?"Committed???":"Not Committed");
buf.writeTo(out);
out.flush();

View File

@ -283,9 +283,9 @@ public class HttpConnectionTest
public void testHead() throws Exception
{
String responsePOST=connector.getResponses("POST /R1 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Connection: close\r\n"+
"\r\n");
"Host: localhost\r\n"+
"Connection: close\r\n"+
"\r\n");
String responseHEAD=connector.getResponses("HEAD /R1 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
@ -330,6 +330,55 @@ public class HttpConnectionTest
assertTrue(postHeaders.equals(headHeaders));
}
@Test
public void testHeadChunked() throws Exception
{
String responsePOST=connector.getResponse("POST /R1?no-content-length=true HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n",false,1,TimeUnit.SECONDS);
String responseHEAD=connector.getResponse("HEAD /R1?no-content-length=true HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n",true,1,TimeUnit.SECONDS);
String postLine;
boolean postDate=false;
Set<String> postHeaders = new HashSet<>();
try(BufferedReader in = new BufferedReader(new StringReader(responsePOST)))
{
postLine = in.readLine();
String line=in.readLine();
while (line!=null && line.length()>0)
{
if (line.startsWith("Date:"))
postDate=true;
else
postHeaders.add(line);
line=in.readLine();
}
}
String headLine;
boolean headDate=false;
Set<String> headHeaders = new HashSet<>();
try(BufferedReader in = new BufferedReader(new StringReader(responseHEAD)))
{
headLine = in.readLine();
String line=in.readLine();
while (line!=null && line.length()>0)
{
if (line.startsWith("Date:"))
headDate=true;
else
headHeaders.add(line);
line=in.readLine();
}
}
assertThat(postLine,equalTo(headLine));
assertThat(postDate,equalTo(headDate));
assertTrue(postHeaders.equals(headHeaders));
}
@Test
public void testBadHostPort() throws Exception
{

View File

@ -22,10 +22,12 @@ import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -91,7 +93,35 @@ public class LocalConnectorTest
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("pathInfo=/R1"));
}
@Test
public void testOneResponse_10() throws Exception
{
String response=_connector.getResponse("GET /R1 HTTP/1.0\r\n\r\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("pathInfo=/R1"));
}
@Test
public void testOneResponse_10_keep_alive() throws Exception
{
String response=_connector.getResponse("GET /R1 HTTP/1.0\r\n" +
"Connection: keep-alive\r\n" +
"\r\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("pathInfo=/R1"));
}
@Test
public void testOneResponse_11() throws Exception
{
String response=_connector.getResponse("GET /R1 HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n");
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("pathInfo=/R1"));
}
@Test
public void testStopStart() throws Exception
{
@ -125,6 +155,27 @@ public class LocalConnectorTest
assertThat(response,containsString("pathInfo=/R2"));
}
@Test
public void testTwoGETsParsed() throws Exception
{
LocalConnector.LocalEndPoint endp = _connector.executeRequest(
"GET /R1 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R2 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n");
String response = BufferUtil.toString(endp.waitForResponse(false,10,TimeUnit.SECONDS),StandardCharsets.ISO_8859_1);
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("pathInfo=/R1"));
response = BufferUtil.toString(endp.waitForResponse(false,10,TimeUnit.SECONDS),StandardCharsets.ISO_8859_1);
assertThat(response,containsString("HTTP/1.1 200 OK"));
assertThat(response,containsString("pathInfo=/R2"));
}
@Test
public void testManyGETs() throws Exception
{

View File

@ -18,15 +18,12 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* <p>This is a lock designed to protect VERY short sections of
* critical code. Threads attempting to take the lock will wait
* until the lock is available, thus it is important that
* the code protected by this lock is extremely simple and non
* blocking.</p>
* Convenience Lock Wrapper.
*
* <pre>
* try(SpinLock.Lock lock = locker.lock())
* {
@ -36,62 +33,24 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class Locker
{
private static final boolean SPIN = Boolean.getBoolean(Locker.class.getName() + ".spin");
private final boolean _spin;
private final ReentrantLock _lock = new ReentrantLock();
private final AtomicReference<Thread> _spinLockState = new AtomicReference<>(null);
private final Lock _unlock = new Lock();
public Locker()
{
this(SPIN);
}
public Locker(boolean spin)
{
this._spin = spin;
}
public Lock lock()
{
if (_spin)
spinLock();
else
concLock();
return _unlock;
}
private void spinLock()
{
Thread current = Thread.currentThread();
while (true)
{
// Using test-and-test-and-set for better performance.
Thread locker = _spinLockState.get();
if (locker != null || !_spinLockState.compareAndSet(null, current))
{
if (locker == current)
throw new IllegalStateException("Locker is not reentrant");
continue;
}
return;
}
}
private void concLock()
{
if (_lock.isHeldByCurrentThread())
throw new IllegalStateException("Locker is not reentrant");
_lock.lock();
return _unlock;
}
public boolean isLocked()
{
if (_spin)
return _spinLockState.get() != null;
else
return _lock.isLocked();
return _lock.isLocked();
}
public class Lock implements AutoCloseable
@ -99,10 +58,12 @@ public class Locker
@Override
public void close()
{
if (_spin)
_spinLockState.set(null);
else
_lock.unlock();
_lock.unlock();
}
}
public Condition newCondition()
{
return _lock.newCondition();
}
}

View File

@ -18,38 +18,24 @@
package org.eclipse.jetty.util.thread;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class LockerTest
{
@Parameterized.Parameters
public static Collection<Object[]> parameters()
public LockerTest()
{
return Arrays.asList(new Object[]{true}, new Object[]{false});
}
private boolean spin;
public LockerTest(boolean spin)
{
this.spin = spin;
}
@Test
public void testLocked()
{
Locker lock = new Locker(spin);
Locker lock = new Locker();
assertFalse(lock.isLocked());
try(Locker.Lock l = lock.lock())
@ -67,7 +53,7 @@ public class LockerTest
@Test
public void testLockedException()
{
Locker lock = new Locker(spin);
Locker lock = new Locker();
assertFalse(lock.isLocked());
try(Locker.Lock l = lock.lock())
@ -90,7 +76,7 @@ public class LockerTest
@Test
public void testContend() throws Exception
{
final Locker lock = new Locker(spin);
final Locker lock = new Locker();
final CountDownLatch held0 = new CountDownLatch(1);
final CountDownLatch hold0 = new CountDownLatch(1);