[plugin] repository-azure: add configuration settings for connect/write/response/read timeouts (#1789)
* [plugin] repository-azure: add configuration settings for connect/write/response/read timeouts Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Addressing code review comments: renaming connectionXxx to connectXxx Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Addressing code review comments: adding timeout comment Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
This commit is contained in:
parent
96d55966a2
commit
5f100c5558
|
@ -96,7 +96,11 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
|
|||
AzureStorageSettings.MAX_RETRIES_SETTING,
|
||||
AzureStorageSettings.PROXY_TYPE_SETTING,
|
||||
AzureStorageSettings.PROXY_HOST_SETTING,
|
||||
AzureStorageSettings.PROXY_PORT_SETTING
|
||||
AzureStorageSettings.PROXY_PORT_SETTING,
|
||||
AzureStorageSettings.CONNECT_TIMEOUT_SETTING,
|
||||
AzureStorageSettings.WRITE_TIMEOUT_SETTING,
|
||||
AzureStorageSettings.READ_TIMEOUT_SETTING,
|
||||
AzureStorageSettings.RESPONSE_TIMEOUT_SETTING
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -64,6 +64,7 @@ import org.opensearch.common.settings.Settings;
|
|||
import org.opensearch.common.settings.SettingsException;
|
||||
import org.opensearch.common.unit.ByteSizeUnit;
|
||||
import org.opensearch.common.unit.ByteSizeValue;
|
||||
import org.opensearch.common.unit.TimeValue;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Proxy;
|
||||
|
@ -178,6 +179,26 @@ public class AzureStorageService implements AutoCloseable {
|
|||
clientBuilder.proxy(new ProxyOptions(type, (InetSocketAddress) proxy.address()));
|
||||
}
|
||||
|
||||
final TimeValue connectTimeout = azureStorageSettings.getConnectTimeout();
|
||||
if (connectTimeout != null) {
|
||||
clientBuilder.connectTimeout(Duration.ofMillis(connectTimeout.millis()));
|
||||
}
|
||||
|
||||
final TimeValue writeTimeout = azureStorageSettings.getWriteTimeout();
|
||||
if (writeTimeout != null) {
|
||||
clientBuilder.writeTimeout(Duration.ofMillis(writeTimeout.millis()));
|
||||
}
|
||||
|
||||
final TimeValue readTimeout = azureStorageSettings.getReadTimeout();
|
||||
if (readTimeout != null) {
|
||||
clientBuilder.readTimeout(Duration.ofMillis(readTimeout.millis()));
|
||||
}
|
||||
|
||||
final TimeValue responseTimeout = azureStorageSettings.getResponseTimeout();
|
||||
if (responseTimeout != null) {
|
||||
clientBuilder.responseTimeout(Duration.ofMillis(responseTimeout.millis()));
|
||||
}
|
||||
|
||||
builder.httpClient(clientBuilder.build());
|
||||
|
||||
// We define a default exponential retry policy
|
||||
|
|
|
@ -97,6 +97,7 @@ final class AzureStorageSettings {
|
|||
() -> KEY_SETTING
|
||||
);
|
||||
|
||||
// The overall operation timeout
|
||||
public static final AffixSetting<TimeValue> TIMEOUT_SETTING = Setting.affixKeySetting(
|
||||
AZURE_CLIENT_PREFIX_KEY,
|
||||
"timeout",
|
||||
|
@ -105,6 +106,42 @@ final class AzureStorageSettings {
|
|||
() -> KEY_SETTING
|
||||
);
|
||||
|
||||
// See please NettyAsyncHttpClientBuilder#DEFAULT_CONNECT_TIMEOUT
|
||||
public static final AffixSetting<TimeValue> CONNECT_TIMEOUT_SETTING = Setting.affixKeySetting(
|
||||
AZURE_CLIENT_PREFIX_KEY,
|
||||
"connect.timeout",
|
||||
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(10), Property.NodeScope),
|
||||
() -> ACCOUNT_SETTING,
|
||||
() -> KEY_SETTING
|
||||
);
|
||||
|
||||
// See please NettyAsyncHttpClientBuilder#DEFAULT_WRITE_TIMEOUT
|
||||
public static final AffixSetting<TimeValue> WRITE_TIMEOUT_SETTING = Setting.affixKeySetting(
|
||||
AZURE_CLIENT_PREFIX_KEY,
|
||||
"write.timeout",
|
||||
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
|
||||
() -> ACCOUNT_SETTING,
|
||||
() -> KEY_SETTING
|
||||
);
|
||||
|
||||
// See please NettyAsyncHttpClientBuilder#DEFAULT_READ_TIMEOUT
|
||||
public static final AffixSetting<TimeValue> READ_TIMEOUT_SETTING = Setting.affixKeySetting(
|
||||
AZURE_CLIENT_PREFIX_KEY,
|
||||
"read.timeout",
|
||||
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
|
||||
() -> ACCOUNT_SETTING,
|
||||
() -> KEY_SETTING
|
||||
);
|
||||
|
||||
// See please NettyAsyncHttpClientBuilder#DEFAULT_RESPONSE_TIMEOUT
|
||||
public static final AffixSetting<TimeValue> RESPONSE_TIMEOUT_SETTING = Setting.affixKeySetting(
|
||||
AZURE_CLIENT_PREFIX_KEY,
|
||||
"response.timeout",
|
||||
(key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(60), Property.NodeScope),
|
||||
() -> ACCOUNT_SETTING,
|
||||
() -> KEY_SETTING
|
||||
);
|
||||
|
||||
/** The type of the proxy to connect to azure through. Can be direct (no proxy, default), http or socks */
|
||||
public static final AffixSetting<Proxy.Type> PROXY_TYPE_SETTING = Setting.affixKeySetting(
|
||||
AZURE_CLIENT_PREFIX_KEY,
|
||||
|
@ -142,6 +179,10 @@ final class AzureStorageSettings {
|
|||
private final int maxRetries;
|
||||
private final Proxy proxy;
|
||||
private final LocationMode locationMode;
|
||||
private final TimeValue connectTimeout;
|
||||
private final TimeValue writeTimeout;
|
||||
private final TimeValue readTimeout;
|
||||
private final TimeValue responseTimeout;
|
||||
|
||||
// copy-constructor
|
||||
private AzureStorageSettings(
|
||||
|
@ -151,7 +192,11 @@ final class AzureStorageSettings {
|
|||
TimeValue timeout,
|
||||
int maxRetries,
|
||||
Proxy proxy,
|
||||
LocationMode locationMode
|
||||
LocationMode locationMode,
|
||||
TimeValue connectTimeout,
|
||||
TimeValue writeTimeout,
|
||||
TimeValue readTimeout,
|
||||
TimeValue responseTimeout
|
||||
) {
|
||||
this.account = account;
|
||||
this.connectString = connectString;
|
||||
|
@ -160,6 +205,10 @@ final class AzureStorageSettings {
|
|||
this.maxRetries = maxRetries;
|
||||
this.proxy = proxy;
|
||||
this.locationMode = locationMode;
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.writeTimeout = writeTimeout;
|
||||
this.readTimeout = readTimeout;
|
||||
this.responseTimeout = responseTimeout;
|
||||
}
|
||||
|
||||
private AzureStorageSettings(
|
||||
|
@ -171,7 +220,11 @@ final class AzureStorageSettings {
|
|||
int maxRetries,
|
||||
Proxy.Type proxyType,
|
||||
String proxyHost,
|
||||
Integer proxyPort
|
||||
Integer proxyPort,
|
||||
TimeValue connectTimeout,
|
||||
TimeValue writeTimeout,
|
||||
TimeValue readTimeout,
|
||||
TimeValue responseTimeout
|
||||
) {
|
||||
this.account = account;
|
||||
this.connectString = buildConnectString(account, key, sasToken, endpointSuffix);
|
||||
|
@ -197,6 +250,10 @@ final class AzureStorageSettings {
|
|||
}
|
||||
}
|
||||
this.locationMode = LocationMode.PRIMARY_ONLY;
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.writeTimeout = writeTimeout;
|
||||
this.readTimeout = readTimeout;
|
||||
this.responseTimeout = responseTimeout;
|
||||
}
|
||||
|
||||
public String getEndpointSuffix() {
|
||||
|
@ -245,6 +302,22 @@ final class AzureStorageSettings {
|
|||
return locationMode;
|
||||
}
|
||||
|
||||
public TimeValue getConnectTimeout() {
|
||||
return connectTimeout;
|
||||
}
|
||||
|
||||
public TimeValue getWriteTimeout() {
|
||||
return writeTimeout;
|
||||
}
|
||||
|
||||
public TimeValue getReadTimeout() {
|
||||
return readTimeout;
|
||||
}
|
||||
|
||||
public TimeValue getResponseTimeout() {
|
||||
return responseTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder("AzureStorageSettings{");
|
||||
|
@ -254,6 +327,10 @@ final class AzureStorageSettings {
|
|||
sb.append(", maxRetries=").append(maxRetries);
|
||||
sb.append(", proxy=").append(proxy);
|
||||
sb.append(", locationMode='").append(locationMode).append('\'');
|
||||
sb.append(", connectTimeout='").append(connectTimeout).append('\'');
|
||||
sb.append(", writeTimeout='").append(writeTimeout).append('\'');
|
||||
sb.append(", readTimeout='").append(readTimeout).append('\'');
|
||||
sb.append(", responseTimeout='").append(responseTimeout).append('\'');
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
@ -296,7 +373,11 @@ final class AzureStorageSettings {
|
|||
getValue(settings, clientName, MAX_RETRIES_SETTING),
|
||||
getValue(settings, clientName, PROXY_TYPE_SETTING),
|
||||
getValue(settings, clientName, PROXY_HOST_SETTING),
|
||||
getValue(settings, clientName, PROXY_PORT_SETTING)
|
||||
getValue(settings, clientName, PROXY_PORT_SETTING),
|
||||
getValue(settings, clientName, CONNECT_TIMEOUT_SETTING),
|
||||
getValue(settings, clientName, WRITE_TIMEOUT_SETTING),
|
||||
getValue(settings, clientName, READ_TIMEOUT_SETTING),
|
||||
getValue(settings, clientName, RESPONSE_TIMEOUT_SETTING)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -327,7 +408,11 @@ final class AzureStorageSettings {
|
|||
entry.getValue().timeout,
|
||||
entry.getValue().maxRetries,
|
||||
entry.getValue().proxy,
|
||||
locationMode
|
||||
locationMode,
|
||||
entry.getValue().connectTimeout,
|
||||
entry.getValue().writeTimeout,
|
||||
entry.getValue().readTimeout,
|
||||
entry.getValue().responseTimeout
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.opensearch.common.settings.MockSecureSettings;
|
|||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.common.settings.SettingsException;
|
||||
import org.opensearch.common.settings.SettingsModule;
|
||||
import org.opensearch.common.unit.TimeValue;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -217,6 +218,62 @@ public class AzureStorageServiceTests extends OpenSearchTestCase {
|
|||
assertThat(azureStorageService.getBlobRequestTimeout("azure3"), is(Duration.ofSeconds(30)));
|
||||
}
|
||||
|
||||
public void testClientDefaultConnectTimeout() {
|
||||
final Settings settings = Settings.builder()
|
||||
.setSecureSettings(buildSecureSettings())
|
||||
.put("azure.client.azure3.connect.timeout", "25s")
|
||||
.build();
|
||||
final AzureStorageService mock = storageServiceWithSettingsValidation(settings);
|
||||
final TimeValue timeout = mock.storageSettings.get("azure3").getConnectTimeout();
|
||||
|
||||
assertThat(timeout, notNullValue());
|
||||
assertThat(timeout, equalTo(TimeValue.timeValueSeconds(25)));
|
||||
assertThat(mock.storageSettings.get("azure2").getConnectTimeout(), notNullValue());
|
||||
assertThat(mock.storageSettings.get("azure2").getConnectTimeout(), equalTo(TimeValue.timeValueSeconds(10)));
|
||||
}
|
||||
|
||||
public void testClientDefaultWriteTimeout() {
|
||||
final Settings settings = Settings.builder()
|
||||
.setSecureSettings(buildSecureSettings())
|
||||
.put("azure.client.azure3.write.timeout", "85s")
|
||||
.build();
|
||||
final AzureStorageService mock = storageServiceWithSettingsValidation(settings);
|
||||
final TimeValue timeout = mock.storageSettings.get("azure3").getWriteTimeout();
|
||||
|
||||
assertThat(timeout, notNullValue());
|
||||
assertThat(timeout, equalTo(TimeValue.timeValueSeconds(85)));
|
||||
assertThat(mock.storageSettings.get("azure2").getWriteTimeout(), notNullValue());
|
||||
assertThat(mock.storageSettings.get("azure2").getWriteTimeout(), equalTo(TimeValue.timeValueSeconds(60)));
|
||||
}
|
||||
|
||||
public void testClientDefaultReadTimeout() {
|
||||
final Settings settings = Settings.builder()
|
||||
.setSecureSettings(buildSecureSettings())
|
||||
.put("azure.client.azure3.read.timeout", "120s")
|
||||
.build();
|
||||
final AzureStorageService mock = storageServiceWithSettingsValidation(settings);
|
||||
final TimeValue timeout = mock.storageSettings.get("azure3").getReadTimeout();
|
||||
|
||||
assertThat(timeout, notNullValue());
|
||||
assertThat(timeout, equalTo(TimeValue.timeValueSeconds(120)));
|
||||
assertThat(mock.storageSettings.get("azure2").getReadTimeout(), notNullValue());
|
||||
assertThat(mock.storageSettings.get("azure2").getReadTimeout(), equalTo(TimeValue.timeValueSeconds(60)));
|
||||
}
|
||||
|
||||
public void testClientDefaultResponseTimeout() {
|
||||
final Settings settings = Settings.builder()
|
||||
.setSecureSettings(buildSecureSettings())
|
||||
.put("azure.client.azure3.response.timeout", "1ms")
|
||||
.build();
|
||||
final AzureStorageService mock = storageServiceWithSettingsValidation(settings);
|
||||
final TimeValue timeout = mock.storageSettings.get("azure3").getResponseTimeout();
|
||||
|
||||
assertThat(timeout, notNullValue());
|
||||
assertThat(timeout, equalTo(TimeValue.timeValueMillis(1)));
|
||||
assertThat(mock.storageSettings.get("azure2").getResponseTimeout(), notNullValue());
|
||||
assertThat(mock.storageSettings.get("azure2").getResponseTimeout(), equalTo(TimeValue.timeValueSeconds(60)));
|
||||
}
|
||||
|
||||
public void testGetSelectedClientNoTimeout() {
|
||||
final AzureStorageService azureStorageService = storageServiceWithSettingsValidation(buildSettings());
|
||||
assertThat(azureStorageService.getBlobRequestTimeout("azure1"), nullValue());
|
||||
|
|
Loading…
Reference in New Issue