479678 - Support HTTP/1.1 Upgrade in HttpClient

+ Adding Connection.UpgradeFrom support to HttpConnectionOverHTTP
+ (Soft) Closing HttpConnectionOverHTTP on upgrade
+ Removing 'extends Connection' from Connection.UpgradeFrom and
  Connection.UpgradeTo to allow for use from components that
  delegate bytebuffer handling away from raw Connection
This commit is contained in:
Joakim Erdfelt 2015-10-15 09:15:50 -07:00
parent 72d97f77cc
commit 15ee24585e
5 changed files with 46 additions and 14 deletions

View File

@ -76,7 +76,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
} }
else else
{ {
if (HttpScheme.HTTPS.is(getScheme())||HttpScheme.WSS.is(getScheme())) if (HttpScheme.HTTPS.is(getScheme()) || HttpScheme.WSS.is(getScheme()))
connectionFactory = newSslClientConnectionFactory(connectionFactory); connectionFactory = newSslClientConnectionFactory(connectionFactory);
} }
this.connectionFactory = connectionFactory; this.connectionFactory = connectionFactory;

View File

@ -18,12 +18,13 @@
package org.eclipse.jetty.client.http; package org.eclipse.jetty.client.http;
import java.util.Locale;
import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpRequest; import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse; import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -58,13 +59,13 @@ public class HttpChannelOverHTTP extends HttpChannel
} }
@Override @Override
protected HttpSender getHttpSender() protected HttpSenderOverHTTP getHttpSender()
{ {
return sender; return sender;
} }
@Override @Override
protected HttpReceiver getHttpReceiver() protected HttpReceiverOverHTTP getHttpReceiver()
{ {
return receiver; return receiver;
} }
@ -96,10 +97,15 @@ public class HttpChannelOverHTTP extends HttpChannel
HttpResponse response = exchange.getResponse(); HttpResponse response = exchange.getResponse();
if ( (response.getVersion() == HttpVersion.HTTP_1_1) && if ((response.getVersion() == HttpVersion.HTTP_1_1) &&
(response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) && (response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101))
(response.getHeaders().get("Connection").equalsIgnoreCase("upgrade")) )
{ {
String connection = response.getHeaders().get(HttpHeader.CONNECTION);
if ((connection == null) || !connection.toLowerCase(Locale.US).contains("upgrade"))
{
return new Result(result,new HttpResponseException("101 Switching Protocols without Connection: Upgrade not supported",response));
}
// Upgrade Response // Upgrade Response
HttpRequest request = exchange.getRequest(); HttpRequest request = exchange.getRequest();
if (request instanceof HttpConnectionUpgrader) if (request instanceof HttpConnectionUpgrader)
@ -107,11 +113,11 @@ public class HttpChannelOverHTTP extends HttpChannel
HttpConnectionUpgrader listener = (HttpConnectionUpgrader)request; HttpConnectionUpgrader listener = (HttpConnectionUpgrader)request;
try try
{ {
listener.upgrade(response, getHttpConnection()); listener.upgrade(response,getHttpConnection());
} }
catch (Throwable x) catch (Throwable x)
{ {
return new Result(result, x); return new Result(result,x);
} }
} }
} }

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client.http; package org.eclipse.jetty.client.http;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -36,7 +37,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper; import org.eclipse.jetty.util.thread.Sweeper;
public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable
{ {
private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class); private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
@ -89,6 +90,13 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
promise.succeeded(this); promise.succeeded(this);
} }
@Override
public void onClose()
{
softClose();
super.onClose();
}
public boolean isClosed() public boolean isClosed()
{ {
return closed.get(); return closed.get();
@ -119,6 +127,13 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
} }
} }
@Override
public ByteBuffer onUpgradeFrom()
{
HttpReceiverOverHTTP receiver = channel.getHttpReceiver();
return receiver.onUpgradeFrom();
}
public void release() public void release()
{ {
// Restore idle timeout // Restore idle timeout

View File

@ -88,6 +88,17 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
buffer = null; buffer = null;
} }
protected ByteBuffer onUpgradeFrom()
{
if (BufferUtil.hasContent(buffer))
{
ByteBuffer upgradeBuffer = buffer;
releaseBuffer(); // TODO: right place to do this?
return upgradeBuffer;
}
return null;
}
private void process() private void process()
{ {
try try

View File

@ -63,7 +63,7 @@ public interface Connection extends Closeable
public long getBytesOut(); public long getBytesOut();
public long getCreatedTimeStamp(); public long getCreatedTimeStamp();
public interface UpgradeFrom extends Connection public interface UpgradeFrom
{ {
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Take the input buffer from the connection on upgrade. /** Take the input buffer from the connection on upgrade.
@ -75,7 +75,7 @@ public interface Connection extends Closeable
ByteBuffer onUpgradeFrom(); ByteBuffer onUpgradeFrom();
} }
public interface UpgradeTo extends Connection public interface UpgradeTo
{ {
/** /**
* <p>Callback method invoked when this {@link Connection} is upgraded.</p> * <p>Callback method invoked when this {@link Connection} is upgraded.</p>