Adding backoff from retries on GCE errors
In case of any error while trying to get GCE instances list from GCE API, elasticsearch will slow down its API calls.
This commit is contained in:
parent
ff9999876c
commit
6a6e168b8b
|
@ -46,6 +46,19 @@ discovery:
|
||||||
type: gce
|
type: gce
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
|
||||||
|
The following gce settings (prefixed with `cloud.gce`) are supported:
|
||||||
|
|
||||||
|
`retry`::
|
||||||
|
|
||||||
|
If set to `true`, client will use
|
||||||
|
https://developers.google.com/api-client-library/java/google-http-java-client/backoff[ExponentialBackOff]
|
||||||
|
policy to retry the failed http request. Defaults to `true`.
|
||||||
|
|
||||||
|
`max_wait`::
|
||||||
|
|
||||||
|
The maximum elapsed time in milliseconds after the client instantiating retry. If the time elapsed goes past the
|
||||||
|
`max_wait`, client stops to retry. Defaults to 15 minutes (900000 milliseconds).
|
||||||
|
|
||||||
|
|
||||||
[IMPORTANT]
|
[IMPORTANT]
|
||||||
.Binding the network host
|
.Binding the network host
|
||||||
|
|
|
@ -32,6 +32,9 @@ public interface GceComputeService extends LifecycleComponent<GceComputeService>
|
||||||
public static final String REFRESH = "cloud.gce.refresh_interval";
|
public static final String REFRESH = "cloud.gce.refresh_interval";
|
||||||
public static final String TAGS = "discovery.gce.tags";
|
public static final String TAGS = "discovery.gce.tags";
|
||||||
public static final String VERSION = "Elasticsearch/GceCloud/1.0";
|
public static final String VERSION = "Elasticsearch/GceCloud/1.0";
|
||||||
|
|
||||||
|
public static final String RETRY = "cloud.gce.retry";
|
||||||
|
public static final String MAXWAIT = "cloud.gce.max_wait";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -190,14 +191,29 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceCompute
|
||||||
|
|
||||||
logger.debug("token [{}] will expire in [{}] s", credential.getAccessToken(), credential.getExpiresInSeconds());
|
logger.debug("token [{}] will expire in [{}] s", credential.getAccessToken(), credential.getExpiresInSeconds());
|
||||||
if (credential.getExpiresInSeconds() != null) {
|
if (credential.getExpiresInSeconds() != null) {
|
||||||
refreshInterval = TimeValue.timeValueSeconds(credential.getExpiresInSeconds()-1);
|
refreshInterval = TimeValue.timeValueSeconds(credential.getExpiresInSeconds() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once done, let's use this token
|
boolean ifRetry = settings.getAsBoolean(Fields.RETRY, true);
|
||||||
this.client = new Compute.Builder(getGceHttpTransport(), gceJsonFactory, null)
|
Compute.Builder builder = new Compute.Builder(getGceHttpTransport(), gceJsonFactory, null)
|
||||||
.setApplicationName(Fields.VERSION)
|
.setApplicationName(Fields.VERSION);
|
||||||
.setHttpRequestInitializer(credential)
|
|
||||||
.build();
|
if (ifRetry) {
|
||||||
|
int maxWait = settings.getAsInt(Fields.MAXWAIT, -1);
|
||||||
|
RetryHttpInitializerWrapper retryHttpInitializerWrapper;
|
||||||
|
|
||||||
|
if (maxWait > 0) {
|
||||||
|
retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential, maxWait);
|
||||||
|
} else {
|
||||||
|
retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential);
|
||||||
|
}
|
||||||
|
builder.setHttpRequestInitializer(retryHttpInitializerWrapper);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
builder.setHttpRequestInitializer(credential);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.client = builder.build();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("unable to start GCE discovery service", e);
|
logger.warn("unable to start GCE discovery service", e);
|
||||||
throw new IllegalArgumentException("unable to start GCE discovery service", e);
|
throw new IllegalArgumentException("unable to start GCE discovery service", e);
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.discovery.gce;
|
||||||
|
|
||||||
|
import com.google.api.client.auth.oauth2.Credential;
|
||||||
|
import com.google.api.client.http.*;
|
||||||
|
import com.google.api.client.util.ExponentialBackOff;
|
||||||
|
import com.google.api.client.util.Sleeper;
|
||||||
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
|
||||||
|
|
||||||
|
private int maxWait;
|
||||||
|
|
||||||
|
private static final ESLogger logger =
|
||||||
|
ESLoggerFactory.getLogger(RetryHttpInitializerWrapper.class.getName());
|
||||||
|
|
||||||
|
// Intercepts the request for filling in the "Authorization"
|
||||||
|
// header field, as well as recovering from certain unsuccessful
|
||||||
|
// error codes wherein the Credential must refresh its token for a
|
||||||
|
// retry.
|
||||||
|
private final Credential wrappedCredential;
|
||||||
|
|
||||||
|
// A sleeper; you can replace it with a mock in your test.
|
||||||
|
private final Sleeper sleeper;
|
||||||
|
|
||||||
|
public RetryHttpInitializerWrapper(Credential wrappedCredential) {
|
||||||
|
this(wrappedCredential, Sleeper.DEFAULT, ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RetryHttpInitializerWrapper(Credential wrappedCredential, int maxWait) {
|
||||||
|
this(wrappedCredential, Sleeper.DEFAULT, maxWait);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use only for testing.
|
||||||
|
RetryHttpInitializerWrapper(
|
||||||
|
Credential wrappedCredential, Sleeper sleeper, int maxWait) {
|
||||||
|
this.wrappedCredential = Objects.requireNonNull(wrappedCredential);
|
||||||
|
this.sleeper = sleeper;
|
||||||
|
this.maxWait = maxWait;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(HttpRequest httpRequest) {
|
||||||
|
final HttpUnsuccessfulResponseHandler backoffHandler =
|
||||||
|
new HttpBackOffUnsuccessfulResponseHandler(
|
||||||
|
new ExponentialBackOff.Builder()
|
||||||
|
.setMaxElapsedTimeMillis(maxWait)
|
||||||
|
.build())
|
||||||
|
.setSleeper(sleeper);
|
||||||
|
|
||||||
|
httpRequest.setInterceptor(wrappedCredential);
|
||||||
|
httpRequest.setUnsuccessfulResponseHandler(
|
||||||
|
new HttpUnsuccessfulResponseHandler() {
|
||||||
|
int retry = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry) throws IOException {
|
||||||
|
if (wrappedCredential.handleResponse(
|
||||||
|
request, response, supportsRetry)) {
|
||||||
|
// If credential decides it can handle it,
|
||||||
|
// the return code or message indicated
|
||||||
|
// something specific to authentication,
|
||||||
|
// and no backoff is desired.
|
||||||
|
return true;
|
||||||
|
} else if (backoffHandler.handleResponse(
|
||||||
|
request, response, supportsRetry)) {
|
||||||
|
// Otherwise, we defer to the judgement of
|
||||||
|
// our internal backoff handler.
|
||||||
|
logger.debug("Retrying [{}] times : [{}]", retry, request.getUrl());
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
httpRequest.setIOExceptionHandler(
|
||||||
|
new HttpBackOffIOExceptionHandler(
|
||||||
|
new ExponentialBackOff.Builder()
|
||||||
|
.setMaxElapsedTimeMillis(maxWait)
|
||||||
|
.build())
|
||||||
|
.setSleeper(sleeper)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,174 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.discovery.gce;
|
||||||
|
|
||||||
|
import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential;
|
||||||
|
import com.google.api.client.http.*;
|
||||||
|
import com.google.api.client.json.JsonFactory;
|
||||||
|
import com.google.api.client.json.jackson2.JacksonFactory;
|
||||||
|
import com.google.api.client.testing.http.MockHttpTransport;
|
||||||
|
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
|
||||||
|
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
|
||||||
|
import com.google.api.client.testing.util.MockSleeper;
|
||||||
|
import com.google.api.services.compute.Compute;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
public class RetryHttpInitializerWrapperTests {
|
||||||
|
|
||||||
|
static private class FailThenSuccessBackoffTransport extends MockHttpTransport {
|
||||||
|
|
||||||
|
public int lowLevelExecCalls;
|
||||||
|
int errorStatusCode;
|
||||||
|
int callsBeforeSuccess;
|
||||||
|
boolean throwException;
|
||||||
|
|
||||||
|
protected FailThenSuccessBackoffTransport(int errorStatusCode, int callsBeforeSuccess) {
|
||||||
|
this.errorStatusCode = errorStatusCode;
|
||||||
|
this.callsBeforeSuccess = callsBeforeSuccess;
|
||||||
|
this.throwException = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected FailThenSuccessBackoffTransport(int errorStatusCode, int callsBeforeSuccess, boolean throwException) {
|
||||||
|
this.errorStatusCode = errorStatusCode;
|
||||||
|
this.callsBeforeSuccess = callsBeforeSuccess;
|
||||||
|
this.throwException = throwException;
|
||||||
|
}
|
||||||
|
|
||||||
|
public LowLevelHttpRequest retryableGetRequest = new MockLowLevelHttpRequest() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LowLevelHttpResponse execute() throws IOException {
|
||||||
|
lowLevelExecCalls++;
|
||||||
|
|
||||||
|
if (lowLevelExecCalls <= callsBeforeSuccess) {
|
||||||
|
if (throwException) {
|
||||||
|
throw new IOException("Test IOException");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return failure on the first call
|
||||||
|
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
|
||||||
|
response.setContent("Request should fail");
|
||||||
|
response.setStatusCode(errorStatusCode);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
// Return success on the second
|
||||||
|
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
|
||||||
|
response.setStatusCode(200);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LowLevelHttpRequest buildRequest(String method, String url) {
|
||||||
|
return retryableGetRequest;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleRetry() throws Exception {
|
||||||
|
|
||||||
|
FailThenSuccessBackoffTransport fakeTransport =
|
||||||
|
new FailThenSuccessBackoffTransport(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, 3);
|
||||||
|
|
||||||
|
MockGoogleCredential credential = new MockGoogleCredential.Builder()
|
||||||
|
.build();
|
||||||
|
MockSleeper mockSleeper = new MockSleeper();
|
||||||
|
|
||||||
|
RetryHttpInitializerWrapper retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential, mockSleeper, 5000);
|
||||||
|
|
||||||
|
Compute client = new Compute.Builder(fakeTransport, new JacksonFactory(), null)
|
||||||
|
.setHttpRequestInitializer(retryHttpInitializerWrapper)
|
||||||
|
.setApplicationName("test")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
HttpRequest request = client.getRequestFactory().buildRequest("Get", new GenericUrl("http://elasticsearch.com"), null);
|
||||||
|
HttpResponse response = request.execute();
|
||||||
|
|
||||||
|
assertThat(mockSleeper.getCount(), equalTo(3));
|
||||||
|
assertThat(response.getStatusCode(), equalTo(200));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryWaitTooLong() throws Exception {
|
||||||
|
int maxWait = 50;
|
||||||
|
|
||||||
|
FailThenSuccessBackoffTransport fakeTransport =
|
||||||
|
new FailThenSuccessBackoffTransport(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, 50);
|
||||||
|
JsonFactory jsonFactory = new JacksonFactory();
|
||||||
|
MockGoogleCredential credential = new MockGoogleCredential.Builder()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
MockSleeper oneTimeSleeper = new MockSleeper() {
|
||||||
|
@Override
|
||||||
|
public void sleep(long millis) throws InterruptedException {
|
||||||
|
Thread.sleep(maxWait);
|
||||||
|
super.sleep(0); // important number, use this to get count
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
RetryHttpInitializerWrapper retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential, oneTimeSleeper, maxWait);
|
||||||
|
|
||||||
|
Compute client = new Compute.Builder(fakeTransport, jsonFactory, null)
|
||||||
|
.setHttpRequestInitializer(retryHttpInitializerWrapper)
|
||||||
|
.setApplicationName("test")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
HttpRequest request1 = client.getRequestFactory().buildRequest("Get", new GenericUrl("http://elasticsearch.com"), null);
|
||||||
|
try {
|
||||||
|
request1.execute();
|
||||||
|
fail("Request should fail if wait too long");
|
||||||
|
} catch (HttpResponseException e) {
|
||||||
|
assertThat(e.getStatusCode(), equalTo(HttpStatusCodes.STATUS_CODE_SERVER_ERROR));
|
||||||
|
assertThat(e.getMessage(), equalTo("500\nRequest should fail"));
|
||||||
|
// should only retry once.
|
||||||
|
assertThat(oneTimeSleeper.getCount(), equalTo(1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIOExceptionRetry() throws Exception {
|
||||||
|
|
||||||
|
FailThenSuccessBackoffTransport fakeTransport =
|
||||||
|
new FailThenSuccessBackoffTransport(HttpStatusCodes.STATUS_CODE_SERVER_ERROR, 1, true);
|
||||||
|
|
||||||
|
MockGoogleCredential credential = new MockGoogleCredential.Builder()
|
||||||
|
.build();
|
||||||
|
MockSleeper mockSleeper = new MockSleeper();
|
||||||
|
RetryHttpInitializerWrapper retryHttpInitializerWrapper = new RetryHttpInitializerWrapper(credential, mockSleeper, 500);
|
||||||
|
|
||||||
|
Compute client = new Compute.Builder(fakeTransport, new JacksonFactory(), null)
|
||||||
|
.setHttpRequestInitializer(retryHttpInitializerWrapper)
|
||||||
|
.setApplicationName("test")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
HttpRequest request = client.getRequestFactory().buildRequest("Get", new GenericUrl("http://elasticsearch.com"), null);
|
||||||
|
HttpResponse response = request.execute();
|
||||||
|
|
||||||
|
assertThat(mockSleeper.getCount(), equalTo(1));
|
||||||
|
assertThat(response.getStatusCode(), equalTo(200));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue