Watcher: Use Apache HttpClient for internal Watcher HttpClient (elastic/elasticsearch#4434)

Watcher: Use Apache HttpClient for internal Watcher HttpClient

The current implementation based on URLConnection has several drawbacks.

* If server returned HTTP header but then got stuck, no timeout would help, the connection remained stuck
* GET requests with a body were not supported, the method was silently changed to POST
* More complex handling of input/error stream handling, the body could not be read from a single input stream

NOTE: This is a BWC breaker. From now on every part of the URL needs to be encoded properly before it is configured in the requeust builder. This requires an upgrade of all watches.

Closes elastic/elasticsearch#1141

Original commit: elastic/x-pack-elasticsearch@bbc8f85dd8
This commit is contained in:
Alexander Reelsen 2017-01-05 14:25:58 +01:00 committed by GitHub
parent 125a2c9c03
commit 63f4bbba98
15 changed files with 410 additions and 168 deletions

View File

@ -5,8 +5,31 @@
*/
package org.elasticsearch.xpack.common.http;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.SpecialPermission;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.NameValuePair;
import org.apache.http.auth.AuthScope;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIUtils;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.Streams;
@ -17,34 +40,27 @@ import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.ssl.SSLService;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocketFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.URLEncoder;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Client class to wrap http connections
*/
public class HttpClient extends AbstractComponent {
private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl.";
private final HttpAuthRegistry httpAuthRegistry;
private final CloseableHttpClient client;
private final Integer proxyPort;
private final String proxyHost;
private final TimeValue defaultConnectionTimeout;
private final TimeValue defaultReadTimeout;
private final boolean isHostnameVerificationEnabled;
private final SSLSocketFactory sslSocketFactory;
private final HttpProxy proxy;
public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLService sslService) {
super(settings);
@ -52,148 +68,158 @@ public class HttpClient extends AbstractComponent {
this.defaultConnectionTimeout = HttpSettings.CONNECTION_TIMEOUT.get(settings);
this.defaultReadTimeout = HttpSettings.READ_TIMEOUT.get(settings);
final Integer proxyPort;
if (HttpSettings.PROXY_HOST.exists(settings)) {
proxyPort = HttpSettings.PROXY_PORT.get(settings);
} else {
proxyPort = null;
}
final String proxyHost = HttpSettings.PROXY_HOST.get(settings);
if (proxyPort != null && Strings.hasText(proxyHost)) {
this.proxy = new HttpProxy(proxyHost, proxyPort);
// proxy setup
this.proxyHost = HttpSettings.PROXY_HOST.get(settings);
this.proxyPort = HttpSettings.PROXY_PORT.get(settings);
if (proxyPort != 0 && Strings.hasText(proxyHost)) {
logger.info("Using default proxy for http input and slack/hipchat/pagerduty/webhook actions [{}:{}]", proxyHost, proxyPort);
} else if (proxyPort == null && Strings.hasText(proxyHost) == false) {
this.proxy = HttpProxy.NO_PROXY;
} else {
throw new IllegalArgumentException("HTTP Proxy requires both settings: [" + HttpSettings.PROXY_HOST_KEY + "] and [" +
HttpSettings.PROXY_PORT_KEY + "]");
} else if (proxyPort != 0 ^ Strings.hasText(proxyHost)) {
throw new IllegalArgumentException("HTTP proxy requires both settings: [" + HttpSettings.PROXY_HOST.getKey() + "] and [" +
HttpSettings.PROXY_PORT.getKey() + "]");
}
Settings sslSettings = settings.getByPrefix(HttpSettings.SSL_KEY_PREFIX);
this.sslSocketFactory = sslService.sslSocketFactory(settings.getByPrefix(HttpSettings.SSL_KEY_PREFIX));
this.isHostnameVerificationEnabled = sslService.getVerificationMode(sslSettings, Settings.EMPTY).isHostnameVerificationEnabled();
HttpClientBuilder clientBuilder = HttpClientBuilder.create();
// ssl setup
Settings sslSettings = settings.getByPrefix(SETTINGS_SSL_PREFIX);
boolean isHostnameVerificationEnabled = sslService.getVerificationMode(sslSettings, Settings.EMPTY).isHostnameVerificationEnabled();
HostnameVerifier verifier = isHostnameVerificationEnabled ? new DefaultHostnameVerifier() : NoopHostnameVerifier.INSTANCE;
SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier);
clientBuilder.setSSLSocketFactory(factory);
client = clientBuilder.build();
}
public HttpResponse execute(HttpRequest request) throws IOException {
try {
return doExecute(request);
} catch (SocketTimeoutException ste) {
throw new ElasticsearchTimeoutException("failed to execute http request. timeout expired", ste);
}
}
URI uri = createURI(request);
public HttpResponse doExecute(HttpRequest request) throws IOException {
String queryString = null;
if (request.params() != null && !request.params().isEmpty()) {
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, String> entry : request.params().entrySet()) {
if (builder.length() != 0) {
builder.append('&');
}
builder.append(URLEncoder.encode(entry.getKey(), "UTF-8"))
.append('=')
.append(URLEncoder.encode(entry.getValue(), "UTF-8"));
HttpRequestBase internalRequest;
if (request.method == HttpMethod.HEAD) {
internalRequest = new HttpHead(uri);
} else {
HttpMethodWithEntity methodWithEntity = new HttpMethodWithEntity(uri, request.method.name());
if (request.body != null) {
methodWithEntity.setEntity(new StringEntity(request.body));
}
queryString = builder.toString();
internalRequest = methodWithEntity;
}
internalRequest.setHeader(HttpHeaders.ACCEPT_CHARSET, StandardCharsets.UTF_8.name());
String path = Strings.hasLength(request.path) ? request.path : "";
if (Strings.hasLength(queryString)) {
path += "?" + queryString;
}
URL url = new URL(request.scheme.scheme(), request.host, request.port, path);
RequestConfig.Builder config = RequestConfig.custom();
logger.debug("making [{}] request to [{}]", request.method().method(), url);
logger.trace("sending [{}] as body of request", request.body());
// proxy configured in the request always wins!
HttpProxy proxyToUse = request.proxy != null ? request.proxy : proxy;
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection(proxyToUse.proxy());
if (urlConnection instanceof HttpsURLConnection) {
final HttpsURLConnection httpsConn = (HttpsURLConnection) urlConnection;
final SSLSocketFactory factory = sslSocketFactory;
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
httpsConn.setSSLSocketFactory(factory);
if (isHostnameVerificationEnabled == false) {
httpsConn.setHostnameVerifier(NoopHostnameVerifier.INSTANCE);
}
return null;
}
});
}
urlConnection.setRequestMethod(request.method().method());
if (request.headers() != null) {
for (Map.Entry<String, String> entry : request.headers().entrySet()) {
urlConnection.setRequestProperty(entry.getKey(), entry.getValue());
// headers
if (request.headers().isEmpty() == false) {
for (Map.Entry<String, String> entry : request.headers.entrySet()) {
internalRequest.setHeader(entry.getKey(), entry.getValue());
}
}
// proxy
if (request.proxy != null && request.proxy.equals(HttpProxy.NO_PROXY) == false) {
HttpHost proxy = new HttpHost(request.proxy.getHost(), request.proxy.getPort(), request.scheme.scheme());
config.setProxy(proxy);
} else if (proxyPort != null && Strings.hasText(proxyHost)) {
HttpHost proxy = new HttpHost(proxyHost, proxyPort, request.scheme.scheme());
config.setProxy(proxy);
}
HttpClientContext localContext = HttpClientContext.create();
// auth
if (request.auth() != null) {
logger.trace("applying auth headers");
ApplicableHttpAuth applicableAuth = httpAuthRegistry.createApplicable(request.auth);
applicableAuth.apply(urlConnection);
}
urlConnection.setUseCaches(false);
urlConnection.setRequestProperty("Accept-Charset", StandardCharsets.UTF_8.name());
if (request.body() != null) {
urlConnection.setDoOutput(true);
byte[] bytes = request.body().getBytes(StandardCharsets.UTF_8.name());
urlConnection.setRequestProperty("Content-Length", String.valueOf(bytes.length));
urlConnection.getOutputStream().write(bytes);
urlConnection.getOutputStream().close();
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
applicableAuth.apply(credentialsProvider, new AuthScope(request.host, request.port));
localContext.setCredentialsProvider(credentialsProvider);
// preemptive auth, no need to wait for a 401 first
AuthCache authCache = new BasicAuthCache();
BasicScheme basicAuth = new BasicScheme();
authCache.put(new HttpHost(request.host, request.port, request.scheme.scheme()), basicAuth);
localContext.setAuthCache(authCache);
}
TimeValue connectionTimeout = request.connectionTimeout != null ? request.connectionTimeout : defaultConnectionTimeout;
urlConnection.setConnectTimeout((int) connectionTimeout.millis());
// timeouts
if (request.connectionTimeout() != null) {
TimeValue readTimeout = request.readTimeout != null ? request.readTimeout : defaultReadTimeout;
urlConnection.setReadTimeout((int) readTimeout.millis());
urlConnection.connect();
final int statusCode = urlConnection.getResponseCode();
// no status code, not considered a valid HTTP response then
if (statusCode == -1) {
throw new IOException("Not a valid HTTP response, no status code in response");
config.setConnectTimeout(Math.toIntExact(request.connectionTimeout.millis()));
} else {
config.setConnectTimeout(Math.toIntExact(defaultConnectionTimeout.millis()));
}
Map<String, String[]> responseHeaders = new HashMap<>(urlConnection.getHeaderFields().size());
for (Map.Entry<String, List<String>> header : urlConnection.getHeaderFields().entrySet()) {
// HttpURLConnection#getHeaderFields returns the first status line as a header
// with a `null` key (facepalm)... so we have to skip that one.
if (header.getKey() != null) {
responseHeaders.put(header.getKey(), header.getValue().toArray(new String[header.getValue().size()]));
if (request.readTimeout() != null) {
config.setSocketTimeout(Math.toIntExact(request.readTimeout.millis()));
config.setConnectionRequestTimeout(Math.toIntExact(request.readTimeout.millis()));
} else {
config.setSocketTimeout(Math.toIntExact(defaultReadTimeout.millis()));
config.setConnectionRequestTimeout(Math.toIntExact(defaultReadTimeout.millis()));
}
internalRequest.setConfig(config.build());
try (CloseableHttpResponse response = client.execute(internalRequest, localContext)) {
// headers
Header[] headers = response.getAllHeaders();
Map<String, String[]> responseHeaders = new HashMap<>(headers.length);
for (Header header : headers) {
if (responseHeaders.containsKey(header.getName())) {
String[] old = responseHeaders.get(header.getName());
String[] values = new String[old.length + 1];
System.arraycopy(old, 0, values, 0, old.length);
values[values.length-1] = header.getValue();
responseHeaders.put(header.getName(), values);
} else {
responseHeaders.put(header.getName(), new String[]{header.getValue()});
}
}
}
logger.debug("http status code [{}]", statusCode);
final byte[] body;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
try (InputStream is = urlConnection.getInputStream()) {
Streams.copy(is, outputStream);
} catch (Exception e) {
if (urlConnection.getErrorStream() != null) {
try (InputStream is = urlConnection.getErrorStream()) {
final byte[] body;
// not every response has a content, i.e. 204
if (response.getEntity() == null) {
body = new byte[0];
} else {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
try (InputStream is = response.getEntity().getContent()) {
Streams.copy(is, outputStream);
}
body = outputStream.toByteArray();
}
}
body = outputStream.toByteArray();
return new HttpResponse(response.getStatusLine().getStatusCode(), body, responseHeaders);
}
return new HttpResponse(statusCode, body, responseHeaders);
}
private static final class NoopHostnameVerifier implements HostnameVerifier {
private URI createURI(HttpRequest request) {
// this could be really simple, as the apache http client has a UriBuilder class, however this class is always doing
// url path escaping, and we have done this already, so this would result in double escaping
try {
List<NameValuePair> qparams = new ArrayList<>(request.params.size());
request.params.forEach((k, v)-> qparams.add(new BasicNameValuePair(k, v)));
URI uri = URIUtils.createURI(request.scheme.scheme(), request.host, request.port, request.path,
URLEncodedUtils.format(qparams, "UTF-8"), null);
private static final HostnameVerifier INSTANCE = new NoopHostnameVerifier();
return uri;
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Helper class to have all HTTP methods except HEAD allow for an body, including GET
*/
final class HttpMethodWithEntity extends HttpEntityEnclosingRequestBase {
private final String methodName;
HttpMethodWithEntity(final URI uri, String methodName) {
this.methodName = methodName;
setURI(uri);
}
@Override
public boolean verify(String s, SSLSession sslSession) {
return true;
public String getMethod() {
return methodName;
}
}
}

View File

@ -460,8 +460,8 @@ public class HttpRequest implements ToXContent {
scheme = Scheme.parse(uri.getScheme());
port = uri.getPort() > 0 ? uri.getPort() : scheme.defaultPort();
host = uri.getHost();
if (Strings.hasLength(uri.getPath())) {
path = uri.getPath();
if (Strings.hasLength(uri.getRawPath())) {
path = uri.getRawPath();
}
String rawQuery = uri.getRawQuery();
if (Strings.hasLength(rawQuery)) {

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.common.http.auth;
import org.apache.http.auth.AuthScope;
import org.apache.http.client.CredentialsProvider;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -13,7 +15,7 @@ import java.net.HttpURLConnection;
public abstract class ApplicableHttpAuth<Auth extends HttpAuth> implements ToXContent {
private final Auth auth;
protected final Auth auth;
public ApplicableHttpAuth(Auth auth) {
this.auth = auth;
@ -25,6 +27,8 @@ public abstract class ApplicableHttpAuth<Auth extends HttpAuth> implements ToXCo
public abstract void apply(HttpURLConnection connection);
public abstract void apply(CredentialsProvider credsProvider, AuthScope authScope);
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return auth.toXContent(builder, params);

View File

@ -5,20 +5,25 @@
*/
package org.elasticsearch.xpack.common.http.auth.basic;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.elasticsearch.xpack.common.http.auth.ApplicableHttpAuth;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.elasticsearch.xpack.common.http.auth.ApplicableHttpAuth;
import org.elasticsearch.xpack.security.crypto.CryptoService;
public class ApplicableBasicAuth extends ApplicableHttpAuth<BasicAuth> {
private final String basicAuth;
private final CryptoService cryptoService;
public ApplicableBasicAuth(BasicAuth auth, CryptoService service) {
super(auth);
basicAuth = headerValue(auth.username, auth.password.text(service));
this.cryptoService = service;
}
public static String headerValue(String username, char[] password) {
@ -29,4 +34,10 @@ public class ApplicableBasicAuth extends ApplicableHttpAuth<BasicAuth> {
connection.setRequestProperty("Authorization", basicAuth);
}
@Override
public void apply(CredentialsProvider credsProvider, AuthScope authScope) {
credsProvider.setCredentials(authScope,
new UsernamePasswordCredentials(auth.username, new String(auth.password.text(cryptoService))));
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.common.http.HttpClient;
import org.elasticsearch.xpack.common.http.HttpMethod;
import org.elasticsearch.xpack.common.http.HttpProxy;
import org.elasticsearch.xpack.common.http.HttpRequest;
import org.elasticsearch.xpack.common.http.HttpResponse;
@ -68,6 +69,7 @@ public class SlackAccount {
public SentMessages.SentMessage send(final String to, final SlackMessage message, final HttpProxy proxy) {
HttpRequest request = HttpRequest.builder(url.getHost(), url.getPort())
.path(url.getPath())
.method(HttpMethod.POST)
.proxy(proxy)
.scheme(Scheme.parse(url.getScheme()))
.jsonBody(new ToXContent() {

View File

@ -2,9 +2,8 @@ grant {
// needed because of problems in unbound LDAP library
permission java.util.PropertyPermission "*", "read,write";
// needed to set expert SSL options, etc
// required to configure the custom mailcap for watcher
permission java.lang.RuntimePermission "setFactory";
permission javax.net.ssl.SSLPermission "setHostnameVerifier";
// needed when sending emails for javax.activation
// otherwise a classnotfound exception is thrown due to trying

View File

@ -7,6 +7,7 @@ package org.elasticsearch.test.http;
import org.elasticsearch.common.SuppressForbidden;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -48,7 +49,16 @@ public class Headers {
* @param value Value of the header
*/
void add(String name, String value) {
this.headers.put(name, Collections.singletonList(value));
this.headers.compute(name, (k, v) -> {
if (v == null) {
return Collections.singletonList(value);
} else {
List<String> list = new ArrayList<>();
list.addAll(v);
list.add(value);
return list;
}
});
}
/**

View File

@ -6,8 +6,10 @@
package org.elasticsearch.xpack.common.http;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.http.client.ClientProtocolException;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
@ -15,7 +17,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.http.MockResponse;
import org.elasticsearch.test.http.MockWebServer;
import org.elasticsearch.test.junit.annotations.Network;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuthFactory;
@ -32,15 +33,19 @@ import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
@ -89,7 +94,6 @@ public class HttpClientTests extends ESTestCase {
HttpRequest request = requestBuilder.build();
HttpResponse response = httpClient.execute(request);
assertThat(response.status(), equalTo(responseCode));
assertThat(response.body().utf8ToString(), equalTo(body));
assertThat(webServer.requests(), hasSize(1));
@ -98,7 +102,6 @@ public class HttpClientTests extends ESTestCase {
assertThat(webServer.requests().get(0).getHeader(headerKey), equalTo(headerValue));
}
@TestLogging("org.elasticsearch.http.test:TRACE")
public void testNoQueryString() throws Exception {
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
@ -114,7 +117,7 @@ public class HttpClientTests extends ESTestCase {
assertThat(webServer.requests().get(0).getBody(), is(nullValue()));
}
public void testUrlEncodingWithQueryStrings() throws Exception{
public void testUrlEncodingWithQueryStrings() throws Exception {
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
.method(HttpMethod.GET)
@ -216,9 +219,9 @@ public class HttpClientTests extends ESTestCase {
public void testHttpsClientAuth() throws Exception {
Path resource = getDataPath("/org/elasticsearch/xpack/security/keystore/testnode.jks");
Settings settings = Settings.builder()
.put("xpack.ssl.keystore.path", resource.toString())
.put("xpack.ssl.keystore.password", "testnode")
.build();
.put("xpack.ssl.keystore.path", resource.toString())
.put("xpack.ssl.keystore.password", "testnode")
.build();
TestsSSLService sslService = new TestsSSLService(settings, environment);
httpClient = new HttpClient(settings, authRegistry, sslService);
@ -329,11 +332,31 @@ public class HttpClientTests extends ESTestCase {
}
}
public void testThatProxyConfigurationRequiresHostAndPort() {
Settings.Builder settings = Settings.builder();
if (randomBoolean()) {
settings.put(HttpSettings.PROXY_HOST.getKey(), "localhost");
} else {
settings.put(HttpSettings.PROXY_PORT.getKey(), 8080);
}
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> new HttpClient(settings.build(), authRegistry, new SSLService(settings.build(), environment)));
assertThat(e.getMessage(),
containsString("HTTP proxy requires both settings: [xpack.http.proxy.host] and [xpack.http.proxy.port]"));
}
public void testThatUrlPathIsNotEncoded() throws Exception {
// %2F is a slash that needs to be encoded to not be misinterpreted as a path
String path = "/%3Clogstash-%7Bnow%2Fd%7D%3E/_search";
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("foo"));
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort()).path(path).build();
HttpRequest request;
if (randomBoolean()) {
request = HttpRequest.builder("localhost", webServer.getPort()).path(path).build();
} else {
// ensure that fromUrl acts the same way than the above builder
request = HttpRequest.builder().fromUrl(String.format(Locale.ROOT, "http://localhost:%s%s", webServer.getPort(), path)).build();
}
httpClient.execute(request);
assertThat(webServer.requests(), hasSize(1));
@ -344,6 +367,36 @@ public class HttpClientTests extends ESTestCase {
assertThat(webServer.requests().get(0).getUri().getPath(), is("/<logstash-{now/d}>/_search"));
}
public void testThatDuplicateHeaderKeysAreReturned() throws Exception {
MockResponse mockResponse = new MockResponse().setResponseCode(200).setBody("foo")
.addHeader("foo", "bar")
.addHeader("foo", "baz")
.addHeader("Content-Length", "3");
webServer.enqueue(mockResponse);
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort()).path("/").build();
HttpResponse httpResponse = httpClient.execute(request);
assertThat(webServer.requests(), hasSize(1));
assertThat(httpResponse.headers(), hasKey("foo"));
assertThat(httpResponse.headers().get("foo"), containsInAnyOrder("bar", "baz"));
}
// finally fixing https://github.com/elastic/x-plugins/issues/1141 - yay! Fixed due to switching to apache http client internally!
public void testThatClientTakesTimeoutsIntoAccountAfterHeadersAreSent() throws Exception {
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("foo").setBodyDelay(TimeValue.timeValueSeconds(2)));
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort()).path("/foo")
.method(HttpMethod.POST)
.body("foo")
.connectionTimeout(TimeValue.timeValueMillis(500))
.readTimeout(TimeValue.timeValueMillis(500))
.build();
SocketTimeoutException e = expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
assertThat(e.getMessage(), is("Read timed out"));
}
public void testThatHttpClientFailsOnNonHttpResponse() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
AtomicReference<Exception> hasExceptionHappened = new AtomicReference();
@ -360,11 +413,19 @@ public class HttpClientTests extends ESTestCase {
}
});
HttpRequest request = HttpRequest.builder("localhost", serverSocket.getLocalPort()).path("/").build();
IOException e = expectThrows(IOException.class, () -> httpClient.execute(request));
assertThat(e.getMessage(), is("Not a valid HTTP response, no status code in response"));
expectThrows(ClientProtocolException.class, () -> httpClient.execute(request));
assertThat("A server side exception occured, but shouldnt", hasExceptionHappened.get(), is(nullValue()));
} finally {
terminate(executor);
}
}
public void testNoContentResponse() throws Exception {
int noContentStatusCode = 204;
webServer.enqueue(new MockResponse().setResponseCode(noContentStatusCode));
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort()).path("/foo").build();
HttpResponse response = httpClient.execute(request);
assertThat(response.status(), is(noContentStatusCode));
assertThat(response.body(), is(nullValue()));
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.common.http;
import org.apache.http.conn.ConnectTimeoutException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -37,7 +38,7 @@ public class HttpConnectionTimeoutTests extends ESTestCase {
try {
httpClient.execute(request);
fail("expected timeout exception");
} catch (ElasticsearchTimeoutException ete) {
} catch (ConnectTimeoutException ete) {
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
logger.info("http connection timed out after {}", timeout.format());
// it's supposed to be 10, but we'll give it an error margin of 2 seconds
@ -63,7 +64,7 @@ public class HttpConnectionTimeoutTests extends ESTestCase {
try {
httpClient.execute(request);
fail("expected timeout exception");
} catch (ElasticsearchTimeoutException ete) {
} catch (ConnectTimeoutException ete) {
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
logger.info("http connection timed out after {}", timeout.format());
// it's supposed to be 7, but we'll give it an error margin of 2 seconds
@ -90,7 +91,7 @@ public class HttpConnectionTimeoutTests extends ESTestCase {
try {
httpClient.execute(request);
fail("expected timeout exception");
} catch (ElasticsearchTimeoutException ete) {
} catch (ConnectTimeoutException ete) {
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
logger.info("http connection timed out after {}", timeout.format());
// it's supposed to be 7, but we'll give it an error margin of 2 seconds

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.common.http;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
@ -17,6 +17,8 @@ import org.elasticsearch.xpack.ssl.SSLService;
import org.junit.After;
import org.junit.Before;
import java.net.SocketTimeoutException;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.Mockito.mock;
@ -47,7 +49,7 @@ public class HttpReadTimeoutTests extends ESTestCase {
.build();
long start = System.nanoTime();
expectThrows(ElasticsearchTimeoutException.class, () -> httpClient.execute(request));
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
logger.info("http connection timed out after {}", timeout.format());
@ -69,7 +71,7 @@ public class HttpReadTimeoutTests extends ESTestCase {
.build();
long start = System.nanoTime();
expectThrows(ElasticsearchTimeoutException.class, () -> httpClient.execute(request));
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
logger.info("http connection timed out after {}", timeout.format());
@ -86,18 +88,18 @@ public class HttpReadTimeoutTests extends ESTestCase {
, mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment));
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
.readTimeout(TimeValue.timeValueSeconds(5))
.readTimeout(TimeValue.timeValueSeconds(3))
.method(HttpMethod.POST)
.path("/")
.build();
long start = System.nanoTime();
expectThrows(ElasticsearchTimeoutException.class, () -> httpClient.execute(request));
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
logger.info("http connection timed out after {}", timeout.format());
// it's supposed to be 5, but we'll give it an error margin of 2 seconds
assertThat(timeout.seconds(), greaterThan(3L));
assertThat(timeout.seconds(), lessThan(7L));
// it's supposed to be 3, but we'll give it an error margin of 2 seconds
assertThat(timeout.seconds(), greaterThan(1L));
assertThat(timeout.seconds(), lessThan(5L));
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.http.MockResponse;
import org.elasticsearch.test.http.MockWebServer;
import org.elasticsearch.xpack.common.http.HttpMethod;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.http.Scheme;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth;
@ -69,7 +70,8 @@ public class WebhookHttpsIntegrationTests extends AbstractWatcherIntegrationTest
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", webServer.getPort())
.scheme(Scheme.HTTPS)
.path(new TextTemplate("/test/_id"))
.body(new TextTemplate("{key=value}"));
.body(new TextTemplate("{key=value}"))
.method(HttpMethod.POST);
watcherClient().preparePutWatch("_id")
.setSource(watchBuilder()
@ -91,6 +93,7 @@ public class WebhookHttpsIntegrationTests extends AbstractWatcherIntegrationTest
SearchResponse response =
searchWatchRecords(b -> b.setQuery(QueryBuilders.termQuery(WatchRecord.Field.STATE.getPreferredName(), "executed")));
assertNoFailures(response);
XContentSource source = xContentSource(response.getHits().getAt(0).sourceRef());
String body = source.getValue("result.actions.0.webhook.response.body");
@ -108,7 +111,8 @@ public class WebhookHttpsIntegrationTests extends AbstractWatcherIntegrationTest
.scheme(Scheme.HTTPS)
.auth(new BasicAuth("_username", "_password".toCharArray()))
.path(new TextTemplate("/test/_id"))
.body(new TextTemplate("{key=value}"));
.body(new TextTemplate("{key=value}"))
.method(HttpMethod.POST);
watcherClient().preparePutWatch("_id")
.setSource(watchBuilder()

View File

@ -5,10 +5,15 @@
*/
package org.elasticsearch.xpack.watcher.actions.webhook;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.http.MockResponse;
import org.elasticsearch.test.http.MockWebServer;
import org.elasticsearch.xpack.common.http.HttpMethod;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth;
import org.elasticsearch.xpack.common.text.TextTemplate;
@ -20,6 +25,8 @@ import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.junit.After;
import org.junit.Before;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
@ -36,6 +43,16 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase
private MockWebServer webServer = new MockWebServer();;
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("http.enabled", true).build();
}
@Override
protected boolean enableSecurity() {
return true;
}
@Before
public void startWebservice() throws Exception {
webServer.start();
@ -52,7 +69,8 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase
.path(new TextTemplate("/test/_id"))
.putParam("param1", new TextTemplate("value1"))
.putParam("watch_id", new TextTemplate("_id"))
.body(new TextTemplate("_body"));
.body(new TextTemplate("_body"))
.method(HttpMethod.POST);
watcherClient().preparePutWatch("_id")
.setSource(watchBuilder()
@ -68,7 +86,6 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase
}
assertWatchWithMinimumPerformedActionsCount("_id", 1, false);
assertThat(webServer.requests(), hasSize(1));
assertThat(webServer.requests().get(0).getUri().getQuery(),
anyOf(equalTo("watch_id=_id&param1=value1"), equalTo("param1=value1&watch_id=_id")));
@ -94,7 +111,8 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase
.path(new TextTemplate("/test/_id"))
.putParam("param1", new TextTemplate("value1"))
.putParam("watch_id", new TextTemplate("_id"))
.body(new TextTemplate("_body"));
.body(new TextTemplate("_body"))
.method(HttpMethod.POST);
watcherClient().preparePutWatch("_id")
.setSource(watchBuilder()
@ -117,4 +135,31 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase
assertThat(webServer.requests().get(0).getBody(), is("_body"));
assertThat(webServer.requests().get(0).getHeader("Authorization"), is(("Basic X3VzZXJuYW1lOl9wYXNzd29yZA==")));
}
public void testWebhookWithTimebasedIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("<logstash-{now/d}>").get());
HttpServerTransport serverTransport = internalCluster().getDataNodeInstance(HttpServerTransport.class);
TransportAddress publishAddress = serverTransport.boundAddress().publishAddress();
String host = publishAddress.address().getHostString();
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder(host, publishAddress.getPort())
.path(new TextTemplate("/%3Clogstash-%7Bnow%2Fd%7D%3E/log/1"))
.body(new TextTemplate("{\"foo\":\"bar\"}"))
.auth(new BasicAuth("test", "changeme".toCharArray()))
.method(HttpMethod.PUT);
watcherClient().preparePutWatch("_id")
.setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(simpleInput("key", "value"))
.condition(AlwaysCondition.INSTANCE)
.addAction("_id", ActionBuilders.webhookAction(builder)))
.get();
watcherClient().prepareExecuteWatch("_id").get();
GetResponse response = client().prepareGet("<logstash-{now/d}>", "log", "1").get();
assertExists(response);
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.test.http.MockResponse;
import org.elasticsearch.test.http.MockWebServer;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.common.http.HttpMethod;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
@ -66,6 +67,7 @@ public class HistoryTemplateHttpMappingsTests extends AbstractWatcherIntegration
.condition(AlwaysCondition.INSTANCE)
.addAction("_webhook", webhookAction(HttpRequestTemplate.builder("localhost", webServer.getPort())
.path("/webhook/path")
.method(HttpMethod.POST)
.body("_body"))))
.get();

View File

@ -47,8 +47,6 @@ public class SlackServiceTests extends AbstractWatcherIntegrationTestCase {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// this is for the `test-watcher-integration` group level integration in HipChat
.put("xpack.notification.slack.account.test_account.url",
"https://hooks.slack.com/services/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")
.build();
@ -81,7 +79,7 @@ public class SlackServiceTests extends AbstractWatcherIntegrationTestCase {
public void testWatchWithSlackAction() throws Exception {
String account = "test_account";
SlackAction.Builder actionBuilder = slackAction(account, SlackMessage.Template.builder()
.setText("slack integration test` " + DateTime.now())
.setText("slack integration test `testWatchWithSlackAction()` " + DateTime.now())
.addTo("#watcher-test", "#watcher-test-2"));
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("1").setSource(watchBuilder()

View File

@ -0,0 +1,77 @@
---
"Test url escaping with url mustache function":
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: <date-index-{now/d}>
type: log
id: 1
refresh: true
body: { foo: bar }
- do: {xpack.watcher.stats:{}}
- match: { "watcher_state": "started" }
- match: { "watch_count": 0 }
- do:
xpack.watcher.put_watch:
id: "test_watch"
body: >
{
"metadata" : {
"index" : "<date-index-{now/d}>"
},
"trigger": {
"schedule": {
"interval": "1h"
}
},
"input": {
"http" : {
"request" : {
"host" : "localhost",
"port" : 9400,
"path" : "/{{#url}}{{ctx.metadata.index}}{{/url}}/_search"
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"eq" : 1
}
}
},
"actions": {
"output": {
"webhook" : {
"method" : "PUT",
"host" : "localhost",
"port" : 9400,
"path" : "/{{#url}}{{ctx.metadata.index}}{{/url}}/log/2",
"params" : {
"refresh" : "true"
},
"body" : "{ \"foo\": \"bar\" }"
}
}
}
}
- match: { _id: "test_watch" }
- match: { created: true }
- do:
xpack.watcher.execute_watch:
id: "test_watch"
- do:
count:
index: <date-index-{now/d}>
type: log
- match: {count : 2}