477270 - Add ability to send a single PRIORITY frame.

Also fixed the mistake of sending the stream id as the parent stream id.
This commit is contained in:
Simone Bordet 2015-09-14 11:50:44 +02:00
parent dbd66b131b
commit edce119c0e
7 changed files with 236 additions and 52 deletions

View File

@ -0,0 +1,144 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
public class PriorityTest extends AbstractTest
{
@Test
public void testPriorityBeforeHeaders() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(responseFrame, Callback.NOOP);
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
int streamId = session.priority(new PriorityFrame(0, 13, false), Callback.NOOP);
Assert.assertTrue(streamId > 0);
CountDownLatch latch = new CountDownLatch(2);
MetaData metaData = newRequest("GET", new HttpFields());
HeadersFrame headersFrame = new HeadersFrame(streamId, metaData, null, true);
session.newStream(headersFrame, new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream result)
{
Assert.assertEquals(streamId, result.getId());
latch.countDown();
}
}, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testPriorityAfterHeaders() throws Exception
{
CountDownLatch beforeRequests = new CountDownLatch(1);
CountDownLatch afterRequests = new CountDownLatch(2);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
try
{
beforeRequests.await(5, TimeUnit.SECONDS);
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(responseFrame, Callback.NOOP);
afterRequests.countDown();
return null;
}
catch (InterruptedException x)
{
x.printStackTrace();
return null;
}
}
});
CountDownLatch responses = new CountDownLatch(2);
Stream.Listener.Adapter listener = new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
responses.countDown();
}
};
Session session = newClient(new Session.Listener.Adapter());
MetaData metaData1 = newRequest("GET", "/one", new HttpFields());
HeadersFrame headersFrame1 = new HeadersFrame(metaData1, null, true);
FuturePromise<Stream> promise1 = new FuturePromise<>();
session.newStream(headersFrame1, promise1, listener);
Stream stream1 = promise1.get(5, TimeUnit.SECONDS);
MetaData metaData2 = newRequest("GET", "/two", new HttpFields());
HeadersFrame headersFrame2 = new HeadersFrame(metaData2, null, true);
FuturePromise<Stream> promise2 = new FuturePromise<>();
session.newStream(headersFrame2, promise2, listener);
Stream stream2 = promise2.get(5, TimeUnit.SECONDS);
int streamId = session.priority(new PriorityFrame(stream1.getId(), stream2.getId(), 13, false), Callback.NOOP);
Assert.assertEquals(stream1.getId(), streamId);
// Give time to the PRIORITY frame to arrive to server.
Thread.sleep(1000);
beforeRequests.countDown();
Assert.assertTrue(afterRequests.await(5, TimeUnit.SECONDS));
Assert.assertTrue(responses.await(5, TimeUnit.SECONDS));
}
}

View File

@ -201,6 +201,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);
}
@Override
@ -418,11 +421,15 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
boolean queued;
synchronized (this)
{
int streamId = streamIds.getAndAdd(2);
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getDependentStreamId(),
priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
int streamId = frame.getStreamId();
if (streamId <= 0)
{
streamId = streamIds.getAndAdd(2);
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
}
final IStream stream = createLocalStream(streamId, promise);
if (stream == null)
return;
@ -436,6 +443,21 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flusher.iterate();
}
@Override
public int priority(PriorityFrame frame, Callback callback)
{
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
if (stream == null)
{
streamId = streamIds.getAndAdd(2);
frame = new PriorityFrame(streamId, frame.getParentStreamId(),
frame.getWeight(), frame.isExclusive());
}
control(stream, callback, frame);
return streamId;
}
@Override
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
{
@ -652,20 +674,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
public IStream createUpgradeStream()
{
// SPEC: upgrade stream is id=1 and can't exceed maximum
remoteStreamCount.incrementAndGet();
IStream stream = newStream(1);
streams.put(1,stream);
updateLastStreamId(1);
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream, false);
if (LOG.isDebugEnabled())
LOG.debug("Created upgrade {}", stream);
return stream;
}
protected IStream newStream(int streamId)
{
return new HTTP2Stream(scheduler, this, streamId);
@ -1176,7 +1184,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
private class PromiseCallback<C> implements Callback
private static class PromiseCallback<C> implements Callback
{
private final Promise<C> promise;
private final C value;

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.Callback;
@ -62,6 +63,19 @@ public interface Session
*/
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener);
/**
* <p>Sends the given PRIORITY {@code frame}.</p>
* <p>If the {@code frame} references a {@code streamId} that does not exist
* (for example {@code 0}), then a new {@code streamId} will be allocated, to
* support <em>unused anchor streams</em> that act as parent for other streams.</p>
*
* @param frame the PRIORITY frame to send
* @param callback the callback that gets notified when the frame has been sent
* @return the new stream id generated by the PRIORITY frame, or the stream id
* that it is already referencing
*/
public int priority(PriorityFrame frame, Callback callback);
/**
* <p>Sends the given SETTINGS {@code frame} to configure the session.</p>
*

View File

@ -21,15 +21,20 @@ package org.eclipse.jetty.http2.frames;
public class PriorityFrame extends Frame
{
private final int streamId;
private final int dependentStreamId;
private final int parentStreamId;
private final int weight;
private final boolean exclusive;
public PriorityFrame(int streamId, int dependentStreamId, int weight, boolean exclusive)
public PriorityFrame(int parentStreamId, int weight, boolean exclusive)
{
this(0, parentStreamId, weight, exclusive);
}
public PriorityFrame(int streamId, int parentStreamId, int weight, boolean exclusive)
{
super(FrameType.PRIORITY);
this.streamId = streamId;
this.dependentStreamId = dependentStreamId;
this.parentStreamId = parentStreamId;
this.weight = weight;
this.exclusive = exclusive;
}
@ -39,9 +44,18 @@ public class PriorityFrame extends Frame
return streamId;
}
/**
* @deprecated use {@link #getParentStreamId()} instead.
*/
@Deprecated
public int getDependentStreamId()
{
return dependentStreamId;
return getParentStreamId();
}
public int getParentStreamId()
{
return parentStreamId;
}
public int getWeight()
@ -57,6 +71,6 @@ public class PriorityFrame extends Frame
@Override
public String toString()
{
return String.format("%s#%d/#%d{weight=%d,ex=%b}", super.toString(), streamId, dependentStreamId, weight, exclusive);
return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), streamId, parentStreamId, weight, exclusive);
}
}

View File

@ -38,22 +38,22 @@ public class PriorityGenerator extends FrameGenerator
public void generate(ByteBufferPool.Lease lease, Frame frame)
{
PriorityFrame priorityFrame = (PriorityFrame)frame;
generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getDependentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive());
generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getParentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive());
}
public void generatePriority(ByteBufferPool.Lease lease, int streamId, int dependentStreamId, int weight, boolean exclusive)
public void generatePriority(ByteBufferPool.Lease lease, int streamId, int parentStreamId, int weight, boolean exclusive)
{
if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId);
if (dependentStreamId < 0)
throw new IllegalArgumentException("Invalid dependent stream id: " + dependentStreamId);
if (parentStreamId < 0)
throw new IllegalArgumentException("Invalid parent stream id: " + parentStreamId);
ByteBuffer header = generateHeader(lease, FrameType.PRIORITY, 5, Flags.NONE, dependentStreamId);
ByteBuffer header = generateHeader(lease, FrameType.PRIORITY, 5, Flags.NONE, streamId);
if (exclusive)
streamId |= 0x80_00_00_00;
parentStreamId |= 0x80_00_00_00;
header.putInt(streamId);
header.putInt(parentStreamId);
header.put((byte)weight);

View File

@ -28,7 +28,7 @@ public class PriorityBodyParser extends BodyParser
private State state = State.PREPARE;
private int cursor;
private boolean exclusive;
private int streamId;
private int parentStreamId;
public PriorityBodyParser(HeaderParser headerParser, Parser.Listener listener)
{
@ -40,7 +40,7 @@ public class PriorityBodyParser extends BodyParser
state = State.PREPARE;
cursor = 0;
exclusive = false;
streamId = 0;
parentStreamId = 0;
}
@Override
@ -67,40 +67,44 @@ public class PriorityBodyParser extends BodyParser
// because the 31 least significant bits represent the stream id.
int currByte = buffer.get(buffer.position());
exclusive = (currByte & 0x80) == 0x80;
state = State.STREAM_ID;
state = State.PARENT_STREAM_ID;
break;
}
case STREAM_ID:
case PARENT_STREAM_ID:
{
if (buffer.remaining() >= 4)
{
streamId = buffer.getInt();
streamId &= 0x7F_FF_FF_FF;
parentStreamId = buffer.getInt();
parentStreamId &= 0x7F_FF_FF_FF;
state = State.WEIGHT;
}
else
{
state = State.STREAM_ID_BYTES;
state = State.PARENT_STREAM_ID_BYTES;
cursor = 4;
}
break;
}
case STREAM_ID_BYTES:
case PARENT_STREAM_ID_BYTES:
{
int currByte = buffer.get() & 0xFF;
--cursor;
streamId += currByte << (8 * cursor);
parentStreamId += currByte << (8 * cursor);
if (cursor == 0)
{
streamId &= 0x7F_FF_FF_FF;
parentStreamId &= 0x7F_FF_FF_FF;
state = State.WEIGHT;
}
break;
}
case WEIGHT:
{
// SPEC: stream cannot depend on itself.
if (getStreamId() == parentStreamId)
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_priority_frame");
int weight = buffer.get() & 0xFF;
return onPriority(streamId, weight, exclusive);
return onPriority(parentStreamId, weight, exclusive);
}
default:
{
@ -111,9 +115,9 @@ public class PriorityBodyParser extends BodyParser
return false;
}
private boolean onPriority(int streamId, int weight, boolean exclusive)
private boolean onPriority(int parentStreamId, int weight, boolean exclusive)
{
PriorityFrame frame = new PriorityFrame(streamId, getStreamId(), weight, exclusive);
PriorityFrame frame = new PriorityFrame(getStreamId(), parentStreamId, weight, exclusive);
reset();
notifyPriority(frame);
return true;
@ -121,6 +125,6 @@ public class PriorityBodyParser extends BodyParser
private enum State
{
PREPARE, EXCLUSIVE, STREAM_ID, STREAM_ID_BYTES, WEIGHT
PREPARE, EXCLUSIVE, PARENT_STREAM_ID, PARENT_STREAM_ID_BYTES, WEIGHT
}
}

View File

@ -50,7 +50,7 @@ public class PriorityGenerateParseTest
}, 4096, 8192);
int streamId = 13;
int dependentStreamId = 17;
int parentStreamId = 17;
int weight = 3;
boolean exclusive = true;
@ -58,7 +58,7 @@ public class PriorityGenerateParseTest
for (int i = 0; i < 2; ++i)
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generatePriority(lease, streamId, dependentStreamId, weight, exclusive);
generator.generatePriority(lease, streamId, parentStreamId, weight, exclusive);
frames.clear();
for (ByteBuffer buffer : lease.getByteBuffers())
@ -73,7 +73,7 @@ public class PriorityGenerateParseTest
Assert.assertEquals(1, frames.size());
PriorityFrame frame = frames.get(0);
Assert.assertEquals(streamId, frame.getStreamId());
Assert.assertEquals(dependentStreamId, frame.getDependentStreamId());
Assert.assertEquals(parentStreamId, frame.getParentStreamId());
Assert.assertEquals(weight, frame.getWeight());
Assert.assertEquals(exclusive, frame.isExclusive());
}
@ -94,12 +94,12 @@ public class PriorityGenerateParseTest
}, 4096, 8192);
int streamId = 13;
int dependentStreamId = 17;
int parentStreamId = 17;
int weight = 3;
boolean exclusive = true;
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generatePriority(lease, streamId, dependentStreamId, weight, exclusive);
generator.generatePriority(lease, streamId, parentStreamId, weight, exclusive);
for (ByteBuffer buffer : lease.getByteBuffers())
{
@ -112,7 +112,7 @@ public class PriorityGenerateParseTest
Assert.assertEquals(1, frames.size());
PriorityFrame frame = frames.get(0);
Assert.assertEquals(streamId, frame.getStreamId());
Assert.assertEquals(dependentStreamId, frame.getDependentStreamId());
Assert.assertEquals(parentStreamId, frame.getParentStreamId());
Assert.assertEquals(weight, frame.getWeight());
Assert.assertEquals(exclusive, frame.isExclusive());
}