Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.
This commit is contained in:
commit
e2a493d7b0
|
@ -23,9 +23,14 @@ import java.net.URI;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.LongUnaryOperator;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
|
@ -35,7 +40,7 @@ import org.eclipse.jetty.http.HttpHeader;
|
|||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IteratingNestedCallback;
|
||||
import org.eclipse.jetty.util.MathUtils;
|
||||
import org.eclipse.jetty.util.component.Destroyable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
@ -71,9 +76,11 @@ public abstract class HttpReceiver
|
|||
|
||||
private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
|
||||
private final HttpChannel channel;
|
||||
private List<Response.AsyncContentListener> contentListeners;
|
||||
private ContentDecoder decoder;
|
||||
private ContentListeners contentListeners;
|
||||
private Decoder decoder;
|
||||
private Throwable failure;
|
||||
private long demand;
|
||||
private boolean stalled;
|
||||
|
||||
protected HttpReceiver(HttpChannel channel)
|
||||
{
|
||||
|
@ -85,6 +92,55 @@ public abstract class HttpReceiver
|
|||
return channel;
|
||||
}
|
||||
|
||||
void demand(long n)
|
||||
{
|
||||
if (n <= 0)
|
||||
throw new IllegalArgumentException("Invalid demand " + n);
|
||||
|
||||
boolean resume = false;
|
||||
synchronized (this)
|
||||
{
|
||||
demand = MathUtils.cappedAdd(demand, n);
|
||||
if (stalled)
|
||||
{
|
||||
stalled = false;
|
||||
resume = true;
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response demand={}/{}, resume={}", n, demand, resume);
|
||||
}
|
||||
|
||||
if (resume)
|
||||
{
|
||||
if (decoder != null)
|
||||
decoder.resume();
|
||||
else
|
||||
receive();
|
||||
}
|
||||
}
|
||||
|
||||
private long demand()
|
||||
{
|
||||
return demand(LongUnaryOperator.identity());
|
||||
}
|
||||
|
||||
private long demand(LongUnaryOperator operator)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return demand = operator.applyAsLong(demand);
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean hasDemandOrStall()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
stalled = demand <= 0;
|
||||
return !stalled;
|
||||
}
|
||||
}
|
||||
|
||||
protected HttpExchange getHttpExchange()
|
||||
{
|
||||
return channel.getHttpExchange();
|
||||
|
@ -100,6 +156,10 @@ public abstract class HttpReceiver
|
|||
return responseState.get() == ResponseState.FAILURE;
|
||||
}
|
||||
|
||||
protected void receive()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to be invoked when the response status code is available.
|
||||
* <p>
|
||||
|
@ -116,13 +176,12 @@ public abstract class HttpReceiver
|
|||
if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
|
||||
return false;
|
||||
|
||||
final HttpConversation conversation = exchange.getConversation();
|
||||
final HttpResponse response = exchange.getResponse();
|
||||
HttpConversation conversation = exchange.getConversation();
|
||||
HttpResponse response = exchange.getResponse();
|
||||
// Probe the protocol handlers
|
||||
final HttpDestination destination = getHttpDestination();
|
||||
final HttpClient client = destination.getHttpClient();
|
||||
final ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
|
||||
|
||||
HttpDestination destination = getHttpDestination();
|
||||
HttpClient client = destination.getHttpClient();
|
||||
ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
|
||||
Response.Listener handlerListener = null;
|
||||
if (protocolHandler != null)
|
||||
{
|
||||
|
@ -241,23 +300,17 @@ public abstract class HttpReceiver
|
|||
*/
|
||||
protected boolean responseHeaders(HttpExchange exchange)
|
||||
{
|
||||
out:
|
||||
while (true)
|
||||
{
|
||||
ResponseState current = responseState.get();
|
||||
switch (current)
|
||||
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
|
||||
{
|
||||
case BEGIN:
|
||||
case HEADER:
|
||||
{
|
||||
if (updateResponseState(current, ResponseState.TRANSIENT))
|
||||
break out;
|
||||
if (updateResponseState(current, ResponseState.TRANSIENT))
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,29 +320,35 @@ public abstract class HttpReceiver
|
|||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
List<Response.ResponseListener> responseListeners = exchange.getConversation().getResponseListeners();
|
||||
notifier.notifyHeaders(responseListeners, response);
|
||||
contentListeners = responseListeners.stream()
|
||||
.filter(Response.AsyncContentListener.class::isInstance)
|
||||
.map(Response.AsyncContentListener.class::cast)
|
||||
.collect(Collectors.toList());
|
||||
contentListeners = new ContentListeners(responseListeners);
|
||||
contentListeners.notifyBeforeContent(response);
|
||||
|
||||
List<String> contentEncodings = response.getHeaders().getCSV(HttpHeader.CONTENT_ENCODING.asString(), false);
|
||||
if (contentEncodings != null && !contentEncodings.isEmpty())
|
||||
if (!contentListeners.isEmpty())
|
||||
{
|
||||
for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
|
||||
List<String> contentEncodings = response.getHeaders().getCSV(HttpHeader.CONTENT_ENCODING.asString(), false);
|
||||
if (contentEncodings != null && !contentEncodings.isEmpty())
|
||||
{
|
||||
for (String encoding : contentEncodings)
|
||||
for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
|
||||
{
|
||||
if (factory.getEncoding().equalsIgnoreCase(encoding))
|
||||
for (String encoding : contentEncodings)
|
||||
{
|
||||
this.decoder = factory.newContentDecoder();
|
||||
break;
|
||||
if (factory.getEncoding().equalsIgnoreCase(encoding))
|
||||
{
|
||||
decoder = new Decoder(response, factory.newContentDecoder());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
|
||||
return true;
|
||||
{
|
||||
boolean hasDemand = hasDemandOrStall();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response headers {}, hasDemand={}", response, hasDemand);
|
||||
return hasDemand;
|
||||
}
|
||||
|
||||
terminateResponse(exchange);
|
||||
return false;
|
||||
|
@ -307,45 +366,56 @@ public abstract class HttpReceiver
|
|||
*/
|
||||
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
|
||||
{
|
||||
out:
|
||||
while (true)
|
||||
{
|
||||
ResponseState current = responseState.get();
|
||||
switch (current)
|
||||
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
|
||||
{
|
||||
case HEADERS:
|
||||
case CONTENT:
|
||||
{
|
||||
if (updateResponseState(current, ResponseState.TRANSIENT))
|
||||
break out;
|
||||
if (updateResponseState(current, ResponseState.TRANSIENT))
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
callback.failed(new IllegalStateException("Invalid response state " + current));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
callback.failed(new IllegalStateException("Invalid response state " + current));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (demand() <= 0)
|
||||
{
|
||||
callback.failed(new IllegalStateException("No demand for response content"));
|
||||
return false;
|
||||
}
|
||||
|
||||
HttpResponse response = exchange.getResponse();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
|
||||
|
||||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
|
||||
ContentDecoder decoder = this.decoder;
|
||||
if (decoder == null)
|
||||
if (contentListeners.isEmpty())
|
||||
{
|
||||
notifier.notifyContent(response, buffer, callback, contentListeners);
|
||||
callback.succeeded();
|
||||
}
|
||||
else
|
||||
{
|
||||
new Decoder(notifier, response, decoder, buffer, callback).iterate();
|
||||
Decoder decoder = this.decoder;
|
||||
if (decoder == null)
|
||||
{
|
||||
contentListeners.notifyContent(response, buffer, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!decoder.decode(buffer, callback))
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
|
||||
return true;
|
||||
{
|
||||
boolean hasDemand = hasDemandOrStall();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
|
||||
return hasDemand;
|
||||
}
|
||||
|
||||
terminateResponse(exchange);
|
||||
return false;
|
||||
|
@ -386,8 +456,7 @@ public abstract class HttpReceiver
|
|||
|
||||
// Mark atomically the response as terminated, with
|
||||
// respect to concurrency between request and response.
|
||||
Result result = exchange.terminateResponse();
|
||||
terminateResponse(exchange, result);
|
||||
terminateResponse(exchange);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -451,7 +520,7 @@ public abstract class HttpReceiver
|
|||
}
|
||||
|
||||
/**
|
||||
* Resets this {@link HttpReceiver} state.
|
||||
* Resets the state of this HttpReceiver.
|
||||
* <p>
|
||||
* Subclasses should override (but remember to call {@code super}) to reset their own state.
|
||||
* <p>
|
||||
|
@ -459,13 +528,11 @@ public abstract class HttpReceiver
|
|||
*/
|
||||
protected void reset()
|
||||
{
|
||||
contentListeners = null;
|
||||
destroyDecoder(decoder);
|
||||
decoder = null;
|
||||
cleanup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Disposes this {@link HttpReceiver} state.
|
||||
* Disposes the state of this HttpReceiver.
|
||||
* <p>
|
||||
* Subclasses should override (but remember to call {@code super}) to dispose their own state.
|
||||
* <p>
|
||||
|
@ -473,41 +540,32 @@ public abstract class HttpReceiver
|
|||
*/
|
||||
protected void dispose()
|
||||
{
|
||||
destroyDecoder(decoder);
|
||||
decoder = null;
|
||||
cleanup();
|
||||
}
|
||||
|
||||
private static void destroyDecoder(ContentDecoder decoder)
|
||||
private void cleanup()
|
||||
{
|
||||
if (decoder instanceof Destroyable)
|
||||
{
|
||||
((Destroyable)decoder).destroy();
|
||||
}
|
||||
contentListeners = null;
|
||||
if (decoder != null)
|
||||
decoder.destroy();
|
||||
decoder = null;
|
||||
demand = 0;
|
||||
stalled = false;
|
||||
}
|
||||
|
||||
public boolean abort(HttpExchange exchange, Throwable failure)
|
||||
{
|
||||
// Update the state to avoid more response processing.
|
||||
boolean terminate;
|
||||
out:
|
||||
while (true)
|
||||
{
|
||||
ResponseState current = responseState.get();
|
||||
switch (current)
|
||||
if (current == ResponseState.FAILURE)
|
||||
return false;
|
||||
if (updateResponseState(current, ResponseState.FAILURE))
|
||||
{
|
||||
case FAILURE:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
default:
|
||||
{
|
||||
if (updateResponseState(current, ResponseState.FAILURE))
|
||||
{
|
||||
terminate = current != ResponseState.TRANSIENT;
|
||||
break out;
|
||||
}
|
||||
break;
|
||||
}
|
||||
terminate = current != ResponseState.TRANSIENT;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -526,8 +584,7 @@ public abstract class HttpReceiver
|
|||
{
|
||||
// Mark atomically the response as terminated, with
|
||||
// respect to concurrency between request and response.
|
||||
Result result = exchange.terminateResponse();
|
||||
terminateResponse(exchange, result);
|
||||
terminateResponse(exchange);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
|
@ -594,46 +651,171 @@ public abstract class HttpReceiver
|
|||
FAILURE
|
||||
}
|
||||
|
||||
private class Decoder extends IteratingNestedCallback
|
||||
/**
|
||||
* <p>Wraps a list of content listeners, notifies them about content events and
|
||||
* tracks individual listener demand to produce a global demand for content.</p>
|
||||
*/
|
||||
private class ContentListeners
|
||||
{
|
||||
private final Map<Object, Long> demands = new ConcurrentHashMap<>();
|
||||
private final LongConsumer demand = HttpReceiver.this::demand;
|
||||
private final List<Response.DemandedContentListener> listeners;
|
||||
|
||||
private ContentListeners(List<Response.ResponseListener> responseListeners)
|
||||
{
|
||||
listeners = responseListeners.stream()
|
||||
.filter(Response.DemandedContentListener.class::isInstance)
|
||||
.map(Response.DemandedContentListener.class::cast)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private boolean isEmpty()
|
||||
{
|
||||
return listeners.isEmpty();
|
||||
}
|
||||
|
||||
private void notifyBeforeContent(HttpResponse response)
|
||||
{
|
||||
if (isEmpty())
|
||||
{
|
||||
// If no listeners, we want to proceed and consume any content.
|
||||
demand.accept(1);
|
||||
}
|
||||
else
|
||||
{
|
||||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
notifier.notifyBeforeContent(response, this::demand, listeners);
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyContent(HttpResponse response, ByteBuffer buffer, Callback callback)
|
||||
{
|
||||
HttpReceiver.this.demand(d -> d - 1);
|
||||
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
|
||||
notifier.notifyContent(response, this::demand, buffer, callback, listeners);
|
||||
}
|
||||
|
||||
private void demand(Object context, long value)
|
||||
{
|
||||
if (listeners.size() > 1)
|
||||
accept(context, value);
|
||||
else
|
||||
demand.accept(value);
|
||||
}
|
||||
|
||||
private void accept(Object context, long value)
|
||||
{
|
||||
// Increment the demand for the given listener.
|
||||
demands.merge(context, value, MathUtils::cappedAdd);
|
||||
|
||||
// Check if we have demand from all listeners.
|
||||
if (demands.size() == listeners.size())
|
||||
{
|
||||
long minDemand = Long.MAX_VALUE;
|
||||
for (Long demand : demands.values())
|
||||
{
|
||||
if (demand < minDemand)
|
||||
minDemand = demand;
|
||||
}
|
||||
if (minDemand > 0)
|
||||
{
|
||||
// We are going to demand for minDemand content
|
||||
// chunks, so decrement the listener's demand by
|
||||
// minDemand and remove those that have no demand left.
|
||||
Iterator<Map.Entry<Object, Long>> iterator = demands.entrySet().iterator();
|
||||
while (iterator.hasNext())
|
||||
{
|
||||
Map.Entry<Object, Long> entry = iterator.next();
|
||||
long newValue = entry.getValue() - minDemand;
|
||||
if (newValue == 0)
|
||||
iterator.remove();
|
||||
else
|
||||
entry.setValue(newValue);
|
||||
}
|
||||
|
||||
// Demand more content chunks for all the listeners.
|
||||
demand.accept(minDemand);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Implements the decoding of content, producing decoded buffers only if there is demand for content.</p>
|
||||
*/
|
||||
private class Decoder implements Destroyable
|
||||
{
|
||||
private final ResponseNotifier notifier;
|
||||
private final HttpResponse response;
|
||||
private final ContentDecoder decoder;
|
||||
private final ByteBuffer buffer;
|
||||
private ByteBuffer decoded;
|
||||
private ByteBuffer encoded;
|
||||
private Callback callback;
|
||||
|
||||
public Decoder(ResponseNotifier notifier, HttpResponse response, ContentDecoder decoder, ByteBuffer buffer, Callback callback)
|
||||
private Decoder(HttpResponse response, ContentDecoder decoder)
|
||||
{
|
||||
super(callback);
|
||||
this.notifier = notifier;
|
||||
this.response = response;
|
||||
this.decoder = decoder;
|
||||
this.buffer = buffer;
|
||||
this.decoder = Objects.requireNonNull(decoder);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action process()
|
||||
private boolean decode(ByteBuffer encoded, Callback callback)
|
||||
{
|
||||
while (true)
|
||||
try
|
||||
{
|
||||
decoded = decoder.decode(buffer);
|
||||
if (decoded.hasRemaining())
|
||||
break;
|
||||
if (!buffer.hasRemaining())
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
|
||||
while (true)
|
||||
{
|
||||
ByteBuffer buffer;
|
||||
while (true)
|
||||
{
|
||||
buffer = decoder.decode(encoded);
|
||||
if (buffer.hasRemaining())
|
||||
break;
|
||||
if (!encoded.hasRemaining())
|
||||
{
|
||||
callback.succeeded();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
ByteBuffer decoded = buffer;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
|
||||
|
||||
notifier.notifyContent(response, decoded, this, contentListeners);
|
||||
return Action.SCHEDULED;
|
||||
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
if (demand() <= 0)
|
||||
{
|
||||
this.encoded = encoded;
|
||||
this.callback = callback;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private void resume()
|
||||
{
|
||||
ByteBuffer encoded;
|
||||
Callback callback;
|
||||
synchronized (this)
|
||||
{
|
||||
encoded = this.encoded;
|
||||
callback = this.callback;
|
||||
}
|
||||
if (decode(encoded, callback))
|
||||
receive();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
public void destroy()
|
||||
{
|
||||
decoder.release(decoded);
|
||||
super.succeeded();
|
||||
if (decoder instanceof Destroyable)
|
||||
((Destroyable)decoder).destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.eclipse.jetty.client.api.ContentProvider;
|
||||
|
@ -307,7 +308,7 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public List<HttpCookie> getCookies()
|
||||
{
|
||||
return cookies != null ? cookies : Collections.<HttpCookie>emptyList();
|
||||
return cookies != null ? cookies : Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -331,7 +332,7 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public Map<String, Object> getAttributes()
|
||||
{
|
||||
return attributes != null ? attributes : Collections.<String, Object>emptyMap();
|
||||
return attributes != null ? attributes : Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -347,7 +348,7 @@ public class HttpRequest implements Request
|
|||
// This method is invoked often in a request/response conversation,
|
||||
// so we avoid allocation if there is no need to filter.
|
||||
if (type == null || requestListeners == null)
|
||||
return requestListeners != null ? (List<T>)requestListeners : Collections.<T>emptyList();
|
||||
return requestListeners != null ? (List<T>)requestListeners : Collections.emptyList();
|
||||
|
||||
ArrayList<T> result = new ArrayList<>();
|
||||
for (RequestListener listener : requestListeners)
|
||||
|
@ -508,15 +509,16 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public Request onResponseContent(final Response.ContentListener listener)
|
||||
{
|
||||
this.responseListeners.add(new Response.AsyncContentListener()
|
||||
this.responseListeners.add(new Response.DemandedContentListener()
|
||||
{
|
||||
@Override
|
||||
public void onContent(Response response, ByteBuffer content, Callback callback)
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onContent(response, content);
|
||||
callback.succeeded();
|
||||
demand.accept(1);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
@ -530,12 +532,30 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public Request onResponseContentAsync(final Response.AsyncContentListener listener)
|
||||
{
|
||||
this.responseListeners.add(new Response.AsyncContentListener()
|
||||
this.responseListeners.add(new Response.DemandedContentListener()
|
||||
{
|
||||
@Override
|
||||
public void onContent(Response response, ByteBuffer content, Callback callback)
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
listener.onContent(response, content, callback);
|
||||
listener.onContent(response, content, Callback.from(() ->
|
||||
{
|
||||
callback.succeeded();
|
||||
demand.accept(1);
|
||||
}, callback::failed));
|
||||
}
|
||||
});
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Request onResponseContentDemanded(Response.DemandedContentListener listener)
|
||||
{
|
||||
this.responseListeners.add(new Response.DemandedContentListener()
|
||||
{
|
||||
@Override
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
listener.onContent(response, demand, content, callback);
|
||||
}
|
||||
});
|
||||
return this;
|
||||
|
@ -885,6 +905,6 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s %s %s]@%x", this.getClass().getSimpleName(), getMethod(), getPath(), getVersion(), hashCode());
|
||||
return String.format("%s[%s %s %s]@%x", getClass().getSimpleName(), getMethod(), getPath(), getVersion(), hashCode());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.eclipse.jetty.client;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.ObjLongConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
|
@ -103,36 +105,54 @@ public class ResponseNotifier
|
|||
}
|
||||
}
|
||||
|
||||
public void notifyContent(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer, Callback callback)
|
||||
public void notifyBeforeContent(Response response, ObjLongConsumer<Object> demand, List<Response.DemandedContentListener> contentListeners)
|
||||
{
|
||||
List<Response.AsyncContentListener> contentListeners = listeners.stream()
|
||||
.filter(Response.AsyncContentListener.class::isInstance)
|
||||
.map(Response.AsyncContentListener.class::cast)
|
||||
.collect(Collectors.toList());
|
||||
notifyContent(response, buffer, callback, contentListeners);
|
||||
for (Response.DemandedContentListener listener : contentListeners)
|
||||
{
|
||||
notifyBeforeContent(listener, response, d -> demand.accept(listener, d));
|
||||
}
|
||||
}
|
||||
|
||||
public void notifyContent(Response response, ByteBuffer buffer, Callback callback, List<Response.AsyncContentListener> contentListeners)
|
||||
private void notifyBeforeContent(Response.DemandedContentListener listener, Response response, LongConsumer demand)
|
||||
{
|
||||
if (contentListeners.isEmpty())
|
||||
try
|
||||
{
|
||||
listener.onBeforeContent(response, demand);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
public void notifyContent(Response response, ObjLongConsumer<Object> demand, ByteBuffer buffer, Callback callback, List<Response.DemandedContentListener> contentListeners)
|
||||
{
|
||||
int count = contentListeners.size();
|
||||
if (count == 0)
|
||||
{
|
||||
callback.succeeded();
|
||||
demand.accept(null, 1);
|
||||
}
|
||||
else if (count == 1)
|
||||
{
|
||||
Response.DemandedContentListener listener = contentListeners.get(0);
|
||||
notifyContent(listener, response, d -> demand.accept(listener, d), buffer.slice(), callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
CountingCallback counter = new CountingCallback(callback, contentListeners.size());
|
||||
for (Response.AsyncContentListener listener : contentListeners)
|
||||
callback = new CountingCallback(callback, count);
|
||||
for (Response.DemandedContentListener listener : contentListeners)
|
||||
{
|
||||
notifyContent(listener, response, buffer.slice(), counter);
|
||||
notifyContent(listener, response, d -> demand.accept(listener, d), buffer.slice(), callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyContent(Response.AsyncContentListener listener, Response response, ByteBuffer buffer, Callback callback)
|
||||
private void notifyContent(Response.DemandedContentListener listener, Response response, LongConsumer demand, ByteBuffer buffer, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onContent(response, buffer, callback);
|
||||
listener.onContent(response, demand, buffer, callback);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
@ -236,7 +256,15 @@ public class ResponseNotifier
|
|||
{
|
||||
byte[] content = ((ContentResponse)response).getContent();
|
||||
if (content != null && content.length > 0)
|
||||
notifyContent(listeners, response, ByteBuffer.wrap(content), Callback.NOOP);
|
||||
{
|
||||
List<Response.DemandedContentListener> contentListeners = listeners.stream()
|
||||
.filter(Response.DemandedContentListener.class::isInstance)
|
||||
.map(Response.DemandedContentListener.class::cast)
|
||||
.collect(Collectors.toList());
|
||||
ObjLongConsumer<Object> demand = (context, value) -> {};
|
||||
notifyBeforeContent(response, demand, contentListeners);
|
||||
notifyContent(response, demand, ByteBuffer.wrap(content), Callback.NOOP, contentListeners);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -370,6 +370,12 @@ public interface Request
|
|||
*/
|
||||
Request onResponseContentAsync(Response.AsyncContentListener listener);
|
||||
|
||||
/**
|
||||
* @param listener an asynchronous listener for response content events
|
||||
* @return this request object
|
||||
*/
|
||||
Request onResponseContentDemanded(Response.DemandedContentListener listener);
|
||||
|
||||
/**
|
||||
* @param listener a listener for response success event
|
||||
* @return this request object
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.eclipse.jetty.client.api;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.EventListener;
|
||||
import java.util.List;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
import org.eclipse.jetty.client.util.BufferingResponseListener;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
|
@ -109,7 +110,7 @@ public interface Response
|
|||
public interface HeaderListener extends ResponseListener
|
||||
{
|
||||
/**
|
||||
* Callback method invoked when a response header has been received,
|
||||
* Callback method invoked when a response header has been received and parsed,
|
||||
* returning whether the header should be processed or not.
|
||||
*
|
||||
* @param response the response containing the response line data and the headers so far
|
||||
|
@ -125,7 +126,7 @@ public interface Response
|
|||
public interface HeadersListener extends ResponseListener
|
||||
{
|
||||
/**
|
||||
* Callback method invoked when the response headers have been received and parsed.
|
||||
* Callback method invoked when all the response headers have been received and parsed.
|
||||
*
|
||||
* @param response the response containing the response line data and the headers
|
||||
*/
|
||||
|
@ -133,14 +134,16 @@ public interface Response
|
|||
}
|
||||
|
||||
/**
|
||||
* Listener for the response content events.
|
||||
* Synchronous listener for the response content events.
|
||||
*
|
||||
* @see AsyncContentListener
|
||||
*/
|
||||
public interface ContentListener extends ResponseListener
|
||||
{
|
||||
/**
|
||||
* Callback method invoked when the response content has been received.
|
||||
* This method may be invoked multiple times, and the {@code content} buffer must be consumed
|
||||
* before returning from this method.
|
||||
* Callback method invoked when the response content has been received, parsed and there is demand.
|
||||
* This method may be invoked multiple times, and the {@code content} buffer
|
||||
* must be consumed (or copied) before returning from this method.
|
||||
*
|
||||
* @param response the response containing the response line data and the headers
|
||||
* @param content the content bytes received
|
||||
|
@ -148,18 +151,60 @@ public interface Response
|
|||
public void onContent(Response response, ByteBuffer content);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronous listener for the response content events.
|
||||
*
|
||||
* @see DemandedContentListener
|
||||
*/
|
||||
public interface AsyncContentListener extends ResponseListener
|
||||
{
|
||||
/**
|
||||
* Callback method invoked asynchronously when the response content has been received.
|
||||
* Callback method invoked when the response content has been received, parsed and there is demand.
|
||||
* The {@code callback} object should be succeeded to signal that the
|
||||
* {@code content} buffer has been consumed and to demand more content.
|
||||
*
|
||||
* @param response the response containing the response line data and the headers
|
||||
* @param content the content bytes received
|
||||
* @param callback the callback to call when the content is consumed.
|
||||
* @param callback the callback to call when the content is consumed and to demand more content
|
||||
*/
|
||||
public void onContent(Response response, ByteBuffer content, Callback callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronous listener for the response content events.
|
||||
*/
|
||||
public interface DemandedContentListener extends ResponseListener
|
||||
{
|
||||
/**
|
||||
* Callback method invoked before response content events.
|
||||
* The {@code demand} object should be used to demand content, otherwise
|
||||
* the demand remains at zero (no demand) and
|
||||
* {@link #onContent(Response, LongConsumer, ByteBuffer, Callback)} will
|
||||
* not be invoked even if content has been received and parsed.
|
||||
*
|
||||
* @param response the response containing the response line data and the headers
|
||||
* @param demand the object that allows to demand content buffers
|
||||
*/
|
||||
public default void onBeforeContent(Response response, LongConsumer demand)
|
||||
{
|
||||
demand.accept(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback method invoked when the response content has been received.
|
||||
* The {@code callback} object should be succeeded to signal that the
|
||||
* {@code content} buffer has been consumed.
|
||||
* The {@code demand} object should be used to demand more content,
|
||||
* similarly to ReactiveStreams's {@code Subscription#request(long)}.
|
||||
*
|
||||
* @param response the response containing the response line data and the headers
|
||||
* @param demand the object that allows to demand content buffers
|
||||
* @param content the content bytes received
|
||||
* @param callback the callback to call when the content is consumed
|
||||
*/
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener for the response succeeded event.
|
||||
*/
|
||||
|
@ -212,7 +257,7 @@ public interface Response
|
|||
/**
|
||||
* Listener for all response events.
|
||||
*/
|
||||
public interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, AsyncContentListener, SuccessListener, FailureListener, CompleteListener
|
||||
public interface Listener extends BeginListener, HeaderListener, HeadersListener, ContentListener, AsyncContentListener, DemandedContentListener, SuccessListener, FailureListener, CompleteListener
|
||||
{
|
||||
/**
|
||||
* An empty implementation of {@link Listener}
|
||||
|
@ -254,6 +299,16 @@ public interface Response
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
onContent(response, content, Callback.from(() ->
|
||||
{
|
||||
callback.succeeded();
|
||||
demand.accept(1);
|
||||
}, callback::failed));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Response response)
|
||||
{
|
||||
|
|
|
@ -34,13 +34,14 @@ import org.eclipse.jetty.http.HttpStatus;
|
|||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.RetainableByteBuffer;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.CompletableCallback;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
||||
public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler
|
||||
{
|
||||
private final HttpParser parser;
|
||||
private ByteBuffer buffer;
|
||||
private RetainableByteBuffer networkBuffer;
|
||||
private boolean shutdown;
|
||||
private boolean complete;
|
||||
|
||||
|
@ -63,41 +64,66 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
|
|||
|
||||
protected ByteBuffer getResponseBuffer()
|
||||
{
|
||||
return buffer;
|
||||
return networkBuffer == null ? null : networkBuffer.getBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receive()
|
||||
{
|
||||
if (buffer == null)
|
||||
acquireBuffer();
|
||||
if (networkBuffer == null)
|
||||
acquireNetworkBuffer();
|
||||
process();
|
||||
}
|
||||
|
||||
private void acquireBuffer()
|
||||
private void acquireNetworkBuffer()
|
||||
{
|
||||
HttpClient client = getHttpDestination().getHttpClient();
|
||||
ByteBufferPool bufferPool = client.getByteBufferPool();
|
||||
buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
|
||||
networkBuffer = newNetworkBuffer();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Acquired {}", networkBuffer);
|
||||
}
|
||||
|
||||
private void releaseBuffer()
|
||||
private void reacquireNetworkBuffer()
|
||||
{
|
||||
if (buffer == null)
|
||||
RetainableByteBuffer currentBuffer = networkBuffer;
|
||||
if (currentBuffer == null)
|
||||
throw new IllegalStateException();
|
||||
if (BufferUtil.hasContent(buffer))
|
||||
|
||||
if (currentBuffer.hasRemaining())
|
||||
throw new IllegalStateException();
|
||||
|
||||
currentBuffer.release();
|
||||
networkBuffer = newNetworkBuffer();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Reacquired {} <- {}", currentBuffer, networkBuffer);
|
||||
}
|
||||
|
||||
private RetainableByteBuffer newNetworkBuffer()
|
||||
{
|
||||
HttpClient client = getHttpDestination().getHttpClient();
|
||||
ByteBufferPool bufferPool = client.getByteBufferPool();
|
||||
bufferPool.release(buffer);
|
||||
buffer = null;
|
||||
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), true);
|
||||
}
|
||||
|
||||
private void releaseNetworkBuffer()
|
||||
{
|
||||
if (networkBuffer == null)
|
||||
throw new IllegalStateException();
|
||||
if (networkBuffer.hasRemaining())
|
||||
throw new IllegalStateException();
|
||||
networkBuffer.release();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Released {}", networkBuffer);
|
||||
networkBuffer = null;
|
||||
}
|
||||
|
||||
protected ByteBuffer onUpgradeFrom()
|
||||
{
|
||||
if (BufferUtil.hasContent(buffer))
|
||||
if (networkBuffer.hasRemaining())
|
||||
{
|
||||
ByteBuffer upgradeBuffer = ByteBuffer.allocate(buffer.remaining());
|
||||
upgradeBuffer.put(buffer).flip();
|
||||
ByteBuffer upgradeBuffer = BufferUtil.allocate(networkBuffer.remaining());
|
||||
BufferUtil.clearToFill(upgradeBuffer);
|
||||
BufferUtil.put(networkBuffer.getBuffer(), upgradeBuffer);
|
||||
BufferUtil.flipToFlush(upgradeBuffer, 0);
|
||||
return upgradeBuffer;
|
||||
}
|
||||
return null;
|
||||
|
@ -111,39 +137,42 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
|
|||
EndPoint endPoint = connection.getEndPoint();
|
||||
while (true)
|
||||
{
|
||||
boolean upgraded = connection != endPoint.getConnection();
|
||||
// Always parse even empty buffers to advance the parser.
|
||||
boolean stopProcessing = parse();
|
||||
|
||||
// Connection may be closed or upgraded in a parser callback.
|
||||
boolean upgraded = connection != endPoint.getConnection();
|
||||
if (connection.isClosed() || upgraded)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed");
|
||||
releaseBuffer();
|
||||
releaseNetworkBuffer();
|
||||
return;
|
||||
}
|
||||
|
||||
if (parse())
|
||||
if (stopProcessing)
|
||||
return;
|
||||
|
||||
int read = endPoint.fill(buffer);
|
||||
if (networkBuffer.getReferences() > 1)
|
||||
reacquireNetworkBuffer();
|
||||
|
||||
int read = endPoint.fill(networkBuffer.getBuffer());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Read {} bytes {} from {}", read, BufferUtil.toDetailString(buffer), endPoint);
|
||||
LOG.debug("Read {} bytes in {} from {}", read, networkBuffer, endPoint);
|
||||
|
||||
if (read > 0)
|
||||
{
|
||||
connection.addBytesIn(read);
|
||||
if (parse())
|
||||
return;
|
||||
}
|
||||
else if (read == 0)
|
||||
{
|
||||
releaseBuffer();
|
||||
releaseNetworkBuffer();
|
||||
fillInterested();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
releaseBuffer();
|
||||
releaseNetworkBuffer();
|
||||
shutdown();
|
||||
return;
|
||||
}
|
||||
|
@ -153,9 +182,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(x);
|
||||
BufferUtil.clear(buffer);
|
||||
if (buffer != null)
|
||||
releaseBuffer();
|
||||
networkBuffer.clear();
|
||||
releaseNetworkBuffer();
|
||||
failAndClose(x);
|
||||
}
|
||||
}
|
||||
|
@ -169,20 +197,20 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
|
|||
{
|
||||
while (true)
|
||||
{
|
||||
boolean handle = parser.parseNext(buffer);
|
||||
boolean handle = parser.parseNext(networkBuffer.getBuffer());
|
||||
boolean complete = this.complete;
|
||||
this.complete = false;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Parsed {}, remaining {} {}", handle, BufferUtil.length(buffer), parser);
|
||||
LOG.debug("Parsed {}, remaining {} {}", handle, networkBuffer.remaining(), parser);
|
||||
if (handle)
|
||||
return true;
|
||||
if (!BufferUtil.hasContent(buffer))
|
||||
if (networkBuffer.isEmpty())
|
||||
return false;
|
||||
if (complete)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Discarding unexpected content after response: {}", BufferUtil.toDetailString(buffer));
|
||||
BufferUtil.clear(buffer);
|
||||
LOG.debug("Discarding unexpected content after response: {}", networkBuffer);
|
||||
networkBuffer.clear();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -276,26 +304,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
|
|||
if (exchange == null)
|
||||
return false;
|
||||
|
||||
CompletableCallback callback = new CompletableCallback()
|
||||
{
|
||||
@Override
|
||||
public void resume()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Content consumed asynchronously, resuming processing");
|
||||
process();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(Throwable x)
|
||||
{
|
||||
failAndClose(x);
|
||||
}
|
||||
};
|
||||
// Do not short circuit these calls.
|
||||
boolean proceed = responseContent(exchange, buffer, callback);
|
||||
boolean async = callback.tryComplete();
|
||||
return !proceed || async;
|
||||
networkBuffer.retain();
|
||||
return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, this::failAndClose));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.io.InterruptedIOException;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
|
@ -44,7 +44,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||
|
||||
public class HttpClientGZIPTest extends AbstractHttpClientServerTest
|
||||
|
@ -217,16 +217,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
});
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scenario.getScheme())
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isFailed())
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
assertThrows(ExecutionException.class, () ->
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scenario.getScheme())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
|
@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.LongConsumer;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -1117,6 +1118,13 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
counter.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
// Should not be invoked
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Response response)
|
||||
{
|
||||
|
|
|
@ -81,6 +81,11 @@ public class HttpChannelOverFCGI extends HttpChannel
|
|||
return sender.isFailed() || receiver.isFailed();
|
||||
}
|
||||
|
||||
void receive()
|
||||
{
|
||||
connection.process();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(HttpExchange exchange)
|
||||
{
|
||||
|
|
|
@ -49,8 +49,9 @@ import org.eclipse.jetty.http.HttpHeaderValue;
|
|||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.RetainableByteBuffer;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.CompletableCallback;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
@ -68,7 +69,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
private final Flusher flusher;
|
||||
private final Delegate delegate;
|
||||
private final ClientParser parser;
|
||||
private ByteBuffer buffer;
|
||||
private RetainableByteBuffer networkBuffer;
|
||||
|
||||
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
|
||||
{
|
||||
|
@ -114,69 +115,80 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
@Override
|
||||
public void onFillable()
|
||||
{
|
||||
buffer = acquireBuffer();
|
||||
process(buffer);
|
||||
networkBuffer = newNetworkBuffer();
|
||||
process();
|
||||
}
|
||||
|
||||
private ByteBuffer acquireBuffer()
|
||||
private void reacquireNetworkBuffer()
|
||||
{
|
||||
if (networkBuffer == null)
|
||||
throw new IllegalStateException();
|
||||
if (networkBuffer.hasRemaining())
|
||||
throw new IllegalStateException();
|
||||
networkBuffer.release();
|
||||
networkBuffer = newNetworkBuffer();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Reacquired {}", networkBuffer);
|
||||
}
|
||||
|
||||
private RetainableByteBuffer newNetworkBuffer()
|
||||
{
|
||||
HttpClient client = destination.getHttpClient();
|
||||
ByteBufferPool bufferPool = client.getByteBufferPool();
|
||||
return bufferPool.acquire(client.getResponseBufferSize(), true);
|
||||
// TODO: configure directness.
|
||||
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), true);
|
||||
}
|
||||
|
||||
private void releaseBuffer(ByteBuffer buffer)
|
||||
private void releaseNetworkBuffer()
|
||||
{
|
||||
@SuppressWarnings("ReferenceEquality")
|
||||
boolean isCurrentBuffer = (this.buffer == buffer);
|
||||
assert (isCurrentBuffer);
|
||||
HttpClient client = destination.getHttpClient();
|
||||
ByteBufferPool bufferPool = client.getByteBufferPool();
|
||||
bufferPool.release(buffer);
|
||||
this.buffer = null;
|
||||
if (networkBuffer == null)
|
||||
throw new IllegalStateException();
|
||||
if (networkBuffer.hasRemaining())
|
||||
throw new IllegalStateException();
|
||||
networkBuffer.release();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Released {}", networkBuffer);
|
||||
this.networkBuffer = null;
|
||||
}
|
||||
|
||||
private void process(ByteBuffer buffer)
|
||||
void process()
|
||||
{
|
||||
try
|
||||
{
|
||||
EndPoint endPoint = getEndPoint();
|
||||
boolean looping = false;
|
||||
while (true)
|
||||
{
|
||||
if (!looping && parse(buffer))
|
||||
if (parse(networkBuffer.getBuffer()))
|
||||
return;
|
||||
|
||||
int read = endPoint.fill(buffer);
|
||||
if (networkBuffer.getReferences() > 1)
|
||||
reacquireNetworkBuffer();
|
||||
|
||||
// The networkBuffer may have been reacquired.
|
||||
int read = endPoint.fill(networkBuffer.getBuffer());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Read {} bytes from {}", read, endPoint);
|
||||
|
||||
if (read > 0)
|
||||
if (read == 0)
|
||||
{
|
||||
if (parse(buffer))
|
||||
return;
|
||||
}
|
||||
else if (read == 0)
|
||||
{
|
||||
releaseBuffer(buffer);
|
||||
releaseNetworkBuffer();
|
||||
fillInterested();
|
||||
return;
|
||||
}
|
||||
else
|
||||
else if (read < 0)
|
||||
{
|
||||
releaseBuffer(buffer);
|
||||
releaseNetworkBuffer();
|
||||
shutdown();
|
||||
return;
|
||||
}
|
||||
|
||||
looping = true;
|
||||
}
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(x);
|
||||
releaseBuffer(buffer);
|
||||
networkBuffer.clear();
|
||||
releaseNetworkBuffer();
|
||||
close(x);
|
||||
}
|
||||
}
|
||||
|
@ -423,26 +435,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
HttpChannelOverFCGI channel = activeChannels.get(request);
|
||||
if (channel != null)
|
||||
{
|
||||
CompletableCallback callback = new CompletableCallback()
|
||||
{
|
||||
@Override
|
||||
public void resume()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Content consumed asynchronously, resuming processing");
|
||||
process(HttpConnectionOverFCGI.this.buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(Throwable x)
|
||||
{
|
||||
close(x);
|
||||
}
|
||||
};
|
||||
// Do not short circuit these calls.
|
||||
boolean proceed = channel.content(buffer, callback);
|
||||
boolean async = callback.tryComplete();
|
||||
return !proceed || async;
|
||||
networkBuffer.retain();
|
||||
return !channel.content(buffer, Callback.from(networkBuffer::release, HttpConnectionOverFCGI.this::close));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -33,6 +33,12 @@ public class HttpReceiverOverFCGI extends HttpReceiver
|
|||
super(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpChannelOverFCGI getHttpChannel()
|
||||
{
|
||||
return (HttpChannelOverFCGI)super.getHttpChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean responseBegin(HttpExchange exchange)
|
||||
{
|
||||
|
@ -68,4 +74,10 @@ public class HttpReceiverOverFCGI extends HttpReceiver
|
|||
{
|
||||
return super.responseFailure(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void receive()
|
||||
{
|
||||
getHttpChannel().receive();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -331,7 +331,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
|
|||
if (currentBuffer == null)
|
||||
throw new IllegalStateException();
|
||||
|
||||
if (currentBuffer.getBuffer().hasRemaining())
|
||||
if (currentBuffer.hasRemaining())
|
||||
throw new IllegalStateException();
|
||||
|
||||
currentBuffer.release();
|
||||
|
|
|
@ -62,7 +62,6 @@ import org.eclipse.jetty.util.Callback;
|
|||
import org.eclipse.jetty.util.CountingCallback;
|
||||
import org.eclipse.jetty.util.MathUtils;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.Retainable;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
|
@ -1482,7 +1481,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
}
|
||||
}
|
||||
|
||||
private class DataCallback extends Callback.Nested implements Retainable
|
||||
private class DataCallback extends Callback.Nested
|
||||
{
|
||||
private final IStream stream;
|
||||
private final int flowControlLength;
|
||||
|
@ -1494,14 +1493,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
this.flowControlLength = flowControlLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void retain()
|
||||
{
|
||||
Callback callback = getCallback();
|
||||
if (callback instanceof Retainable)
|
||||
((Retainable)callback).retain();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
|
|
|
@ -49,12 +49,10 @@ import org.eclipse.jetty.http2.frames.ResetFrame;
|
|||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.Retainable;
|
||||
|
||||
public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.Client
|
||||
{
|
||||
private final ContentNotifier contentNotifier = new ContentNotifier();
|
||||
private final ContentNotifier contentNotifier = new ContentNotifier(this);
|
||||
|
||||
public HttpReceiverOverHTTP2(HttpChannel channel)
|
||||
{
|
||||
|
@ -67,6 +65,12 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
|
|||
return (HttpChannelOverHTTP2)super.getHttpChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void receive()
|
||||
{
|
||||
contentNotifier.process(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reset()
|
||||
{
|
||||
|
@ -205,16 +209,33 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
|
|||
|
||||
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
|
||||
{
|
||||
contentNotifier.offer(new DataInfo(exchange, frame, callback));
|
||||
contentNotifier.iterate();
|
||||
contentNotifier.offer(exchange, frame, callback);
|
||||
}
|
||||
|
||||
private class ContentNotifier extends IteratingCallback implements Retainable
|
||||
private static class ContentNotifier
|
||||
{
|
||||
private final Queue<DataInfo> queue = new ArrayDeque<>();
|
||||
private final HttpReceiverOverHTTP2 receiver;
|
||||
private DataInfo dataInfo;
|
||||
private boolean active;
|
||||
private boolean resume;
|
||||
private boolean stalled;
|
||||
|
||||
private void offer(DataInfo dataInfo)
|
||||
private ContentNotifier(HttpReceiverOverHTTP2 receiver)
|
||||
{
|
||||
this.receiver = receiver;
|
||||
}
|
||||
|
||||
private void offer(HttpExchange exchange, DataFrame frame, Callback callback)
|
||||
{
|
||||
DataInfo dataInfo = new DataInfo(exchange, frame, callback);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queueing content {}", dataInfo);
|
||||
enqueue(dataInfo);
|
||||
process(false);
|
||||
}
|
||||
|
||||
private void enqueue(DataInfo dataInfo)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
|
@ -222,73 +243,125 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action process()
|
||||
private void process(boolean resume)
|
||||
{
|
||||
if (dataInfo != null)
|
||||
{
|
||||
dataInfo.callback.succeeded();
|
||||
if (dataInfo.frame.isEndStream())
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
// Allow only one thread at a time.
|
||||
if (active(resume))
|
||||
return;
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (dataInfo != null)
|
||||
{
|
||||
if (dataInfo.frame.isEndStream())
|
||||
{
|
||||
receiver.responseSuccess(dataInfo.exchange);
|
||||
// Return even if active, as reset() will be called later.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
dataInfo = queue.poll();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Dequeued content {}", dataInfo);
|
||||
if (dataInfo == null)
|
||||
{
|
||||
active = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ByteBuffer buffer = dataInfo.frame.getData();
|
||||
Callback callback = dataInfo.callback;
|
||||
if (buffer.hasRemaining())
|
||||
{
|
||||
boolean proceed = receiver.responseContent(dataInfo.exchange, buffer, Callback.from(callback::succeeded, x -> fail(callback, x)));
|
||||
if (!proceed)
|
||||
{
|
||||
// Should stall, unless just resumed.
|
||||
if (stall())
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean active(boolean resume)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
dataInfo = queue.poll();
|
||||
if (active)
|
||||
{
|
||||
if (resume)
|
||||
this.resume = true;
|
||||
return true;
|
||||
}
|
||||
if (stalled && !resume)
|
||||
return true;
|
||||
active = true;
|
||||
stalled = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean stall()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (resume)
|
||||
{
|
||||
resume = false;
|
||||
return false;
|
||||
}
|
||||
active = false;
|
||||
stalled = true;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private void reset()
|
||||
{
|
||||
dataInfo = null;
|
||||
synchronized (this)
|
||||
{
|
||||
queue.clear();
|
||||
active = false;
|
||||
resume = false;
|
||||
stalled = false;
|
||||
}
|
||||
}
|
||||
|
||||
private void fail(Callback callback, Throwable failure)
|
||||
{
|
||||
callback.failed(failure);
|
||||
receiver.responseFailure(failure);
|
||||
}
|
||||
|
||||
private static class DataInfo
|
||||
{
|
||||
private final HttpExchange exchange;
|
||||
private final DataFrame frame;
|
||||
private final Callback callback;
|
||||
|
||||
private DataInfo(HttpExchange exchange, DataFrame frame, Callback callback)
|
||||
{
|
||||
this.exchange = exchange;
|
||||
this.frame = frame;
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
if (dataInfo == null)
|
||||
return Action.IDLE;
|
||||
|
||||
ByteBuffer buffer = dataInfo.frame.getData();
|
||||
if (buffer.hasRemaining())
|
||||
responseContent(dataInfo.exchange, buffer, this);
|
||||
else
|
||||
succeeded();
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void retain()
|
||||
{
|
||||
Callback callback = dataInfo.callback;
|
||||
if (callback instanceof Retainable)
|
||||
((Retainable)callback).retain();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
responseSuccess(dataInfo.exchange);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteFailure(Throwable failure)
|
||||
{
|
||||
dataInfo.callback.failed(failure);
|
||||
responseFailure(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean reset()
|
||||
{
|
||||
queue.clear();
|
||||
dataInfo = null;
|
||||
return super.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private static class DataInfo
|
||||
{
|
||||
private final HttpExchange exchange;
|
||||
private final DataFrame frame;
|
||||
private final Callback callback;
|
||||
|
||||
private DataInfo(HttpExchange exchange, DataFrame frame, Callback callback)
|
||||
{
|
||||
this.exchange = exchange;
|
||||
this.frame = frame;
|
||||
this.callback = callback;
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), frame);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,14 +81,24 @@ public class RetainableByteBuffer implements Retainable
|
|||
return ref;
|
||||
}
|
||||
|
||||
public int remaining()
|
||||
{
|
||||
return buffer.remaining();
|
||||
}
|
||||
|
||||
public boolean hasRemaining()
|
||||
{
|
||||
return buffer.hasRemaining();
|
||||
return remaining() > 0;
|
||||
}
|
||||
|
||||
public boolean isEmpty()
|
||||
{
|
||||
return !buffer.hasRemaining();
|
||||
return !hasRemaining();
|
||||
}
|
||||
|
||||
public void clear()
|
||||
{
|
||||
BufferUtil.clear(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -2,7 +2,9 @@
|
|||
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
|
||||
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||
<New id="OpenIdConfiguration" class="org.eclipse.jetty.security.openid.OpenIdConfiguration">
|
||||
<Arg><Property name="jetty.openid.openIdProvider"/></Arg>
|
||||
<Arg><Property name="jetty.openid.provider" deprecated="jetty.openid.openIdProvider"/></Arg>
|
||||
<Arg><Property name="jetty.openid.provider.authorizationEndpoint"/></Arg>
|
||||
<Arg><Property name="jetty.openid.provider.tokenEndpoint"/></Arg>
|
||||
<Arg><Property name="jetty.openid.clientId"/></Arg>
|
||||
<Arg><Property name="jetty.openid.clientSecret"/></Arg>
|
||||
<Call name="addScopes">
|
||||
|
|
|
@ -18,11 +18,17 @@ etc/openid-baseloginservice.xml
|
|||
etc/jetty-openid.xml
|
||||
|
||||
[ini-template]
|
||||
## The OpenID Identity Provider
|
||||
# jetty.openid.openIdProvider=https://accounts.google.com/
|
||||
## The OpenID Identity Provider's issuer ID (the entire URL *before* ".well-known/openid-configuration")
|
||||
# jetty.openid.provider=https://id.example.com/~
|
||||
|
||||
## The OpenID Identity Provider's authorization endpoint (optional if the metadata of the OP is accessible)
|
||||
# jetty.openid.provider.authorizationEndpoint=https://id.example.com/authorization
|
||||
|
||||
## The OpenID Identity Provider's token endpoint (optional if the metadata of the OP is accessible)
|
||||
# jetty.openid.provider.tokenEndpoint=https://id.example.com/token
|
||||
|
||||
## The Client Identifier
|
||||
# jetty.openid.clientId=test1234.apps.googleusercontent.com
|
||||
# jetty.openid.clientId=test1234
|
||||
|
||||
## The Client Secret
|
||||
# jetty.openid.clientSecret=XT_Mafv_aUCGheuCaKY8P
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.ajax.JSON;
|
||||
|
@ -43,13 +44,11 @@ public class OpenIdConfiguration implements Serializable
|
|||
private static final long serialVersionUID = 2227941990601349102L;
|
||||
private static final String CONFIG_PATH = "/.well-known/openid-configuration";
|
||||
|
||||
private final String openIdProvider;
|
||||
private final String issuer;
|
||||
private final String authEndpoint;
|
||||
private final String tokenEndpoint;
|
||||
private final String clientId;
|
||||
private final String clientSecret;
|
||||
private final Map<String, Object> discoveryDocument;
|
||||
private final List<String> scopes = new ArrayList<>();
|
||||
|
||||
/**
|
||||
|
@ -60,10 +59,50 @@ public class OpenIdConfiguration implements Serializable
|
|||
*/
|
||||
public OpenIdConfiguration(String provider, String clientId, String clientSecret)
|
||||
{
|
||||
this.openIdProvider = provider;
|
||||
this(provider, null, null, clientId, clientSecret);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an OpenID configuration for a specific OIDC provider.
|
||||
* @param issuer The URL of the OpenID provider.
|
||||
* @param authorizationEndpoint the URL of the OpenID provider's authorization endpoint if configured.
|
||||
* @param tokenEndpoint the URL of the OpenID provider's token endpoint if configured.
|
||||
* @param clientId OAuth 2.0 Client Identifier valid at the Authorization Server.
|
||||
* @param clientSecret The client secret known only by the Client and the Authorization Server.
|
||||
*/
|
||||
public OpenIdConfiguration(String issuer, String authorizationEndpoint, String tokenEndpoint, String clientId, String clientSecret)
|
||||
{
|
||||
this.issuer = issuer;
|
||||
this.clientId = clientId;
|
||||
this.clientSecret = clientSecret;
|
||||
|
||||
if (issuer == null)
|
||||
throw new IllegalArgumentException("Provider was not configured");
|
||||
|
||||
if (tokenEndpoint == null || authorizationEndpoint == null)
|
||||
{
|
||||
Map<String, Object> discoveryDocument = fetchOpenIdConnectMetadata(issuer);
|
||||
|
||||
this.authEndpoint = (String)discoveryDocument.get("authorization_endpoint");
|
||||
if (this.authEndpoint == null)
|
||||
throw new IllegalArgumentException("authorization_endpoint");
|
||||
|
||||
this.tokenEndpoint = (String)discoveryDocument.get("token_endpoint");
|
||||
if (this.tokenEndpoint == null)
|
||||
throw new IllegalArgumentException("token_endpoint");
|
||||
|
||||
if (!Objects.equals(discoveryDocument.get("issuer"), issuer))
|
||||
LOG.warn("The provider in the metadata is not correct.");
|
||||
}
|
||||
else
|
||||
{
|
||||
this.authEndpoint = authorizationEndpoint;
|
||||
this.tokenEndpoint = tokenEndpoint;
|
||||
}
|
||||
}
|
||||
|
||||
private static Map<String, Object> fetchOpenIdConnectMetadata(String provider)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (provider.endsWith("/"))
|
||||
|
@ -72,31 +111,16 @@ public class OpenIdConfiguration implements Serializable
|
|||
URI providerUri = URI.create(provider + CONFIG_PATH);
|
||||
InputStream inputStream = providerUri.toURL().openConnection().getInputStream();
|
||||
String content = IO.toString(inputStream);
|
||||
discoveryDocument = (Map)JSON.parse(content);
|
||||
Map<String, Object> discoveryDocument = (Map)JSON.parse(content);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("discovery document {}", discoveryDocument);
|
||||
|
||||
return discoveryDocument;
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
throw new IllegalArgumentException("invalid identity provider", e);
|
||||
}
|
||||
|
||||
issuer = (String)discoveryDocument.get("issuer");
|
||||
if (issuer == null)
|
||||
throw new IllegalArgumentException();
|
||||
|
||||
authEndpoint = (String)discoveryDocument.get("authorization_endpoint");
|
||||
if (authEndpoint == null)
|
||||
throw new IllegalArgumentException("authorization_endpoint");
|
||||
|
||||
tokenEndpoint = (String)discoveryDocument.get("token_endpoint");
|
||||
if (tokenEndpoint == null)
|
||||
throw new IllegalArgumentException("token_endpoint");
|
||||
}
|
||||
|
||||
public Map<String, Object> getDiscoveryDocument()
|
||||
{
|
||||
return discoveryDocument;
|
||||
}
|
||||
|
||||
public String getAuthEndpoint()
|
||||
|
@ -119,11 +143,6 @@ public class OpenIdConfiguration implements Serializable
|
|||
return issuer;
|
||||
}
|
||||
|
||||
public String getOpenIdProvider()
|
||||
{
|
||||
return openIdProvider;
|
||||
}
|
||||
|
||||
public String getTokenEndpoint()
|
||||
{
|
||||
return tokenEndpoint;
|
||||
|
|
|
@ -242,7 +242,7 @@ public class OpenIdCredentials implements Serializable
|
|||
{
|
||||
connection.setDoOutput(true);
|
||||
connection.setRequestMethod("POST");
|
||||
connection.setRequestProperty("Host", configuration.getOpenIdProvider());
|
||||
connection.setRequestProperty("Host", configuration.getIssuer());
|
||||
connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
|
||||
|
||||
try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream()))
|
||||
|
|
|
@ -67,7 +67,7 @@ public class OpenIdLoginService extends ContainerLifeCycle implements LoginServi
|
|||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return _configuration.getOpenIdProvider();
|
||||
return _configuration.getIssuer();
|
||||
}
|
||||
|
||||
public OpenIdConfiguration getConfiguration()
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.net.ConnectException;
|
|||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -216,7 +217,7 @@ public class ForwardProxyTLSServerTest
|
|||
ContentResponse response = httpClient.newRequest(host, serverConnector.getLocalPort())
|
||||
.scheme(HttpScheme.HTTPS.asString())
|
||||
.method(HttpMethod.GET)
|
||||
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
|
||||
.path("/echo?body=" + URLEncoder.encode(body, StandardCharsets.UTF_8))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
|
@ -248,7 +249,7 @@ public class ForwardProxyTLSServerTest
|
|||
ContentResponse response1 = httpClient.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.scheme(HttpScheme.HTTPS.asString())
|
||||
.method(HttpMethod.GET)
|
||||
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
|
||||
.path("/echo?body=" + URLEncoder.encode(body, StandardCharsets.UTF_8))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
|
@ -297,7 +298,7 @@ public class ForwardProxyTLSServerTest
|
|||
ContentResponse response1 = httpClient.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.scheme(HttpScheme.HTTPS.asString())
|
||||
.method(HttpMethod.GET)
|
||||
.path("/echo?body=" + URLEncoder.encode(content1, "UTF-8"))
|
||||
.path("/echo?body=" + URLEncoder.encode(content1, StandardCharsets.UTF_8))
|
||||
.onRequestCommit(request ->
|
||||
{
|
||||
Destination destination = httpClient.resolveDestination(request);
|
||||
|
@ -384,7 +385,7 @@ public class ForwardProxyTLSServerTest
|
|||
ContentResponse response = httpClient.newRequest(host, serverConnector.getLocalPort())
|
||||
.scheme(HttpScheme.HTTPS.asString())
|
||||
.method(HttpMethod.GET)
|
||||
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
|
||||
.path("/echo?body=" + URLEncoder.encode(body, StandardCharsets.UTF_8))
|
||||
// Long idle timeout for the request.
|
||||
.idleTimeout(10 * idleTimeout, TimeUnit.MILLISECONDS)
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
|
@ -420,7 +421,7 @@ public class ForwardProxyTLSServerTest
|
|||
httpClient.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.scheme(HttpScheme.HTTPS.asString())
|
||||
.method(HttpMethod.GET)
|
||||
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
|
||||
.path("/echo?body=" + URLEncoder.encode(body, StandardCharsets.UTF_8))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
});
|
||||
|
@ -449,7 +450,7 @@ public class ForwardProxyTLSServerTest
|
|||
httpClient.newRequest("localhost", serverPort)
|
||||
.scheme(HttpScheme.HTTPS.asString())
|
||||
.method(HttpMethod.GET)
|
||||
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
|
||||
.path("/echo?body=" + URLEncoder.encode(body, StandardCharsets.UTF_8))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
});
|
||||
|
@ -611,7 +612,7 @@ public class ForwardProxyTLSServerTest
|
|||
ContentResponse response = httpClient.newRequest(host, serverConnector.getLocalPort())
|
||||
.scheme(HttpScheme.HTTPS.asString())
|
||||
.method(HttpMethod.GET)
|
||||
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
|
||||
.path("/echo?body=" + URLEncoder.encode(body, StandardCharsets.UTF_8))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
|
|
|
@ -30,13 +30,16 @@ import java.util.EnumSet;
|
|||
import java.util.Enumeration;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.DispatcherType;
|
||||
import javax.servlet.Filter;
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.WriteListener;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
@ -77,6 +80,8 @@ public class GzipHandlerTest
|
|||
"Aliquam purus mauris, consectetur nec convallis lacinia, porta sed ante. Suspendisse " +
|
||||
"et cursus magna. Donec orci enim, molestie a lobortis eu, imperdiet vitae neque.";
|
||||
|
||||
private static final byte[] __bytes = __content.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
private static final String __micro = __content.substring(0, 10);
|
||||
|
||||
private static final String __contentETag = String.format("W/\"%x\"", __content.hashCode());
|
||||
|
@ -110,6 +115,7 @@ public class GzipHandlerTest
|
|||
servlets.addServletWithMapping(IncludeServlet.class, "/include");
|
||||
servlets.addServletWithMapping(EchoServlet.class, "/echo/*");
|
||||
servlets.addServletWithMapping(DumpServlet.class, "/dump/*");
|
||||
servlets.addServletWithMapping(AsyncServlet.class, "/async/*");
|
||||
servlets.addFilterWithMapping(CheckFilter.class, "/*", EnumSet.of(DispatcherType.REQUEST));
|
||||
|
||||
_server.start();
|
||||
|
@ -175,6 +181,46 @@ public class GzipHandlerTest
|
|||
}
|
||||
}
|
||||
|
||||
public static class AsyncServlet extends HttpServlet
|
||||
{
|
||||
@Override
|
||||
protected void doGet(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
String writes = req.getParameter("writes");
|
||||
final AsyncContext context = req.startAsync();
|
||||
final ServletOutputStream out = response.getOutputStream();
|
||||
|
||||
out.setWriteListener(new WriteListener()
|
||||
{
|
||||
int count = writes == null ? 1 : Integer.valueOf(writes);
|
||||
{
|
||||
response.setContentLength(count * __bytes.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWritePossible() throws IOException
|
||||
{
|
||||
while(out.isReady())
|
||||
{
|
||||
if (count-- == 0)
|
||||
{
|
||||
out.close();
|
||||
break;
|
||||
}
|
||||
|
||||
out.write(__bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
t.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public static class EchoServlet extends HttpServlet
|
||||
{
|
||||
@Override
|
||||
|
@ -259,7 +305,7 @@ public class GzipHandlerTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGzipHandler() throws Exception
|
||||
public void testBlockingResponse() throws Exception
|
||||
{
|
||||
// generated and parsed test
|
||||
HttpTester.Request request = HttpTester.newRequest();
|
||||
|
@ -285,6 +331,62 @@ public class GzipHandlerTest
|
|||
assertEquals(__content, testOut.toString("UTF8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncResponse() throws Exception
|
||||
{
|
||||
// generated and parsed test
|
||||
HttpTester.Request request = HttpTester.newRequest();
|
||||
HttpTester.Response response;
|
||||
|
||||
request.setMethod("GET");
|
||||
request.setURI("/ctx/async/info?writes=1");
|
||||
request.setVersion("HTTP/1.0");
|
||||
request.setHeader("Host", "tester");
|
||||
request.setHeader("accept-encoding", "gzip");
|
||||
|
||||
response = HttpTester.parseResponse(_connector.getResponse(request.generate()));
|
||||
|
||||
assertThat(response.getStatus(), is(200));
|
||||
assertThat(response.get("Content-Encoding"), Matchers.equalToIgnoringCase("gzip"));
|
||||
assertThat(response.getCSV("Vary", false), Matchers.contains("Accept-Encoding"));
|
||||
|
||||
InputStream testIn = new GZIPInputStream(new ByteArrayInputStream(response.getContentBytes()));
|
||||
ByteArrayOutputStream testOut = new ByteArrayOutputStream();
|
||||
IO.copy(testIn, testOut);
|
||||
|
||||
assertEquals(__content, testOut.toString("UTF8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncLargeResponse() throws Exception
|
||||
{
|
||||
int writes = 100;
|
||||
// generated and parsed test
|
||||
HttpTester.Request request = HttpTester.newRequest();
|
||||
HttpTester.Response response;
|
||||
|
||||
request.setMethod("GET");
|
||||
request.setURI("/ctx/async/info?writes=" + writes);
|
||||
request.setVersion("HTTP/1.0");
|
||||
request.setHeader("Host", "tester");
|
||||
request.setHeader("accept-encoding", "gzip");
|
||||
|
||||
response = HttpTester.parseResponse(_connector.getResponse(request.generate()));
|
||||
|
||||
assertThat(response.getStatus(), is(200));
|
||||
assertThat(response.get("Content-Encoding"), Matchers.equalToIgnoringCase("gzip"));
|
||||
assertThat(response.getCSV("Vary", false), Matchers.contains("Accept-Encoding"));
|
||||
|
||||
InputStream testIn = new GZIPInputStream(new ByteArrayInputStream(response.getContentBytes()));
|
||||
ByteArrayOutputStream testOut = new ByteArrayOutputStream();
|
||||
IO.copy(testIn, testOut);
|
||||
|
||||
byte[] bytes = testOut.toByteArray();
|
||||
|
||||
for (int i = 0; i < writes; i++)
|
||||
assertEquals(__content, new String(Arrays.copyOfRange(bytes,i * __bytes.length, (i + 1) * __bytes.length), StandardCharsets.UTF_8), "chunk " + i);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGzipHandlerWithMultipleAcceptEncodingHeaders() throws Exception
|
||||
{
|
||||
|
|
|
@ -56,7 +56,10 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
* else
|
||||
* // continue processing, async operation already done
|
||||
* </pre>
|
||||
*
|
||||
* @deprecated not used anymore
|
||||
*/
|
||||
@Deprecated
|
||||
public abstract class CompletableCallback implements Callback
|
||||
{
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
|
||||
|
|
|
@ -0,0 +1,363 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.LongConsumer;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.util.BufferingResponseListener;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class HttpClientDemandTest extends AbstractTest<TransportScenario>
|
||||
{
|
||||
@Override
|
||||
public void init(Transport transport) throws IOException
|
||||
{
|
||||
setScenario(new TransportScenario(transport));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testDemandInTwoChunks(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
|
||||
// Tests a special case where the first chunk is automatically
|
||||
// delivered, and the second chunk is explicitly demanded and
|
||||
// completes the response content.
|
||||
CountDownLatch contentLatch = new CountDownLatch(1);
|
||||
scenario.start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
response.setContentLength(2);
|
||||
ServletOutputStream out = response.getOutputStream();
|
||||
out.write('A');
|
||||
out.flush();
|
||||
contentLatch.await();
|
||||
out.write('B');
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
private final AtomicInteger chunks = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
if (chunks.incrementAndGet() == 1)
|
||||
contentLatch.countDown();
|
||||
// Need to demand also after the second
|
||||
// chunk to allow the parser to proceed
|
||||
// and complete the response.
|
||||
demand.accept(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
Response response = result.getResponse();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
resultLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testDemand(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
|
||||
int bufferSize = 512;
|
||||
byte[] content = new byte[10 * bufferSize];
|
||||
new Random().nextBytes(content);
|
||||
scenario.startServer(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
response.setContentLength(content.length);
|
||||
response.getOutputStream().write(content);
|
||||
}
|
||||
});
|
||||
scenario.startClient(client ->
|
||||
{
|
||||
// A small buffer size so the response content is read in multiple buffers.
|
||||
client.setByteBufferPool(new MappedByteBufferPool(bufferSize));
|
||||
client.setResponseBufferSize(bufferSize);
|
||||
});
|
||||
|
||||
Queue<LongConsumer> demandQueue = new ConcurrentLinkedQueue<>();
|
||||
Queue<ByteBuffer> contentQueue = new ConcurrentLinkedQueue<>();
|
||||
Queue<Callback> callbackQueue = new ConcurrentLinkedQueue<>();
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
|
||||
{
|
||||
// Don't demand and don't succeed callbacks.
|
||||
demandQueue.offer(demand);
|
||||
contentQueue.offer(content);
|
||||
callbackQueue.offer(callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
assertTrue(result.isSucceeded());
|
||||
Response response = result.getResponse();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
resultLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for the client to receive data from the server.
|
||||
// Wait a bit more to be sure it only receives 1 buffer.
|
||||
Thread.sleep(1000);
|
||||
assertEquals(1, demandQueue.size());
|
||||
assertEquals(1, contentQueue.size());
|
||||
assertEquals(1, callbackQueue.size());
|
||||
|
||||
// Demand more buffers.
|
||||
int count = 2;
|
||||
LongConsumer demand = demandQueue.poll();
|
||||
assertNotNull(demand);
|
||||
demand.accept(count);
|
||||
// The client should have received just `count` more buffers.
|
||||
Thread.sleep(1000);
|
||||
assertEquals(count, demandQueue.size());
|
||||
assertEquals(1 + count, contentQueue.size());
|
||||
assertEquals(1 + count, callbackQueue.size());
|
||||
|
||||
// Demand all the rest.
|
||||
demand = demandQueue.poll();
|
||||
assertNotNull(demand);
|
||||
demand.accept(Long.MAX_VALUE);
|
||||
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
byte[] received = new byte[content.length];
|
||||
AtomicInteger offset = new AtomicInteger();
|
||||
contentQueue.forEach(buffer ->
|
||||
{
|
||||
int length = buffer.remaining();
|
||||
buffer.get(received, offset.getAndAdd(length), length);
|
||||
});
|
||||
assertArrayEquals(content, received);
|
||||
|
||||
callbackQueue.forEach(Callback::succeeded);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testContentWhileStalling(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
|
||||
CountDownLatch serverContentLatch = new CountDownLatch(1);
|
||||
scenario.start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
response.setContentLength(2);
|
||||
ServletOutputStream out = response.getOutputStream();
|
||||
out.write('A');
|
||||
out.flush();
|
||||
serverContentLatch.await();
|
||||
out.write('B');
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
long delay = 1000;
|
||||
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
|
||||
CountDownLatch clientContentLatch = new CountDownLatch(2);
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.onResponseContentDemanded((response, demand, content, callback) ->
|
||||
{
|
||||
try
|
||||
{
|
||||
if (demandRef.getAndSet(demand) == null)
|
||||
{
|
||||
// Produce more content just before stalling.
|
||||
serverContentLatch.countDown();
|
||||
// Wait for the content to arrive to the client.
|
||||
Thread.sleep(delay);
|
||||
}
|
||||
clientContentLatch.countDown();
|
||||
// Succeed the callback but don't demand.
|
||||
callback.succeeded();
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
})
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
|
||||
Response response = result.getResponse();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
resultLatch.countDown();
|
||||
});
|
||||
|
||||
// We did not demand, so we only expect one chunk of content.
|
||||
assertFalse(clientContentLatch.await(2 * delay, TimeUnit.MILLISECONDS));
|
||||
assertEquals(1, clientContentLatch.getCount());
|
||||
|
||||
// Now demand, we should be notified of the second chunk.
|
||||
demandRef.get().accept(1);
|
||||
|
||||
assertTrue(clientContentLatch.await(5, TimeUnit.SECONDS));
|
||||
demandRef.get().accept(1);
|
||||
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testTwoListenersWithDifferentDemand(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
|
||||
int bufferSize = 512;
|
||||
byte[] content = new byte[10 * bufferSize];
|
||||
new Random().nextBytes(content);
|
||||
scenario.startServer(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
response.setContentLength(content.length);
|
||||
response.getOutputStream().write(content);
|
||||
}
|
||||
});
|
||||
scenario.startClient(client ->
|
||||
{
|
||||
client.setByteBufferPool(new MappedByteBufferPool(bufferSize));
|
||||
client.setResponseBufferSize(bufferSize);
|
||||
});
|
||||
|
||||
AtomicInteger chunks = new AtomicInteger();
|
||||
Response.DemandedContentListener listener1 = (response, demand, content1, callback) ->
|
||||
{
|
||||
callback.succeeded();
|
||||
// The first time, demand infinitely.
|
||||
if (chunks.incrementAndGet() == 1)
|
||||
demand.accept(Long.MAX_VALUE);
|
||||
};
|
||||
BlockingQueue<ByteBuffer> contentQueue = new LinkedBlockingQueue<>();
|
||||
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
|
||||
AtomicReference<CountDownLatch> demandLatch = new AtomicReference<>(new CountDownLatch(1));
|
||||
Response.DemandedContentListener listener2 = (response, demand, content12, callback) ->
|
||||
{
|
||||
contentQueue.offer(content12);
|
||||
demandRef.set(demand);
|
||||
demandLatch.get().countDown();
|
||||
};
|
||||
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.onResponseContentDemanded(listener1)
|
||||
.onResponseContentDemanded(listener2)
|
||||
.send(result ->
|
||||
{
|
||||
assertFalse(result.isFailed(), String.valueOf(result.getFailure()));
|
||||
Response response = result.getResponse();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
resultLatch.countDown();
|
||||
});
|
||||
|
||||
assertTrue(demandLatch.get().await(5, TimeUnit.SECONDS));
|
||||
LongConsumer demand = demandRef.get();
|
||||
assertNotNull(demand);
|
||||
assertEquals(1, contentQueue.size());
|
||||
assertNotNull(contentQueue.poll());
|
||||
|
||||
// Must not get additional content because listener2 did not demand.
|
||||
assertNull(contentQueue.poll(1, TimeUnit.SECONDS));
|
||||
|
||||
// Now demand, we should get content in both listeners.
|
||||
demandLatch.set(new CountDownLatch(1));
|
||||
demand.accept(1);
|
||||
|
||||
assertNotNull(contentQueue.poll(5, TimeUnit.SECONDS));
|
||||
assertEquals(2, chunks.get());
|
||||
|
||||
// Demand the rest and verify the result.
|
||||
assertTrue(demandLatch.get().await(5, TimeUnit.SECONDS));
|
||||
demand = demandRef.get();
|
||||
assertNotNull(demand);
|
||||
demand.accept(Long.MAX_VALUE);
|
||||
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
#org.eclipse.jetty.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.client.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.fcgi.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.proxy.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.http2.LEVEL=DEBUG
|
||||
org.eclipse.jetty.http2.hpack.LEVEL=INFO
|
||||
|
|
Loading…
Reference in New Issue