Add GCS support for searchable snapshots (#55403)

Adds ranged read support for GCS repositories in order to enable searchable snapshot support
for GCS.

As part of this PR, I've extracted some of the test infrastructure to make sure that
GoogleCloudStorageBlobContainerRetriesTests and S3BlobContainerRetriesTests are covering
similar test (as I saw those diverging in what they cover)
This commit is contained in:
Yannick Welsch 2020-04-20 10:52:26 +02:00
parent 258f4b3be3
commit b9da307cd1
10 changed files with 786 additions and 489 deletions

View File

@ -43,7 +43,8 @@ restResources {
}
}
testFixtures.useFixture(':test:fixtures:gcs-fixture')
testFixtures.useFixture(':test:fixtures:gcs-fixture', 'gcs-fixture')
testFixtures.useFixture(':test:fixtures:gcs-fixture', 'gcs-fixture-third-party')
boolean useFixture = false
String gcsServiceAccount = System.getenv("google_storage_service_account")

View File

@ -62,6 +62,11 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
return blobStore.readBlob(buildKey(blobName));
}
@Override
public InputStream readBlob(final String blobName, final long position, final long length) throws IOException {
return blobStore.readBlob(buildKey(blobName), position, length);
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);

View File

@ -47,6 +47,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.Streams;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -196,7 +197,29 @@ class GoogleCloudStorageBlobStore implements BlobStore {
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName) throws IOException {
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName));
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), 0, Long.MAX_VALUE);
}
/**
* Returns an {@link java.io.InputStream} for the given blob's position and length
*
* @param blobName name of the blob
* @param position starting position to read from
* @param length length of bytes to read
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName, long position, long length) throws IOException {
if (position < 0L) {
throw new IllegalArgumentException("position must be non-negative");
}
if (length < 0) {
throw new IllegalArgumentException("length must be non-negative");
}
if (length == 0) {
return new ByteArrayInputStream(new byte[0]);
} else {
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), position, length);
}
}
/**

View File

@ -32,7 +32,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
@ -54,6 +54,9 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
private final BlobId blobId;
private final long start;
private final long length;
private final int maxRetries;
private InputStream currentStream;
@ -62,33 +65,81 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
private long currentOffset;
private boolean closed;
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException {
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId, long start, long length) throws IOException {
this.client = client;
this.blobId = blobId;
this.start = start;
this.length = length;
this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1;
currentStream = openStream();
}
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private InputStream openStream() throws IOException {
try {
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId));
if (currentOffset > 0L) {
readChannel.seek(currentOffset);
}
return Channels.newInputStream(new ReadableByteChannel() {
final long end = start + length < 0L ? Long.MAX_VALUE : start + length; // inclusive
final SeekableByteChannel adaptedChannel = new SeekableByteChannel() {
long position;
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int read(ByteBuffer dst) throws IOException {
final long remainingBytesToRead = end - position;
assert remainingBytesToRead >= 0L;
// The SDK uses the maximum between chunk size and dst.remaining() to determine fetch size
// We can be smarter here and only fetch what's needed when we know the length
if (remainingBytesToRead < DEFAULT_CHUNK_SIZE) {
readChannel.setChunkSize(Math.toIntExact(remainingBytesToRead));
}
if (remainingBytesToRead < dst.remaining()) {
dst.limit(dst.position() + Math.toIntExact(remainingBytesToRead));
}
try {
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
int read = SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
if (read > 0) {
position += read;
}
return read;
} catch (StorageException e) {
if (e.getCode() == HTTP_NOT_FOUND) {
throw new NoSuchFileException("Blob [" + blobId.getName() + "] does not exist");
throw new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + e.getMessage());
}
throw e;
} finally {
readChannel.setChunkSize(0); // set to default again
}
}
@Override
public int write(ByteBuffer src) {
throw new UnsupportedOperationException();
}
@Override
public long position() {
return position;
}
@Override
public SeekableByteChannel position(long newPosition) throws IOException {
readChannel.seek(newPosition);
this.position = newPosition;
return this;
}
@Override
public long size() {
return length;
}
@Override
public SeekableByteChannel truncate(long size) {
throw new UnsupportedOperationException();
}
@Override
public boolean isOpen() {
return readChannel.isOpen();
@ -98,7 +149,11 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
}
});
};
if (currentOffset > 0 || start > 0) {
adaptedChannel.position(Math.addExact(start, currentOffset));
}
return Channels.newInputStream(adaptedChannel);
} catch (StorageException e) {
throw addSuppressedExceptions(e);
}
@ -147,7 +202,7 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
throw addSuppressedExceptions(e);
}
logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying",
blobId, currentOffset, attempt, MAX_SUPPRESSED_EXCEPTIONS), e);
blobId, currentOffset, attempt, maxRetries), e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);

View File

@ -24,7 +24,6 @@ import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import fixture.gcs.FakeOAuth2HttpHandler;
import org.apache.http.HttpStatus;
import org.elasticsearch.common.Nullable;
@ -42,24 +41,20 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.threeten.bp.Duration;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -91,9 +86,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@SuppressForbidden(reason = "use a http server")
public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
private HttpServer httpServer;
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
private String httpServerUrl() {
assertThat(httpServer, notNullValue());
@ -106,20 +99,33 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
GoogleCloudStorageBlobStoreRepositoryTests.assumeNotJava8();
}
@Before
public void setUp() throws Exception {
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
super.setUp();
// Google's SDK ignores Content-Length header when no bytes are sent, see SizeValidatingInputStream
// TODO: fix this in the SDK
@Override
protected int minIncompleteContentToSend() {
return 1;
}
@After
public void tearDown() throws Exception {
httpServer.stop(0);
super.tearDown();
@Override
protected String downloadStorageEndpoint(String blob) {
return "/download/storage/v1/b/bucket/o/" + blob;
}
private BlobContainer createBlobContainer(final int maxRetries, final @Nullable TimeValue readTimeout) {
@Override
protected String bytesContentType() {
return "application/octet-stream";
}
@Override
protected Class<? extends Exception> unresponsiveExceptionType() {
return StorageException.class;
}
@Override
protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize) {
final Settings.Builder clientSettings = Settings.builder();
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
clientSettings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace(client).getKey(), httpServerUrl());
@ -137,20 +143,22 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
final HttpTransportOptions httpTransportOptions) {
StorageOptions options = super.createStorageOptions(clientSettings, httpTransportOptions);
RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder()
.setTotalTimeout(options.getRetrySettings().getTotalTimeout())
.setInitialRetryDelay(Duration.ofMillis(10L))
.setRetryDelayMultiplier(1.0d)
.setMaxRetryDelay(Duration.ofSeconds(1L))
.setJittered(false)
.setInitialRpcTimeout(Duration.ofSeconds(1))
.setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier())
.setMaxRpcTimeout(Duration.ofSeconds(1));
if (maxRetries != null) {
retrySettingsBuilder.setMaxAttempts(maxRetries);
}
return options.toBuilder()
.setHost(options.getHost())
.setCredentials(options.getCredentials())
.setRetrySettings(RetrySettings.newBuilder()
.setTotalTimeout(options.getRetrySettings().getTotalTimeout())
.setInitialRetryDelay(Duration.ofMillis(10L))
.setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier())
.setMaxRetryDelay(Duration.ofSeconds(1L))
.setMaxAttempts(maxRetries)
.setJittered(false)
.setInitialRpcTimeout(options.getRetrySettings().getInitialRpcTimeout())
.setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier())
.setMaxRpcTimeout(options.getRetrySettings().getMaxRpcTimeout())
.build())
.setRetrySettings(retrySettingsBuilder.build())
.build();
}
};
@ -174,40 +182,6 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
return new GoogleCloudStorageBlobContainer(BlobPath.cleanPath(), blobStore);
}
public void testReadNonexistentBlobThrowsNoSuchFileException() {
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null);
final Exception exception = expectThrows(NoSuchFileException.class,
() -> Streams.readFully(blobContainer.readBlob("read_nonexistent_blob")));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob [read_nonexistent_blob] does not exist"));
}
public void testReadBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final CountDown countDown = new CountDown(maxRetries);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/download/storage/v1/b/bucket/o/read_blob_max_retries", exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), bytes.length);
exchange.getResponseBody().write(bytes);
exchange.close();
return;
}
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
if (randomBoolean()) {
exchange.close();
}
});
final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 500)));
try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
assertThat(countDown.isCountedDown(), is(true));
}
}
public void testReadLargeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final AtomicInteger countDown = new AtomicInteger(maxRetries);
@ -231,47 +205,12 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
exchange.close();
});
final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
}
}
public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomIntBetween(1, 3);
final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 200)));
// HTTP server does not send a response
httpServer.createContext("/download/storage/v1/b/bucket/o/read_blob_unresponsive", exchange -> {});
StorageException storageException = expectThrows(StorageException.class,
() -> Streams.readFully(blobContainer.readBlob("read_blob_unresponsive")));
assertThat(storageException.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
assertThat(storageException.getCause(), instanceOf(SocketTimeoutException.class));
// HTTP server sends a partial response
final byte[] bytes = randomBlobContent();
httpServer.createContext("/download/storage/v1/b/bucket/o/read_blob_incomplete", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
final int bytesToSend = randomIntBetween(0, bytes.length - 1);
if (bytesToSend > 0) {
exchange.getResponseBody().write(bytes, 0, bytesToSend);
}
if (randomBoolean()) {
exchange.getResponseBody().flush();
}
});
storageException = expectThrows(StorageException.class, () -> {
try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) {
Streams.readFully(stream);
}
});
assertThat(storageException.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
assertThat(storageException.getCause(), instanceOf(SocketTimeoutException.class));
}
public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final CountDown countDown = new CountDown(maxRetries);
@ -303,7 +242,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
}
}));
final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false);
}
@ -313,7 +252,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null);
// HTTP server does not send a response
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
@ -441,7 +380,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout);
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null);
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
blobContainer.writeBlob("write_large_blob", stream, data.length, false);
}
@ -461,8 +400,4 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
}
};
}
private static byte[] randomBlobContent() {
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
}
}

View File

@ -21,18 +21,13 @@ package org.elasticsearch.repositories.s3;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
import com.amazonaws.util.Base16;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpStatus;
import org.apache.http.NoHttpResponseException;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@ -44,27 +39,18 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING;
@ -72,41 +58,47 @@ import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SET
import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
/**
* This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs.
*/
@SuppressForbidden(reason = "use a http server")
public class S3BlobContainerRetriesTests extends ESTestCase {
public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
private static final long MAX_RANGE_VAL = Long.MAX_VALUE - 1;
private HttpServer httpServer;
private S3Service service;
@Before
public void setUp() throws Exception {
service = new S3Service();
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
super.setUp();
}
@After
public void tearDown() throws Exception {
IOUtils.close(service);
httpServer.stop(0);
super.tearDown();
}
private BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
@Override
protected String downloadStorageEndpoint(String blob) {
return "/bucket/" + blob;
}
@Override
protected String bytesContentType() {
return "text/plain; charset=utf-8";
}
@Override
protected Class<? extends Exception> unresponsiveExceptionType() {
return SdkClientException.class;
}
@Override
protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize) {
@ -144,210 +136,6 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
repositoryMetadata));
}
public void testReadNonexistentBlobThrowsNoSuchFileException() {
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
final long position = randomLongBetween(0, MAX_RANGE_VAL);
final int length = randomIntBetween(0, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
final Exception exception = expectThrows(NoSuchFileException.class,
() -> {
if (randomBoolean()) {
blobContainer.readBlob("read_nonexistent_blob");
} else {
blobContainer.readBlob("read_nonexistent_blob", 0, 1);
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
assertThat(expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob", position, length))
.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
}
public void testReadBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_blob_max_retries", exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
assertEquals(Optional.empty(), getRangeEnd(exchange));
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart);
exchange.close();
return;
}
if (randomBoolean()) {
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
} else if (randomBoolean()) {
sendIncompleteContent(exchange, bytes);
}
if (randomBoolean()) {
exchange.close();
}
});
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
// read stream only partly
readLimit = randomIntBetween(0, bytes.length);
wrappedStream = Streams.limitStream(inputStream, readLimit);
} else {
readLimit = bytes.length;
wrappedStream = inputStream;
}
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream));
logger.info("maxRetries={}, readLimit={}, byteSize={}, bytesRead={}",
maxRetries, readLimit, bytes.length, bytesRead.length);
assertArrayEquals(Arrays.copyOfRange(bytes, 0, readLimit), bytesRead);
if (readLimit < bytes.length) {
// we might have completed things based on an incomplete response, and we're happy with that
} else {
assertTrue(countDown.isCountedDown());
}
}
}
public void testReadRangeBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_range_blob_max_retries", exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
assertTrue(getRangeEnd(exchange).isPresent());
final int rangeEnd = getRangeEnd(exchange).get();
assertThat(rangeEnd, greaterThanOrEqualTo(rangeStart));
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd);
final int length = (effectiveRangeEnd - rangeStart) + 1;
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
exchange.getResponseBody().write(bytes, rangeStart, length);
exchange.close();
return;
}
if (randomBoolean()) {
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
} else if (randomBoolean()) {
sendIncompleteContent(exchange, bytes);
}
if (randomBoolean()) {
exchange.close();
}
});
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
final int position = randomIntBetween(0, bytes.length - 1);
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
// read stream only partly
readLimit = randomIntBetween(0, length);
wrappedStream = Streams.limitStream(inputStream, readLimit);
} else {
readLimit = length;
wrappedStream = inputStream;
}
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream));
logger.info("maxRetries={}, position={}, length={}, readLimit={}, byteSize={}, bytesRead={}",
maxRetries, position, length, readLimit, bytes.length, bytesRead.length);
assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + readLimit)), bytesRead);
if (readLimit == 0 || (readLimit < length && readLimit == bytesRead.length)) {
// we might have completed things based on an incomplete response, and we're happy with that
} else {
assertTrue(countDown.isCountedDown());
}
}
}
public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomInt(5);
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
// HTTP server does not send a response
httpServer.createContext("/bucket/read_blob_unresponsive", exchange -> {});
Exception exception = expectThrows(SdkClientException.class, () -> blobContainer.readBlob("read_blob_unresponsive"));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
assertThat(exception.getCause(), instanceOf(SocketTimeoutException.class));
// HTTP server sends a partial response
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_blob_incomplete", exchange -> sendIncompleteContent(exchange, bytes));
final int position = randomIntBetween(0, bytes.length - 1);
final int length = randomIntBetween(1, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
exception = expectThrows(IOException.class, () -> {
try (InputStream stream = randomBoolean() ?
blobContainer.readBlob("read_blob_incomplete") :
blobContainer.readBlob("read_blob_incomplete", position, length)) {
Streams.readFully(stream);
}
});
assertThat(exception, either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class)));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), either(containsString("read timed out")).or(
containsString("premature end of chunk coded message body: closing chunk expected")));
assertThat(exception.getSuppressed().length, equalTo(maxRetries));
}
public void testReadBlobWithNoHttpResponse() {
final BlobContainer blobContainer = createBlobContainer(randomInt(5), null, null, null);
// HTTP server closes connection immediately
httpServer.createContext("/bucket/read_blob_no_response", HttpExchange::close);
Exception exception = expectThrows(SdkClientException.class,
() -> {
if (randomBoolean()) {
blobContainer.readBlob("read_blob_no_response");
} else {
blobContainer.readBlob("read_blob_no_response", 0, 1);
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("the target server failed to respond"));
assertThat(exception.getCause(), instanceOf(NoHttpResponseException.class));
assertThat(exception.getSuppressed().length, equalTo(0));
}
public void testReadBlobWithPrematureConnectionClose() {
final int maxRetries = randomInt(20);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
// HTTP server sends a partial response
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_blob_incomplete", exchange -> {
sendIncompleteContent(exchange, bytes);
exchange.close();
});
final Exception exception = expectThrows(ConnectionClosedException.class, () -> {
try (InputStream stream = randomBoolean() ?
blobContainer.readBlob("read_blob_incomplete", 0, 1):
blobContainer.readBlob("read_blob_incomplete")) {
Streams.readFully(stream);
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT),
either(containsString("premature end of chunk coded message body: closing chunk expected"))
.or(containsString("premature end of content-length delimited message body")));
assertThat(exception.getSuppressed().length, equalTo(Math.min(S3RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS, maxRetries)));
}
public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
@ -372,7 +160,7 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
} else {
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
}
}
exchange.close();
@ -504,140 +292,4 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
assertThat(countDownUploads.get(), equalTo(0));
assertThat(countDownComplete.isCountedDown(), is(true));
}
private static byte[] randomBlobContent() {
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
}
private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$");
private static Tuple<Long, Long> getRange(HttpExchange exchange) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
if (rangeHeader == null) {
return Tuple.tuple(0L, MAX_RANGE_VAL);
}
final Matcher matcher = RANGE_PATTERN.matcher(rangeHeader);
assertTrue(rangeHeader + " matches expected pattern", matcher.matches());
long rangeStart = Long.parseLong(matcher.group(1));
long rangeEnd = Long.parseLong(matcher.group(2));
assertThat(rangeStart, lessThanOrEqualTo(rangeEnd));
return Tuple.tuple(rangeStart, rangeEnd);
}
private static int getRangeStart(HttpExchange exchange) {
return Math.toIntExact(getRange(exchange).v1());
}
private static Optional<Integer> getRangeEnd(HttpExchange exchange) {
final long rangeEnd = getRange(exchange).v2();
if (rangeEnd == MAX_RANGE_VAL) {
return Optional.empty();
}
return Optional.of(Math.toIntExact(rangeEnd));
}
private static void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
final Optional<Integer> rangeEnd = getRangeEnd(exchange);
final int length;
if (rangeEnd.isPresent()) {
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
final int effectiveRangeEnd = Math.min(rangeEnd.get(), bytes.length - 1);
length = effectiveRangeEnd - rangeStart + 1;
} else {
length = bytes.length - rangeStart;
}
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
final int bytesToSend = randomIntBetween(0, length - 1);
if (bytesToSend > 0) {
exchange.getResponseBody().write(bytes, rangeStart, bytesToSend);
}
if (randomBoolean()) {
exchange.getResponseBody().flush();
}
}
/**
* A resettable InputStream that only serves zeros.
**/
private static class ZeroInputStream extends InputStream {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final long length;
private final AtomicLong reads;
private volatile long mark;
private ZeroInputStream(final long length) {
this.length = length;
this.reads = new AtomicLong(0);
this.mark = -1;
}
@Override
public int read() throws IOException {
ensureOpen();
return (reads.incrementAndGet() <= length) ? 0 : -1;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
if (len == 0) {
return 0;
}
final int available = available();
if (available == 0) {
return -1;
}
final int toCopy = Math.min(len, available);
Arrays.fill(b, off, off + toCopy, (byte) 0);
reads.addAndGet(toCopy);
return toCopy;
}
@Override
public boolean markSupported() {
return true;
}
@Override
public synchronized void mark(int readlimit) {
mark = reads.get();
}
@Override
public synchronized void reset() throws IOException {
ensureOpen();
reads.set(mark);
}
@Override
public int available() throws IOException {
ensureOpen();
if (reads.get() >= length) {
return 0;
}
try {
return Math.toIntExact(length - reads.get());
} catch (ArithmeticException e) {
return Integer.MAX_VALUE;
}
}
@Override
public void close() {
closed.set(true);
}
private void ensureOpen() throws IOException {
if (closed.get()) {
throw new IOException("Stream closed");
}
}
}
}

View File

@ -23,4 +23,16 @@ services:
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"
- "80"
gcs-fixture-other:
build:
context: .
args:
port: 80
bucket: "bucket"
token: "o/oauth2/token"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"

View File

@ -0,0 +1,445 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.blobstore;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpStatus;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@SuppressForbidden(reason = "use a http server")
public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
private static final long MAX_RANGE_VAL = Long.MAX_VALUE - 1;
protected HttpServer httpServer;
@Before
public void setUp() throws Exception {
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
super.setUp();
}
@After
public void tearDown() throws Exception {
httpServer.stop(0);
super.tearDown();
}
protected abstract String downloadStorageEndpoint(String blob);
protected abstract String bytesContentType();
protected abstract Class<? extends Exception> unresponsiveExceptionType();
protected abstract BlobContainer createBlobContainer(@Nullable Integer maxRetries,
@Nullable TimeValue readTimeout,
@Nullable Boolean disableChunkedEncoding,
@Nullable ByteSizeValue bufferSize);
public void testReadNonexistentBlobThrowsNoSuchFileException() {
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
final long position = randomLongBetween(0, MAX_RANGE_VAL);
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
final Exception exception = expectThrows(
NoSuchFileException.class,
() -> {
if (randomBoolean()) {
Streams.readFully(blobContainer.readBlob("read_nonexistent_blob"));
} else {
Streams.readFully(blobContainer.readBlob("read_nonexistent_blob", 0, 1));
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
assertThat(expectThrows(NoSuchFileException.class,
() -> Streams.readFully(blobContainer.readBlob("read_nonexistent_blob", position, length)))
.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
}
public void testReadBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
final byte[] bytes = randomBlobContent();
httpServer.createContext(downloadStorageEndpoint("read_blob_max_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart);
exchange.close();
return;
}
if (randomBoolean()) {
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
} else if (randomBoolean()) {
sendIncompleteContent(exchange, bytes);
}
if (randomBoolean()) {
exchange.close();
}
});
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
// read stream only partly
readLimit = randomIntBetween(0, bytes.length);
wrappedStream = Streams.limitStream(inputStream, readLimit);
} else {
readLimit = bytes.length;
wrappedStream = inputStream;
}
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream));
logger.info("maxRetries={}, readLimit={}, byteSize={}, bytesRead={}",
maxRetries, readLimit, bytes.length, bytesRead.length);
assertArrayEquals(Arrays.copyOfRange(bytes, 0, readLimit), bytesRead);
if (readLimit < bytes.length) {
// we might have completed things based on an incomplete response, and we're happy with that
} else {
assertTrue(countDown.isCountedDown());
}
}
}
public void testReadRangeBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
final byte[] bytes = randomBlobContent();
httpServer.createContext(downloadStorageEndpoint("read_range_blob_max_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
assertTrue(getRangeEnd(exchange).isPresent());
final int rangeEnd = getRangeEnd(exchange).get();
assertThat(rangeEnd, greaterThanOrEqualTo(rangeStart));
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd);
final int length = (effectiveRangeEnd - rangeStart) + 1;
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
exchange.getResponseBody().write(bytes, rangeStart, length);
exchange.close();
return;
}
if (randomBoolean()) {
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
} else if (randomBoolean()) {
sendIncompleteContent(exchange, bytes);
}
if (randomBoolean()) {
exchange.close();
}
});
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
final int position = randomIntBetween(0, bytes.length - 1);
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
// read stream only partly
readLimit = randomIntBetween(0, length);
wrappedStream = Streams.limitStream(inputStream, readLimit);
} else {
readLimit = length;
wrappedStream = inputStream;
}
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream));
logger.info("maxRetries={}, position={}, length={}, readLimit={}, byteSize={}, bytesRead={}",
maxRetries, position, length, readLimit, bytes.length, bytesRead.length);
assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + readLimit)), bytesRead);
if (readLimit == 0 || (readLimit < length && readLimit == bytesRead.length)) {
// we might have completed things based on an incomplete response, and we're happy with that
} else {
assertTrue(countDown.isCountedDown());
}
}
}
public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomInt(5);
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
// HTTP server does not send a response
httpServer.createContext(downloadStorageEndpoint("read_blob_unresponsive"), exchange -> {});
Exception exception = expectThrows(unresponsiveExceptionType(),
() -> Streams.readFully(blobContainer.readBlob("read_blob_unresponsive")));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
assertThat(exception.getCause(), instanceOf(SocketTimeoutException.class));
// HTTP server sends a partial response
final byte[] bytes = randomBlobContent();
httpServer.createContext(downloadStorageEndpoint("read_blob_incomplete"), exchange -> sendIncompleteContent(exchange, bytes));
final int position = randomIntBetween(0, bytes.length - 1);
final int length = randomIntBetween(1, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
exception = expectThrows(Exception.class, () -> {
try (InputStream stream = randomBoolean() ?
blobContainer.readBlob("read_blob_incomplete") :
blobContainer.readBlob("read_blob_incomplete", position, length)) {
Streams.readFully(stream);
}
});
assertThat(exception, either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class))
.or(instanceOf(RuntimeException.class)));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), either(containsString("read timed out")).or(
containsString("premature end of chunk coded message body: closing chunk expected")).or(containsString("Read timed out")));
assertThat(exception.getSuppressed().length, equalTo(maxRetries));
}
public void testReadBlobWithNoHttpResponse() {
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null);
// HTTP server closes connection immediately
httpServer.createContext(downloadStorageEndpoint("read_blob_no_response"), HttpExchange::close);
Exception exception = expectThrows(unresponsiveExceptionType(),
() -> {
if (randomBoolean()) {
Streams.readFully(blobContainer.readBlob("read_blob_no_response"));
} else {
Streams.readFully(blobContainer.readBlob("read_blob_no_response", 0, 1));
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), either(containsString("the target server failed to respond"))
.or(containsString("unexpected end of file from server")));
}
public void testReadBlobWithPrematureConnectionClose() {
final int maxRetries = randomInt(20);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
// HTTP server sends a partial response
final byte[] bytes = randomBlobContent(minIncompleteContentToSend() + 1);
httpServer.createContext(downloadStorageEndpoint("read_blob_incomplete"), exchange -> {
sendIncompleteContent(exchange, bytes);
exchange.close();
});
final Exception exception = expectThrows(Exception.class, () -> {
try (InputStream stream = randomBoolean() ?
blobContainer.readBlob("read_blob_incomplete", 0, minIncompleteContentToSend() + 1):
blobContainer.readBlob("read_blob_incomplete")) {
Streams.readFully(stream);
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT),
either(containsString("premature end of chunk coded message body: closing chunk expected"))
.or(containsString("premature end of content-length delimited message body"))
.or(containsString("connection closed prematurely")));
assertThat(exception.getSuppressed().length, equalTo(Math.min(10, maxRetries)));
}
protected static byte[] randomBlobContent() {
return randomBlobContent(1);
}
protected static byte[] randomBlobContent(int minSize) {
return randomByteArrayOfLength(randomIntBetween(minSize, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
}
private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$");
private static Tuple<Long, Long> getRange(HttpExchange exchange) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
if (rangeHeader == null) {
return Tuple.tuple(0L, MAX_RANGE_VAL);
}
final Matcher matcher = RANGE_PATTERN.matcher(rangeHeader);
assertTrue(rangeHeader + " matches expected pattern", matcher.matches());
long rangeStart = Long.parseLong(matcher.group(1));
long rangeEnd = Long.parseLong(matcher.group(2));
assertThat(rangeStart, lessThanOrEqualTo(rangeEnd));
return Tuple.tuple(rangeStart, rangeEnd);
}
protected static int getRangeStart(HttpExchange exchange) {
return Math.toIntExact(getRange(exchange).v1());
}
protected static Optional<Integer> getRangeEnd(HttpExchange exchange) {
final long rangeEnd = getRange(exchange).v2();
if (rangeEnd == MAX_RANGE_VAL) {
return Optional.empty();
}
return Optional.of(Math.toIntExact(rangeEnd));
}
protected void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
final Optional<Integer> rangeEnd = getRangeEnd(exchange);
final int length;
if (rangeEnd.isPresent()) {
// adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
final int effectiveRangeEnd = Math.min(rangeEnd.get(), bytes.length - 1);
length = effectiveRangeEnd - rangeStart + 1;
} else {
length = bytes.length - rangeStart;
}
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
int minSend = Math.min(minIncompleteContentToSend(), length - 1);
final int bytesToSend = randomIntBetween(minSend, length - 1);
if (bytesToSend > 0) {
exchange.getResponseBody().write(bytes, rangeStart, bytesToSend);
}
if (randomBoolean()) {
exchange.getResponseBody().flush();
}
}
protected int minIncompleteContentToSend() {
return 0;
}
/**
* A resettable InputStream that only serves zeros.
**/
public static class ZeroInputStream extends InputStream {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final long length;
private final AtomicLong reads;
private volatile long mark;
public ZeroInputStream(final long length) {
this.length = length;
this.reads = new AtomicLong(0);
this.mark = -1;
}
@Override
public int read() throws IOException {
ensureOpen();
return (reads.incrementAndGet() <= length) ? 0 : -1;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
if (len == 0) {
return 0;
}
final int available = available();
if (available == 0) {
return -1;
}
final int toCopy = Math.min(len, available);
Arrays.fill(b, off, off + toCopy, (byte) 0);
reads.addAndGet(toCopy);
return toCopy;
}
@Override
public boolean markSupported() {
return true;
}
@Override
public synchronized void mark(int readlimit) {
mark = reads.get();
}
@Override
public synchronized void reset() throws IOException {
ensureOpen();
reads.set(mark);
}
@Override
public int available() throws IOException {
ensureOpen();
if (reads.get() >= length) {
return 0;
}
try {
return Math.toIntExact(length - reads.get());
} catch (ArithmeticException e) {
return Integer.MAX_VALUE;
}
}
@Override
public void close() {
closed.set(true);
}
private void ensureOpen() throws IOException {
if (closed.get()) {
throw new IOException("Stream closed");
}
}
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.info.BuildParams
import org.elasticsearch.gradle.MavenFilteringHack
import java.nio.file.Files
import java.security.KeyPair
import java.security.KeyPairGenerator
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
final Project fixture = project(':test:fixtures:gcs-fixture')
final Project repositoryPlugin = project(':plugins:repository-gcs')
dependencies {
testCompile project(path: xpackModule('searchable-snapshots'), configuration: 'testArtifacts')
testCompile repositoryPlugin
}
restResources {
restApi {
includeCore 'indices', 'search', 'bulk', 'snapshot', 'nodes', '_common'
includeXpack 'searchable_snapshots'
}
}
boolean useFixture = false
String gcsServiceAccount = System.getenv("google_storage_service_account")
String gcsBucket = System.getenv("google_storage_bucket")
String gcsBasePath = System.getenv("google_storage_base_path")
File serviceAccountFile = null
if (!gcsServiceAccount && !gcsBucket && !gcsBasePath) {
serviceAccountFile = new File(project.buildDir, 'generated-resources/service_account_test.json')
gcsBucket = 'bucket'
gcsBasePath = 'integration_test'
useFixture = true
} else if (!gcsServiceAccount || !gcsBucket || !gcsBasePath) {
throw new IllegalArgumentException("not all options specified to run tests against external GCS service are present")
} else {
serviceAccountFile = new File(gcsServiceAccount)
}
def encodedCredentials = {
Base64.encoder.encodeToString(Files.readAllBytes(serviceAccountFile.toPath()))
}
/** A service account file that points to the Google Cloud Storage service emulated by the fixture **/
task createServiceAccountFile() {
doLast {
KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(1024)
KeyPair keyPair = keyPairGenerator.generateKeyPair()
String encodedKey = Base64.getEncoder().encodeToString(keyPair.private.getEncoded())
serviceAccountFile.parentFile.mkdirs()
serviceAccountFile.setText("{\n" +
' "type": "service_account",\n' +
' "project_id": "integration_test",\n' +
' "private_key_id": "' + UUID.randomUUID().toString() + '",\n' +
' "private_key": "-----BEGIN PRIVATE KEY-----\\n' + encodedKey + '\\n-----END PRIVATE KEY-----\\n",\n' +
' "client_email": "integration_test@appspot.gserviceaccount.com",\n' +
' "client_id": "123456789101112130594"\n' +
'}', 'UTF-8')
}
}
def fixtureAddress = { f ->
assert useFixture: 'closure should not be used without a fixture'
int ephemeralPort = project(':test:fixtures:gcs-fixture').postProcessFixture.ext."test.fixtures.${f}.tcp.80"
assert ephemeralPort > 0
'http://127.0.0.1:' + ephemeralPort
}
Map<String, Object> expansions = [
'bucket' : gcsBucket,
'base_path': gcsBasePath + "_integration_tests"
]
processTestResources {
inputs.properties(expansions)
MavenFilteringHack.filter(it, expansions)
}
if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture(fixture.path, 'gcs-fixture-other')
}
integTest {
dependsOn repositoryPlugin.bundlePlugin
runner {
systemProperty 'test.gcs.bucket', gcsBucket
systemProperty 'test.gcs.base_path', gcsBasePath + "/searchable_snapshots_tests"
}
}
testClusters.integTest {
testDistribution = 'DEFAULT'
plugin repositoryPlugin.bundlePlugin.archiveFile
if (BuildParams.isSnapshotBuild() == false) {
systemProperty 'es.searchable_snapshots_feature_enabled', 'true'
}
keystore 'gcs.client.searchable_snapshots.credentials_file', serviceAccountFile, IGNORE_VALUE
if (useFixture) {
tasks.integTest.dependsOn createServiceAccountFile
/* Use a closure on the string to delay evaluation until tests are executed */
setting 'gcs.client.searchable_snapshots.endpoint', { "${-> fixtureAddress('gcs-fixture-other')}" }, IGNORE_VALUE
setting 'gcs.client.searchable_snapshots.token_uri', { "${-> fixtureAddress('gcs-fixture-other')}/o/oauth2/token" }, IGNORE_VALUE
} else {
println "Using an external service to test " + project.name
}
setting 'xpack.license.self_generated.type', 'trial'
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;
import org.elasticsearch.common.settings.Settings;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.not;
public class GCSSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestTestCase {
@Override
protected String repositoryType() {
return "gcs";
}
@Override
protected Settings repositorySettings() {
final String bucket = System.getProperty("test.gcs.bucket");
assertThat(bucket, not(blankOrNullString()));
final String basePath = System.getProperty("test.gcs.base_path");
assertThat(basePath, not(blankOrNullString()));
return Settings.builder().put("client", "searchable_snapshots").put("bucket", bucket).put("base_path", basePath).build();
}
}