Merge remote-tracking branch 'danielmitterdorfer/eager-content-length'

This commit is contained in:
Daniel Mitterdorfer 2016-05-09 15:10:01 +02:00
commit 0f36c744d0
1 changed files with 7 additions and 4 deletions

View File

@ -107,9 +107,10 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
} }
RestChannel responseChannel = channel; RestChannel responseChannel = channel;
try { try {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(request.content().length(), "<http_request>"); int contentLength = request.content().length();
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
// iff we could reserve bytes for the request we need to send the response also over this channel // iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService); responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
restController.dispatchRequest(request, responseChannel, threadContext); restController.dispatchRequest(request, responseChannel, threadContext);
} catch (Throwable t) { } catch (Throwable t) {
restController.sendErrorResponse(request, responseChannel, t); restController.sendErrorResponse(request, responseChannel, t);
@ -136,11 +137,13 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
private static final class ResourceHandlingHttpChannel implements RestChannel { private static final class ResourceHandlingHttpChannel implements RestChannel {
private final RestChannel delegate; private final RestChannel delegate;
private final CircuitBreakerService circuitBreakerService; private final CircuitBreakerService circuitBreakerService;
private final int contentLength;
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService) { public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) {
this.delegate = delegate; this.delegate = delegate;
this.circuitBreakerService = circuitBreakerService; this.circuitBreakerService = circuitBreakerService;
this.contentLength = contentLength;
} }
@Override @Override
@ -184,7 +187,7 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
if (closed.compareAndSet(false, true) == false) { if (closed.compareAndSet(false, true) == false) {
throw new IllegalStateException("Channel is already closed"); throw new IllegalStateException("Channel is already closed");
} }
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-request().content().length()); inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
} }
} }