Jetty9 - Greatly simplified HttpChannelOverSPDY.

Removed old code that was not necessary anymore.
This commit is contained in:
Simone Bordet 2012-08-28 09:33:11 +02:00
parent 34ee7101cc
commit 156f07d493
4 changed files with 93 additions and 309 deletions

View File

@ -18,13 +18,10 @@
package org.eclipse.jetty.spdy.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
@ -33,7 +30,6 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
@ -41,17 +37,14 @@ import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpChannelOverSPDY extends HttpChannel
public class HttpChannelOverSPDY extends HttpChannel<DataInfo>
{
private static final Logger LOG = Log.getLogger(HttpChannelOverSPDY.class);
private final Queue<Runnable> tasks = new LinkedList<>();
private final Stream stream;
private Headers headers; // No need for volatile, guarded by state
private DataInfo dataInfo; // No need for volatile, guarded by state
private ByteBuffer buffer; // No need for volatile, guarded by state
private volatile State state = State.INITIAL;
private boolean dispatched; // Guarded by synchronization on tasks
private boolean headersComplete;
public HttpChannelOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInputOverSPDY input, Stream stream)
{
@ -59,109 +52,11 @@ public class HttpChannelOverSPDY extends HttpChannel
this.stream = stream;
}
private void handle()
@Override
public boolean headerComplete()
{
switch (state)
{
case INITIAL:
{
break;
}
case REQUEST:
{
short version = stream.getSession().getVersion();
Headers.Header methodHeader = headers.get(HTTPSPDYHeader.METHOD.name(version));
Headers.Header uriHeader = headers.get(HTTPSPDYHeader.URI.name(version));
Headers.Header versionHeader = headers.get(HTTPSPDYHeader.VERSION.name(version));
if (methodHeader == null || uriHeader == null || versionHeader == null)
{
badMessage(400, "Missing required request line elements");
break;
}
HttpMethod httpMethod = HttpMethod.fromString(methodHeader.value());
HttpVersion httpVersion = HttpVersion.fromString(versionHeader.value());
String uriString = uriHeader.value();
LOG.debug("HTTP > {} {} {}", httpMethod, uriString, httpVersion);
startRequest(httpMethod, httpMethod.asString(), uriString, httpVersion);
Headers.Header schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(version));
if (schemeHeader != null)
getRequest().setScheme(schemeHeader.value());
updateState(State.HEADERS);
handle();
break;
}
case HEADERS:
{
for (Headers.Header header : headers)
{
String name = header.name();
HttpHeader httpHeader = HttpHeader.CACHE.get(name);
// Skip special SPDY headers, unless it's the "host" header
HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(stream.getSession().getVersion(), name);
if (specialHeader != null)
{
if (specialHeader == HTTPSPDYHeader.HOST)
name = "host";
else
continue;
}
switch (name)
{
case "connection":
case "keep-alive":
case "proxy-connection":
case "transfer-encoding":
{
// Spec says to ignore these headers
continue;
}
default:
{
// Spec says headers must be single valued
String value = header.value();
LOG.debug("HTTP > {}: {}", name, value);
parsedHeader(httpHeader, name, value);
break;
}
}
}
break;
}
case HEADERS_COMPLETE:
{
if (headerComplete())
run();
break;
}
case CONTENT:
{
run();
break;
}
case FINAL:
{
if (messageComplete(0))
run();
break;
}
case ASYNC:
{
// TODO:
// handleRequest();
break;
}
default:
{
throw new IllegalStateException();
}
}
headersComplete = true;
return super.headerComplete();
}
private void post(Runnable task)
@ -202,50 +97,39 @@ public class HttpChannelOverSPDY extends HttpChannel
}
}
private void updateState(State newState)
public void requestStart(final Headers headers, final boolean endRequest)
{
LOG.debug("State update {} -> {}", state, newState);
state = newState;
if (!headers.isEmpty())
requestHeaders(headers, endRequest);
}
private void close(Stream stream)
public void requestHeaders(Headers headers, boolean endRequest)
{
stream.getSession().goAway();
}
boolean proceed = performBeginRequest(headers);
if (!proceed)
return;
public void beginRequest(final Headers headers, final boolean endRequest)
{
this.headers = headers.isEmpty() ? null : headers;
post(new Runnable()
performHeaders(headers);
if (endRequest)
{
@Override
public void run()
{
if (!headers.isEmpty())
updateState(State.REQUEST);
handle();
if (endRequest)
performEndRequest();
}
});
if (headerComplete())
post(this);
if (messageComplete(-1))
post(this);
}
}
public void headers(Headers headers)
public void requestContent(final DataInfo dataInfo, boolean endRequest)
{
this.headers = headers;
post(new Runnable()
if (!headersComplete)
{
@Override
public void run()
{
updateState(state == State.INITIAL ? State.REQUEST : State.HEADERS);
handle();
}
});
}
if (headerComplete())
post(this);
}
LOG.debug("HTTP > {} bytes of content", dataInfo.length());
public void content(final DataInfo dataInfo, boolean endRequest)
{
// We need to copy the dataInfo since we do not know when its bytes
// will be consumed. When the copy is consumed, we consume also the
// original, so the implementation can send a window update.
@ -261,132 +145,78 @@ public class HttpChannelOverSPDY extends HttpChannel
};
LOG.debug("Queuing last={} content {}", endRequest, copyDataInfo);
HttpInputOverSPDY input = (HttpInputOverSPDY)getRequest().getHttpInput();
input.offer(copyDataInfo, endRequest);
input.content(copyDataInfo);
if (content(copyDataInfo))
post(this);
if (endRequest)
input.shutdown();
post(new Runnable()
{
@Override
public void run()
{
LOG.debug("HTTP > {} bytes of content", dataInfo.length());
if (state == State.HEADERS)
{
updateState(State.HEADERS_COMPLETE);
handle();
}
updateState(State.CONTENT);
handle();
}
});
}
public void endRequest()
{
post(new Runnable()
{
public void run()
{
performEndRequest();
}
});
}
private void performEndRequest()
{
if (state == State.HEADERS)
{
updateState(State.HEADERS_COMPLETE);
handle();
if (messageComplete(-1))
post(this);
}
updateState(State.FINAL);
handle();
}
// @Override
protected int write(ByteBuffer content) throws IOException
private boolean performBeginRequest(Headers headers)
{
LOG.debug("write");
return 0;
}
// @Override
protected void commitResponse(HttpGenerator.ResponseInfo info, ByteBuffer content) throws IOException
{
LOG.debug("commitResponse");
}
// @Override
protected int getContentBufferSize()
{
LOG.debug("getContentBufferSize");
return 0;
}
// @Override
protected void increaseContentBufferSize(int size)
{
LOG.debug("increaseContentBufferSize");
}
// @Override
protected void resetBuffer()
{
LOG.debug("resetBuffer");
}
// @Override
protected void flushResponse() throws IOException
{
LOG.debug("flushResponse");
}
// @Override
protected void completeResponse() throws IOException
{
LOG.debug("completeResponse");
// commit();
Response response = getResponse();
Headers headers = new Headers();
short version = stream.getSession().getVersion();
headers.put(HTTPSPDYHeader.VERSION.name(version), HttpVersion.HTTP_1_1.asString());
StringBuilder status = new StringBuilder().append(response.getStatus());
String reason = response.getReason();
if (reason != null)
status.append(" ").append(reason.toString());
headers.put(HTTPSPDYHeader.STATUS.name(version), status.toString());
LOG.debug("HTTP < {} {}", HttpVersion.HTTP_1_1, status);
Headers.Header methodHeader = headers.get(HTTPSPDYHeader.METHOD.name(version));
Headers.Header uriHeader = headers.get(HTTPSPDYHeader.URI.name(version));
Headers.Header versionHeader = headers.get(HTTPSPDYHeader.VERSION.name(version));
HttpFields httpFields = response.getHttpFields();
if (httpFields != null)
if (methodHeader == null || uriHeader == null || versionHeader == null)
{
for (HttpFields.Field httpField : httpFields)
{
String name = httpField.getName().toLowerCase();
String value = httpField.getValue();
headers.put(name, value);
LOG.debug("HTTP < {}: {}", name, value);
}
badMessage(400, "Missing required request line elements");
return false;
}
// We have to query the HttpGenerator and its buffers to know
// whether there is content buffered and update the generator state
// reply(stream, new ReplyInfo(headers, response.getContentCount() < 1));
HttpMethod httpMethod = HttpMethod.fromString(methodHeader.value());
HttpVersion httpVersion = HttpVersion.fromString(versionHeader.value());
String uriString = uriHeader.value();
//TODO: sent content
LOG.debug("HTTP > {} {} {}", httpMethod, uriString, httpVersion);
startRequest(httpMethod, httpMethod.asString(), uriString, httpVersion);
Headers.Header schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(version));
if (schemeHeader != null)
getRequest().setScheme(schemeHeader.value());
return true;
}
// @Override
protected void completed()
private void performHeaders(Headers headers)
{
LOG.debug("completed");
}
for (Headers.Header header : headers)
{
String name = header.name();
HttpHeader httpHeader = HttpHeader.CACHE.get(name);
private enum State
{
INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL, ASYNC
// Skip special SPDY headers, unless it's the "host" header
HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(stream.getSession().getVersion(), name);
if (specialHeader != null)
{
if (specialHeader == HTTPSPDYHeader.HOST)
name = "host";
else
continue;
}
switch (name)
{
case "connection":
case "keep-alive":
case "proxy-connection":
case "transfer-encoding":
{
// Spec says to ignore these headers
continue;
}
default:
{
// Spec says headers must be single valued
String value = header.value();
LOG.debug("HTTP > {}: {}", name, value);
parsedHeader(httpHeader, name, value);
break;
}
}
}
}
}

View File

@ -18,27 +18,11 @@
package org.eclipse.jetty.spdy.http;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.util.BufferUtil;
public class HttpInputOverSPDY extends HttpInput<DataInfo>
{
private static final DataInfo END_OF_CONTENT = new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true);
private final Queue<DataInfo> dataInfos = new ConcurrentLinkedQueue<>();
public void offer(DataInfo dataInfo, boolean lastContent)
{
dataInfos.offer(dataInfo);
// if (lastContent)
// dataInfos.offer(END_OF_CONTENT); // TODO: necessary ?
}
@Override
protected int remaining(DataInfo item)
{
@ -54,8 +38,6 @@ public class HttpInputOverSPDY extends HttpInput<DataInfo>
@Override
protected void onContentConsumed(DataInfo dataInfo)
{
boolean removed = dataInfos.remove(dataInfo);
if (removed)
dataInfo.consume(dataInfo.length());
dataInfo.consume(dataInfo.length());
}
}

View File

@ -106,6 +106,9 @@ public class HttpTransportOverSPDY implements HttpTransport
@Override
public void send(ByteBuffer content, boolean lastContent) throws IOException
{
// Guard against a last 0 bytes write
if (stream.isClosed() && BufferUtil.isEmpty(content) && lastContent)
return;
stream.data(new ByteBufferDataInfo(content, lastContent));
}
@ -142,7 +145,7 @@ public class HttpTransportOverSPDY implements HttpTransport
public void completed(Stream pushStream)
{
HttpChannelOverSPDY pushChannel = newHttpChannelOverSPDY(pushStream, pushRequestHeaders);
pushChannel.beginRequest(pushRequestHeaders, true);
pushChannel.requestStart(pushRequestHeaders, true);
}
});
}

View File

@ -87,7 +87,7 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYConnectionFa
HttpChannelOverSPDY channel = new HttpChannelOverSPDY(connector, configuration, endPoint, transport, input, stream);
stream.setAttribute(CHANNEL_ATTRIBUTE, channel);
channel.beginRequest(headers, synInfo.isClose());
channel.requestStart(headers, synInfo.isClose());
if (headers.isEmpty())
{
@ -114,9 +114,7 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYConnectionFa
{
logger.debug("Received {} on {}", headersInfo, stream);
HttpChannelOverSPDY channel = (HttpChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE);
channel.headers(headersInfo.getHeaders());
if (headersInfo.isClose())
channel.endRequest();
channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose());
}
@Override
@ -124,36 +122,7 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYConnectionFa
{
logger.debug("Received {} on {}", dataInfo, stream);
HttpChannelOverSPDY channel = (HttpChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE);
channel.content(dataInfo, dataInfo.isClose());
if (dataInfo.isClose())
channel.endRequest();
// We need to copy the dataInfo since we do not know when its bytes
// will be consumed. When the copy is consumed, we consume also the
// original, so the implementation can send a window update.
// ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
// {
// @Override
// public void consume(int delta)
// {
// super.consume(delta);
// dataInfo.consume(delta);
// }
// };
// logger.debug("Queuing last={} content {}", dataInfo.isClose(), copyDataInfo);
// channel.content(copyDataInfo.asByteBuffer(true));
// dataInfos.offer(copyDataInfo); //TODO:
// .content()
// if (endRequest)
// dataInfos.offer(END_OF_CONTENT);
// updateState(State.CONTENT);
// handle();
// }
// });
//
// connection.content(dataInfo, dataInfo.isClose());
// if (dataInfo.isClose())
// channel.endRequest();
channel.requestContent(dataInfo, dataInfo.isClose());
}
}
}