Put some more flesh on the HTTP2 implementation.

This commit is contained in:
Simone Bordet 2014-06-11 18:23:51 +02:00
parent 78cbed1236
commit 8e4c6b7fdd
21 changed files with 1021 additions and 61 deletions

View File

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-parent</artifactId>
<version>10.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>http2-client</artifactId>
<name>Jetty :: HTTP2 :: Client</name>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>manifest</goal>
</goals>
<configuration>
<instructions>
<Import-Package>javax.servlet.*;version="[2.6.0,3.2)",javax.net.*,*</Import-Package>
</instructions>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,169 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ErrorCode;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.parser.PrefaceParser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2Client extends ContainerLifeCycle
{
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final Executor executor;
private final Scheduler scheduler;
private final SelectorManager selector;
private final ByteBufferPool byteBufferPool;
public HTTP2Client()
{
this(new QueuedThreadPool());
}
public HTTP2Client(Executor executor)
{
this.executor = executor;
addBean(executor);
this.scheduler = new ScheduledExecutorScheduler();
addBean(scheduler, true);
this.selector = new ClientSelectorManager(executor, scheduler);
addBean(selector, true);
this.byteBufferPool = new MappedByteBufferPool();
addBean(byteBufferPool, true);
}
@Override
protected void doStop() throws Exception
{
closeConnections();
super.doStop();
}
public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
{
try
{
SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
channel.configureBlocking(false);
channel.connect(address);
selector.connect(channel, new Context(listener, promise));
}
catch (Throwable x)
{
promise.failed(x);
}
}
private void closeConnections()
{
for (Session session : sessions)
session.close(ErrorCode.NO_ERROR, null, Callback.Adapter.INSTANCE);
sessions.clear();
}
private class ClientSelectorManager extends SelectorManager
{
private ClientSelectorManager(Executor executor, Scheduler scheduler)
{
super(executor, scheduler);
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), 30000);
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
Context context = (Context)attachment;
Generator generator = new Generator(byteBufferPool, 4096);
HTTP2Session session = new HTTP2ClientSession(endpoint, generator, context.listener, new HTTP2FlowControl(), 65535);
Parser parser = new Parser(byteBufferPool, session);
Connection connection = new HTTP2ClientConnection(byteBufferPool, getExecutor(), endpoint, parser, 8192, session);
context.promise.succeeded(session);
return connection;
}
}
private class HTTP2ClientConnection extends HTTP2Connection
{
private final Session session;
public HTTP2ClientConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, int bufferSize, Session session)
{
super(byteBufferPool, executor, endpoint, parser, bufferSize);
this.session = session;
}
@Override
public void onOpen()
{
super.onOpen();
sessions.offer(session);
getEndPoint().write(Callback.Adapter.INSTANCE, ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES));
}
@Override
public void onClose()
{
super.onClose();
sessions.remove(session);
}
}
private class Context
{
private final Session.Listener listener;
private final Promise<Session> promise;
private Context(Session.Listener listener, Promise<Session> promise)
{
this.listener = listener;
this.promise = promise;
}
}
}

View File

@ -0,0 +1,77 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ErrorCode;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HTTP2ClientSession extends HTTP2Session
{
private static final Logger LOG = Log.getLogger(HTTP2ClientSession.class);
public HTTP2ClientSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize)
{
super(endPoint, generator, listener, flowControl, initialWindowSize, 1);
}
@Override
public boolean onHeaders(HeadersFrame frame)
{
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream == null)
{
ResetFrame reset = new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR);
reset(reset, disconnectCallback);
}
else
{
stream.updateClose(frame.isEndStream(), false);
stream.process(frame);
notifyHeaders(stream, frame);
if (stream.isClosed())
removeStream(stream, false);
}
return false;
}
private void notifyHeaders(IStream stream, HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
if (listener == null)
return;
try
{
listener.onHeaders(stream, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
}

View File

@ -0,0 +1,186 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.hpack.MetaData;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class HTTP2Test
{
private Server server;
private ServerConnector connector;
private String path;
private HTTP2Client client;
private void startServer(HttpServlet servlet) throws Exception
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server(serverExecutor);
connector = new ServerConnector(server, new HTTP2ServerConnectionFactory(new HttpConfiguration()));
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/");
path = "/test";
context.addServlet(new ServletHolder(servlet), path);
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client = new HTTP2Client(clientExecutor);
server.addBean(client);
server.start();
}
@After
public void dispose() throws Exception
{
server.stop();
}
@Test
public void testRequestNoContentResponseNoContent() throws Exception
{
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
}
});
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
InetSocketAddress address = new InetSocketAddress(host, port);
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(address, new Session.Listener.Adapter(), promise);
Session session = promise.get();
HttpFields fields = new HttpFields();
MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, "GET", authority, host, port, path, fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, true);
final CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Assert.assertTrue(stream.isClosed());
Assert.assertTrue(stream.getId() > 0);
Assert.assertTrue(frame.isEndStream());
Assert.assertEquals(stream.getId(), frame.getStreamId());
Assert.assertTrue(frame.getMetaData().isResponse());
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(200, response.getStatus());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testRequestNoContentResponseContent() throws Exception
{
final byte[] content = "Hello World!".getBytes(StandardCharsets.UTF_8);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
resp.getOutputStream().write(content);
}
});
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
InetSocketAddress address = new InetSocketAddress(host, port);
FuturePromise<Session> promise = new FuturePromise<>();
client.connect(address, new Session.Listener.Adapter(), promise);
Session session = promise.get();
HttpFields fields = new HttpFields();
MetaData.Request metaData = new MetaData.Request(HttpScheme.HTTP, "GET", authority, host, port, path, fields);
HeadersFrame frame = new HeadersFrame(1, metaData, null, true);
final CountDownLatch latch = new CountDownLatch(2);
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Assert.assertFalse(stream.isClosed());
Assert.assertTrue(stream.getId() > 0);
Assert.assertFalse(frame.isEndStream());
Assert.assertEquals(stream.getId(), frame.getStreamId());
Assert.assertTrue(frame.getMetaData().isResponse());
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(200, response.getStatus());
latch.countDown();
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
Assert.assertTrue(stream.isClosed());
Assert.assertTrue(frame.isEndStream());
Assert.assertEquals(ByteBuffer.wrap(content), frame.getData());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -0,0 +1,2 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.http2.LEVEL=DEBUG

View File

@ -0,0 +1,28 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
public interface FlowControl
{
public void onNewStream(IStream stream);
public int getWindowSize(ISession session);
public void setWindowSize(ISession session, int windowSize);
}

View File

@ -0,0 +1,38 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
public class HTTP2FlowControl implements FlowControl
{
@Override
public void onNewStream(IStream stream)
{
}
@Override
public int getWindowSize(ISession session)
{
return 0;
}
@Override
public void setWindowSize(ISession session, int windowSize)
{
}
}

View File

@ -19,10 +19,16 @@
package org.eclipse.jetty.http2;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
@ -34,9 +40,11 @@ import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ErrorCode;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Promise;
@ -47,24 +55,74 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
private static final Logger LOG = Log.getLogger(HTTP2Session.class);
protected final Callback disconnectCallback = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
disconnect();
}
};
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger streamCount = new AtomicInteger();
private final Flusher flusher = new Flusher();
private final EndPoint endPoint;
private final Generator generator;
private final Listener listener;
private final FlowControl flowControl;
private final int initialWindowSize;
private volatile int maxStreamCount;
public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener)
public HTTP2Session(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize, int initialStreamId)
{
this.endPoint = endPoint;
this.generator = generator;
this.listener = listener;
this.flowControl = flowControl;
this.initialWindowSize = initialWindowSize;
this.maxStreamCount = -1;
this.streamIds.set(initialStreamId);
}
public Generator getGenerator()
{
return generator;
}
public int getInitialWindowSize()
{
return initialWindowSize;
}
public int getMaxStreamCount()
{
return maxStreamCount;
}
public FlowControl getFlowControl()
{
return flowControl;
}
@Override
public boolean onData(DataFrame frame)
{
IStream stream = streams.get(frame.getStreamId());
return stream.process(frame);
int streamId = frame.getStreamId();
IStream stream = getStream(streamId);
if (stream != null)
{
stream.updateClose(frame.isEndStream(), false);
return stream.process(frame);
}
else
{
ResetFrame resetFrame = new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR);
reset(resetFrame, disconnectCallback);
return false;
}
}
@Override
@ -85,6 +143,20 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public boolean onSettings(SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
{
maxStreamCount = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
LOG.debug("Updated max concurrent streams to {}", maxStreamCount);
}
if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE))
{
int windowSize = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE);
setWindowSize(windowSize);
LOG.debug("Updated window size to {}", windowSize);
}
// TODO: handle other settings
notifySettings(this, frame);
return false;
}
@ -113,9 +185,29 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
@Override
public void newStream(HeadersFrame frame, Stream.Listener listener, Promise<Stream> promise)
public void newStream(HeadersFrame frame, final Promise<Stream> promise, Stream.Listener listener)
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
synchronized (this)
{
int streamId = streamIds.getAndAdd(2);
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getDependentStreamId(),
priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
final IStream stream = createLocalStream(frame);
if (stream == null)
{
promise.failed(new IllegalStateException());
return;
}
stream.updateClose(frame.isEndStream(), true);
stream.setListener(listener);
flusher.offer(generator.generate(frame, new PromiseCallback<>(promise, stream)));
}
// Iterate outside the synchronized block.
flusher.iterate();
}
@Override
@ -137,9 +229,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
@Override
public void close(GoAwayFrame frame, Callback callback)
public void close(int error, String reason, Callback callback)
{
byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
frame(frame, callback);
}
@Override
@ -154,9 +248,96 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
endPoint.close();
}
protected IStream putIfAbsent(IStream stream)
protected IStream createLocalStream(HeadersFrame frame)
{
return streams.putIfAbsent(stream.getId(), stream);
IStream stream = newStream(frame);
int streamId = stream.getId();
updateLastStreamId(streamId);
if (streams.putIfAbsent(streamId, stream) == null)
{
LOG.debug("Created local {}", stream);
return stream;
}
else
{
return null;
}
}
protected IStream createRemoteStream(HeadersFrame frame)
{
int streamId = frame.getStreamId();
// SPEC: exceeding max concurrent streams is treated as stream error.
while (true)
{
int currentStreams = streamCount.get();
int maxStreams = maxStreamCount;
if (maxStreams >= 0 && currentStreams >= maxStreams)
{
reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR), disconnectCallback);
return null;
}
if (streamCount.compareAndSet(currentStreams, currentStreams + 1))
break;
}
IStream stream = newStream(frame);
// SPEC: duplicate stream is treated as connection error.
if (streams.putIfAbsent(streamId, stream) == null)
{
updateLastStreamId(streamId);
LOG.debug("Created remote {}", stream);
return stream;
}
else
{
close(ErrorCode.PROTOCOL_ERROR, "duplicate_stream", disconnectCallback);
return null;
}
}
protected IStream newStream(HeadersFrame frame)
{
return new HTTP2Stream(this, frame);
}
protected void removeStream(IStream stream, boolean local)
{
IStream removed = streams.remove(stream.getId());
if (removed != null)
{
assert removed == stream;
if (local)
streamCount.decrementAndGet();
LOG.debug("Removed {}", stream);
}
}
@Override
public Collection<Stream> getStreams()
{
List<Stream> result = new ArrayList<>();
result.addAll(streams.values());
return result;
}
public IStream getStream(int streamId)
{
return streams.get(streamId);
}
private void updateLastStreamId(int streamId)
{
Atomics.updateMax(lastStreamId, streamId);
}
public void setWindowSize(int initialWindowSize)
{
flowControl.setWindowSize(this, initialWindowSize);
}
protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
@ -172,17 +353,34 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
protected void notifySettings(Session session, SettingsFrame frame)
{
try
{
listener.onSettings(session, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
private class Flusher extends IteratingCallback
{
private final ArrayQueue<Generator.LeaseCallback> queue = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH);
private Generator.LeaseCallback active;
private void flush(Generator.LeaseCallback lease)
private void offer(Generator.LeaseCallback lease)
{
synchronized (queue)
{
queue.offer(lease);
}
}
private void flush(Generator.LeaseCallback lease)
{
offer(lease);
iterate();
}
@ -222,4 +420,28 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
}
}
public class PromiseCallback<C> implements Callback
{
private final Promise<C> promise;
private final C value;
public PromiseCallback(Promise<C> promise, C value)
{
this.promise = promise;
this.value = value;
}
@Override
public void succeeded()
{
promise.succeeded(value);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
}
}

View File

@ -23,12 +23,18 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HTTP2Stream implements IStream
{
private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final ISession session;
private final HeadersFrame frame;
private Listener listener;
@ -81,6 +87,12 @@ public class HTTP2Stream implements IStream
return attributes().remove(key);
}
@Override
public boolean isClosed()
{
return closeState.get() == CloseState.CLOSED;
}
private ConcurrentMap<String, Object> attributes()
{
ConcurrentMap<String, Object> map = attributes.get();
@ -96,35 +108,108 @@ public class HTTP2Stream implements IStream
}
@Override
public void setListener(Listener listener)
public Listener getListener()
{
return listener;
}
@Override
public boolean process(DataFrame frame)
public void setListener(Listener listener)
{
return notifyData(frame);
this.listener = listener;
}
@Override
public boolean process(Frame frame)
{
switch (frame.getType())
{
case DATA:
{
return notifyData((DataFrame)frame);
}
case HEADERS:
{
return false;
}
default:
throw new UnsupportedOperationException();
}
}
@Override
public void updateClose(boolean update, boolean local)
{
if (LOG.isDebugEnabled())
LOG.debug("Update close for {} close={} local={}", this, update, local);
if (!update)
return;
while (true)
{
CloseState current = closeState.get();
switch (current)
{
case NOT_CLOSED:
{
CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
if (closeState.compareAndSet(current, newValue))
return;
break;
}
case LOCALLY_CLOSED:
{
if (local)
return;
if (closeState.compareAndSet(current, CloseState.CLOSED))
return;
break;
}
case REMOTELY_CLOSED:
{
if (!local)
return;
if (closeState.compareAndSet(current, CloseState.CLOSED))
return;
break;
}
default:
{
return;
}
}
}
}
protected boolean notifyData(DataFrame frame)
{
final Listener listener = this.listener;
listener.onData(this, frame, new Callback()
if (listener == null)
return false;
try
{
@Override
public void succeeded()
listener.onData(this, frame, new Callback()
{
// TODO: notify flow control
}
@Override
public void succeeded()
{
// TODO: notify flow control
}
@Override
public void failed(Throwable x)
{
// TODO: bail out
}
});
return true;
@Override
public void failed(Throwable x)
{
// TODO: bail out
}
});
return false;
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return false;
}
}
@Override
@ -132,4 +217,9 @@ public class HTTP2Stream implements IStream
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
private enum CloseState
{
NOT_CLOSED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
}
}

View File

@ -24,5 +24,8 @@ import org.eclipse.jetty.util.Callback;
public interface ISession extends Session
{
@Override
IStream getStream(int streamId);
public void frame(Frame frame, Callback callback);
}

View File

@ -19,14 +19,27 @@
package org.eclipse.jetty.http2;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
public interface IStream extends Stream
{
@Override
public ISession getSession();
public Listener getListener();
public void setListener(Listener listener);
public boolean process(DataFrame frame);
public boolean process(Frame frame);
/**
* Updates the close state of this stream.
*
* @param update whether to update the close state
* @param local whether the update comes from a local operation
* (such as sending a frame that ends the stream)
* or a remote operation (such as receiving a frame
* that ends the stream).
*/
public void updateClose(boolean update, boolean local);
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.http2.api;
import java.util.Collection;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
@ -28,7 +30,7 @@ import org.eclipse.jetty.util.Promise;
public interface Session
{
public void newStream(HeadersFrame frame, Stream.Listener listener, Promise<Stream> promise);
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener);
public void settings(SettingsFrame frame, Callback callback);
@ -36,7 +38,11 @@ public interface Session
public void reset(ResetFrame frame, Callback callback);
public void close(GoAwayFrame frame, Callback callback);
public void close(int error, String payload, Callback callback);
public Collection<Stream> getStreams();
public Stream getStream(int streamId);
// TODO: getStreams(), remote and local address, etc. see SPDY's Session

View File

@ -38,28 +38,36 @@ public interface Stream
public Object removeAttribute(String key);
public boolean isClosed();
// TODO: see SPDY's Stream
public interface Listener
{
public void onHeaders(Stream stream, HeadersFrame frame);
public void onData(Stream stream, DataFrame frame, Callback callback);
// TODO: is this method needed ?
public void onFailure(Stream stream, Throwable x);
// TODO: See SPDY's StreamFrameListener
public static class Adapter implements Listener
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
}
@Override
public void onFailure(Stream stream, Throwable x)
{
}
}
}

View File

@ -22,6 +22,11 @@ import java.util.Map;
public class SettingsFrame extends Frame
{
public static final int HEADER_TABLE_SIZE = 1;
public static final int ENABLE_PUSH = 2;
public static final int MAX_CONCURRENT_STREAMS = 3;
public static final int INITIAL_WINDOW_SIZE = 4;
private final Map<Integer, Integer> settings;
private final boolean reply;

View File

@ -27,11 +27,18 @@ import org.eclipse.jetty.util.Callback;
public class Generator
{
private final ByteBufferPool byteBufferPool;
private final int headerTableSize;
private final FrameGenerator[] generators;
public Generator(ByteBufferPool byteBufferPool)
{
this(byteBufferPool, 4096);
}
public Generator(ByteBufferPool byteBufferPool, int headerTableSize)
{
this.byteBufferPool = byteBufferPool;
this.headerTableSize = headerTableSize;
HeaderGenerator headerGenerator = new HeaderGenerator();
HpackEncoder encoder = new HpackEncoder(headerTableSize);
@ -52,6 +59,11 @@ public class Generator
}
public int getHeaderTableSize()
{
return headerTableSize;
}
public LeaseCallback generate(Frame frame, Callback callback)
{
LeaseCallback lease = new LeaseCallback(byteBufferPool, callback);

View File

@ -31,7 +31,6 @@ import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.hpack.HpackDecoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -74,11 +73,7 @@ public class Parser
public boolean parse(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
{
int l=Math.min(buffer.remaining(),32);
LOG.debug("Parsing "+TypeUtil.toHexString(buffer.array(),buffer.arrayOffset()+buffer.position(),l)+(l<buffer.remaining()?"...":""));
}
LOG.debug("Parsing {}", buffer);
while (true)
{
@ -132,7 +127,7 @@ public class Parser
case COMPLETE:
{
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame, synchronous processing: {}", FrameType.from(type),bodyParser);
LOG.debug("Parsed {} frame, synchronous processing", FrameType.from(type));
reset();
break;
}

View File

@ -55,6 +55,7 @@ public class PrefaceParser
if (cursor == PREFACE_BYTES.length)
{
cursor = 0;
LOG.debug("Parsed preface bytes");
return true;
}
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.HTTP2FlowControl;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
@ -44,6 +45,7 @@ public class HTTP2ServerConnectionFactory extends AbstractConnectionFactory
private final HttpConfiguration httpConfiguration;
private int headerTableSize = 4096;
private int initialWindowSize = 65535;
public HTTP2ServerConnectionFactory(HttpConfiguration httpConfiguration)
{
@ -61,13 +63,24 @@ public class HTTP2ServerConnectionFactory extends AbstractConnectionFactory
this.headerTableSize = headerTableSize;
}
public int getInitialWindowSize()
{
return initialWindowSize;
}
public void setInitialWindowSize(int initialWindowSize)
{
this.initialWindowSize = initialWindowSize;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
Session.Listener listener = new HTTPServerSessionListener(connector, httpConfiguration, endPoint);
Generator generator = new Generator(connector.getByteBufferPool(), getHeaderTableSize());
HTTP2ServerSession session = new HTTP2ServerSession(endPoint, generator, listener);
HTTP2ServerSession session = new HTTP2ServerSession(endPoint, generator, listener, new HTTP2FlowControl(),
getInitialWindowSize());
Parser parser = new ServerParser(connector.getByteBufferPool(), session);
HTTP2Connection connection = new HTTP2Connection(connector.getByteBufferPool(), connector.getExecutor(),
@ -104,6 +117,12 @@ public class HTTP2ServerConnectionFactory extends AbstractConnectionFactory
return frame.isEndStream() ? null : this;
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
// Servers do not receive responses.
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{

View File

@ -20,8 +20,8 @@ package org.eclipse.jetty.http2.server;
import java.util.HashMap;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.HTTP2Stream;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
@ -29,43 +29,42 @@ import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Listener
{
public HTTP2ServerSession(EndPoint endPoint, Generator generator, Listener listener)
public HTTP2ServerSession(EndPoint endPoint, Generator generator, Listener listener, FlowControl flowControl, int initialWindowSize)
{
super(endPoint, generator, listener);
super(endPoint, generator, listener, flowControl, initialWindowSize, 2);
}
@Override
public boolean onPreface()
{
frame(new SettingsFrame(new HashMap<Integer, Integer>(), false), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
// If cannot write the SETTINGS frame, hard disconnect.
disconnect();
}
});
// SPEC: send a SETTINGS frame upon receiving the preface.
HashMap<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.HEADER_TABLE_SIZE, getGenerator().getHeaderTableSize());
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialWindowSize());
int maxConcurrentStreams = getMaxStreamCount();
if (maxConcurrentStreams >= 0)
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);
SettingsFrame frame = new SettingsFrame(settings, false);
settings(frame, disconnectCallback);
return false;
}
@Override
public boolean onHeaders(HeadersFrame frame)
{
// TODO: handle max concurrent streams
// TODO: handle duplicate streams
// TODO: handle empty headers
IStream stream = new HTTP2Stream(this, frame);
IStream existing = putIfAbsent(stream);
if (existing == null)
IStream stream = createRemoteStream(frame);
if (stream != null)
{
stream.updateClose(frame.isEndStream(), false);
stream.process(frame);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
// The listener may have sent a frame that closed the stream.
if (stream.isClosed())
removeStream(stream, false);
}
return false;
}

View File

@ -19,10 +19,10 @@
package org.eclipse.jetty.http2.server;
import java.nio.ByteBuffer;
import java.util.List;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
@ -78,12 +78,20 @@ public class HttpChannelOverHTTP2 extends HttpChannel<ByteBufferCallback>
parsedHostHeader(requestMetaData.getHost(), requestMetaData.getPort());
// The specification says user agents MUST support gzip encoding.
// Based on that, some browser does not send the header, but it's
// important that applications can find it (e.g. GzipFilter).
boolean hasAcceptEncodingGzip = false;
HttpFields fields = requestMetaData.getFields();
for (int i = 0; i < fields.size(); ++i)
{
HttpField field = fields.getField(i);
if (HttpHeader.ACCEPT_ENCODING.is(field.getName()))
hasAcceptEncodingGzip = field.getValue().contains("gzip");
parsedHeader(field);
}
if (!hasAcceptEncodingGzip)
parsedHeader(new HttpField(HttpHeader.ACCEPT_ENCODING, "gzip"));
headerComplete();

View File

@ -15,6 +15,7 @@
<modules>
<module>http2-hpack</module>
<module>http2-common</module>
<module>http2-client</module>
<module>http2-server</module>
</modules>