Refactored the send of the SETTINGS frame from the client.
It's now sent after a call to onPreface(), which has been moved to the common interface Session.Listener (from ServerSession.Listener), so that client applications can customize the SETTINGS to send to the server.
This commit is contained in:
parent
ab96bf775f
commit
f8086dc7c2
|
@ -19,21 +19,17 @@
|
|||
package org.eclipse.jetty.http2.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.http2.HTTP2Connection;
|
||||
import org.eclipse.jetty.http2.HTTP2FlowControl;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.http2.generator.Generator;
|
||||
import org.eclipse.jetty.http2.parser.Parser;
|
||||
import org.eclipse.jetty.http2.parser.PrefaceParser;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.ClientConnectionFactory;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
|
@ -63,29 +59,33 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);
|
||||
|
||||
Generator generator = new Generator(byteBufferPool, 4096);
|
||||
HTTP2Session session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, new HTTP2FlowControl(65535));
|
||||
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, new HTTP2FlowControl(65535));
|
||||
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
|
||||
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, promise);
|
||||
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, promise, listener);
|
||||
}
|
||||
|
||||
private class HTTP2ClientConnection extends HTTP2Connection implements Callback
|
||||
{
|
||||
private final AtomicBoolean prefaceSent = new AtomicBoolean();
|
||||
private final HTTP2Client client;
|
||||
private final Promise<Session> promise;
|
||||
private final Session.Listener listener;
|
||||
|
||||
public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise)
|
||||
public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
|
||||
{
|
||||
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
|
||||
this.client = client;
|
||||
this.promise = promise;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
super.onOpen();
|
||||
getEndPoint().write(this, ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES));
|
||||
Map<Integer, Integer> settings = listener.onPreface(getSession());
|
||||
if (settings == null)
|
||||
settings = Collections.emptyMap();
|
||||
getSession().settings(new SettingsFrame(settings, false, true), this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,18 +98,8 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
if (prefaceSent.compareAndSet(false, true))
|
||||
{
|
||||
// SPEC: after the preface bytes, a SETTINGS frame must be sent.
|
||||
// TODO: configure settings.
|
||||
HashMap<Integer, Integer> settings = new HashMap<>();
|
||||
getSession().settings(new SettingsFrame(settings, false), this);
|
||||
}
|
||||
else
|
||||
{
|
||||
client.addSession(getSession());
|
||||
promise.succeeded(getSession());
|
||||
}
|
||||
client.addSession(getSession());
|
||||
promise.succeeded(getSession());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -531,4 +531,75 @@ public class FlowControlTest extends AbstractTest
|
|||
|
||||
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientSendingInitialSmallWindow() throws Exception
|
||||
{
|
||||
startServer(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
|
||||
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||
stream.headers(responseFrame, Callback.Adapter.INSTANCE);
|
||||
return new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
// Since we echo back the data
|
||||
// asynchronously we must copy it.
|
||||
ByteBuffer data = frame.getData();
|
||||
ByteBuffer copy = ByteBuffer.allocateDirect(data.remaining());
|
||||
copy.put(data).flip();
|
||||
stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), callback);
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
final int initialWindow = 16;
|
||||
Session session = newClient(new Session.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Map<Integer, Integer> onPreface(Session session)
|
||||
{
|
||||
Map<Integer, Integer> settings = new HashMap<>();
|
||||
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, initialWindow);
|
||||
return settings;
|
||||
}
|
||||
});
|
||||
|
||||
byte[] requestData = new byte[initialWindow * 4];
|
||||
new Random().nextBytes(requestData);
|
||||
|
||||
byte[] responseData = new byte[requestData.length];
|
||||
final ByteBuffer responseContent = ByteBuffer.wrap(responseData);
|
||||
MetaData.Request metaData = newRequest("GET", new HttpFields());
|
||||
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
|
||||
FuturePromise<Stream> streamPromise = new FuturePromise<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
responseContent.put(frame.getData());
|
||||
callback.succeeded();
|
||||
if (frame.isEndStream())
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
|
||||
|
||||
ByteBuffer requestContent = ByteBuffer.wrap(requestData);
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), requestContent, true);
|
||||
stream.data(dataFrame, Callback.Adapter.INSTANCE);
|
||||
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
responseContent.flip();
|
||||
Assert.assertArrayEquals(requestData, responseData);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.http2.api;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.http2.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
|
@ -48,6 +49,8 @@ public interface Session
|
|||
|
||||
public interface Listener
|
||||
{
|
||||
public Map<Integer,Integer> onPreface(Session session);
|
||||
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame);
|
||||
|
||||
public void onSettings(Session session, SettingsFrame frame);
|
||||
|
@ -62,6 +65,12 @@ public interface Session
|
|||
|
||||
public static class Adapter implements Session.Listener
|
||||
{
|
||||
@Override
|
||||
public Map<Integer, Integer> onPreface(Session session)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
|
|
|
@ -18,27 +18,17 @@
|
|||
|
||||
package org.eclipse.jetty.http2.api.server;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
|
||||
public interface ServerSessionListener extends Session.Listener
|
||||
{
|
||||
public void onConnect(Session session);
|
||||
|
||||
public Map<Integer,Integer> onPreface(Session session);
|
||||
|
||||
public static class Adapter extends Session.Listener.Adapter implements ServerSessionListener
|
||||
{
|
||||
@Override
|
||||
public void onConnect(Session session)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Integer, Integer> onPreface(Session session)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,12 +30,19 @@ public class SettingsFrame extends Frame
|
|||
|
||||
private final Map<Integer, Integer> settings;
|
||||
private final boolean reply;
|
||||
private boolean preface;
|
||||
|
||||
public SettingsFrame(Map<Integer, Integer> settings, boolean reply)
|
||||
{
|
||||
this(settings, reply, false);
|
||||
}
|
||||
|
||||
public SettingsFrame(Map<Integer, Integer> settings, boolean reply, boolean preface)
|
||||
{
|
||||
super(FrameType.SETTINGS);
|
||||
this.settings = settings;
|
||||
this.reply = reply;
|
||||
this.preface = preface;
|
||||
}
|
||||
|
||||
public Map<Integer, Integer> getSettings()
|
||||
|
@ -47,4 +54,9 @@ public class SettingsFrame extends Frame
|
|||
{
|
||||
return reply;
|
||||
}
|
||||
|
||||
public boolean isPreface()
|
||||
{
|
||||
return preface;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.eclipse.jetty.http2.Flags;
|
|||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
import org.eclipse.jetty.http2.frames.FrameType;
|
||||
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.http2.parser.PrefaceParser;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
||||
|
@ -39,11 +40,14 @@ public class SettingsGenerator extends FrameGenerator
|
|||
public void generate(ByteBufferPool.Lease lease, Frame frame)
|
||||
{
|
||||
SettingsFrame settingsFrame = (SettingsFrame)frame;
|
||||
generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply());
|
||||
generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply(), settingsFrame.isPreface());
|
||||
}
|
||||
|
||||
public void generateSettings(ByteBufferPool.Lease lease, Map<Integer, Integer> settings, boolean reply)
|
||||
public void generateSettings(ByteBufferPool.Lease lease, Map<Integer, Integer> settings, boolean reply, boolean preface)
|
||||
{
|
||||
if (preface)
|
||||
lease.append(ByteBuffer.wrap(PrefaceParser.PREFACE_BYTES), false);
|
||||
|
||||
// Two bytes for the identifier, four bytes for the value.
|
||||
int entryLength = 2 + 4;
|
||||
int length = entryLength * settings.size();
|
||||
|
|
|
@ -87,7 +87,7 @@ public class SettingsGenerateParseTest
|
|||
for (int i = 0; i < 2; ++i)
|
||||
{
|
||||
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
|
||||
generator.generateSettings(lease, settings, true);
|
||||
generator.generateSettings(lease, settings, true, false);
|
||||
|
||||
frames.clear();
|
||||
for (ByteBuffer buffer : lease.getByteBuffers())
|
||||
|
@ -120,7 +120,7 @@ public class SettingsGenerateParseTest
|
|||
Map<Integer, Integer> settings1 = new HashMap<>();
|
||||
settings1.put(13, 17);
|
||||
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
|
||||
generator.generateSettings(lease, settings1, true);
|
||||
generator.generateSettings(lease, settings1, true, false);
|
||||
// Modify the length of the frame to make it invalid
|
||||
ByteBuffer bytes = lease.getByteBuffers().get(0);
|
||||
bytes.putShort(1, (short)(bytes.getShort(1) - 1));
|
||||
|
@ -158,7 +158,7 @@ public class SettingsGenerateParseTest
|
|||
settings1.put(key, value);
|
||||
|
||||
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
|
||||
generator.generateSettings(lease, settings1, true);
|
||||
generator.generateSettings(lease, settings1, true, false);
|
||||
|
||||
for (ByteBuffer buffer : lease.getByteBuffers())
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue