Merge branch 'master' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project

This commit is contained in:
Joakim Erdfelt 2014-01-14 15:10:11 -07:00
commit e3e402c751
11 changed files with 160 additions and 149 deletions

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.client.http;
import java.util.Enumeration;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Result;
@ -79,28 +77,13 @@ public class HttpChannelOverHTTP extends HttpChannel
public void exchangeTerminated(Result result)
{
super.exchangeTerminated(result);
if (result.isSucceeded())
{
HttpFields responseHeaders = result.getResponse().getHeaders();
Enumeration<String> values = responseHeaders.getValues(HttpHeader.CONNECTION.asString(), ",");
if (values != null)
{
while (values.hasMoreElements())
{
if (HttpHeaderValue.CLOSE.asString().equalsIgnoreCase(values.nextElement()))
{
connection.close();
return;
}
}
}
connection.release();
}
else
{
boolean close = result.isFailed();
HttpFields responseHeaders = result.getResponse().getHeaders();
close |= responseHeaders.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
if (close)
connection.close();
}
else
connection.release();
}
@Override

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
@ -30,37 +29,19 @@ import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
* <p>{@link HttpInput} holds a queue of items passed to it by calls to {@link #content(T)}.</p>
* <p>{@link HttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
* but simply holds references to the item, thus the caller must organize for those buffers to valid while
* held by this class.</p>
* <p>To assist the caller, subclasses may override methods {@link #onContentQueued(T)},
* {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the
* caller will know when buffers are queued and consumed.</p>
*/
/**
* @author gregw
*
* @param <T>
*/
/**
* @author gregw
*
* @param <T>
*/
public abstract class HttpInput<T> extends ServletInputStream implements Runnable
{
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final byte[] _oneByteBuffer = new byte[1];
private HttpChannelState _channelState;
private Throwable _onError;
private ReadListener _listener;
private boolean _notReady;
protected State _state = BLOCKING;
private State _eof=null;
private final Object _lock;
private HttpChannelState _channelState;
private ReadListener _listener;
private Throwable _onError;
private boolean _notReady;
private State _state = BLOCKING;
private State _eof;
private long _contentRead;
protected HttpInput()
@ -70,7 +51,15 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
protected HttpInput(Object lock)
{
_lock=lock==null?this:lock;
_lock = lock == null ? this : lock;
}
public void init(HttpChannelState state)
{
synchronized (lock())
{
_channelState = state;
}
}
public final Object lock()
@ -89,43 +78,6 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
}
}
/**
* Access the next content to be consumed from. Returning the next item does not consume it
* and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)}
* or {@link #consume(Object, int)} are required to consume data from the content.
* @return Content or null if none available.
* @throws IOException
*/
protected abstract T nextContent() throws IOException;
/**
* A convenience method to call nextContent and to check the return value, which if null then the
* a check is made for EOF and the state changed accordingly.
* @see #nextContent()
* @return Content or null if none available.
* @throws IOException
*/
protected T getNextContent() throws IOException
{
T content=nextContent();
if (content==null && _eof!=null)
{
LOG.debug("{} eof {}",this,_eof);
_state=_eof;
_eof=null;
}
return content;
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
return read < 0 ? -1 : 0xff & _oneByteBuffer[0];
}
@Override
public int available()
{
@ -143,6 +95,13 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
}
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
return read < 0 ? -1 : 0xff & _oneByteBuffer[0];
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
@ -171,6 +130,36 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
return l;
}
/**
* A convenience method to call nextContent and to check the return value, which if null then the
* a check is made for EOF and the state changed accordingly.
* @see #nextContent()
* @return Content or null if none available.
* @throws IOException
*/
protected T getNextContent() throws IOException
{
T content=nextContent();
if (content==null && _eof!=null)
{
LOG.debug("{} eof {}",this,_eof);
_state=_eof;
_eof=null;
}
return content;
}
/**
* Access the next content to be consumed from. Returning the next item does not consume it
* and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)}
* or {@link #consume(Object, int)} are required to consume data from the content.
* @return Content or null if none available.
* @throws IOException
*/
protected abstract T nextContent() throws IOException;
protected abstract int remaining(T item);
protected abstract int get(T item, byte[] buffer, int offset, int length);
@ -178,10 +167,15 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
protected abstract void consume(T item, int length);
protected abstract void blockForContent() throws IOException;
/** Add some content to the input stream
* @param item
*/
public abstract void content(T item);
protected boolean onAsyncRead()
{
if (_listener==null)
if (_listener == null)
return false;
_channelState.onReadPossible();
return true;
@ -194,11 +188,6 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
return _contentRead;
}
}
/** Add some content to the input stream
* @param item
*/
public abstract void content(T item);
/** This method should be called to signal to the HttpInput
@ -442,13 +431,4 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
return "EOF";
}
};
public void init(HttpChannelState state)
{
synchronized (lock())
{
_channelState=state;
}
}
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.util.ArrayQueue;
@ -44,8 +43,34 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
public QueuedHttpInput()
{}
{
}
/** Add some content to the input stream
* @param item
*/
public void content(T item)
{
// The buffer is not copied here. This relies on the caller not recycling the buffer
// until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are
// the signals to the caller that the buffers can be recycled.
synchronized (lock())
{
boolean empty=_inputQ.isEmpty();
_inputQ.add(item);
if (empty)
{
if (!onAsyncRead())
lock().notify();
}
LOG.debug("{} queued {}", this, item);
}
}
public void recycle()
{
synchronized (lock())
@ -84,13 +109,11 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
return item;
}
protected abstract void onContentConsumed(T item);
protected void blockForContent() throws IOException
{
synchronized (lock())
{
while (_inputQ.isEmpty() && !_state.isEOF())
while (_inputQ.isEmpty() && !isFinished())
{
try
{
@ -105,41 +128,14 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
}
}
protected abstract void onContentConsumed(T item);
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal all available content has been consumed
*/
protected void onAllContentConsumed()
{
}
/* ------------------------------------------------------------ */
/** Add some content to the input stream
* @param item
*/
public void content(T item)
{
// The buffer is not copied here. This relies on the caller not recycling the buffer
// until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are
// the signals to the caller that the buffers can be recycled.
synchronized (lock())
{
boolean empty=_inputQ.isEmpty();
_inputQ.add(item);
if (empty)
{
if (!onAsyncRead())
lock().notify();
}
LOG.debug("{} queued {}", this, item);
}
}
public void earlyEOF()
{
synchronized (lock())
@ -157,5 +153,4 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
lock().notify();
}
}
}

View File

@ -392,6 +392,13 @@ public class SPDYClient
return Collections.unmodifiableCollection(sessions);
}
@Override
protected void dumpThis(Appendable out) throws IOException
{
super.dumpThis(out);
dump(out, "", sessions);
}
private class ClientSelectorManager extends SelectorManager
{
private ClientSelectorManager(Executor executor, Scheduler scheduler)

View File

@ -180,6 +180,7 @@ public class Flusher
// Has the stream been reset for this data frame ?
if (stream != null && stream.isReset() && frameBytes instanceof StandardSession.DataFrameBytes)
{
// TODO: notify from within sync block !
frameBytes.failed(new StreamException(frameBytes.getStream().getId(),
StreamStatus.INVALID_STREAM, "Stream: " + frameBytes.getStream() + " is reset!"));
continue;

View File

@ -0,0 +1,9 @@
[name]
npn-boot
[files]
http://central.maven.org/maven2/org/mortbay/jetty/npn/npn-boot/1.1.6.v20130911/npn-boot-1.1.6.v20130911.jar:lib/npn/npn-boot-1.1.6.v20130911.jar
[ini-template]
--exec
-Xbootclasspath/p:lib/npn/npn-boot-1.1.6.v20130911.jar

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.spdy.server;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
@ -182,6 +183,13 @@ public class SPDYServerConnectionFactory extends AbstractConnectionFactory
return Collections.unmodifiableCollection(sessions);
}
@Override
protected void dumpThis(Appendable out) throws IOException
{
super.dumpThis(out);
dump(out, "", sessions);
}
private class ServerSPDYConnection extends SPDYConnection implements Runnable
{
private final ServerSessionFrameListener listener;

View File

@ -57,12 +57,12 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Ignore;
import org.junit.Test;
public class SynDataReplyDataLoadTest extends AbstractTest
{
private static final int TIMEOUT = 60000;
private static final int TIMEOUT = 60 * 1000;
private static final Logger logger = Log.getLogger(SynDataReplyDataLoadTest.class);
@Test(timeout = TIMEOUT)
@ -104,14 +104,20 @@ public class SynDataReplyDataLoadTest extends AbstractTest
};
}
};
short spdyVersion = SPDY.V2;
long idleTimeout = 2 * TIMEOUT;
server = newServer();
connector = new ServerConnector(server, null, null, serverBufferPool, 1,
Runtime.getRuntime().availableProcessors() / 2, new SPDYServerConnectionFactory(SPDY.V3, listener));
Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
new SPDYServerConnectionFactory(spdyVersion, listener));
connector.setIdleTimeout(idleTimeout);
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName(clientExecutor.getName() + "-client");
clientFactory = new SPDYClient.Factory(clientExecutor, null, clientBufferPool, null, 30000);
final Session session = startClient(SPDY.V3, startServer(SPDY.V3, listener), null);
clientFactory = new SPDYClient.Factory(clientExecutor, null, clientBufferPool, null, idleTimeout);
final Session session = startClient(spdyVersion, startServer(spdyVersion, listener), null);
final Thread testThread = Thread.currentThread();
Runnable timeout = new Runnable()
@ -162,7 +168,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
}
});
}
Scheduler.Task timeoutTask = clientFactory.getScheduler().schedule(timeout, TIMEOUT / 2, TimeUnit.MILLISECONDS);
Scheduler.Task syncTimeoutTask = clientFactory.getScheduler().schedule(timeout, TIMEOUT / 2, TimeUnit.MILLISECONDS);
{
long begin = System.nanoTime();
List<Future<Object>> futures = threadPool.invokeAll(tasks);
@ -172,7 +178,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
long end = System.nanoTime();
System.err.printf("SYN+GET+DATA+GET completed in %d ms%n", TimeUnit.NANOSECONDS.toMillis(end - begin));
}
timeoutTask.cancel();
syncTimeoutTask.cancel();
tasks.clear();
for (int i = 0; i < count; ++i)
@ -187,7 +193,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
}
});
}
timeoutTask = clientFactory.getScheduler().schedule(timeout, TIMEOUT / 2, TimeUnit.MILLISECONDS);
Scheduler.Task asyncTimeoutTask = clientFactory.getScheduler().schedule(timeout, TIMEOUT / 2, TimeUnit.MILLISECONDS);
{
long begin = System.nanoTime();
List<Future<Object>> futures = threadPool.invokeAll(tasks);
@ -197,7 +203,8 @@ public class SynDataReplyDataLoadTest extends AbstractTest
long end = System.nanoTime();
System.err.printf("SYN+COMPLETED+DATA completed in %d ms%n", TimeUnit.NANOSECONDS.toMillis(end - begin));
}
timeoutTask.cancel();
asyncTimeoutTask.cancel();
threadPool.shutdown();
Assert.assertEquals(0, leaks.get());
@ -206,7 +213,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
private void synCompletedData(Session session, Fields headers, int iterations) throws Exception
{
final Map<Integer, Integer> counter = new ConcurrentHashMap<>(iterations);
final CountDownLatch latch = new CountDownLatch(2 * iterations);
final CountDownLatch requestsLatch = new CountDownLatch(2 * iterations);
for (int i = 0; i < iterations; ++i)
{
final AtomicInteger count = new AtomicInteger(2);
@ -218,7 +225,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Assert.assertEquals(2, count.getAndDecrement());
latch.countDown();
requestsLatch.countDown();
}
@Override
@ -230,7 +237,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
{
Assert.assertEquals(1, count.getAndDecrement());
counter.remove(index);
latch.countDown();
requestsLatch.countDown();
}
}
}, new Promise.Adapter<Stream>()
@ -244,7 +251,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
}
);
}
Assert.assertTrue(latch.await(iterations, TimeUnit.SECONDS));
Assert.assertTrue(requestsLatch.await(iterations, TimeUnit.SECONDS));
Assert.assertTrue(counter.toString(), counter.isEmpty());
}

View File

@ -1,2 +1,2 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.spdy.LEVEL=WARN
#org.eclipse.jetty.spdy.LEVEL=DEBUG

View File

@ -0,0 +1,9 @@
[name]
npn-boot
[files]
http://central.maven.org/maven2/org/mortbay/jetty/npn/npn-boot/1.1.6.v20130911/npn-boot-1.1.6.v20130911.jar:lib/npn/npn-boot-1.1.6.v20130911.jar
[ini-template]
--exec
-Xbootclasspath/p:lib/npn/npn-boot-1.1.6.v20130911.jar

12
pom.xml
View File

@ -908,6 +908,18 @@
<npn.version>1.1.6.v20130911</npn.version>
</properties>
</profile>
<profile>
<id>7u51</id>
<activation>
<property>
<name>java.version</name>
<value>1.7.0_51</value>
</property>
</activation>
<properties>
<npn.version>1.1.6.v20130911</npn.version>
</properties>
</profile>
</profiles>
</project>