First take at implementing the HttpClientTransport for HTTP2.

This commit is contained in:
Simone Bordet 2015-02-05 13:17:03 +01:00
parent 96132dbe45
commit 02b5732720
50 changed files with 1428 additions and 211 deletions

View File

@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.io.ByteBufferPool;
@ -85,6 +85,11 @@ public class HTTP2Client extends ContainerLifeCycle
}
public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
{
connect(sslContextFactory, address, listener, promise, new HashMap<String, Object>());
}
public void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
try
{
@ -92,7 +97,6 @@ public class HTTP2Client extends ContainerLifeCycle
channel.socket().setTcpNoDelay(true);
channel.configureBlocking(false);
Map<String, Object> context = new HashMap<>();
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
@ -115,7 +119,7 @@ public class HTTP2Client extends ContainerLifeCycle
private void closeConnections()
{
for (ISession session : sessions)
session.close(ErrorCodes.NO_ERROR, null, Callback.Adapter.INSTANCE);
session.close(ErrorCode.NO_ERROR.code, null, Callback.Adapter.INSTANCE);
sessions.clear();
}

View File

@ -55,7 +55,6 @@ public class HTTP2ClientSession extends HTTP2Session
}
else
{
stream.updateClose(frame.isEndStream(), false);
stream.process(frame, Callback.Adapter.INSTANCE);
notifyHeaders(stream, frame);
if (stream.isClosed())
@ -96,7 +95,6 @@ public class HTTP2ClientSession extends HTTP2Session
else
{
IStream pushStream = createRemoteStream(pushStreamId);
pushStream.updateClose(true, true);
pushStream.process(frame, Callback.Adapter.INSTANCE);
Stream.Listener listener = notifyPush(stream, pushStream, frame);
pushStream.setListener(listener);

View File

@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.ISession;
@ -636,7 +636,7 @@ public class FlowControlTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (frame.getError() == ErrorCodes.FLOW_CONTROL_ERROR)
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
closeLatch.countDown();
}
});
@ -694,7 +694,7 @@ public class FlowControlTest extends AbstractTest
@Override
public void onClose(Session session, GoAwayFrame frame)
{
if (frame.getError() == ErrorCodes.FLOW_CONTROL_ERROR)
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
closeLatch.countDown();
}
});

View File

@ -377,7 +377,7 @@ public class IdleTimeoutTest extends AbstractTest
}
@Override
public void onFailure(Stream stream, Throwable x)
public void onTimeout(Stream stream, Throwable x)
{
assertThat(x, instanceOf(TimeoutException.class));
timeoutLatch.countDown();
@ -407,7 +407,7 @@ public class IdleTimeoutTest extends AbstractTest
return new Stream.Listener.Adapter()
{
@Override
public void onFailure(Stream stream, Throwable x)
public void onTimeout(Stream stream, Throwable x)
{
timeoutLatch.countDown();
}
@ -416,18 +416,18 @@ public class IdleTimeoutTest extends AbstractTest
});
final CountDownLatch resetLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", new HttpFields());
// Stream does not end here, but we won't send any DATA frame.
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onReset(Session session, ResetFrame frame)
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
MetaData.Request metaData = newRequest("GET", new HttpFields());
// Stream does not end here, but we won't send any DATA frame.
HeadersFrame requestFrame = new HeadersFrame(0, metaData, null, false);
session.newStream(requestFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());
Assert.assertTrue(timeoutLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
@ -451,7 +451,7 @@ public class IdleTimeoutTest extends AbstractTest
return new Stream.Listener.Adapter()
{
@Override
public void onFailure(Stream stream, Throwable x)
public void onTimeout(Stream stream, Throwable x)
{
timeoutLatch.countDown();
}

View File

@ -32,7 +32,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
@ -201,7 +201,7 @@ public class PushCacheFilterTest extends AbstractTest
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
// Reset the stream as soon as we see the push.
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCodes.REFUSED_STREAM_ERROR);
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code);
stream.reset(resetFrame, Callback.Adapter.INSTANCE);
return new Adapter()
{

View File

@ -149,14 +149,7 @@ public class StreamCountTest extends AbstractTest
}
});
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onReset(Session session, ResetFrame frame)
{
resetLatch.countDown();
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
@ -177,7 +170,14 @@ public class StreamCountTest extends AbstractTest
HeadersFrame frame2 = new HeadersFrame(3, metaData, null, false);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter());
session.newStream(frame2, streamPromise2, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
streamPromise2.get(5, TimeUnit.SECONDS);
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));

View File

@ -35,7 +35,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -62,7 +62,7 @@ public class StreamResetTest extends AbstractTest
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(requestFrame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCodes.CANCEL_STREAM_ERROR);
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code);
FutureCallback resetCallback = new FutureCallback();
stream.reset(resetFrame, resetCallback);
resetCallback.get(5, TimeUnit.SECONDS);
@ -78,13 +78,19 @@ public class StreamResetTest extends AbstractTest
startServer(new ServerSessionListener.Adapter()
{
@Override
public void onReset(Session session, ResetFrame frame)
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
Stream stream = session.getStream(frame.getStreamId());
Assert.assertNotNull(stream);
Assert.assertTrue(stream.isReset());
streamRef.set(stream);
resetLatch.countDown();
return new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
Assert.assertNotNull(stream);
Assert.assertTrue(stream.isReset());
streamRef.set(stream);
resetLatch.countDown();
}
};
}
});
@ -94,7 +100,7 @@ public class StreamResetTest extends AbstractTest
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(requestFrame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCodes.CANCEL_STREAM_ERROR);
ResetFrame resetFrame = new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code);
stream.reset(resetFrame, Callback.Adapter.INSTANCE);
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
@ -129,22 +135,21 @@ public class StreamResetTest extends AbstractTest
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.Adapter.INSTANCE);
serverDataLatch.countDown();
}
};
}
@Override
public void onReset(Session session, ResetFrame frame)
{
Stream stream = session.getStream(frame.getStreamId());
// Simulate that there is pending data to send.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
public void onReset(Stream stream, ResetFrame frame)
{
serverResetLatch.countDown();
// Simulate that there is pending data to send.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
serverResetLatch.countDown();
}
});
}
});
};
}
});
@ -185,7 +190,7 @@ public class StreamResetTest extends AbstractTest
});
Stream stream2 = promise2.get(5, TimeUnit.SECONDS);
ResetFrame resetFrame = new ResetFrame(stream1.getId(), ErrorCodes.CANCEL_STREAM_ERROR);
ResetFrame resetFrame = new ResetFrame(stream1.getId(), ErrorCode.CANCEL_STREAM_ERROR.code);
stream1.reset(resetFrame, Callback.Adapter.INSTANCE);
Assert.assertTrue(serverResetLatch.await(5, TimeUnit.SECONDS));
@ -249,7 +254,7 @@ public class StreamResetTest extends AbstractTest
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE);
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
resetLatch.countDown();
}
});
@ -328,7 +333,7 @@ public class StreamResetTest extends AbstractTest
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE);
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
resetLatch.countDown();
}
});

View File

@ -0,0 +1,58 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
import java.util.HashMap;
import java.util.Map;
public enum ErrorCode
{
NO_ERROR(0),
PROTOCOL_ERROR(1),
INTERNAL_ERROR(2),
FLOW_CONTROL_ERROR(3),
SETTINGS_TIMEOUT_ERROR(4),
STREAM_CLOSED_ERROR(5),
FRAME_SIZE_ERROR(6),
REFUSED_STREAM_ERROR(7),
CANCEL_STREAM_ERROR(8),
COMPRESSION_ERROR(9),
HTTP_CONNECT_ERROR(10),
ENHANCE_YOUR_CALM_ERROR(11),
INADEQUATE_SECURITY_ERROR(12),
HTTP_1_1_REQUIRED_ERROR(13);
public final int code;
private ErrorCode(int code)
{
this.code = code;
Codes.codes.put(code, this);
}
public static ErrorCode from(int error)
{
return Codes.codes.get(error);
}
private static class Codes
{
private static final Map<Integer, ErrorCode> codes = new HashMap<>();
}
}

View File

@ -32,7 +32,7 @@ public interface FlowControl
public void onDataReceived(ISession session, IStream stream, int length);
public void onDataConsumed(IStream stream, int length);
public void onDataConsumed(ISession session, IStream stream, int length);
public void onDataSending(IStream stream, int length);

View File

@ -82,36 +82,42 @@ public class HTTP2FlowControl implements FlowControl
{
int oldSize = session.updateRecvWindow(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
LOG.debug("Data received, updated session recv window {} -> {} for {}", oldSize, oldSize - length, session);
if (stream != null)
{
oldSize = stream.updateRecvWindow(-length);
if (LOG.isDebugEnabled())
LOG.debug("Updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
LOG.debug("Data received, updated stream recv window {} -> {} for {}", oldSize, oldSize - length, stream);
}
}
@Override
public void onDataConsumed(IStream stream, int length)
public void onDataConsumed(ISession session, IStream stream, int length)
{
// This is the algorithm for flow control.
// This method is called when a whole flow controlled frame has been consumed.
// We currently send a WindowUpdate every time, even if the frame was very small.
// Other policies may send the WindowUpdate only upon reaching a threshold.
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increasing windows by {} for {}", length, stream);
if (length > 0)
{
ISession session = stream.getSession();
WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length);
session.updateRecvWindow(length);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased session recv window by {} for {}", length, session);
// Negative streamId allow for generation of bytes for both stream and session.
WindowUpdateFrame frame = new WindowUpdateFrame(-stream.getId(), length);
session.control(stream, Callback.Adapter.INSTANCE, frame, Frame.EMPTY_ARRAY);
Frame[] streamFrame = null;
if (stream != null)
{
streamFrame = new Frame[1];
streamFrame[0] = new WindowUpdateFrame(stream.getId(), length);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream);
}
session.control(stream, Callback.Adapter.INSTANCE, sessionFrame, streamFrame == null ? Frame.EMPTY_ARRAY : streamFrame);
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
@ -73,7 +72,7 @@ public class HTTP2Flusher extends IteratingCallback
}
}
public void prepend(Entry entry)
public boolean prepend(Entry entry)
{
boolean fail = false;
synchronized (this)
@ -91,9 +90,10 @@ public class HTTP2Flusher extends IteratingCallback
}
if (fail)
closed(entry, new ClosedChannelException());
return !fail;
}
public void append(Entry entry)
public boolean append(Entry entry)
{
boolean fail = false;
synchronized (this)
@ -111,6 +111,7 @@ public class HTTP2Flusher extends IteratingCallback
}
if (fail)
closed(entry, new ClosedChannelException());
return !fail;
}
public int getQueueSize()
@ -189,7 +190,7 @@ public class HTTP2Flusher extends IteratingCallback
--size;
// If the stream has been reset, don't send the frame.
if (stream != null && stream.isReset())
if (stream != null && stream.isReset() && !entry.isProtocol())
{
reset.add(entry);
continue;
@ -356,6 +357,21 @@ public class HTTP2Flusher extends IteratingCallback
callback.failed(x);
}
public boolean isProtocol()
{
switch (frame.getType())
{
case PRIORITY:
case RST_STREAM:
case GO_AWAY:
case WINDOW_UPDATE:
case DISCONNECT:
return true;
default:
return false;
}
}
@Override
public String toString()
{

View File

@ -49,6 +49,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.GatheringCallback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -155,20 +156,26 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (stream != null)
{
stream.updateClose(frame.isEndStream(), false);
if (getRecvWindow() < 0)
{
close(ErrorCodes.FLOW_CONTROL_ERROR, "session_window_exceeded", Callback.Adapter.INSTANCE);
close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.Adapter.INSTANCE);
return false;
}
boolean result = stream.process(frame, new Callback.Adapter()
boolean result = stream.process(frame, new Callback()
{
@Override
public void succeeded()
{
flowControl.onDataConsumed(stream, flowControlLength);
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
@Override
public void failed(Throwable x)
{
// Consume also in case of failures, to free the
// session flow control window for other streams.
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
});
@ -180,6 +187,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
// We must enlarge the session flow control window,
// otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength);
return false;
}
}
@ -202,8 +212,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
IStream stream = getStream(frame.getStreamId());
if (stream != null)
stream.process(frame, Callback.Adapter.INSTANCE);
notifyReset(this, frame);
else
notifyReset(this, frame);
if (stream != null)
removeStream(stream, false);
@ -236,7 +246,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// SPEC: check the value is sane.
if (value != 0 && value != 1)
{
onConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_settings_enable_push");
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_enable_push");
return false;
}
pushEnabled = value == 1;
@ -263,7 +273,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// SPEC: check the max frame size is sane.
if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH)
{
onConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_settings_max_frame_size");
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");
return false;
}
generator.setMaxFrameSize(value);
@ -400,6 +410,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{
int streamId = streamIds.getAndAdd(2);
@ -414,10 +425,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
stream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
flusher.append(entry);
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
flusher.iterate();
if (queued)
flusher.iterate();
}
@Override
@ -425,6 +437,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
boolean queued;
synchronized (this)
{
int streamId = streamIds.getAndAdd(2);
@ -436,10 +449,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
pushStream.updateClose(true, false);
ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
flusher.append(entry);
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
flusher.iterate();
if (queued)
flusher.iterate();
}
@Override
@ -530,11 +544,23 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
@Override
public void control(IStream stream, Callback callback, Frame frame, Frame... frames)
{
// We want to generate as late as possible to allow re-prioritization.
// We want to generate as late as possible to allow re-prioritization;
// generation will happen while processing the entries.
// The callback needs to be notified only when the last frame completes.
int length = frames.length;
frame(new ControlEntry(frame, stream, callback), length == 0);
for (int i = 1; i <= length; ++i)
frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
if (length == 0)
{
frame(new ControlEntry(frame, stream, callback), true);
}
else
{
callback = new GatheringCallback(callback, 1 + length);
frame(new ControlEntry(frame, stream, callback), false);
for (int i = 1; i <= length; ++i)
frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
}
}
@Override
@ -549,11 +575,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (LOG.isDebugEnabled())
LOG.debug("Sending {}", entry.frame);
// Ping frames are prepended to process them as soon as possible.
if (entry.frame.getType() == FrameType.PING)
flusher.prepend(entry);
else
flusher.append(entry);
if (flush)
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
if (queued && flush)
flusher.iterate();
}
@ -597,7 +620,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount >= maxCount)
{
reset(new ResetFrame(streamId, ErrorCodes.REFUSED_STREAM_ERROR), Callback.Adapter.INSTANCE);
reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
return null;
}
if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
@ -618,7 +641,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
else
{
close(ErrorCodes.PROTOCOL_ERROR, "duplicate_stream", Callback.Adapter.INSTANCE);
close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.Adapter.INSTANCE);
return null;
}
}
@ -788,7 +811,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
case NOT_CLOSED:
{
// Real idle timeout, just close.
close(ErrorCodes.NO_ERROR, "idle_timeout", Callback.Adapter.INSTANCE);
close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.Adapter.INSTANCE);
break;
}
case LOCALLY_CLOSED:

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
@ -48,7 +50,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream
private final ISession session;
private final int streamId;
private volatile Listener listener;
private volatile boolean reset;
private volatile boolean localReset;
private volatile boolean remoteReset;
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId)
{
@ -94,6 +97,9 @@ public class HTTP2Stream extends IdleTimeout implements IStream
public void reset(ResetFrame frame, Callback callback)
{
notIdle();
if (isReset())
return;
localReset = true;
session.control(this, callback, frame, Frame.EMPTY_ARRAY);
}
@ -118,7 +124,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
@Override
public boolean isReset()
{
return reset;
return localReset || remoteReset;
}
@Override
@ -127,6 +133,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream
return closeState.get() == CloseState.CLOSED;
}
private boolean isRemotelyClosed()
{
return closeState.get() == CloseState.REMOTELY_CLOSED;
}
@Override
public boolean isOpen()
{
@ -144,10 +155,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream
close();
// Tell the other peer that we timed out.
reset(new ResetFrame(getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE);
reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
// Notify the application.
notifyFailure(this, timeout);
notifyTimeout(this, timeout);
}
private ConcurrentMap<String, Object> attributes()
@ -207,40 +218,52 @@ public class HTTP2Stream extends IdleTimeout implements IStream
private boolean onHeaders(HeadersFrame frame, Callback callback)
{
// TODO: handle case where HEADERS after DATA.
updateClose(frame.isEndStream(), false);
callback.succeeded();
return false;
}
private boolean onData(DataFrame frame, Callback callback)
{
// TODO: handle cases where:
// TODO: A) stream already remotely close.
// TODO: B) DATA before HEADERS.
if (getRecvWindow() < 0)
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
session.close(ErrorCodes.FLOW_CONTROL_ERROR, "stream_window_exceeded", callback);
session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", callback);
return true;
}
else
// SPEC: remotely closed streams must be replied with a reset.
if (isRemotelyClosed())
{
notifyData(this, frame, callback);
return false;
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE);
callback.failed(new EOFException("stream_closed"));
return true;
}
if (isReset())
{
// Just drop the frame.
callback.failed(new IOException("stream_reset"));
return true;
}
updateClose(frame.isEndStream(), false);
notifyData(this, frame, callback);
return false;
}
private boolean onReset(ResetFrame frame, Callback callback)
{
reset = true;
remoteReset = true;
callback.succeeded();
notifyReset(this, frame);
return false;
}
private boolean onPush(PushPromiseFrame frame, Callback callback)
{
updateClose(true, true);
callback.succeeded();
return false;
}
@ -316,7 +339,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
onClose();
}
protected void notifyData(Stream stream, DataFrame frame, Callback callback)
private void notifyData(Stream stream, DataFrame frame, Callback callback)
{
final Listener listener = this.listener;
if (listener == null)
@ -331,14 +354,29 @@ public class HTTP2Stream extends IdleTimeout implements IStream
}
}
private void notifyFailure(Stream stream, Throwable failure)
private void notifyReset(Stream stream, ResetFrame frame)
{
final Listener listener = this.listener;
if (listener == null)
return;
try
{
listener.onReset(stream, frame);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
private void notifyTimeout(Stream stream, Throwable failure)
{
Listener listener = this.listener;
if (listener == null)
return;
try
{
listener.onFailure(stream, failure);
listener.onTimeout(stream, failure);
}
catch (Throwable x)
{
@ -350,6 +388,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream
public String toString()
{
return String.format("%s@%x{id=%d,sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
hashCode(), getId(), sendWindow, recvWindow, reset, closeState);
hashCode(), getId(), sendWindow, recvWindow, isReset(), closeState);
}
}

View File

@ -61,6 +61,7 @@ public interface Session
public void onClose(Session session, GoAwayFrame frame);
// TODO: how come this is not called ???
public void onFailure(Session session, Throwable failure);
public static class Adapter implements Session.Listener

View File

@ -63,7 +63,9 @@ public interface Stream
public void onData(Stream stream, DataFrame frame, Callback callback);
public void onFailure(Stream stream, Throwable x);
public void onReset(Stream stream, ResetFrame frame);
public void onTimeout(Stream stream, Throwable x);
public static class Adapter implements Listener
{
@ -85,7 +87,12 @@ public interface Stream
}
@Override
public void onFailure(Stream stream, Throwable x)
public void onReset(Stream stream, ResetFrame frame)
{
}
@Override
public void onTimeout(Stream stream, Throwable x)
{
}
}

View File

@ -43,6 +43,6 @@ public class WindowUpdateFrame extends Frame
@Override
public String toString()
{
return String.format("%s,delta=%d", super.toString(), windowDelta);
return String.format("%s#%d,delta=%d", super.toString(), streamId, windowDelta);
}
}

View File

@ -46,23 +46,6 @@ public class WindowUpdateGenerator extends FrameGenerator
if (windowUpdate < 0)
throw new IllegalArgumentException("Invalid window update: " + windowUpdate);
// A negative streamId means that we have to generate
// bytes for both the stream and session frames.
boolean both = false;
if (streamId < 0)
{
both = true;
streamId = -streamId;
}
if (both)
{
ByteBuffer header = generateHeader(lease, FrameType.WINDOW_UPDATE, 4, Flags.NONE, 0);
header.putInt(windowUpdate);
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
}
ByteBuffer header = generateHeader(lease, FrameType.WINDOW_UPDATE, 4, Flags.NONE, streamId);
header.putInt(windowUpdate);
BufferUtil.flipToFlush(header, 0);

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
@ -53,7 +53,7 @@ public abstract class BodyParser
protected boolean emptyBody(ByteBuffer buffer)
{
BufferUtil.clear(buffer);
notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_frame");
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_frame");
return false;
}

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.util.BufferUtil;
@ -50,7 +50,7 @@ public class DataBodyParser extends BodyParser
if (isPadding())
{
BufferUtil.clear(buffer);
notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_data_frame");
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_data_frame");
return false;
}
return onData(BufferUtil.EMPTY_BUFFER, false, 0);
@ -70,7 +70,7 @@ public class DataBodyParser extends BodyParser
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_data_frame");
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_data_frame");
}
length = getBodyLength();
if (isPadding())
@ -94,7 +94,7 @@ public class DataBodyParser extends BodyParser
if (length < 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_data_frame_padding");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_data_frame_padding");
}
break;
}

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.util.BufferUtil;
@ -72,7 +72,7 @@ public class GoAwayBodyParser extends BodyParser
if (length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_go_away_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
}
else
@ -91,7 +91,7 @@ public class GoAwayBodyParser extends BodyParser
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_go_away_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
if (cursor == 0)
{
@ -100,7 +100,7 @@ public class GoAwayBodyParser extends BodyParser
if (length == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_go_away_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
}
break;
@ -115,7 +115,7 @@ public class GoAwayBodyParser extends BodyParser
if (length < 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_go_away_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
if (length == 0)
{
@ -138,7 +138,7 @@ public class GoAwayBodyParser extends BodyParser
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_go_away_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_go_away_frame");
}
if (cursor == 0)
{

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PriorityFrame;
@ -78,14 +78,14 @@ public class HeadersBodyParser extends BodyParser
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_headers_frame");
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
}
// For now we don't support HEADERS frames that don't have END_HEADERS.
if (!hasFlag(Flags.END_HEADERS))
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.INTERNAL_ERROR, "unsupported_headers_frame");
return notifyConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "unsupported_headers_frame");
}
length = getBodyLength();
@ -114,7 +114,7 @@ public class HeadersBodyParser extends BodyParser
if (length < 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_headers_frame_padding");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame_padding");
}
break;
}
@ -138,7 +138,7 @@ public class HeadersBodyParser extends BodyParser
if (length < 1)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_headers_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
}
}
else
@ -157,7 +157,7 @@ public class HeadersBodyParser extends BodyParser
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_headers_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
}
if (cursor == 0)
{
@ -166,7 +166,7 @@ public class HeadersBodyParser extends BodyParser
if (length < 1)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_headers_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_headers_frame");
}
}
break;

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
@ -94,7 +94,7 @@ public class Parser
LOG.debug("Parsing {} frame", FrameType.from(type));
if (type < 0 || type >= bodyParsers.length)
{
notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "unknown_frame_type_" + type);
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unknown_frame_type_" + type);
BufferUtil.clear(buffer);
return false;
}
@ -152,7 +152,7 @@ public class Parser
{
if (LOG.isDebugEnabled())
LOG.debug(x);
notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "parser_error");
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "parser_error");
BufferUtil.clear(buffer);
return false;
}

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.util.BufferUtil;
@ -56,13 +56,13 @@ public class PingBodyParser extends BodyParser
if (getStreamId() != 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_ping_frame");
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_ping_frame");
}
// SPEC: wrong body length is treated as connection error.
if (getBodyLength() != 8)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_ping_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_ping_frame");
}
state = State.PAYLOAD;
break;

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
@ -45,7 +45,7 @@ public class PrefaceParser
int currByte = buffer.get();
if (currByte != PrefaceFrame.PREFACE_BYTES[cursor])
{
notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_preface");
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_preface");
BufferUtil.clear(buffer);
return false;
}

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.util.BufferUtil;
@ -57,13 +57,13 @@ public class PriorityBodyParser extends BodyParser
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_priority_frame");
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_priority_frame");
}
int length = getBodyLength();
if (length != 5)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_priority_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_priority_frame");
}
state = State.EXCLUSIVE;
break;

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.util.BufferUtil;
@ -64,14 +64,14 @@ public class PushPromiseBodyParser extends BodyParser
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_push_promise_frame");
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_push_promise_frame");
}
// For now we don't support PUSH_PROMISE frames that don't have END_HEADERS.
if (!hasFlag(Flags.END_HEADERS))
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.INTERNAL_ERROR, "unsupported_push_promise_frame");
return notifyConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "unsupported_push_promise_frame");
}
length = getBodyLength();
@ -95,7 +95,7 @@ public class PushPromiseBodyParser extends BodyParser
if (length < 4)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_push_promise_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_push_promise_frame");
}
break;
}
@ -125,7 +125,7 @@ public class PushPromiseBodyParser extends BodyParser
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_push_promise_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_push_promise_frame");
}
if (cursor == 0)
{

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.BufferUtil;
@ -55,13 +55,13 @@ public class ResetBodyParser extends BodyParser
if (getStreamId() == 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_rst_stream_frame");
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_rst_stream_frame");
}
int length = getBodyLength();
if (length != 4)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_rst_stream_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_rst_stream_frame");
}
state = State.ERROR;
break;

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -76,7 +76,7 @@ public class ServerParser extends Parser
catch (Throwable x)
{
LOG.debug(x);
notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "parser_error");
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "parser_error");
return false;
}
}

View File

@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
@ -73,7 +73,7 @@ public class SettingsBodyParser extends BodyParser
if (getStreamId() != 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "invalid_settings_frame");
return notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_frame");
}
length = getBodyLength();
settings = new HashMap<>();
@ -90,7 +90,7 @@ public class SettingsBodyParser extends BodyParser
if (length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_settings_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
}
}
else
@ -110,7 +110,7 @@ public class SettingsBodyParser extends BodyParser
if (length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_settings_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
}
if (cursor == 0)
{
@ -150,7 +150,7 @@ public class SettingsBodyParser extends BodyParser
if (cursor > 0 && length <= 0)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_settings_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_settings_frame");
}
if (cursor == 0)
{

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.BufferUtil;
@ -55,7 +55,7 @@ public class WindowUpdateBodyParser extends BodyParser
if (length != 4)
{
BufferUtil.clear(buffer);
return notifyConnectionFailure(ErrorCodes.FRAME_SIZE_ERROR, "invalid_window_update_frame");
return notifyConnectionFailure(ErrorCode.FRAME_SIZE_ERROR.code, "invalid_window_update_frame");
}
state = State.WINDOW_DELTA;
break;

View File

@ -26,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.generator.HeaderGenerator;
import org.eclipse.jetty.http2.generator.SettingsGenerator;
import org.eclipse.jetty.http2.parser.Parser;
@ -133,7 +133,7 @@ public class SettingsGenerateParseTest
}
}
Assert.assertEquals(ErrorCodes.FRAME_SIZE_ERROR, errorRef.get());
Assert.assertEquals(ErrorCode.FRAME_SIZE_ERROR.code, errorRef.get());
}
@Test

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-parent</artifactId>
<version>9.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>http2-http-client-transport</artifactId>
<name>Jetty :: HTTP2 :: HTTP Client Transport</name>
<properties>
<bundle-symbolic-name>${project.groupId}.client.http</bundle-symbolic-name>
</properties>
<build>
</build>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,83 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
public class HttpChannelOverHTTP2 extends HttpChannel
{
private final HttpConnectionOverHTTP2 connection;
private final Session session;
private final HttpSenderOverHTTP2 sender;
private final HttpReceiverOverHTTP2 receiver;
public HttpChannelOverHTTP2(HttpDestination destination, HttpConnectionOverHTTP2 connection, Session session)
{
super(destination);
this.connection = connection;
this.session = session;
this.sender = new HttpSenderOverHTTP2(this);
this.receiver = new HttpReceiverOverHTTP2(this);
}
public Session getSession()
{
return session;
}
public Stream.Listener getStreamListener()
{
return receiver;
}
public void send()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
sender.send(exchange);
}
@Override
public void proceed(HttpExchange exchange, Throwable failure)
{
sender.proceed(exchange, failure);
}
@Override
public boolean abort(Throwable cause)
{
boolean sendAborted = sender.abort(cause);
boolean receiveAborted = receiver.abort(cause);
// Variables cannot be inlined, otherwise abort
// calls may not be executed due to short-circuit.
return sendAborted || receiveAborted;
}
@Override
public void exchangeTerminated(Result result)
{
super.exchangeTerminated(result);
connection.release(this);
}
}

View File

@ -0,0 +1,98 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
public class HttpClientTransportOverHTTP2 implements HttpClientTransport
{
private final HTTP2Client client;
// private final ClientConnectionFactory connectionFactory;
private HttpClient httpClient;
public HttpClientTransportOverHTTP2(HTTP2Client client)
{
this.client = client;
}
@Override
public void setHttpClient(HttpClient client)
{
httpClient = client;
}
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP2(httpClient, origin);
}
@Override
public void connect(SocketAddress address, Map<String, Object> context)
{
final HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
final Promise<Connection> connection = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
Session.Listener listener = new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
destination.abort(failure);
}
};
final Promise<Session> promise = new Promise<Session>()
{
@Override
public void succeeded(Session result)
{
connection.succeeded(new HttpConnectionOverHTTP2(destination, result));
}
@Override
public void failed(Throwable failure)
{
connection.failed(failure);
}
};
client.connect(httpClient.getSslContextFactory(), (InetSocketAddress)address, listener, promise, context);
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
final HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
}

View File

@ -0,0 +1,80 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import java.nio.channels.AsynchronousCloseException;
import java.util.Set;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentHashSet;
public class HttpConnectionOverHTTP2 extends HttpConnection
{
private final Set<HttpChannel> channels = new ConcurrentHashSet<>();
private final Session session;
public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{
super(destination);
this.session = session;
}
@Override
protected void send(HttpExchange exchange)
{
normalizeRequest(exchange.getRequest());
// One connection maps to N channels, so for each exchange we create a new channel.
HttpChannel channel = new HttpChannelOverHTTP2(getHttpDestination(), this, session);
channels.add(channel);
channel.associate(exchange);
channel.send();
}
protected void release(HttpChannel channel)
{
channels.remove(channel);
}
@Override
public void close()
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
session.close(ErrorCode.NO_ERROR.code, null, Callback.Adapter.INSTANCE);
abort(new AsynchronousCloseException());
}
private void abort(Throwable failure)
{
for (HttpChannel channel : channels)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
}
channels.clear();
}
}

View File

@ -16,21 +16,23 @@
// ========================================================================
//
package org.eclipse.jetty.http2;
package org.eclipse.jetty.http2.client.http;
public interface ErrorCodes
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
public class HttpDestinationOverHTTP2 extends MultiplexHttpDestination<HttpConnectionOverHTTP2>
{
public static final int NO_ERROR = 0;
public static final int PROTOCOL_ERROR = 1;
public static final int INTERNAL_ERROR = 2;
public static final int FLOW_CONTROL_ERROR = 3;
public static final int SETTINGS_TIMEOUT_ERROR = 4;
public static final int STREAM_CLOSED_ERROR = 5;
public static final int FRAME_SIZE_ERROR = 6;
public static final int REFUSED_STREAM_ERROR = 7;
public static final int CANCEL_STREAM_ERROR = 8;
public static final int COMPRESSION_ERROR = 9;
public static final int HTTP_CONNECT_ERROR = 10;
public static final int ENHANCE_YOUR_CALM_ERROR = 11;
public static final int INADEQUATE_SECURITY_ERROR = 12;
public HttpDestinationOverHTTP2(HttpClient client, Origin origin)
{
super(client, origin);
}
@Override
protected void send(HttpConnectionOverHTTP2 connection, HttpExchange exchange)
{
connection.send(exchange);
}
}

View File

@ -0,0 +1,118 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listener
{
public HttpReceiverOverHTTP2(HttpChannel channel)
{
super(channel);
}
@Override
protected HttpChannelOverHTTP2 getHttpChannel()
{
return (HttpChannelOverHTTP2)super.getHttpChannel();
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpResponse response = exchange.getResponse();
MetaData.Response metaData = (MetaData.Response)frame.getMetaData();
response.version(metaData.getVersion()).status(metaData.getStatus()).reason(metaData.getReason());
if (responseBegin(exchange))
{
HttpFields headers = metaData.getFields();
for (HttpField header : headers)
{
if (!responseHeader(exchange, header))
return;
}
if (responseHeaders(exchange))
{
if (frame.isEndStream())
responseSuccess(exchange);
}
}
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
// Not supported.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
return null;
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
if (responseContent(exchange, frame.getData(), callback))
{
if (frame.isEndStream())
responseSuccess(exchange);
}
}
@Override
public void onReset(Stream stream, ResetFrame frame)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
ErrorCode error = ErrorCode.from(frame.getError());
String reason = error == null ? "reset" : error.name().toLowerCase();
exchange.getRequest().abort(new IOException(reason));
}
@Override
public void onTimeout(Stream stream, Throwable failure)
{
responseFailure(failure);
}
}

View File

@ -0,0 +1,106 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client.http;
import org.eclipse.jetty.client.HttpContent;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
public class HttpSenderOverHTTP2 extends HttpSender
{
private Stream stream;
public HttpSenderOverHTTP2(HttpChannelOverHTTP2 channel)
{
super(channel);
}
@Override
protected HttpChannelOverHTTP2 getHttpChannel()
{
return (HttpChannelOverHTTP2)super.getHttpChannel();
}
@Override
protected void sendHeaders(HttpExchange exchange, final HttpContent content, final Callback callback)
{
final Request request = exchange.getRequest();
HttpURI uri = new HttpURI(request.getScheme(), request.getHost(), request.getPort(), request.getPath(), null, request.getQuery(), null);
MetaData.Request metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_2, request.getHeaders());
HeadersFrame headersFrame = new HeadersFrame(0, metaData, null, !content.hasContent());
HttpChannelOverHTTP2 channel = getHttpChannel();
Promise<Stream> promise = new Promise<Stream>()
{
@Override
public void succeeded(Stream stream)
{
HttpSenderOverHTTP2.this.stream = stream;
stream.setIdleTimeout(request.getIdleTimeout());
if (content.hasContent() && !expects100Continue(request))
{
if (content.advance())
{
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), content.isLast());
stream.data(dataFrame, callback);
return;
}
}
callback.succeeded();
}
@Override
public void failed(Throwable failure)
{
callback.failed(failure);
}
};
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
}
@Override
protected void sendContent(HttpExchange exchange, HttpContent content, Callback callback)
{
if (content.isConsumed())
{
callback.succeeded();
}
else
{
DataFrame frame = new DataFrame(stream.getId(), content.getByteBuffer(), content.isLast());
stream.data(frame, callback);
}
}
@Override
protected void reset()
{
super.reset();
stream = null;
}
}

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.http2.server;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
@ -30,6 +30,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.io.ByteBufferPool;
@ -129,7 +130,13 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
}
@Override
public void onFailure(Stream stream, Throwable x)
public void onReset(Stream stream, ResetFrame frame)
{
// TODO:
}
@Override
public void onTimeout(Stream stream, Throwable x)
{
// TODO
}
@ -137,7 +144,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
private void close(Stream stream, String reason)
{
final Session session = stream.getSession();
session.close(ErrorCodes.PROTOCOL_ERROR, reason, Callback.Adapter.INSTANCE);
session.close(ErrorCode.PROTOCOL_ERROR.code, reason, Callback.Adapter.INSTANCE);
}
}

View File

@ -22,7 +22,7 @@ import java.util.Collections;
import java.util.Map;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControl;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
@ -75,7 +75,6 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
IStream stream = createRemoteStream(frame.getStreamId());
if (stream != null)
{
stream.updateClose(frame.isEndStream(), false);
stream.process(frame, Callback.Adapter.INSTANCE);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
@ -86,7 +85,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
}
else
{
onConnectionFailure(ErrorCodes.INTERNAL_ERROR, "invalid_request");
onConnectionFailure(ErrorCode.INTERNAL_ERROR.code, "invalid_request");
}
return false;
}
@ -94,7 +93,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
@Override
public boolean onPushPromise(PushPromiseFrame frame)
{
onConnectionFailure(ErrorCodes.PROTOCOL_ERROR, "push_promise");
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "push_promise");
return false;
}

View File

@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
@ -31,6 +31,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -189,6 +190,16 @@ public class HttpTransportOverHTTP2 implements HttpTransport
@Override
public void onCompleted()
{
if (!stream.isClosed())
{
// If the stream is not closed, it is still reading the request content.
// Send a reset to the other end so that it stops sending data.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
// Now that this stream is reset, in-flight data frames will be consumed and discarded.
// Consume the existing queued data frames to avoid stalling the flow control.
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.getRequest().getHttpInput().consumeAll();
}
}
@Override
@ -196,8 +207,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} aborted", stream.getId());
if (!stream.isReset())
stream.reset(new ResetFrame(stream.getId(), ErrorCodes.INTERNAL_ERROR), Callback.Adapter.INSTANCE);
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.Adapter.INSTANCE);
}
private class CommitCallback implements Callback

View File

@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
@ -136,7 +136,7 @@ public class CloseTest extends AbstractServerTest
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
generator.control(lease, new GoAwayFrame(1, ErrorCodes.NO_ERROR, "OK".getBytes("UTF-8")));
generator.control(lease, new GoAwayFrame(1, ErrorCode.NO_ERROR.code, "OK".getBytes("UTF-8")));
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
@ -192,7 +192,7 @@ public class CloseTest extends AbstractServerTest
sessionRef.set(stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.Adapter.INSTANCE);
stream.getSession().close(ErrorCodes.NO_ERROR, "OK", Callback.Adapter.INSTANCE);
stream.getSession().close(ErrorCode.NO_ERROR.code, "OK", Callback.Adapter.INSTANCE);
return null;
}
});

View File

@ -33,7 +33,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCodes;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
@ -235,7 +235,7 @@ public class HTTP2ServerTest extends AbstractServerTest
@Override
public boolean onGoAway(GoAwayFrame frame)
{
Assert.assertEquals(ErrorCodes.FRAME_SIZE_ERROR, frame.getError());
Assert.assertEquals(ErrorCode.FRAME_SIZE_ERROR.code, frame.getError());
latch.countDown();
return false;
}
@ -272,7 +272,7 @@ public class HTTP2ServerTest extends AbstractServerTest
@Override
public boolean onGoAway(GoAwayFrame frame)
{
Assert.assertEquals(ErrorCodes.PROTOCOL_ERROR, frame.getError());
Assert.assertEquals(ErrorCode.PROTOCOL_ERROR.code, frame.getError());
latch.countDown();
return false;
}

View File

@ -17,6 +17,7 @@
<module>http2-common</module>
<module>http2-client</module>
<module>http2-server</module>
<module>http2-http-client-transport</module>
</modules>

View File

@ -0,0 +1,76 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicInteger;
public class GatheringCallback implements Callback
{
private final Callback callback;
private final AtomicInteger count;
public GatheringCallback(Callback callback, int count)
{
this.callback = callback;
this.count = new AtomicInteger(count);
}
@Override
public void succeeded()
{
// Forward success on the last success.
while (true)
{
int current = count.get();
// Already completed ?
if (current == 0)
return;
if (count.compareAndSet(current, current - 1))
{
if (current == 1)
{
callback.succeeded();
return;
}
}
}
}
@Override
public void failed(Throwable failure)
{
// Forward failure on the first failure.
while (true)
{
int current = count.get();
// Already completed ?
if (current == 0)
return;
if (count.compareAndSet(current, 0))
{
callback.failed(failure);
return;
}
}
}
}

View File

@ -49,5 +49,6 @@
<module>test-integration</module>
<module>test-quickstart</module>
<module>test-jmx</module>
<module>test-http-client-transport</module>
</modules>
</project>

View File

@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.eclipse.jetty.tests</groupId>
<artifactId>tests-parent</artifactId>
<version>9.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>test-http-client-transport</artifactId>
<packaging>jar</packaging>
<name>Test :: HTTP Client Transports</name>
<properties>
<bundle-symbolic-name>${project.groupId}.client.http</bundle-symbolic-name>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<!-- DO NOT DEPLOY (or Release) -->
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-http-client-transport</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,118 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public abstract class AbstractTest
{
private static final HTTP2Client http2Client;
static
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("h2-client");
http2Client = new HTTP2Client(clientThreads);
}
@Parameterized.Parameters(name = "{index}: mod:{0}")
public static List<Object[]> parameters() throws Exception
{
HttpConfiguration httpConfiguration = new HttpConfiguration();
return Arrays.asList(
new Object[]{new HttpClientTransportOverHTTP(), new HttpConnectionFactory(httpConfiguration)},
new Object[]{new HttpClientTransportOverHTTP2(http2Client), new HTTP2ServerConnectionFactory(httpConfiguration)}
);
}
@BeforeClass
public static void prepare() throws Exception
{
http2Client.start();
}
@AfterClass
public static void dispose() throws Exception
{
http2Client.stop();
}
@Rule
public final TestTracker tracker = new TestTracker();
private final HttpClientTransport httpClientTransport;
private final ConnectionFactory serverConnectionFactory;
protected Server server;
protected ServerConnector connector;
protected HttpClient client;
public AbstractTest(HttpClientTransport httpClientTransport, ConnectionFactory serverConnectionFactory)
{
this.httpClientTransport = httpClientTransport;
this.serverConnectionFactory = serverConnectionFactory;
}
public void start(Handler handler) throws Exception
{
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerConnector(server, serverConnectionFactory);
server.addConnector(connector);
server.setHandler(handler);
server.start();
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient(httpClientTransport, null);
client.setExecutor(clientThreads);
client.start();
}
@After
public void stop() throws Exception
{
client.stop();
server.stop();
}
}

View File

@ -0,0 +1,270 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientTest extends AbstractTest
{
public HttpClientTest(HttpClientTransport httpClientTransport, ConnectionFactory serverConnectionFactory)
{
super(httpClientTransport, serverConnectionFactory);
}
@Test
public void testRequestWithoutResponseContent() throws Exception
{
final int status = HttpStatus.NO_CONTENT_204;
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(status);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(status, response.getStatus());
Assert.assertEquals(0, response.getContent().length);
}
@Test
public void testRequestWithSmallResponseContent() throws Exception
{
testRequestWithResponseContent(1024);
}
@Test
public void testRequestWithLargeResponseContent() throws Exception
{
testRequestWithResponseContent(1024 * 1024);
}
private void testRequestWithResponseContent(int length) throws Exception
{
final byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(bytes);
}
});
org.eclipse.jetty.client.api.Request request = client.newRequest("localhost", connector.getLocalPort());
FutureResponseListener listener = new FutureResponseListener(request, length);
request.timeout(10, TimeUnit.SECONDS).send(listener);
ContentResponse response = listener.get();
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(bytes, response.getContent());
}
@Test
public void testRequestWithSmallResponseContentChunked() throws Exception
{
testRequestWithResponseContentChunked(512);
}
@Test
public void testRequestWithLargeResponseContentChunked() throws Exception
{
testRequestWithResponseContentChunked(512 * 512);
}
private void testRequestWithResponseContentChunked(int length) throws Exception
{
final byte[] chunk1 = new byte[length];
final byte[] chunk2 = new byte[length];
Random random = new Random();
random.nextBytes(chunk1);
random.nextBytes(chunk2);
byte[] bytes = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, bytes, 0, chunk1.length);
System.arraycopy(chunk2, 0, bytes, chunk1.length, chunk2.length);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
output.flush();
output.write(chunk2);
}
});
org.eclipse.jetty.client.api.Request request = client.newRequest("localhost", connector.getLocalPort());
FutureResponseListener listener = new FutureResponseListener(request, 2 * length);
request.timeout(10, TimeUnit.SECONDS).send(listener);
ContentResponse response = listener.get();
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(bytes, response.getContent());
}
@Test
public void testUploadZeroLengthWithoutResponseContent() throws Exception
{
testUploadWithoutResponseContent(0);
}
@Test
public void testUploadSmallWithoutResponseContent() throws Exception
{
testUploadWithoutResponseContent(1024);
}
@Test
public void testUploadLargeWithoutResponseContent() throws Exception
{
testUploadWithoutResponseContent(1024 * 1024);
}
private void testUploadWithoutResponseContent(int length) throws Exception
{
final byte[] bytes = new byte[length];
new Random().nextBytes(bytes);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
for (int i = 0; i < bytes.length; ++i)
Assert.assertEquals(bytes[i] & 0xFF, input.read());
Assert.assertEquals(-1, input.read());
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.content(new BytesContentProvider(bytes))
.timeout(15, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(0, response.getContent().length);
}
@Test
public void testUploadLargeWithExceptionThrownWhileReadingOnServer() throws Exception
{
final byte[] bytes = new byte[1024 * 1024];
new Random().nextBytes(bytes);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
byte[] buffer = new byte[512];
input.read(buffer);
throw new IOException();
}
});
try
{
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.content(new BytesContentProvider(bytes))
.timeout(555, TimeUnit.SECONDS)
.send();
Assert.assertEquals(500, response.getStatus());
}
catch (Exception e)
{
Thread.sleep(1000);
}
}
@Test
public void testUploadChunkedWithExceptionThrownWhileReadingOnServer() throws Exception
{
final byte[] chunk1 = new byte[512];
final byte[] chunk2 = new byte[512];
Random random = new Random();
random.nextBytes(chunk1);
random.nextBytes(chunk2);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
byte[] buffer = new byte[512];
input.read(buffer);
throw new IOException();
}
});
org.eclipse.jetty.client.api.Request request = client.newRequest("localhost", connector.getLocalPort());
DeferredContentProvider content = new DeferredContentProvider();
FutureResponseListener listener = new FutureResponseListener(request);
request.method(HttpMethod.POST)
.content(content)
.timeout(555, TimeUnit.SECONDS)
.send(listener);
content.offer(ByteBuffer.wrap(chunk1));
ContentResponse response = listener.get();
// content.offer(ByteBuffer.wrap(chunk2));
// content.close();
Assert.assertEquals(500, response.getStatus());
}
}

View File

@ -0,0 +1,6 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
org.eclipse.jetty.client.LEVEL=DEBUG
org.eclipse.jetty.http2.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
org.eclipse.jetty.http2.client.LEVEL=DEBUG