479537 - Server preface sent after client preface reply.

Fixed by anticipating the onPreface() callback on server before
processing the client preface's SETTINGS frame.
This commit is contained in:
Simone Bordet 2015-10-12 17:50:09 +02:00
parent 2bdb4e7474
commit 3fd8a2e666
6 changed files with 269 additions and 13 deletions
jetty-http2
http2-client/src/test/java/org/eclipse/jetty/http2/client
http2-common/src/main/java/org/eclipse/jetty/http2/parser
http2-server/src/main/java/org/eclipse/jetty/http2/server

View File

@ -51,7 +51,7 @@ public class AbstractTest
protected ServerConnector connector; protected ServerConnector connector;
protected String servletPath = "/test"; protected String servletPath = "/test";
protected HTTP2Client client; protected HTTP2Client client;
private Server server; protected Server server;
protected void start(HttpServlet servlet) throws Exception protected void start(HttpServlet servlet) throws Exception
{ {
@ -78,12 +78,12 @@ public class AbstractTest
client.start(); client.start();
} }
private void prepareServer(ConnectionFactory connectionFactory) protected void prepareServer(ConnectionFactory... connectionFactories)
{ {
QueuedThreadPool serverExecutor = new QueuedThreadPool(); QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server"); serverExecutor.setName("server");
server = new Server(serverExecutor); server = new Server(serverExecutor);
connector = new ServerConnector(server, 1,1, connectionFactory); connector = new ServerConnector(server, 1, 1, connectionFactories);
server.addConnector(connector); server.addConnector(connector);
} }

View File

@ -18,19 +18,41 @@
package org.eclipse.jetty.http2.client; package org.eclipse.jetty.http2.client;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.junit.Assert; import org.junit.Assert;
@ -39,7 +61,7 @@ import org.junit.Test;
public class PrefaceTest extends AbstractTest public class PrefaceTest extends AbstractTest
{ {
@Test @Test
public void testServerPrefaceBeforeClientPreface() throws Exception public void testServerPrefaceReplySentAfterClientPreface() throws Exception
{ {
start(new ServerSessionListener.Adapter() start(new ServerSessionListener.Adapter()
{ {
@ -95,4 +117,211 @@ public class PrefaceTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testClientPrefaceReplySentAfterServerPreface() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 128);
return settings;
}
@Override
public void onPing(Session session, PingFrame frame)
{
session.close(ErrorCode.NO_ERROR.code, null, Callback.NOOP);
}
});
ByteBufferPool byteBufferPool = client.getByteBufferPool();
try (SocketChannel socket = SocketChannel.open())
{
socket.connect(new InetSocketAddress("localhost", connector.getLocalPort()));
Generator generator = new Generator(byteBufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
Map<Integer, Integer> clientSettings = new HashMap<>();
clientSettings.put(SettingsFrame.ENABLE_PUSH, 0);
generator.control(lease, new SettingsFrame(clientSettings, false));
// The PING frame just to make sure the client stops reading.
generator.control(lease, new PingFrame(true));
List<ByteBuffer> buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[buffers.size()]));
Queue<SettingsFrame> settings = new ArrayQueue<>();
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public void onSettings(SettingsFrame frame)
{
settings.offer(frame);
}
}, 4096, 8192);
ByteBuffer buffer = byteBufferPool.acquire(1024, true);
while (true)
{
int read = socket.read(buffer);
buffer.flip();
if (read < 0)
break;
parser.parse(buffer);
buffer.clear();
}
Assert.assertEquals(2, settings.size());
SettingsFrame frame1 = settings.poll();
Assert.assertFalse(frame1.isReply());
SettingsFrame frame2 = settings.poll();
Assert.assertTrue(frame2.isReply());
}
}
@Test
public void testOnPrefaceNotifiedForStandardUpgrade() throws Exception
{
Integer maxConcurrentStreams = 128;
AtomicReference<CountDownLatch> serverPrefaceLatch = new AtomicReference<>(new CountDownLatch(1));
AtomicReference<CountDownLatch> serverSettingsLatch = new AtomicReference<>(new CountDownLatch(1));
HttpConfiguration config = new HttpConfiguration();
prepareServer(new HttpConnectionFactory(config), new HTTP2CServerConnectionFactory(config)
{
@Override
protected ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint)
{
return new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> serverSettings = new HashMap<>();
serverSettings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);
serverPrefaceLatch.get().countDown();
return serverSettings;
}
@Override
public void onSettings(Session session, SettingsFrame frame)
{
serverSettingsLatch.get().countDown();
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
return null;
}
};
}
});
server.start();
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
try (SocketChannel socket = SocketChannel.open())
{
socket.connect(new InetSocketAddress("localhost", connector.getLocalPort()));
String upgradeRequest = "" +
"GET /one HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Connection: Upgrade, HTTP2-Settings\r\n" +
"Upgrade: h2c\r\n" +
"HTTP2-Settings: \r\n" +
"\r\n";
ByteBuffer upgradeBuffer = ByteBuffer.wrap(upgradeRequest.getBytes(StandardCharsets.ISO_8859_1));
socket.write(upgradeBuffer);
// Make sure onPreface() is called on server.
Assert.assertTrue(serverPrefaceLatch.get().await(5, TimeUnit.SECONDS));
Assert.assertTrue(serverSettingsLatch.get().await(5, TimeUnit.SECONDS));
// The 101 response is the reply to the client preface SETTINGS frame.
ByteBuffer buffer = byteBufferPool.acquire(1024, true);
http1: while (true)
{
buffer.clear();
int read = socket.read(buffer);
buffer.flip();
if (read < 0)
Assert.fail();
int crlfs = 0;
while (buffer.hasRemaining())
{
byte b = buffer.get();
if (b == '\r' || b == '\n')
++crlfs;
else
crlfs = 0;
if (crlfs == 4)
break http1;
}
}
// Reset the latches on server.
serverPrefaceLatch.set(new CountDownLatch(1));
serverSettingsLatch.set(new CountDownLatch(1));
// After the 101, the client must send the connection preface.
Generator generator = new Generator(byteBufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
Map<Integer, Integer> clientSettings = new HashMap<>();
clientSettings.put(SettingsFrame.ENABLE_PUSH, 1);
generator.control(lease, new SettingsFrame(clientSettings, false));
List<ByteBuffer> buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[buffers.size()]));
// However, we should not call onPreface() again.
Assert.assertFalse(serverPrefaceLatch.get().await(1, TimeUnit.SECONDS));
// Although we should notify of the SETTINGS frame.
Assert.assertTrue(serverSettingsLatch.get().await(5, TimeUnit.SECONDS));
CountDownLatch clientSettingsLatch = new CountDownLatch(1);
AtomicBoolean responded = new AtomicBoolean();
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public void onSettings(SettingsFrame frame)
{
if (frame.isReply())
return;
Assert.assertEquals(maxConcurrentStreams, frame.getSettings().get(SettingsFrame.MAX_CONCURRENT_STREAMS));
clientSettingsLatch.countDown();
}
@Override
public void onHeaders(HeadersFrame frame)
{
if (frame.isEndStream())
responded.set(true);
}
}, 4096, 8192);
// HTTP/2 parsing.
while (true)
{
parser.parse(buffer);
if (responded.get())
break;
buffer.clear();
int read = socket.read(buffer);
buffer.flip();
if (read < 0)
Assert.fail();
}
Assert.assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
}
}
} }

View File

@ -197,6 +197,11 @@ public class Parser
return headerParser.getFrameType(); return headerParser.getFrameType();
} }
protected boolean hasFlag(int bit)
{
return headerParser.hasFlag(bit);
}
protected void notifyConnectionFailure(int error, String reason) protected void notifyConnectionFailure(int error, String reason)
{ {
try try

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -34,6 +35,7 @@ public class ServerParser extends Parser
private final Listener listener; private final Listener listener;
private final PrefaceParser prefaceParser; private final PrefaceParser prefaceParser;
private State state = State.PREFACE; private State state = State.PREFACE;
private boolean notifyPreface = true;
public ServerParser(ByteBufferPool byteBufferPool, Listener listener, int maxDynamicTableSize, int maxHeaderSize) public ServerParser(ByteBufferPool byteBufferPool, Listener listener, int maxDynamicTableSize, int maxHeaderSize)
{ {
@ -61,6 +63,16 @@ public class ServerParser extends Parser
prefaceParser.directUpgrade(); prefaceParser.directUpgrade();
} }
/**
* <p>The standard HTTP/1.1 upgrade path.</p>
*/
public void standardUpgrade()
{
if (state != State.PREFACE)
throw new IllegalStateException();
notifyPreface = false;
}
@Override @Override
public void parse(ByteBuffer buffer) public void parse(ByteBuffer buffer)
{ {
@ -77,6 +89,8 @@ public class ServerParser extends Parser
{ {
if (!prefaceParser.parse(buffer)) if (!prefaceParser.parse(buffer))
return; return;
if (notifyPreface)
onPreface();
state = State.SETTINGS; state = State.SETTINGS;
break; break;
} }
@ -84,7 +98,7 @@ public class ServerParser extends Parser
{ {
if (!parseHeader(buffer)) if (!parseHeader(buffer))
return; return;
if (getFrameType() != FrameType.SETTINGS.getType()) if (getFrameType() != FrameType.SETTINGS.getType() || hasFlag(Flags.ACK))
{ {
BufferUtil.clear(buffer); BufferUtil.clear(buffer);
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_preface"); notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_preface");
@ -92,7 +106,6 @@ public class ServerParser extends Parser
} }
if (!parseBody(buffer)) if (!parseBody(buffer))
return; return;
onPreface();
state = State.FRAMES; state = State.FRAMES;
break; break;
} }

View File

@ -19,6 +19,8 @@
package org.eclipse.jetty.http2.server; package org.eclipse.jetty.http2.server;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -33,7 +35,9 @@ import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.ServerParser; import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.http2.parser.SettingsBodyParser; import org.eclipse.jetty.http2.parser.SettingsBodyParser;
@ -53,7 +57,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final Queue<HttpChannelOverHTTP2> channels = new ConcurrentArrayQueue<>(); private final Queue<HttpChannelOverHTTP2> channels = new ConcurrentArrayQueue<>();
private final ServerSessionListener listener; private final ServerSessionListener listener;
private final HttpConfiguration httpConfig; private final HttpConfiguration httpConfig;
private HeadersFrame upgradeRequest; private final List<Frame> upgradeFrames = new ArrayList<>();
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{ {
@ -79,10 +83,10 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
@Override @Override
public void onOpen() public void onOpen()
{ {
super.onOpen();
notifyAccept(getSession()); notifyAccept(getSession());
if (upgradeRequest != null) for (Frame frame : upgradeFrames)
getSession().onFrame(upgradeRequest); getSession().onFrame(frame);
super.onOpen();
} }
private void notifyAccept(ISession session) private void notifyAccept(ISession session)
@ -172,10 +176,12 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
throw new BadMessageException(); throw new BadMessageException();
} }
getSession().onFrame(settingsFrame); getParser().standardUpgrade();
upgradeFrames.add(new PrefaceFrame());
upgradeFrames.add(settingsFrame);
// Remember the request to send a response from onOpen(). // Remember the request to send a response from onOpen().
upgradeRequest = new HeadersFrame(1, request, null, true); upgradeFrames.add(new HeadersFrame(1, request, null, true));
} }
return true; return true;
} }

View File

@ -112,6 +112,9 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
{ {
switch (frame.getType()) switch (frame.getType())
{ {
case PREFACE:
onPreface();
break;
case SETTINGS: case SETTINGS:
// SPEC: the required reply to this SETTINGS frame is the 101 response. // SPEC: the required reply to this SETTINGS frame is the 101 response.
onSettings((SettingsFrame)frame, false); onSettings((SettingsFrame)frame, false);