NIFI-4858: Expose Request Timeout from HTTP Context Map and use that to set as the timeout of the 'AsyncContext' in HandleHttpRequest. Otherwise, the request will never timeout (which is OK because the HttpContextMap will handle this). However, Jetty behind the scenes is adding a task to Scheduled Executor for each request with a delay of whatever the timeout is set to. Since it's currently set to Long.MAX_VALUE, that task will never be run and as a result the ExecutorService's queue will grow indefinitely, eventually exhausting the JVM Heap

This closes #2460.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2018-02-08 15:02:06 -05:00 committed by Bryan Bende
parent a2f2ddd6b8
commit 4428fe28bf
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 29 additions and 1 deletions

View File

@ -281,6 +281,8 @@ public class HandleHttpRequest extends AbstractProcessor {
final String host = context.getProperty(HOSTNAME).getValue(); final String host = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger(); final int port = context.getProperty(PORT).asInteger();
final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final HttpContextMap httpContextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
final long requestTimeout = httpContextMap.getRequestTimeout(TimeUnit.MILLISECONDS);
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
final boolean need; final boolean need;
@ -404,7 +406,7 @@ public class HandleHttpRequest extends AbstractProcessor {
// Right now, that information, though, is only in the ProcessSession, not the ProcessContext, // Right now, that information, though, is only in the ProcessSession, not the ProcessContext,
// so it is not known to us. Should see if it can be added to the ProcessContext. // so it is not known to us. Should see if it can be added to the ProcessContext.
final AsyncContext async = baseRequest.startAsync(); final AsyncContext async = baseRequest.startAsync();
async.setTimeout(Long.MAX_VALUE); // timeout is handled by HttpContextMap async.setTimeout(requestTimeout);
final boolean added = containerQueue.offer(new HttpRequestContainer(request, response, async)); final boolean added = containerQueue.offer(new HttpRequestContainer(request, response, async));
if (added) { if (added) {

View File

@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -304,5 +305,10 @@ public class TestHandleHttpRequest {
public void setRegisterSuccessfully(boolean registerSuccessfully) { public void setRegisterSuccessfully(boolean registerSuccessfully) {
this.registerSuccessfully = registerSuccessfully; this.registerSuccessfully = registerSuccessfully;
} }
@Override
public long getRequestTimeout(TimeUnit timeUnit) {
return timeUnit.convert(30000, TimeUnit.MILLISECONDS);
}
} }
} }

View File

@ -28,6 +28,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
@ -268,5 +269,10 @@ public class TestHandleHttpResponse {
public int getCompletionCount() { public int getCompletionCount() {
return completedCount.get(); return completedCount.get();
} }
@Override
public long getRequestTimeout(TimeUnit timeUnit) {
return timeUnit.convert(30000, TimeUnit.MILLISECONDS);
}
} }
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.http; package org.apache.nifi.http;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -66,4 +68,11 @@ public interface HttpContextMap extends ControllerService {
*/ */
void complete(String identifier); void complete(String identifier);
/**
* Returns the configured timeout for HTTP Requests
*
* @param timeUnit the desired time unit
* @return the configured timeout for HTTP Requests
*/
long getRequestTimeout(TimeUnit timeUnit);
} }

View File

@ -194,4 +194,9 @@ public class StandardHttpContextMap extends AbstractControllerService implements
} }
} }
} }
@Override
public long getRequestTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(maxRequestNanos, TimeUnit.NANOSECONDS);
}
} }