Added connection/read timeout support for HTTP
Until today connection and read timeout for http was not directly supported. This means that without setting oracle specific system properties at startup, calling a bad http service would by default hold the watch executing thread forever... is niet goed!!! This commit introduces connection & read timeouts. - Connection timeouts are timeouts for setting up the connection - Read timeouts are timeouts waiting for data to be read By default both timeouts are set to 10 seconds (overriding the default jdk to indefinite). It is possible to customize the default timeouts by settings `watcher.http.default_connection_timeout` and `watcher.http.default_read_timeout` settings). It is also possible to override these defaults per http request, meaning, per webhook and http input configuration in the watch. Original commit: elastic/x-pack-elasticsearch@224f50bc8b
This commit is contained in:
parent
a9448ab2a4
commit
b6e8df6a32
|
@ -14,6 +14,7 @@ import org.elasticsearch.common.collect.ImmutableMap;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.watcher.support.http.auth.ApplicableHttpAuth;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
|
||||
|
||||
|
@ -55,6 +56,8 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
|
|||
static final String SETTINGS_SSL_SHIELD_TRUSTSTORE_ALGORITHM = SETTINGS_SSL_SHIELD_PREFIX + "truststore.algorithm";
|
||||
|
||||
private final HttpAuthRegistry httpAuthRegistry;
|
||||
private final TimeValue defaultConnectionTimeout;
|
||||
private final TimeValue defaultReadTimeout;
|
||||
|
||||
private SSLSocketFactory sslSocketFactory;
|
||||
|
||||
|
@ -62,6 +65,8 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
|
|||
public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry) {
|
||||
super(settings);
|
||||
this.httpAuthRegistry = httpAuthRegistry;
|
||||
defaultConnectionTimeout = settings.getAsTime("watcher.http.default_connection_timeout", TimeValue.timeValueSeconds(10));
|
||||
defaultReadTimeout = settings.getAsTime("watcher.http.default_read_timeout", TimeValue.timeValueSeconds(10));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -142,6 +147,13 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
|
|||
urlConnection.getOutputStream().write(bytes);
|
||||
urlConnection.getOutputStream().close();
|
||||
}
|
||||
|
||||
TimeValue connectionTimeout = request.connectionTimeout != null ? request.connectionTimeout : defaultConnectionTimeout;
|
||||
urlConnection.setConnectTimeout((int) connectionTimeout.millis());
|
||||
|
||||
TimeValue readTimeout = request.readTimeout != null ? request.readTimeout : defaultReadTimeout;
|
||||
urlConnection.setReadTimeout((int) readTimeout.millis());
|
||||
|
||||
urlConnection.connect();
|
||||
|
||||
final int statusCode = urlConnection.getResponseCode();
|
||||
|
|
|
@ -9,12 +9,13 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.watcher.WatcherException;
|
||||
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
|
||||
import org.elasticsearch.watcher.support.WatcherUtils;
|
||||
import org.elasticsearch.watcher.support.http.auth.ApplicableHttpAuth;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
|
||||
|
||||
|
@ -32,10 +33,12 @@ public class HttpRequest implements ToXContent {
|
|||
final ImmutableMap<String, String> headers;
|
||||
final @Nullable HttpAuth auth;
|
||||
final @Nullable String body;
|
||||
final @Nullable TimeValue connectionTimeout;
|
||||
final @Nullable TimeValue readTimeout;
|
||||
|
||||
public HttpRequest(String host, int port, @Nullable Scheme scheme, @Nullable HttpMethod method, @Nullable String path,
|
||||
@Nullable ImmutableMap<String, String> params, @Nullable ImmutableMap<String, String> headers,
|
||||
@Nullable HttpAuth auth, @Nullable String body) {
|
||||
@Nullable HttpAuth auth, @Nullable String body, @Nullable TimeValue connectionTimeout, @Nullable TimeValue readTimeout) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.scheme = scheme != null ? scheme : Scheme.HTTP;
|
||||
|
@ -45,6 +48,8 @@ public class HttpRequest implements ToXContent {
|
|||
this.headers = headers != null ? headers : ImmutableMap.<String, String>of();
|
||||
this.auth = auth;
|
||||
this.body = body;
|
||||
this.connectionTimeout = connectionTimeout;
|
||||
this.readTimeout = readTimeout;
|
||||
}
|
||||
|
||||
public Scheme scheme() {
|
||||
|
@ -87,6 +92,14 @@ public class HttpRequest implements ToXContent {
|
|||
return body;
|
||||
}
|
||||
|
||||
public TimeValue connectionTimeout() {
|
||||
return connectionTimeout;
|
||||
}
|
||||
|
||||
public TimeValue readTimeout() {
|
||||
return readTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
@ -109,6 +122,12 @@ public class HttpRequest implements ToXContent {
|
|||
if (body != null) {
|
||||
builder.field(Field.BODY.getPreferredName(), body);
|
||||
}
|
||||
if (connectionTimeout != null) {
|
||||
builder.field(Field.CONNECTION_TIMEOUT.getPreferredName(), connectionTimeout.toString());
|
||||
}
|
||||
if (readTimeout != null) {
|
||||
builder.field(Field.READ_TIMEOUT.getPreferredName(), readTimeout.toString());
|
||||
}
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
|
@ -127,6 +146,8 @@ public class HttpRequest implements ToXContent {
|
|||
if (!params.equals(that.params)) return false;
|
||||
if (!headers.equals(that.headers)) return false;
|
||||
if (auth != null ? !auth.equals(that.auth) : that.auth != null) return false;
|
||||
if (connectionTimeout != null ? !connectionTimeout.equals(that.connectionTimeout) : that.connectionTimeout != null) return false;
|
||||
if (readTimeout != null ? !readTimeout.equals(that.readTimeout) : that.readTimeout != null) return false;
|
||||
return !(body != null ? !body.equals(that.body) : that.body != null);
|
||||
|
||||
}
|
||||
|
@ -141,6 +162,8 @@ public class HttpRequest implements ToXContent {
|
|||
result = 31 * result + params.hashCode();
|
||||
result = 31 * result + headers.hashCode();
|
||||
result = 31 * result + (auth != null ? auth.hashCode() : 0);
|
||||
result = 31 * result + (connectionTimeout != null ? connectionTimeout.hashCode() : 0);
|
||||
result = 31 * result + (readTimeout != null ? readTimeout.hashCode() : 0);
|
||||
result = 31 * result + (body != null ? body.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
@ -154,6 +177,8 @@ public class HttpRequest implements ToXContent {
|
|||
"], method=[" + method +
|
||||
"], port=[" + port +
|
||||
"], host=[" + host + '\'' +
|
||||
"], connection_timeout=[" + connectionTimeout + '\'' +
|
||||
"], read_timeout=[" + readTimeout + '\'' +
|
||||
"]}";
|
||||
}
|
||||
|
||||
|
@ -179,6 +204,18 @@ public class HttpRequest implements ToXContent {
|
|||
currentFieldName = parser.currentName();
|
||||
} else if (Field.AUTH.match(currentFieldName)) {
|
||||
builder.auth(httpAuthRegistry.parse(parser));
|
||||
} else if (Field.CONNECTION_TIMEOUT.match(currentFieldName)) {
|
||||
try {
|
||||
builder.connectionTimeout(WatcherDateTimeUtils.parseTimeValue(parser, null));
|
||||
} catch (WatcherDateTimeUtils.ParseException pe) {
|
||||
throw new ParseException("could not parse http request. invalid time value for [{}] field", pe, currentFieldName);
|
||||
}
|
||||
} else if (Field.READ_TIMEOUT.match(currentFieldName)) {
|
||||
try {
|
||||
builder.readTimeout(WatcherDateTimeUtils.parseTimeValue(parser, null));
|
||||
} catch (WatcherDateTimeUtils.ParseException pe) {
|
||||
throw new ParseException("could not parse http request. invalid time value for [{}] field", pe, currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if (Field.HEADERS.match(currentFieldName)) {
|
||||
builder.setHeaders((Map) WatcherUtils.flattenModel(parser.map()));
|
||||
|
@ -187,7 +224,7 @@ public class HttpRequest implements ToXContent {
|
|||
} else if (Field.BODY.match(currentFieldName)) {
|
||||
builder.body(parser.text());
|
||||
} else {
|
||||
throw new ParseException("could not parse http request. unexpected object field [" + currentFieldName + "]");
|
||||
throw new ParseException("could not parse http request. unexpected object field [{}]", currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (Field.SCHEME.match(currentFieldName)) {
|
||||
|
@ -201,25 +238,25 @@ public class HttpRequest implements ToXContent {
|
|||
} else if (Field.BODY.match(currentFieldName)) {
|
||||
builder.body(parser.text());
|
||||
} else {
|
||||
throw new ParseException("could not parse http request. unexpected string field [" + currentFieldName + "]");
|
||||
throw new ParseException("could not parse http request. unexpected string field [{}]", currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if (Field.PORT.match(currentFieldName)) {
|
||||
builder.port = parser.intValue();
|
||||
} else {
|
||||
throw new ParseException("could not parse http request. unexpected numeric field [" + currentFieldName + "]");
|
||||
throw new ParseException("could not parse http request. unexpected numeric field [{}]", currentFieldName);
|
||||
}
|
||||
} else {
|
||||
throw new ParseException("could not parse http request. unexpected token [" + token + "]");
|
||||
throw new ParseException("could not parse http request. unexpected token [{}]", token);
|
||||
}
|
||||
}
|
||||
|
||||
if (builder.host == null) {
|
||||
throw new ParseException("could not parse http request. missing required [host] field");
|
||||
throw new ParseException("could not parse http request. missing required [{}] field", Field.HOST.getPreferredName());
|
||||
}
|
||||
|
||||
if (builder.port < 0) {
|
||||
throw new ParseException("could not parse http request. missing required [port] field");
|
||||
throw new ParseException("could not parse http request. missing required [{}] field", Field.PORT.getPreferredName());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
|
@ -227,12 +264,12 @@ public class HttpRequest implements ToXContent {
|
|||
|
||||
public static class ParseException extends WatcherException {
|
||||
|
||||
public ParseException(String msg) {
|
||||
super(msg);
|
||||
public ParseException(String msg, Object... args) {
|
||||
super(msg, args);
|
||||
}
|
||||
|
||||
public ParseException(String msg, Throwable cause) {
|
||||
super(msg, cause);
|
||||
public ParseException(String msg, Throwable cause, Object... args) {
|
||||
super(msg, cause, args);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -248,6 +285,8 @@ public class HttpRequest implements ToXContent {
|
|||
private ImmutableMap.Builder<String, String> headers = ImmutableMap.builder();
|
||||
private HttpAuth auth;
|
||||
private String body;
|
||||
private TimeValue connectionTimeout;
|
||||
private TimeValue readTimeout;
|
||||
|
||||
private Builder(String host, int port) {
|
||||
this.host = host;
|
||||
|
@ -302,13 +341,22 @@ public class HttpRequest implements ToXContent {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder connectionTimeout(TimeValue timeout) {
|
||||
this.connectionTimeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder readTimeout(TimeValue timeout) {
|
||||
this.readTimeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpRequest build() {
|
||||
return new HttpRequest(host, port, scheme, method, path, params.build(), headers.build(), auth, body);
|
||||
return new HttpRequest(host, port, scheme, method, path, params.build(), headers.build(), auth, body, connectionTimeout, readTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
interface Field {
|
||||
public interface Field {
|
||||
ParseField SCHEME = new ParseField("scheme");
|
||||
ParseField HOST = new ParseField("host");
|
||||
ParseField PORT = new ParseField("port");
|
||||
|
@ -318,5 +366,7 @@ public class HttpRequest implements ToXContent {
|
|||
ParseField HEADERS = new ParseField("headers");
|
||||
ParseField AUTH = new ParseField("auth");
|
||||
ParseField BODY = new ParseField("body");
|
||||
ParseField CONNECTION_TIMEOUT = new ParseField("connection_timeout");
|
||||
ParseField READ_TIMEOUT = new ParseField("read_timeout");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,15 +6,17 @@
|
|||
package org.elasticsearch.watcher.support.http;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.watcher.WatcherException;
|
||||
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequest.Field;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
|
||||
import org.elasticsearch.watcher.support.template.Template;
|
||||
|
@ -39,10 +41,12 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
private final ImmutableMap<String, Template> headers;
|
||||
private final HttpAuth auth;
|
||||
private final Template body;
|
||||
private final @Nullable TimeValue connectionTimeout;
|
||||
private final @Nullable TimeValue readTimeout;
|
||||
|
||||
public HttpRequestTemplate(String host, int port, @Nullable Scheme scheme, @Nullable HttpMethod method, @Nullable Template path,
|
||||
Map<String, Template> params, Map<String, Template> headers, HttpAuth auth,
|
||||
Template body) {
|
||||
Template body, @Nullable TimeValue connectionTimeout, @Nullable TimeValue readTimeout) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.scheme = scheme != null ? scheme :Scheme.HTTP;
|
||||
|
@ -52,6 +56,8 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
this.headers = headers != null ? ImmutableMap.copyOf(headers) : ImmutableMap.<String, Template>of();
|
||||
this.auth = auth;
|
||||
this.body = body;
|
||||
this.connectionTimeout = connectionTimeout;
|
||||
this.readTimeout = readTimeout;
|
||||
}
|
||||
|
||||
public Scheme scheme() {
|
||||
|
@ -90,6 +96,14 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
return body;
|
||||
}
|
||||
|
||||
public TimeValue connectionTimeout() {
|
||||
return connectionTimeout;
|
||||
}
|
||||
|
||||
public TimeValue readTimeout() {
|
||||
return readTimeout;
|
||||
}
|
||||
|
||||
public HttpRequest render(TemplateEngine engine, Map<String, Object> model) {
|
||||
HttpRequest.Builder request = HttpRequest.builder(host, port);
|
||||
request.method(method);
|
||||
|
@ -123,39 +137,51 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
if (body != null) {
|
||||
request.body(engine.render(body, model));
|
||||
}
|
||||
if (connectionTimeout != null) {
|
||||
request.connectionTimeout(connectionTimeout);
|
||||
}
|
||||
if (readTimeout != null) {
|
||||
request.readTimeout(readTimeout);
|
||||
}
|
||||
return request.build();
|
||||
}
|
||||
|
||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(Parser.SCHEME_FIELD.getPreferredName(), scheme, params);
|
||||
builder.field(Parser.HOST_FIELD.getPreferredName(), host);
|
||||
builder.field(Parser.PORT_FIELD.getPreferredName(), port);
|
||||
builder.field(Parser.METHOD_FIELD.getPreferredName(), method, params);
|
||||
builder.field(Field.SCHEME.getPreferredName(), scheme, params);
|
||||
builder.field(Field.HOST.getPreferredName(), host);
|
||||
builder.field(Field.PORT.getPreferredName(), port);
|
||||
builder.field(Field.METHOD.getPreferredName(), method, params);
|
||||
if (path != null) {
|
||||
builder.field(Parser.PATH_FIELD.getPreferredName(), path, params);
|
||||
builder.field(Field.PATH.getPreferredName(), path, params);
|
||||
}
|
||||
if (this.params != null) {
|
||||
builder.startObject(Parser.PARAMS_FIELD.getPreferredName());
|
||||
builder.startObject(Field.PARAMS.getPreferredName());
|
||||
for (Map.Entry<String, Template> entry : this.params.entrySet()) {
|
||||
builder.field(entry.getKey(), entry.getValue(), params);
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
if (headers != null) {
|
||||
builder.startObject(Parser.HEADERS_FIELD.getPreferredName());
|
||||
builder.startObject(Field.HEADERS.getPreferredName());
|
||||
for (Map.Entry<String, Template> entry : headers.entrySet()) {
|
||||
builder.field(entry.getKey(), entry.getValue(), params);
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
if (auth != null) {
|
||||
builder.startObject(Parser.AUTH_FIELD.getPreferredName())
|
||||
builder.startObject(Field.AUTH.getPreferredName())
|
||||
.field(auth.type(), auth, params)
|
||||
.endObject();
|
||||
}
|
||||
if (body != null) {
|
||||
builder.field(Parser.BODY_FIELD.getPreferredName(), body, params);
|
||||
builder.field(Field.BODY.getPreferredName(), body, params);
|
||||
}
|
||||
if (connectionTimeout != null) {
|
||||
builder.field(Field.CONNECTION_TIMEOUT.getPreferredName(), connectionTimeout.toString());
|
||||
}
|
||||
if (readTimeout != null) {
|
||||
builder.field(Field.READ_TIMEOUT.getPreferredName(), readTimeout.toString());
|
||||
}
|
||||
return builder.endObject();
|
||||
}
|
||||
|
@ -175,6 +201,8 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
if (params != null ? !params.equals(that.params) : that.params != null) return false;
|
||||
if (headers != null ? !headers.equals(that.headers) : that.headers != null) return false;
|
||||
if (auth != null ? !auth.equals(that.auth) : that.auth != null) return false;
|
||||
if (connectionTimeout != null ? !connectionTimeout.equals(that.connectionTimeout) : that.connectionTimeout != null) return false;
|
||||
if (readTimeout != null ? !readTimeout.equals(that.readTimeout) : that.readTimeout != null) return false;
|
||||
return body != null ? body.equals(that.body) : that.body == null;
|
||||
}
|
||||
|
||||
|
@ -189,6 +217,8 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
result = 31 * result + (headers != null ? headers.hashCode() : 0);
|
||||
result = 31 * result + (auth != null ? auth.hashCode() : 0);
|
||||
result = 31 * result + (body != null ? body.hashCode() : 0);
|
||||
result = 31 * result + (connectionTimeout != null ? connectionTimeout.hashCode() : 0);
|
||||
result = 31 * result + (readTimeout != null ? readTimeout.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -198,17 +228,6 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
|
||||
public static class Parser {
|
||||
|
||||
public static final ParseField SCHEME_FIELD = new ParseField("scheme");
|
||||
public static final ParseField HOST_FIELD = new ParseField("host");
|
||||
public static final ParseField PORT_FIELD = new ParseField("port");
|
||||
public static final ParseField METHOD_FIELD = new ParseField("method");
|
||||
public static final ParseField PATH_FIELD = new ParseField("path");
|
||||
public static final ParseField PARAMS_FIELD = new ParseField("params");
|
||||
public static final ParseField HEADERS_FIELD = new ParseField("headers");
|
||||
public static final ParseField AUTH_FIELD = new ParseField("auth");
|
||||
public static final ParseField BODY_FIELD = new ParseField("body");
|
||||
public static final ParseField XBODY_FIELD = new ParseField("xbody");
|
||||
|
||||
private final HttpAuthRegistry httpAuthRegistry;
|
||||
|
||||
@Inject
|
||||
|
@ -225,32 +244,44 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (PATH_FIELD.match(currentFieldName)) {
|
||||
} else if (Field.PATH.match(currentFieldName)) {
|
||||
builder.path(parseFieldTemplate(currentFieldName, parser));
|
||||
} else if (HEADERS_FIELD.match(currentFieldName)) {
|
||||
} else if (Field.HEADERS.match(currentFieldName)) {
|
||||
builder.putHeaders(parseFieldTemplates(currentFieldName, parser));
|
||||
} else if (PARAMS_FIELD.match(currentFieldName)) {
|
||||
} else if (Field.PARAMS.match(currentFieldName)) {
|
||||
builder.putParams(parseFieldTemplates(currentFieldName, parser));
|
||||
} else if (BODY_FIELD.match(currentFieldName)) {
|
||||
} else if (Field.BODY.match(currentFieldName)) {
|
||||
builder.body(parseFieldTemplate(currentFieldName, parser));
|
||||
} else if (Field.CONNECTION_TIMEOUT.match(currentFieldName)) {
|
||||
try {
|
||||
builder.connectionTimeout(WatcherDateTimeUtils.parseTimeValue(parser, null));
|
||||
} catch (WatcherDateTimeUtils.ParseException pe) {
|
||||
throw new ParseException("could not parse http request template. invalid time value for [{}] field", pe, currentFieldName);
|
||||
}
|
||||
} else if (Field.READ_TIMEOUT.match(currentFieldName)) {
|
||||
try {
|
||||
builder.readTimeout(WatcherDateTimeUtils.parseTimeValue(parser, null));
|
||||
} catch (WatcherDateTimeUtils.ParseException pe) {
|
||||
throw new ParseException("could not parse http request template. invalid time value for [{}] field", pe, currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if (AUTH_FIELD.match(currentFieldName)) {
|
||||
if (Field.AUTH.match(currentFieldName)) {
|
||||
builder.auth(httpAuthRegistry.parse(parser));
|
||||
} else {
|
||||
throw new ParseException("could not parse http request template. unexpected object field [{}]", currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (SCHEME_FIELD.match(currentFieldName)) {
|
||||
if (Field.SCHEME.match(currentFieldName)) {
|
||||
builder.scheme(Scheme.parse(parser.text()));
|
||||
} else if (METHOD_FIELD.match(currentFieldName)) {
|
||||
} else if (Field.METHOD.match(currentFieldName)) {
|
||||
builder.method(HttpMethod.parse(parser.text()));
|
||||
} else if (HOST_FIELD.match(currentFieldName)) {
|
||||
} else if (Field.HOST.match(currentFieldName)) {
|
||||
builder.host = parser.text();
|
||||
} else {
|
||||
throw new ParseException("could not parse http request template. unexpected string field [{}]", currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if (PORT_FIELD.match(currentFieldName)) {
|
||||
if (Field.PORT.match(currentFieldName)) {
|
||||
builder.port = parser.intValue();
|
||||
} else {
|
||||
throw new ParseException("could not parse http request template. unexpected numeric field [{}]", currentFieldName);
|
||||
|
@ -261,10 +292,10 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
}
|
||||
|
||||
if (builder.host == null) {
|
||||
throw new ParseException("could not parse http request template. missing required [{}] string field", HOST_FIELD.getPreferredName());
|
||||
throw new ParseException("could not parse http request template. missing required [{}] string field", Field.HOST.getPreferredName());
|
||||
}
|
||||
if (builder.port <= 0) {
|
||||
throw new ParseException("could not parse http request template. missing required [{}] numeric field", PORT_FIELD.getPreferredName());
|
||||
throw new ParseException("could not parse http request template. missing required [{}] numeric field", Field.PORT.getPreferredName());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
|
@ -317,6 +348,8 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
private final ImmutableMap.Builder<String, Template> headers = ImmutableMap.builder();
|
||||
private HttpAuth auth;
|
||||
private Template body;
|
||||
private TimeValue connectionTimeout;
|
||||
private TimeValue readTimeout;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
@ -407,8 +440,18 @@ public class HttpRequestTemplate implements ToXContent {
|
|||
return body(Template.inline(content));
|
||||
}
|
||||
|
||||
public Builder connectionTimeout(TimeValue timeout) {
|
||||
this.connectionTimeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder readTimeout(TimeValue timeout) {
|
||||
this.readTimeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HttpRequestTemplate build() {
|
||||
return new HttpRequestTemplate(host, port, scheme, method, path, params.build(), headers.build(), auth, body);
|
||||
return new HttpRequestTemplate(host, port, scheme, method, path, params.build(), headers.build(), auth, body, connectionTimeout, readTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -204,9 +204,9 @@ public class WebhookActionTests extends ElasticsearchTestCase {
|
|||
public void testParser_Failure() throws Exception {
|
||||
XContentBuilder builder = jsonBuilder().startObject();
|
||||
if (randomBoolean()) {
|
||||
builder.field(HttpRequestTemplate.Parser.HOST_FIELD.getPreferredName(), TEST_HOST);
|
||||
builder.field(HttpRequest.Field.HOST.getPreferredName(), TEST_HOST);
|
||||
} else {
|
||||
builder.field(HttpRequestTemplate.Parser.PORT_FIELD.getPreferredName(), TEST_PORT);
|
||||
builder.field(HttpRequest.Field.PORT.getPreferredName(), TEST_PORT);
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ import org.elasticsearch.watcher.actions.ActionBuilders;
|
|||
import org.elasticsearch.watcher.history.HistoryStore;
|
||||
import org.elasticsearch.watcher.history.WatchRecord;
|
||||
import org.elasticsearch.watcher.support.http.HttpClient;
|
||||
import org.elasticsearch.watcher.support.http.HttpClientTest;
|
||||
import org.elasticsearch.watcher.support.http.HttpClientTests;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
|
||||
import org.elasticsearch.watcher.support.http.Scheme;
|
||||
import org.elasticsearch.watcher.support.http.auth.basic.BasicAuth;
|
||||
|
@ -53,7 +53,7 @@ public class WebhookHttpsIntegrationTests extends AbstractWatcherIntegrationTest
|
|||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
Path resource;
|
||||
try {
|
||||
resource = Paths.get(HttpClientTest.class.getResource("/org/elasticsearch/shield/keystore/testnode.jks").toURI());
|
||||
resource = Paths.get(HttpClientTests.class.getResource("/org/elasticsearch/shield/keystore/testnode.jks").toURI());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ import static org.hamcrest.core.Is.is;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class HttpClientTest extends ElasticsearchTestCase {
|
||||
public class HttpClientTests extends ElasticsearchTestCase {
|
||||
|
||||
private MockWebServer webServer;
|
||||
private HttpClient httpClient;
|
||||
|
@ -145,7 +145,7 @@ public class HttpClientTest extends ElasticsearchTestCase {
|
|||
|
||||
@Test
|
||||
public void testHttps() throws Exception {
|
||||
Path resource = Paths.get(HttpClientTest.class.getResource("/org/elasticsearch/shield/keystore/truststore-testnode-only.jks").toURI());
|
||||
Path resource = Paths.get(HttpClientTests.class.getResource("/org/elasticsearch/shield/keystore/truststore-testnode-only.jks").toURI());
|
||||
|
||||
Settings settings;
|
||||
if (randomBoolean()) {
|
||||
|
@ -184,7 +184,7 @@ public class HttpClientTest extends ElasticsearchTestCase {
|
|||
|
||||
@Test
|
||||
public void testHttpsClientAuth() throws Exception {
|
||||
Path resource = Paths.get(HttpClientTest.class.getResource("/org/elasticsearch/shield/keystore/testnode.jks").toURI());
|
||||
Path resource = Paths.get(HttpClientTests.class.getResource("/org/elasticsearch/shield/keystore/testnode.jks").toURI());
|
||||
Settings settings;
|
||||
if (randomBoolean()) {
|
||||
settings = ImmutableSettings.builder()
|
||||
|
@ -217,7 +217,7 @@ public class HttpClientTest extends ElasticsearchTestCase {
|
|||
@Test
|
||||
public void testHttpClientReadKeyWithDifferentPassword() throws Exception {
|
||||
// This truststore doesn't have a cert with a valid SAN so hostname verification will fail if used
|
||||
Path resource = Paths.get(HttpClientTest.class.getResource("/org/elasticsearch/shield/keystore/testnode-different-passwords.jks").toURI());
|
||||
Path resource = Paths.get(HttpClientTests.class.getResource("/org/elasticsearch/shield/keystore/testnode-different-passwords.jks").toURI());
|
||||
|
||||
Settings settings;
|
||||
final boolean watcherSettings = randomBoolean();
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.support.http;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HttpConnectionTimeoutTests extends ElasticsearchTestCase {
|
||||
|
||||
// setting an unroutable IP to simulate a connection timeout
|
||||
private static final String UNROUTABLE_IP = "192.168.255.255";
|
||||
|
||||
@Test @Slow
|
||||
public void testDefaultTimeout() throws Exception {
|
||||
HttpClient httpClient = new HttpClient(ImmutableSettings.EMPTY, mock(HttpAuthRegistry.class)).start();
|
||||
|
||||
HttpRequest request = HttpRequest.builder(UNROUTABLE_IP, 12345)
|
||||
.method(HttpMethod.POST)
|
||||
.path("/" + randomAsciiOfLength(5))
|
||||
.build();
|
||||
|
||||
long start = System.nanoTime();
|
||||
try {
|
||||
httpClient.execute(request);
|
||||
fail("expected timeout exception");
|
||||
} catch (ElasticsearchTimeoutException ete) {
|
||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||
logger.info("http connection timed out after {}", timeout.format());
|
||||
// it's supposed to be 10, but we'll give it an error margin of 2 seconds
|
||||
assertThat(timeout.seconds(), greaterThan(8L));
|
||||
assertThat(timeout.seconds(), lessThan(12L));
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test @Slow
|
||||
public void testDefaultTimeout_Custom() throws Exception {
|
||||
HttpClient httpClient = new HttpClient(ImmutableSettings.builder()
|
||||
.put("watcher.http.default_connection_timeout", "5s")
|
||||
.build()
|
||||
, mock(HttpAuthRegistry.class)).start();
|
||||
|
||||
HttpRequest request = HttpRequest.builder(UNROUTABLE_IP, 12345)
|
||||
.method(HttpMethod.POST)
|
||||
.path("/" + randomAsciiOfLength(5))
|
||||
.build();
|
||||
|
||||
long start = System.nanoTime();
|
||||
try {
|
||||
httpClient.execute(request);
|
||||
fail("expected timeout exception");
|
||||
} catch (ElasticsearchTimeoutException ete) {
|
||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||
logger.info("http connection timed out after {}", timeout.format());
|
||||
// it's supposed to be 7, but we'll give it an error margin of 2 seconds
|
||||
assertThat(timeout.seconds(), greaterThan(3L));
|
||||
assertThat(timeout.seconds(), lessThan(7L));
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test @Slow
|
||||
public void testTimeout_CustomPerRequest() throws Exception {
|
||||
HttpClient httpClient = new HttpClient(ImmutableSettings.builder()
|
||||
.put("watcher.http.default_connection_timeout", "10s")
|
||||
.build()
|
||||
, mock(HttpAuthRegistry.class)).start();
|
||||
|
||||
HttpRequest request = HttpRequest.builder(UNROUTABLE_IP, 12345)
|
||||
.connectionTimeout(TimeValue.timeValueSeconds(5))
|
||||
.method(HttpMethod.POST)
|
||||
.path("/" + randomAsciiOfLength(5))
|
||||
.build();
|
||||
|
||||
long start = System.nanoTime();
|
||||
try {
|
||||
httpClient.execute(request);
|
||||
fail("expected timeout exception");
|
||||
} catch (ElasticsearchTimeoutException ete) {
|
||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||
logger.info("http connection timed out after {}", timeout.format());
|
||||
// it's supposed to be 7, but we'll give it an error margin of 2 seconds
|
||||
assertThat(timeout.seconds(), greaterThan(3L));
|
||||
assertThat(timeout.seconds(), lessThan(7L));
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.support.http;
|
||||
|
||||
import com.squareup.okhttp.mockwebserver.MockResponse;
|
||||
import com.squareup.okhttp.mockwebserver.MockWebServer;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
|
||||
import org.elasticsearch.watcher.support.secret.SecretService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.BindException;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HttpReadTimeoutTests extends ElasticsearchTestCase {
|
||||
|
||||
private MockWebServer webServer;
|
||||
private SecretService secretService;
|
||||
private int webPort;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
secretService = new SecretService.PlainText();
|
||||
for (webPort = 9200; webPort < 9300; webPort++) {
|
||||
try {
|
||||
webServer = new MockWebServer();
|
||||
webServer.start(webPort);
|
||||
return;
|
||||
} catch (BindException be) {
|
||||
logger.warn("port [{}] was already in use trying next port", webPort);
|
||||
}
|
||||
}
|
||||
throw new ElasticsearchException("unable to find open port between 9200 and 9300");
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
webServer.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultTimeout() throws Exception {
|
||||
HttpClient httpClient = new HttpClient(ImmutableSettings.EMPTY, mock(HttpAuthRegistry.class)).start();
|
||||
|
||||
// we're not going to enqueue an response... so the server will just hang
|
||||
|
||||
HttpRequest request = HttpRequest.builder("localhost", webPort)
|
||||
.method(HttpMethod.POST)
|
||||
.path("/" + randomAsciiOfLength(5))
|
||||
.build();
|
||||
|
||||
long start = System.nanoTime();
|
||||
try {
|
||||
httpClient.execute(request);
|
||||
fail("expected read timeout after 10 seconds (default)");
|
||||
} catch (ElasticsearchTimeoutException ete) {
|
||||
// expected
|
||||
|
||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||
logger.info("http connection timed out after {}", timeout.format());
|
||||
|
||||
// it's supposed to be 10, but we'll give it an error margin of 2 seconds
|
||||
assertThat(timeout.seconds(), greaterThan(8L));
|
||||
assertThat(timeout.seconds(), lessThan(12L));
|
||||
|
||||
// lets enqueue a response to relese the server.
|
||||
webServer.enqueue(new MockResponse());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultTimeout_Custom() throws Exception {
|
||||
HttpClient httpClient = new HttpClient(ImmutableSettings.builder()
|
||||
.put("watcher.http.default_read_timeout", "5s")
|
||||
.build()
|
||||
, mock(HttpAuthRegistry.class)).start();
|
||||
|
||||
// we're not going to enqueue an response... so the server will just hang
|
||||
|
||||
HttpRequest request = HttpRequest.builder("localhost", webPort)
|
||||
.method(HttpMethod.POST)
|
||||
.path("/" + randomAsciiOfLength(5))
|
||||
.build();
|
||||
|
||||
long start = System.nanoTime();
|
||||
try {
|
||||
httpClient.execute(request);
|
||||
fail("expected read timeout after 5 seconds (default)");
|
||||
} catch (ElasticsearchTimeoutException ete) {
|
||||
// expected
|
||||
|
||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||
logger.info("http connection timed out after {}", timeout.format());
|
||||
|
||||
// it's supposed to be 5, but we'll give it an error margin of 2 seconds
|
||||
assertThat(timeout.seconds(), greaterThan(3L));
|
||||
assertThat(timeout.seconds(), lessThan(7L));
|
||||
|
||||
// lets enqueue a response to relese the server.
|
||||
webServer.enqueue(new MockResponse());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeout_CustomPerRequest() throws Exception {
|
||||
HttpClient httpClient = new HttpClient(ImmutableSettings.builder()
|
||||
.put("watcher.http.default_read_timeout", "10s")
|
||||
.build()
|
||||
, mock(HttpAuthRegistry.class)).start();
|
||||
|
||||
// we're not going to enqueue an response... so the server will just hang
|
||||
|
||||
HttpRequest request = HttpRequest.builder("localhost", webPort)
|
||||
.readTimeout(TimeValue.timeValueSeconds(5))
|
||||
.method(HttpMethod.POST)
|
||||
.path("/" + randomAsciiOfLength(5))
|
||||
.build();
|
||||
|
||||
long start = System.nanoTime();
|
||||
try {
|
||||
httpClient.execute(request);
|
||||
fail("expected read timeout after 5 seconds (default)");
|
||||
} catch (ElasticsearchTimeoutException ete) {
|
||||
// expected
|
||||
|
||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||
logger.info("http connection timed out after {}", timeout.format());
|
||||
|
||||
// it's supposed to be 5, but we'll give it an error margin of 2 seconds
|
||||
assertThat(timeout.seconds(), greaterThan(3L));
|
||||
assertThat(timeout.seconds(), lessThan(7L));
|
||||
|
||||
// lets enqueue a response to relese the server.
|
||||
webServer.enqueue(new MockResponse());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,8 +6,10 @@
|
|||
package org.elasticsearch.watcher.support.http;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||
import com.carrotsearch.randomizedtesting.annotations.Seed;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -79,6 +81,14 @@ public class HttpRequestTemplateTests extends ElasticsearchTestCase {
|
|||
if (randomBoolean()) {
|
||||
builder.putHeader("_key", Template.inline("_value"));
|
||||
}
|
||||
long connectionTimeout = randomBoolean() ? 0 : randomIntBetween(5, 10);
|
||||
if (connectionTimeout > 0) {
|
||||
builder.connectionTimeout(TimeValue.timeValueSeconds(connectionTimeout));
|
||||
}
|
||||
long readTimeout = randomBoolean() ? 0 : randomIntBetween(5, 10);
|
||||
if (readTimeout > 0) {
|
||||
builder.readTimeout(TimeValue.timeValueSeconds(readTimeout));
|
||||
}
|
||||
|
||||
HttpRequestTemplate template = builder.build();
|
||||
|
||||
|
|
Loading…
Reference in New Issue