It's impossible to tell why #50754 fails without this change. We're failing to close the `exchange` somewhere and there is no write timeout in the GCS SDK (something to look into separately) only a read timeout on the socket so if we're failing on an assertion without reading the full request body (at least into the read-buffer) we're locking up waiting forever on `write0`. This change ensure the `exchange` is closed in the tests where we could lock up on a write and logs the failure so we can find out what broke #50754.
This commit is contained in:
parent
0ac6786f41
commit
4a7e09f624
|
@ -23,6 +23,7 @@ import com.google.cloud.http.HttpTransportOptions;
|
||||||
import com.google.cloud.storage.StorageException;
|
import com.google.cloud.storage.StorageException;
|
||||||
import com.google.cloud.storage.StorageOptions;
|
import com.google.cloud.storage.StorageOptions;
|
||||||
import com.sun.net.httpserver.HttpContext;
|
import com.sun.net.httpserver.HttpContext;
|
||||||
|
import com.sun.net.httpserver.HttpHandler;
|
||||||
import com.sun.net.httpserver.HttpServer;
|
import com.sun.net.httpserver.HttpServer;
|
||||||
import fixture.gcs.FakeOAuth2HttpHandler;
|
import fixture.gcs.FakeOAuth2HttpHandler;
|
||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
|
@ -44,6 +45,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||||
|
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.rest.RestUtils;
|
import org.elasticsearch.rest.RestUtils;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -151,13 +153,12 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
// Auth
|
// Auth
|
||||||
httpServer.createContext("/token", new FakeOAuth2HttpHandler()),
|
httpServer.createContext("/token", new FakeOAuth2HttpHandler()),
|
||||||
// Does bucket exists?
|
// Does bucket exists?
|
||||||
httpServer.createContext("/storage/v1/b/bucket", exchange -> {
|
httpServer.createContext("/storage/v1/b/bucket", safeHandler(exchange -> {
|
||||||
byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8);
|
byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8);
|
||||||
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
||||||
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
|
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
|
||||||
exchange.getResponseBody().write(response);
|
exchange.getResponseBody().write(response);
|
||||||
exchange.close();
|
}))
|
||||||
})
|
|
||||||
);
|
);
|
||||||
|
|
||||||
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, service);
|
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, service);
|
||||||
|
@ -240,7 +241,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
final CountDown countDown = new CountDown(maxRetries);
|
final CountDown countDown = new CountDown(maxRetries);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
|
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
||||||
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
|
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
|
||||||
if (countDown.countDown()) {
|
if (countDown.countDown()) {
|
||||||
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
|
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
|
||||||
|
@ -254,7 +255,6 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
} else {
|
} else {
|
||||||
exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1);
|
exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1);
|
||||||
}
|
}
|
||||||
exchange.close();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
@ -265,8 +265,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
exchange.close();
|
}));
|
||||||
});
|
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
|
||||||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
||||||
|
@ -324,7 +323,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
final AtomicReference<String> sessionUploadId = new AtomicReference<>(UUIDs.randomBase64UUID());
|
final AtomicReference<String> sessionUploadId = new AtomicReference<>(UUIDs.randomBase64UUID());
|
||||||
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
|
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
|
||||||
|
|
||||||
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
|
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
||||||
final Map<String, String> params = new HashMap<>();
|
final Map<String, String> params = new HashMap<>();
|
||||||
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
||||||
assertThat(params.get("uploadType"), equalTo("resumable"));
|
assertThat(params.get("uploadType"), equalTo("resumable"));
|
||||||
|
@ -338,7 +337,6 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
"/upload/storage/v1/b/bucket/o?uploadType=resumable&upload_id=" + sessionUploadId.get());
|
"/upload/storage/v1/b/bucket/o?uploadType=resumable&upload_id=" + sessionUploadId.get());
|
||||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||||
exchange.getResponseBody().write(response);
|
exchange.getResponseBody().write(response);
|
||||||
exchange.close();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (allowReadTimeout.get()) {
|
if (allowReadTimeout.get()) {
|
||||||
|
@ -353,7 +351,6 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
assertThat(wrongChunk, greaterThan(0));
|
assertThat(wrongChunk, greaterThan(0));
|
||||||
Streams.readFully(exchange.getRequestBody());
|
Streams.readFully(exchange.getRequestBody());
|
||||||
exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
|
exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
|
||||||
exchange.close();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +370,6 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
|
|
||||||
Streams.readFully(exchange.getRequestBody());
|
Streams.readFully(exchange.getRequestBody());
|
||||||
exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
|
exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
|
||||||
exchange.close();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -394,13 +390,11 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
final Integer limit = getContentRangeLimit(range);
|
final Integer limit = getContentRangeLimit(range);
|
||||||
if (limit != null) {
|
if (limit != null) {
|
||||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
|
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
|
||||||
exchange.close();
|
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", rangeStart, rangeEnd));
|
exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", rangeStart, rangeEnd));
|
||||||
exchange.getResponseHeaders().add("Content-Length", "0");
|
exchange.getResponseHeaders().add("Content-Length", "0");
|
||||||
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
|
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
|
||||||
exchange.close();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -411,8 +405,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
||||||
}
|
}
|
||||||
exchange.close();
|
}));
|
||||||
});
|
|
||||||
|
|
||||||
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
|
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
|
||||||
|
|
||||||
|
@ -426,6 +419,17 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
||||||
assertThat(allow410Gone.get(), is(false));
|
assertThat(allow410Gone.get(), is(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private HttpHandler safeHandler(HttpHandler handler) {
|
||||||
|
final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger);
|
||||||
|
return exchange -> {
|
||||||
|
try {
|
||||||
|
loggingHandler.handle(exchange);
|
||||||
|
} finally {
|
||||||
|
exchange.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
private static byte[] randomBlobContent() {
|
private static byte[] randomBlobContent() {
|
||||||
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
|
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
|
||||||
}
|
}
|
||||||
|
|
|
@ -232,7 +232,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
|
||||||
/**
|
/**
|
||||||
* Wrap a {@link HttpHandler} to log any thrown exception using the given {@link Logger}.
|
* Wrap a {@link HttpHandler} to log any thrown exception using the given {@link Logger}.
|
||||||
*/
|
*/
|
||||||
private static HttpHandler wrap(final HttpHandler handler, final Logger logger) {
|
public static HttpHandler wrap(final HttpHandler handler, final Logger logger) {
|
||||||
return exchange -> {
|
return exchange -> {
|
||||||
try {
|
try {
|
||||||
handler.handle(exchange);
|
handler.handle(exchange);
|
||||||
|
|
Loading…
Reference in New Issue