459845 Support upgrade

Added the concept of UpgradeFrom and UpgradeTo connections that support
transferring a buffer with content before opening new connection.
Aded EndPoint.update method as utility
This commit is contained in:
Greg Wilkins 2015-02-13 11:41:18 +11:00
parent d788df9a57
commit 2b2a70a93a
19 changed files with 194 additions and 35 deletions

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client.http;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

View File

@ -39,7 +39,8 @@ public enum HttpMethod
TRACE,
CONNECT,
MOVE,
PROXY;
PROXY,
PRI;
/* ------------------------------------------------------------ */
/**
@ -67,6 +68,8 @@ public enum HttpMethod
return PROXY;
if (bytes[position+1]=='U' && bytes[position+2]=='T' && bytes[position+3]==' ')
return PUT;
if (bytes[position+1]=='R' && bytes[position+2]=='I' && bytes[position+3]==' ')
return PRI;
break;
case 'H':
if (bytes[position+1]=='E' && bytes[position+2]=='A' && bytes[position+3]=='D' && length>=5 && bytes[position+4]==' ')

View File

@ -990,7 +990,7 @@ public class HttpParser
// End of headers!
// Was there a required host header?
if (!_host && _version!=HttpVersion.HTTP_1_0 && _requestHandler!=null)
if (!_host && _version==HttpVersion.HTTP_1_1 && _requestHandler!=null)
{
throw new BadMessage(HttpStatus.BAD_REQUEST_400,"No Host");
}

View File

@ -31,7 +31,7 @@ public enum HttpVersion
HTTP_0_9("HTTP/0.9",9),
HTTP_1_0("HTTP/1.0",10),
HTTP_1_1("HTTP/1.1",11),
HTTP_2_0("HTTP/2.0",20);
HTTP_2("HTTP/2.0",20);
/* ------------------------------------------------------------ */
public final static Trie<HttpVersion> CACHE= new ArrayTrie<HttpVersion>();
@ -74,7 +74,7 @@ public enum HttpVersion
switch(bytes[position+7])
{
case '0':
return HTTP_2_0;
return HTTP_2;
}
break;
}
@ -166,6 +166,7 @@ public enum HttpVersion
case 9: return HttpVersion.HTTP_0_9;
case 10: return HttpVersion.HTTP_1_0;
case 11: return HttpVersion.HTTP_1_1;
case 20: return HttpVersion.HTTP_2;
default: throw new IllegalArgumentException();
}
}

View File

@ -1535,6 +1535,30 @@ public class HttpParserTest
assertEquals("unknown",_val[4]);
}
@Test
public void testHTTP2Preface() throws Exception
{
ByteBuffer buffer= BufferUtil.toBuffer(
"PRI * HTTP/2.0\015\012" +
"\015\012" +
"SM\015\012"+
"\015\012");
HttpParser.RequestHandler<ByteBuffer> handler = new Handler();
HttpParser parser= new HttpParser(handler);
parseAll(parser,buffer);
assertTrue(_headerCompleted);
assertTrue(_messageCompleted);
assertEquals("PRI", _methodOrVersion);
assertEquals("*", _uriOrStatus);
assertEquals("HTTP/2.0", _versionOrReason);
assertEquals(-1, _headers);
assertEquals(null, _bad);
}
@Before
public void init()
{

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -163,6 +164,27 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
LOG.debug("Ignored idle endpoint {}",this);
}
@Override
public void upgrade(Connection newConnection)
{
Connection old_connection = getConnection();
if (LOG.isDebugEnabled())
LOG.debug("{} upgradeing from {} to {}", this, old_connection, newConnection);
ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom)
?((Connection.UpgradeFrom)old_connection).onUpgradeFrom():null;
old_connection.onClose();
old_connection.getEndPoint().setConnection(newConnection);
if (newConnection instanceof Connection.UpgradeTo)
((Connection.UpgradeTo)newConnection).onUpgradeTo(prefilled);
else if (BufferUtil.hasContent(prefilled))
throw new IllegalStateException();
newConnection.onOpen();
}
@Override
public String toString()
{

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.eclipse.jetty.util.log.Log;
@ -51,7 +52,7 @@ public interface ClientConnectionFactory
* {@link EndPoint} associated with {@code oldConnection}, performing connection lifecycle management.
* <p />
* The {@code oldConnection} will be closed by invoking {@link org.eclipse.jetty.io.Connection#onClose()}
* and the {@code newConnection} will be opened by invoking {@link org.eclipse.jetty.io.Connection#onOpen()}.
* and the {@code newConnection} will be opened by invoking {@link org.eclipse.jetty.io.Connection#onOpen(ByteBuffer)}.
* @param oldConnection the old connection to replace
* @param newConnection the new connection replacement
*/

View File

@ -19,11 +19,12 @@
package org.eclipse.jetty.io;
import java.io.Closeable;
import java.nio.ByteBuffer;
/**
* <p>A {@link Connection} is associated to an {@link EndPoint} so that I/O events
* happening on the {@link EndPoint} can be processed by the {@link Connection}.</p>
* <p>A typical implementation of {@link Connection} overrides {@link #onOpen()} to
* <p>A typical implementation of {@link Connection} overrides {@link #onOpen(ByteBuffer)} to
* {@link EndPoint#fillInterested(Callback) set read interest} on the {@link EndPoint},
* and when the {@link EndPoint} signals read readyness, this {@link Connection} can
* read bytes from the network and interpret them.</p>
@ -32,10 +33,6 @@ public interface Connection extends Closeable
{
public void addListener(Listener listener);
/**
* <p>Callback method invoked when this {@link Connection} is opened.</p>
* <p>Creators of the connection implementation are responsible for calling this method.</p>
*/
public void onOpen();
/**
@ -48,7 +45,7 @@ public interface Connection extends Closeable
* @return the {@link EndPoint} associated with this {@link Connection}
*/
public EndPoint getEndPoint();
/**
* <p>Performs a logical close of this connection.</p>
* <p>For simple connections, this may just mean to delegate the close to the associated
@ -64,6 +61,30 @@ public interface Connection extends Closeable
public long getBytesOut();
public long getCreatedTimeStamp();
public interface UpgradeFrom extends Connection
{
/* ------------------------------------------------------------ */
/** Take the input buffer from the connection on upgrade.
* <p>This method is used to take any unconsumed input from
* a connection during an upgrade.
* @return A buffer of unconsumed input. The caller must return the buffer
* to the bufferpool when consumed and this connection must not.
*/
ByteBuffer onUpgradeFrom();
}
public interface UpgradeTo extends Connection
{
/**
* <p>Callback method invoked when this {@link Connection} is upgraded.</p>
* <p>This must be called before {@link #onOpen()}.</p>
* @param prefilledBuffer An optional buffer that can contain prefilled data. Typically this
* results from an upgrade of one protocol to the other where the old connection has buffered
* data destined for the new connection. The new connection must take ownership of the buffer
* and is responsible for returning it to the buffer pool
*/
void onUpgradeTo(ByteBuffer prefilled);
}
public interface Listener
{

View File

@ -222,6 +222,7 @@ public interface EndPoint extends Closeable
/**
* @param connection the {@link Connection} associated with this {@link EndPoint}
* @see #getConnection()
* @see #upgrade(Connection)
*/
void setConnection(Connection connection);
@ -237,5 +238,13 @@ public interface EndPoint extends Closeable
*/
void onClose();
/** Upgrade connections.
* Close the old connection, update the endpoint and open the new connection.
* If the oldConnection is an instance of {@link Connection.UpgradeFrom} then
* a prefilled buffer is requested and passed to the newConnection if it is an instance
* of {@link Connection.UpgradeTo}
* @param newConnection The connection to upgrade to
*/
public void upgrade(Connection newConnection);
}

View File

@ -19,8 +19,10 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.util.BufferUtil;

View File

@ -52,7 +52,8 @@ public class HttpChannelState
ASYNC_WOKEN, // A thread has been dispatch to handle from ASYNCWAIT
ASYNC_IO, // Has been dispatched for async IO
COMPLETING, // Request is completable
COMPLETED // Request is complete
COMPLETED, // Request is complete
UPGRADED // Request upgraded the connection
}
/**
@ -525,6 +526,8 @@ public class HttpChannelState
case DISPATCHED:
case ASYNC_IO:
throw new IllegalStateException(getStatusString());
case UPGRADED:
return;
default:
break;
}
@ -539,6 +542,31 @@ public class HttpChannelState
_event=null;
}
}
public void upgrade()
{
synchronized (this)
{
switch(_state)
{
case IDLE:
case COMPLETED:
break;
default:
throw new IllegalStateException(getStatusString());
}
_asyncListeners=null;
_state=State.UPGRADED;
_async=null;
_initial=true;
_asyncRead=false;
_asyncWrite=false;
_timeoutMs=DEFAULT_TIMEOUT;
cancelTimeout();
_event=null;
}
}
protected void scheduleDispatch()
{

View File

@ -45,7 +45,7 @@ import org.eclipse.jetty.util.log.Logger;
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
*/
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
{
public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
private static final boolean REQUEST_BUFFER_DIRECT=false;
@ -163,6 +163,18 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return getHttpChannel().getRequests();
}
@Override
public ByteBuffer onUpgradeFrom()
{
if (BufferUtil.hasContent(_requestBuffer))
{
ByteBuffer buffer = _requestBuffer;
_requestBuffer=null;
return buffer;
}
return null;
}
void releaseRequestBuffer()
{
if (_requestBuffer != null && !_requestBuffer.hasRemaining())
@ -329,11 +341,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Upgrade from {} to {}", this, connection);
onClose();
getEndPoint().setConnection(connection);
connection.onOpen();
_channel.getState().upgrade();
getEndPoint().upgrade(connection);
_channel.reset();
_parser.reset();
_generator.reset();
@ -533,6 +542,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
break;
}
case HTTP_2:
{
persistent=false;
badMessage(400,null);
return true;
}
default:
{
throw new IllegalStateException();

View File

@ -19,7 +19,9 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
@ -117,13 +119,8 @@ public abstract class NegotiatingServerConnection extends AbstractConnection
else
{
EndPoint endPoint = getEndPoint();
Connection oldConnection = endPoint.getConnection();
Connection newConnection = connectionFactory.newConnection(connector, endPoint);
if (LOG.isDebugEnabled())
LOG.debug("{} switching from {} to {}", this, oldConnection, newConnection);
oldConnection.onClose();
endPoint.setConnection(newConnection);
getEndPoint().getConnection().onOpen();
endPoint.upgrade(newConnection);
}
}
}

View File

@ -92,7 +92,16 @@ public class ScheduledExecutorScheduler extends AbstractLifeCycle implements Sch
@Override
public Task schedule(Runnable task, long delay, TimeUnit unit)
{
ScheduledFuture<?> result = scheduler.schedule(task, delay, unit);
ScheduledThreadPoolExecutor s = scheduler;
if (s==null)
return new Task(){
@Override
public boolean cancel()
{
return false;
}};
ScheduledFuture<?> result = s.schedule(task, delay, unit);
return new ScheduledFutureTask(result);
}

View File

@ -265,6 +265,7 @@ public class UpgradeConnection extends AbstractConnection
connectPromise.getClient().addManaged(session);
// Now swap out the connection
// TODO use endp.upgrade ???
endp.setConnection(connection);
connection.onOpen();
}

View File

@ -215,6 +215,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private WebSocketSession session;
private List<ExtensionConfig> extensions;
private boolean isFilling;
private ByteBuffer buffer;
private ReadMode readMode = ReadMode.PARSE;
private IOState ioState;
private Stats stats = new Stats();
@ -250,7 +251,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
/**
* Close the connection.
* <p>
* <p> fillInterested();
* This can result in a close handshake over the network, or a simple local abnormal close
*
* @param statusCode
@ -422,9 +424,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
switch (state)
{
case OPEN:
if (LOG.isDebugEnabled())
LOG.debug("fillInterested");
fillInterested();
if (BufferUtil.isEmpty(buffer))
{
if (LOG.isDebugEnabled())
LOG.debug("fillInterested");
fillInterested();
}
else
onFillable();
break;
case CLOSED:
if (ioState.wasAbnormalClose())
@ -458,7 +465,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable()",policy.getBehavior());
stats.countOnFillableEvents.incrementAndGet();
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
if (buffer==null)
buffer = bufferPool.acquire(getInputBufferSize(),true);
try
{
isFilling = true;
@ -466,7 +474,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if(readMode == ReadMode.PARSE)
{
readMode = readParse(buffer);
} else
}
else
{
readMode = readDiscard(buffer);
}
@ -474,6 +483,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
finally
{
bufferPool.release(buffer);
buffer=null;
}
if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
@ -496,6 +506,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
super.onFillInterestedFailed(cause);
}
protected void prefill(ByteBuffer prefilled)
{
buffer=prefilled;
}
@Override
public void onOpen()
{

View File

@ -293,7 +293,8 @@ public class IOState
{
notifyStateListeners(event);
if(abnormalEvent != null) {
if(abnormalEvent != null)
{
notifyStateListeners(abnormalEvent);
}
}

View File

@ -19,17 +19,20 @@
package org.eclipse.jetty.websocket.server;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
public class WebSocketServerConnection extends AbstractWebSocketConnection
public class WebSocketServerConnection extends AbstractWebSocketConnection implements Connection.UpgradeTo
{
private final AtomicBoolean opened = new AtomicBoolean(false);
@ -54,6 +57,12 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
return getEndPoint().getRemoteAddress();
}
@Override
public void onUpgradeTo(ByteBuffer prefilled)
{
prefill(prefilled);
}
@Override
public void onOpen()
{

View File

@ -85,7 +85,7 @@ public class TooFastClientTest
Generator generator = new Generator(WebSocketPolicy.newClientPolicy(),
new LeakTrackingBufferPool("Generator",new MappedByteBufferPool()));
String msg1 = "Echo 1";
String msg2 = "This is also an echo ... cho ... ho ... o";
String msg2 = "This is also an echooooo!";
generator.generateWholeFrame(new TextFrame().setPayload(msg1),initialPacket);
generator.generateWholeFrame(new TextFrame().setPayload(msg2),initialPacket);