Updated SPDY code to HttpChannel refactorings.

This commit is contained in:
Simone Bordet 2014-06-17 10:53:20 +02:00
parent 20c38c2396
commit aafed92305
3 changed files with 61 additions and 127 deletions

View File

@ -21,19 +21,20 @@ package org.eclipse.jetty.spdy.server.http;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -43,9 +44,6 @@ public class HttpChannelOverSPDY extends HttpChannel
private static final Logger LOG = Log.getLogger(HttpChannelOverSPDY.class);
private final Stream stream;
private boolean dispatched; // Guarded by synchronization on tasks
private boolean redispatch; // Guarded by synchronization on tasks
private boolean headersComplete;
public HttpChannelOverSPDY(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInputOverSPDY input, Stream stream)
{
@ -53,54 +51,6 @@ public class HttpChannelOverSPDY extends HttpChannel
this.stream = stream;
}
@Override
public boolean headerComplete()
{
headersComplete = true;
return super.headerComplete();
}
private void dispatch()
{
synchronized (this)
{
if (dispatched)
redispatch=true;
else
{
LOG.debug("Dispatch {}", this);
dispatched=true;
execute(this);
}
}
}
@Override
public void run()
{
boolean execute=true;
while(execute)
{
try
{
LOG.debug("Executing {}",this);
super.run();
}
finally
{
LOG.debug("Completing {}", this);
synchronized (this)
{
dispatched = redispatch;
redispatch=false;
execute=dispatched;
}
}
}
}
public void requestStart(final Fields headers, final boolean endRequest)
{
if (!headers.isEmpty())
@ -109,63 +59,34 @@ public class HttpChannelOverSPDY extends HttpChannel
public void requestHeaders(Fields headers, boolean endRequest)
{
boolean proceed = performBeginRequest(headers);
boolean proceed = performRequest(headers);
if (!proceed)
return;
performHeaders(headers);
if (endRequest)
{
boolean dispatch = headerComplete();
if (messageComplete())
dispatch=true;
if (dispatch)
dispatch();
}
onRequestComplete();
execute(this);
}
public void requestContent(final DataInfo dataInfo, boolean endRequest)
{
boolean dispatch=false;
if (!headersComplete && headerComplete())
dispatch=true;
LOG.debug("HTTP > {} bytes of content", dataInfo.length());
// We need to copy the dataInfo since we do not know when its bytes
// will be consumed. When the copy is consumed, we consume also the
// original, so the implementation can send a window update.
ByteBuffer copyByteBuffer = dataInfo.asByteBuffer(false);
ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(copyByteBuffer, dataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
dataInfo.consume(delta);
}
};
LOG.debug("Queuing last={} content {}", endRequest, copyDataInfo);
if (content(copyDataInfo))
dispatch=true;
HttpInputOverSPDY.ContentOverSPDY content = new HttpInputOverSPDY.ContentOverSPDY(copyByteBuffer, dataInfo);
if (endRequest && messageComplete())
dispatch=true;
if (dispatch)
dispatch();
onContent(content);
if (endRequest)
onRequestComplete();
}
@Override
public boolean messageComplete()
{
super.messageComplete();
return false;
}
private boolean performBeginRequest(Fields headers)
private boolean performRequest(Fields headers)
{
short version = stream.getSession().getVersion();
Fields.Field methodHeader = headers.get(HTTPSPDYHeader.METHOD.name(version));
@ -174,7 +95,7 @@ public class HttpChannelOverSPDY extends HttpChannel
if (methodHeader == null || uriHeader == null || versionHeader == null)
{
badMessage(400, "Missing required request line elements");
onBadMessage(400, "Missing required request line elements");
return false;
}
@ -184,16 +105,17 @@ public class HttpChannelOverSPDY extends HttpChannel
HttpURI uri = new HttpURI(uriHeader.getValue());
LOG.debug("HTTP > {} {} {}", httpMethod, uriHeader.getValue(), httpVersion);
startRequest(methodHeader.getValue(), uri, httpVersion);
String scheme = "http";
Fields.Field schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(version));
if (schemeHeader != null)
getRequest().setScheme(schemeHeader.getValue());
return true;
}
{
scheme = schemeHeader.getValue();
getRequest().setScheme(scheme);
}
private void performHeaders(Fields headers)
{
String authority = null;
HttpFields fields = new HttpFields();
for (Fields.Field header : headers)
{
String name = header.getName();
@ -202,10 +124,11 @@ public class HttpChannelOverSPDY extends HttpChannel
HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(stream.getSession().getVersion(), name);
if (specialHeader != null)
{
if (specialHeader == HTTPSPDYHeader.HOST)
name = "host";
else
if (specialHeader != HTTPSPDYHeader.HOST)
continue;
name = "host";
authority = header.getValue();
}
switch (name)
@ -223,10 +146,31 @@ public class HttpChannelOverSPDY extends HttpChannel
// Spec says headers must be single valued
String value = header.getValue();
LOG.debug("HTTP > {}: {}", name, value);
parsedHeader(new HttpField(name,value));
fields.add(new HttpField(name, value));
break;
}
}
}
String host = null;
int port = 0;
if (authority != null)
{
int colon = authority.indexOf(':');
if (colon > 0)
{
host = authority.substring(0, colon);
port = Integer.valueOf(authority.substring(colon + 1));
}
else
{
host = authority;
port = HttpScheme.HTTPS.is(scheme) ? 443 : 80;
}
}
MetaData.Request request = new MetaData.Request(httpVersion, httpMethod.asString(), uri, fields, host, port);
onRequest(request);
return true;
}
}

View File

@ -18,32 +18,28 @@
package org.eclipse.jetty.spdy.server.http;
import java.nio.ByteBuffer;
import org.eclipse.jetty.server.QueuedHttpInput;
import org.eclipse.jetty.spdy.api.DataInfo;
public class HttpInputOverSPDY extends QueuedHttpInput
{
@Override
protected int remaining(DataInfo item)
protected void consume(Content content, int length)
{
return item.available();
ContentOverSPDY spdyContent = (ContentOverSPDY)content;
spdyContent.dataInfo.consume(length);
}
@Override
protected int get(DataInfo item, byte[] buffer, int offset, int length)
protected static class ContentOverSPDY extends Content
{
return item.readInto(buffer, offset, length);
}
@Override
protected void consume(DataInfo item, int length)
{
item.consume(length);
}
private final DataInfo dataInfo;
@Override
protected void onContentConsumed(DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
protected ContentOverSPDY(ByteBuffer content, DataInfo dataInfo)
{
super(content);
this.dataInfo = dataInfo;
}
}
}

View File

@ -53,12 +53,11 @@ import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParser.RequestHandler<ByteBuffer>
public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParser.RequestHandler
{
private final short version;
private final Fields headers = new Fields();
@ -76,7 +75,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
}
@Override
protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
protected HttpParser.RequestHandler newRequestHandler()
{
return this;
}
@ -102,11 +101,6 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
headers.put(field.getName(), field.getValue());
}
@Override
public void parsedHostHeader(String host, int port)
{
}
@Override
public boolean headerComplete()
{