Initial implementation of the HTTP over SPDY layer.

This commit is contained in:
Simone Bordet 2012-02-07 15:17:45 +01:00
parent 4a03daa3c2
commit d21ce5599a
11 changed files with 922 additions and 81 deletions

View File

@ -12,7 +12,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jetty.version>7.6.0.v20120127</jetty.version>
<jetty.version>7.6.1-SNAPSHOT</jetty.version>
<jetty.npn.version>1.0.0-SNAPSHOT</jetty.npn.version>
</properties>
@ -30,7 +30,7 @@
<configuration>
<rules>
<requireJavaVersion>
<version>1.7</version>
<version>[1.7,)</version>
</requireJavaVersion>
</rules>
</configuration>

View File

@ -0,0 +1,109 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.nio.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.nio.AsyncSPDYConnection;
import org.eclipse.jetty.spdy.parser.Parser;
public class ServerSPDY2AsyncConnectionFactory implements AsyncConnectionFactory
{
private final ServerSessionFrameListener listener;
public ServerSPDY2AsyncConnectionFactory()
{
this(null);
}
public ServerSPDY2AsyncConnectionFactory(ServerSessionFrameListener listener)
{
this.listener = listener;
}
@Override
public String getProtocol()
{
return "spdy/2";
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{
CompressionFactory compressionFactory = new StandardCompressionFactory();
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(compressionFactory.newCompressor());
ServerSessionFrameListener listener = this.listener;
if (listener == null)
listener = newServerSessionFrameListener(endPoint, attachment);
ServerAsyncSPDYConnection connection = new ServerAsyncSPDYConnection(endPoint, parser, listener);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(connection, 2, listener, generator);
parser.addListener(session);
connection.setSession(session);
return connection;
}
protected ServerSessionFrameListener newServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
{
return listener;
}
private static class ServerAsyncSPDYConnection extends AsyncSPDYConnection
{
private final ServerSessionFrameListener listener;
private volatile Session session;
private ServerAsyncSPDYConnection(EndPoint endPoint, Parser parser, ServerSessionFrameListener listener)
{
super(endPoint, parser);
this.listener = listener;
}
@Override
public Connection handle() throws IOException
{
final Session session = this.session;
if (session != null)
{
// NPE guard to support tests
if (listener != null)
listener.onConnect(session);
this.session = null;
}
return super.handle();
}
private void setSession(Session session)
{
this.session = session;
}
}
}

View File

@ -17,6 +17,8 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
@ -39,6 +41,7 @@ import org.slf4j.LoggerFactory;
public class StandardStream implements IStream
{
private static final Logger logger = LoggerFactory.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final AtomicInteger windowSize = new AtomicInteger(65535);
private final ISession session;
private final SynStreamFrame frame;
@ -96,6 +99,24 @@ public class StandardStream implements IStream
return halfClosed;
}
@Override
public Object getAttribute(String key)
{
return attributes.get(key);
}
@Override
public void setAttribute(String key, Object value)
{
attributes.put(key, value);
}
@Override
public Object removeAttribute(String key)
{
return attributes.remove(key);
}
@Override
public void setFrameListener(FrameListener frameListener)
{

View File

@ -143,6 +143,12 @@ public class Headers implements Iterable<Headers.Header>
return headers.values().iterator();
}
@Override
public String toString()
{
return headers.toString();
}
public static class Header
{
private final String name;

View File

@ -38,6 +38,12 @@ public interface Stream
public boolean isHalfClosed();
public Object getAttribute(String key);
public void setAttribute(String key, Object value);
public Object removeAttribute(String key);
public interface FrameListener extends EventListener
{
public void onReply(Stream stream, ReplyInfo replyInfo);

View File

@ -23,9 +23,9 @@ import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
public class NoProtocolConnection extends AbstractConnection implements AsyncConnection
public class EmptyAsyncConnection extends AbstractConnection implements AsyncConnection
{
public NoProtocolConnection(AsyncEndPoint endPoint)
public EmptyAsyncConnection(AsyncEndPoint endPoint)
{
super(endPoint);
}

View File

@ -0,0 +1,226 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.nio;
import java.io.IOException;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.thread.Timeout;
public class EmptyAsyncEndPoint implements AsyncEndPoint
{
private boolean checkForIdle;
private Connection connection;
private boolean oshut;
private boolean ishut;
private boolean closed;
private int maxIdleTime;
@Override
public void asyncDispatch()
{
}
@Override
public void scheduleWrite()
{
}
@Override
public void onIdleExpired(long idleForMs)
{
}
@Override
public void setCheckForIdle(boolean check)
{
this.checkForIdle = check;
}
@Override
public boolean isCheckForIdle()
{
return checkForIdle;
}
@Override
public boolean isWritable()
{
return false;
}
@Override
public boolean hasProgressed()
{
return false;
}
@Override
public void scheduleTimeout(Timeout.Task task, long timeoutMs)
{
}
@Override
public void cancelTimeout(Timeout.Task task)
{
}
@Override
public Connection getConnection()
{
return connection;
}
@Override
public void setConnection(Connection connection)
{
this.connection = connection;
}
@Override
public void shutdownOutput() throws IOException
{
oshut = true;
}
@Override
public boolean isOutputShutdown()
{
return oshut;
}
@Override
public void shutdownInput() throws IOException
{
ishut = true;
}
@Override
public boolean isInputShutdown()
{
return ishut;
}
@Override
public void close() throws IOException
{
closed = true;
}
@Override
public int fill(Buffer buffer) throws IOException
{
return 0;
}
@Override
public int flush(Buffer buffer) throws IOException
{
return 0;
}
@Override
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
return 0;
}
@Override
public String getLocalAddr()
{
return null;
}
@Override
public String getLocalHost()
{
return null;
}
@Override
public int getLocalPort()
{
return -1;
}
@Override
public String getRemoteAddr()
{
return null;
}
@Override
public String getRemoteHost()
{
return null;
}
@Override
public int getRemotePort()
{
return -1;
}
@Override
public boolean isBlocking()
{
return false;
}
@Override
public boolean blockReadable(long millisecs) throws IOException
{
return false;
}
@Override
public boolean blockWritable(long millisecs) throws IOException
{
return false;
}
@Override
public boolean isOpen()
{
return !closed;
}
@Override
public Object getTransport()
{
return null;
}
@Override
public void flush() throws IOException
{
}
@Override
public int getMaxIdleTime()
{
return maxIdleTime;
}
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
this.maxIdleTime = timeMs;
}
}

View File

@ -254,7 +254,7 @@ public class SPDYClient
}
});
AsyncConnection connection = new NoProtocolConnection(sslEndPoint);
AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
sslEndPoint.setConnection(connection);
startHandshake(engine);

View File

@ -16,32 +16,25 @@
package org.eclipse.jetty.spdy.nio;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.spdy.CompressionFactory;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.StandardSession;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.ServerSPDY2AsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class SPDYServerConnector extends SelectChannelConnector
{
private final ServerSessionFrameListener listener;
private final List<AsyncConnectionFactory> factories = new CopyOnWriteArrayList<>();
private final SslContextFactory sslContextFactory;
public SPDYServerConnector(ServerSessionFrameListener listener)
@ -51,10 +44,16 @@ public class SPDYServerConnector extends SelectChannelConnector
public SPDYServerConnector(ServerSessionFrameListener listener, SslContextFactory sslContextFactory)
{
this.listener = listener;
this.sslContextFactory = sslContextFactory;
if (sslContextFactory != null)
addBean(sslContextFactory);
if (listener != null)
addAsyncConnectionFactory(new ServerSPDY2AsyncConnectionFactory(listener));
}
public void addAsyncConnectionFactory(AsyncConnectionFactory factory)
{
factories.add(factory);
}
@Override
@ -84,7 +83,7 @@ public class SPDYServerConnector extends SelectChannelConnector
}
});
AsyncConnection connection = new NoProtocolConnection(sslEndPoint);
AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
sslEndPoint.setConnection(connection);
startHandshake(engine);
@ -93,7 +92,7 @@ public class SPDYServerConnector extends SelectChannelConnector
}
else
{
AsyncConnectionFactory connectionFactory = new ServerSPDY2AsyncConnectionFactory();
AsyncConnectionFactory connectionFactory = getAsyncConnectionFactory("spdy/2");
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, null);
endPoint.setConnection(connection);
return connection;
@ -102,15 +101,20 @@ public class SPDYServerConnector extends SelectChannelConnector
protected List<String> provideProtocols()
{
// TODO: connectionFactories.map(AsyncConnectionFactory::getProtocol())
return Arrays.asList("spdy/2");
List<String> result = new ArrayList<>();
for (AsyncConnectionFactory factory : factories)
result.add(factory.getProtocol());
return result;
}
protected AsyncConnectionFactory getAsyncConnectionFactory(String protocol)
{
// TODO: select from existing AsyncConnectionFactories
return new ServerSPDY2AsyncConnectionFactory();
for (AsyncConnectionFactory factory : factories)
{
if (factory.getProtocol().equals(protocol))
return factory;
}
return null;
}
protected SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)
@ -133,61 +137,4 @@ public class SPDYServerConnector extends SelectChannelConnector
throw new RuntimeException(x);
}
}
private class ServerSPDY2AsyncConnectionFactory implements AsyncConnectionFactory
{
@Override
public String getProtocol()
{
return "spdy/2";
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{
CompressionFactory compressionFactory = new StandardCompressionFactory();
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(compressionFactory.newCompressor());
ServerAsyncSPDYConnection connection = new ServerAsyncSPDYConnection(endPoint, parser, listener);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(connection, 2, listener, generator);
parser.addListener(session);
connection.setSession(session);
return connection;
}
}
private class ServerAsyncSPDYConnection extends AsyncSPDYConnection
{
private final ServerSessionFrameListener listener;
private volatile Session session;
private ServerAsyncSPDYConnection(EndPoint endp, Parser parser, ServerSessionFrameListener listener)
{
super(endp, parser);
this.listener = listener;
}
@Override
public Connection handle() throws IOException
{
final Session session = this.session;
if (session != null)
{
// NPE guard to support tests
if (listener != null)
listener.onConnect(session);
this.session = null;
}
return super.handle();
}
private void setSession(Session session)
{
this.session = session;
}
}
}

View File

@ -0,0 +1,414 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.nio.http;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.server.AbstractHttpConnection;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.ServerSPDY2AsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.nio.EmptyAsyncEndPoint;
public class ServerHTTP11OverSPDY2AsyncConnectionFactory extends ServerSPDY2AsyncConnectionFactory
{
private final Connector connector;
public ServerHTTP11OverSPDY2AsyncConnectionFactory(Connector connector)
{
this.connector = connector;
}
@Override
protected ServerSessionFrameListener newServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
{
return new HTTPServerSessionFrameListener();
}
private class HTTPServerSessionFrameListener extends ServerSessionFrameListener.Adapter implements Stream.FrameListener
{
@Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{
// Every time we have a SYN, it maps to a HTTP request.
// We can have multiple concurrent SYNs on the same connection,
// and this is very different from HTTP, where only one request/response
// cycle is processed at a time, so we need to fake an http connection
// for each SYN in order to run concurrently.
try
{
HTTPSPDYConnection connection = new HTTPSPDYConnection(connector, new HTTPSPDYAsyncEndPoint(stream), connector.getServer(), stream);
stream.setAttribute("connection", connection);
Headers headers = synInfo.getHeaders();
if (headers.isEmpty())
{
// SYN with no headers, perhaps they'll come in a HEADER frame
return this;
}
else
{
boolean processed = processRequest(stream, headers);
if (!processed)
{
respond(stream, HttpStatus.BAD_REQUEST_400);
return null;
}
if (synInfo.isClose())
{
forwardHeadersComplete(stream);
forwardRequestComplete(stream);
return null;
}
else
{
return this;
}
}
}
catch (HttpException x)
{
respond(stream, x.getStatus());
return null;
}
catch (IOException x)
{
close(stream);
return null;
}
}
private boolean processRequest(Stream stream, Headers headers) throws IOException
{
Boolean requestSeen = (Boolean)stream.getAttribute("request");
if (requestSeen == null || !requestSeen)
{
stream.setAttribute("request", Boolean.TRUE);
Headers.Header method = headers.get("method");
Headers.Header uri = headers.get("url");
Headers.Header version = headers.get("version");
if (method == null || uri == null || version == null)
return false;
forwardRequest(stream, method.value(), uri.value(), version.value());
}
forwardHeaders(stream, headers);
return true;
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do nothing, servers cannot get replies
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
// TODO: support trailers
Boolean dataSeen = (Boolean)stream.getAttribute("data");
if (dataSeen != null && dataSeen)
return;
try
{
processRequest(stream, headersInfo.getHeaders());
if (headersInfo.isClose())
{
forwardHeadersComplete(stream);
forwardRequestComplete(stream);
}
}
catch (HttpException x)
{
respond(stream, x.getStatus());
}
catch (IOException x)
{
close(stream);
}
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
try
{
forwardHeadersComplete(stream);
stream.setAttribute("data", Boolean.TRUE);
ByteBuffer buffer = ByteBuffer.allocate(dataInfo.getBytesCount());
dataInfo.getBytes(buffer);
forwardContent(stream, buffer);
if (dataInfo.isClose())
forwardRequestComplete(stream);
}
catch (HttpException x)
{
respond(stream, x.getStatus());
}
catch (IOException x)
{
close(stream);
}
}
private void respond(Stream stream, int status)
{
Headers headers = new Headers();
headers.put("status", String.valueOf(status));
headers.put("version", "HTTP/1.1");
stream.reply(new ReplyInfo(headers, true));
}
private void forwardRequest(Stream stream, String method, String uri, String version) throws IOException
{
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
connection.startRequest(new ByteArrayBuffer(method), new ByteArrayBuffer(uri), new ByteArrayBuffer(version));
}
private void forwardHeaders(Stream stream, Headers headers) throws IOException
{
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
for (Headers.Header header : headers)
{
String name = header.name();
switch (name)
{
case "method":
case "version":
// Skip request line headers
continue;
case "url":
// Mangle the URL if the host header is missing
String host = parseHost(header.value());
// Jetty needs the host header, although HTTP 1.1 does not
// require it if it can be parsed from an absolute URI
if (host != null)
connection.parsedHeader(new ByteArrayBuffer("host"), new ByteArrayBuffer(host));
break;
case "connection":
case "keep-alive":
case "host":
// Spec says to ignore these headers
continue;
default:
// Spec says headers must be single valued
String value = header.value();
connection.parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value));
break;
}
}
}
private String parseHost(String url)
{
try
{
URI uri = new URI(url);
return uri.getHost() + (uri.getPort() > 0 ? ":" + uri.getPort() : "");
}
catch (URISyntaxException x)
{
return null;
}
}
private void forwardHeadersComplete(Stream stream) throws IOException
{
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
connection.headerComplete();
}
private void forwardContent(Stream stream, ByteBuffer buffer) throws IOException
{
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
connection.content(new IndirectNIOBuffer(buffer, false));
}
private void forwardRequestComplete(Stream stream) throws IOException
{
HTTPSPDYConnection connection = (HTTPSPDYConnection)stream.getAttribute("connection");
connection.messageComplete(0); // TODO: content length
}
private void close(Stream stream)
{
stream.getSession().goAway(stream.getVersion());
}
}
private class HTTPSPDYConnection extends AbstractHttpConnection
{
private HTTPSPDYConnection(Connector connector, EndPoint endPoint, Server server, Stream stream)
{
super(connector, endPoint, server,
new HttpParser(connector.getRequestBuffers(), endPoint, new HTTPSPDYParserHandler()),
new HTTPSPDYGenerator(connector.getResponseBuffers(), endPoint, stream), new HTTPSPDYRequest());
((HTTPSPDYRequest)getRequest()).setConnection(this);
getParser().setPersistent(true);
}
@Override
public Connection handle() throws IOException
{
return this;
}
public void startRequest(Buffer method, Buffer uri, Buffer version) throws IOException
{
super.startRequest(method, uri, version);
}
public void parsedHeader(Buffer name, Buffer value) throws IOException
{
super.parsedHeader(name, value);
}
public void headerComplete() throws IOException
{
super.headerComplete();
}
public void content(Buffer buffer) throws IOException
{
super.content(buffer);
}
public void messageComplete(long contentLength) throws IOException
{
super.messageComplete(contentLength);
}
}
private class HTTPSPDYAsyncEndPoint extends EmptyAsyncEndPoint
{
private final Stream stream;
private HTTPSPDYAsyncEndPoint(Stream stream)
{
this.stream = stream;
}
}
/**
* Empty implementation, since it won't parse anything
*/
private class HTTPSPDYParserHandler extends HttpParser.EventHandler
{
@Override
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
}
@Override
public void content(Buffer ref) throws IOException
{
}
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
}
}
private class HTTPSPDYGenerator extends HttpGenerator
{
private final Stream stream;
private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint, Stream stream)
{
super(buffers, endPoint);
this.stream = stream;
}
@Override
public void send1xx(int code) throws IOException
{
// TODO
}
@Override
public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException
{
Headers headers = new Headers();
StringBuilder status = new StringBuilder().append(_status);
if (_reason != null)
status.append(" ").append(_reason.toString("UTF-8"));
headers.put("status", status.toString());
headers.put("version", "HTTP/1.1");
for (int i = 0; i < fields.size(); ++i)
{
HttpFields.Field field = fields.getField(i);
headers.put(field.getName(), field.getValue());
}
stream.reply(new ReplyInfo(headers, allContentAdded));
}
@Override
public void addContent(Buffer content, boolean last) throws IOException
{
// TODO
}
@Override
public void complete() throws IOException
{
// Nothing to do
}
}
/**
* Needed only to please the compiler
*/
private class HTTPSPDYRequest extends Request
{
private void setConnection(HTTPSPDYConnection connection)
{
super.setConnection(connection);
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.nio.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.nio.SPDYClient;
import org.eclipse.jetty.spdy.nio.SPDYServerConnector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class HTTPOverSPDYTest
{
private Server server;
private SPDYServerConnector connector;
private SPDYClient.Factory clientFactory;
private Session session;
public void start(Handler handler, Session.FrameListener listener) throws Exception
{
server = new Server();
connector = new SPDYServerConnector(null);
server.addConnector(connector);
connector.addAsyncConnectionFactory(new ServerHTTP11OverSPDY2AsyncConnectionFactory(connector));
server.setHandler(handler);
server.start();
clientFactory = new SPDYClient.Factory();
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient();
session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), listener).get(5, TimeUnit.SECONDS);
}
@After
public void stop() throws Exception
{
session.goAway((short)2);
clientFactory.stop();
clientFactory.join();
server.stop();
server.join();
}
@Test
public void testSimpleGET() throws Exception
{
final String path = "/foo";
final CountDownLatch handlerLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
request.setHandled(true);
Assert.assertEquals(path, target);
Assert.assertEquals(httpRequest.getRequestURI(), path);
Assert.assertEquals(httpRequest.getHeader("host"), "localhost:" + connector.getLocalPort());
handlerLatch.countDown();
}
}, null);
Headers headers = new Headers();
headers.put("method", "GET");
headers.put("url", "http://localhost:" + connector.getLocalPort() + path);
headers.put("version", "HTTP/1.1");
final CountDownLatch replyLatch = new CountDownLatch(1);
session.syn((short)2, new SynInfo(headers, true), new Stream.FrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Assert.assertTrue(replyInfo.isClose());
Headers replyHeaders = replyInfo.getHeaders();
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
});
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
}
}