Remove exception on failure response from GCS delete API (#16047)

* Throw 404 Exception on failure response from GCS delete API

* Replace String.format

* Apply suggestions from code review

Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com>

* Remove exception for file not found and fix tests

* Add warn log and fix intellij inspection errors

* More intellij inspection fixes

* * Change to debug log
* change runtime exception class for code coverage
* Add file paths for batch delete failures

* Move failedPaths computation to inside isDebugEnabled flag

* Correct handling of StorageException

* Address review comments

* Remove unused exceptions

* Address code coverage and review comments

* Minor corrections

---------

Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com>
This commit is contained in:
Vishesh Garg 2024-03-07 17:57:17 +05:30 committed by GitHub
parent 5f588fa45c
commit bed5d9c3b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 400 additions and 137 deletions

View File

@ -19,13 +19,12 @@
package org.apache.druid.storage.google; package org.apache.druid.storage.google;
import com.google.api.client.http.HttpResponseException; import com.google.cloud.storage.StorageException;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.SegmentLoadingException;
@ -70,35 +69,24 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller
// anymore, but we still delete them if exists. // anymore, but we still delete them if exists.
deleteIfPresent(bucket, descriptorPath); deleteIfPresent(bucket, descriptorPath);
} }
catch (IOException e) { catch (StorageException e) {
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getId(), e.getMessage()); throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getId(), e.getMessage());
} }
} }
private void deleteIfPresent(String bucket, String path) throws IOException private void deleteIfPresent(String bucket, String path)
{ {
try { try {
RetryUtils.retry( GoogleUtils.retryGoogleCloudStorageOperation(() -> {
(RetryUtils.Task<Void>) () -> { storage.delete(bucket, path);
storage.delete(bucket, path); return null;
return null; });
},
GoogleUtils::isRetryable,
1,
5
);
} }
catch (HttpResponseException e) { catch (StorageException e) {
if (e.getStatusCode() != 404) { throw e;
throw e;
}
LOG.debug("Already deleted: [%s] [%s]", bucket, path);
}
catch (IOException ioe) {
throw ioe;
} }
catch (Exception e) { catch (Exception e) {
throw new RE(e, "Failed to delete [%s] [%s]", bucket, path); throw new RE(e, "Failed to delete google cloud storage object from bucket [%s] and path [%s].", bucket, path);
} }
} }

View File

@ -31,6 +31,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
@ -52,6 +53,8 @@ public class GoogleStorage
* <p> * <p>
* See OmniDataSegmentKiller for how DataSegmentKillers are initialized. * See OmniDataSegmentKiller for how DataSegmentKillers are initialized.
*/ */
private static final Logger log = new Logger(GoogleStorage.class);
private final Supplier<Storage> storage; private final Supplier<Storage> storage;
private final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB"); private final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB");
@ -131,7 +134,7 @@ public class GoogleStorage
{ {
Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values())); Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values()));
if (blob == null) { if (blob == null) {
throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); throw new IOE("Failed to fetch google cloud storage object from bucket [%s] and path[%s].", bucket, path);
} }
return new GoogleStorageObjectMetadata( return new GoogleStorageObjectMetadata(
blob.getBucket(), blob.getBucket(),
@ -142,28 +145,41 @@ public class GoogleStorage
); );
} }
public void delete(final String bucket, final String path) throws IOException
/**
* Deletes an object in a bucket on the specified path
* A false response from GCS delete API is indicative of file not found. Any other error is raised as a StorageException
* and should be explicitly handled.
Ref: <a href="https://github.com/googleapis/java-storage/blob/v2.29.1/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java">HttpStorageRpc.java</a>
*
* @param bucket GCS bucket
* @param path Object path
*/
public void delete(final String bucket, final String path)
{ {
if (!storage.get().delete(bucket, path)) { if (!storage.get().delete(bucket, path)) {
throw new IOE( log.debug("Google cloud storage object to be deleted not found in bucket [%s] and path [%s].", bucket, path);
"Failed deleting google cloud storage object [bucket: %s path: %s]",
bucket,
path
);
} }
} }
/** /**
* Deletes a list of objects in a bucket * Deletes a list of objects in a bucket
* A false response from GCS delete API is indicative of file not found. Any other error is raised as a StorageException
* and should be explicitly handled.
* Ref: <a href="https://github.com/googleapis/java-storage/blob/v2.29.1/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java">HttpStorageRpc.java</a>
* *
* @param bucket GCS bucket * @param bucket GCS bucket
* @param paths Iterable for absolute paths of objects to be deleted inside the bucket * @param paths Iterable for absolute paths of objects to be deleted inside the bucket
*/ */
public void batchDelete(final String bucket, final Iterable<String> paths) throws IOException public void batchDelete(final String bucket, final Iterable<String> paths)
{ {
List<Boolean> statuses = storage.get().delete(Iterables.transform(paths, input -> BlobId.of(bucket, input))); final List<Boolean> statuses = storage.get().delete(Iterables.transform(paths, input -> BlobId.of(bucket, input)));
if (statuses.contains(false)) { if (statuses.contains(false)) {
throw new IOE("Failed deleting google cloud storage object(s)"); log.debug(
"Google cloud storage object(s) to be deleted not found in bucket [%s].",
bucket
);
} }
} }
@ -177,7 +193,7 @@ public class GoogleStorage
{ {
Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
if (blob == null) { if (blob == null) {
throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); throw new IOE("Failed to fetch google cloud storage object from bucket [%s] and path [%s].", bucket, path);
} }
return blob.getSize(); return blob.getSize();
} }
@ -186,7 +202,7 @@ public class GoogleStorage
{ {
Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.GENERATION)); Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.GENERATION));
if (blob == null) { if (blob == null) {
throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); throw new IOE("Failed to fetch google cloud storage object from bucket [%s] and path [%s].", bucket, path);
} }
return blob.getGeneratedId(); return blob.getGeneratedId();
} }
@ -223,7 +239,7 @@ public class GoogleStorage
Page<Blob> blobPage = storage.get().list(bucket, options.toArray(new Storage.BlobListOption[0])); Page<Blob> blobPage = storage.get().list(bucket, options.toArray(new Storage.BlobListOption[0]));
if (blobPage == null) { if (blobPage == null) {
throw new IOE("Failed fetching google cloud storage object [bucket: %s, prefix: %s]", bucket, prefix); throw new IOE("Failed to fetch google cloud storage object from bucket [%s] and prefix [%s].", bucket, prefix);
} }
@ -247,6 +263,5 @@ public class GoogleStorage
{ {
BlobId blobId = BlobId.of(bucket, path); BlobId blobId = BlobId.of(bucket, path);
return BlobInfo.newBuilder(blobId).build(); return BlobInfo.newBuilder(blobId).build();
} }
} }

View File

@ -20,6 +20,7 @@
package org.apache.druid.storage.google; package org.apache.druid.storage.google;
import com.google.api.client.http.HttpResponseException; import com.google.api.client.http.HttpResponseException;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectLocation;
@ -40,6 +41,9 @@ public class GoogleUtils
if (t instanceof HttpResponseException) { if (t instanceof HttpResponseException) {
final HttpResponseException e = (HttpResponseException) t; final HttpResponseException e = (HttpResponseException) t;
return e.getStatusCode() == 429 || (e.getStatusCode() / 500 == 1); return e.getStatusCode() == 429 || (e.getStatusCode() / 500 == 1);
} else if (t instanceof StorageException) {
final StorageException e = (StorageException) t;
return e.isRetryable();
} }
return t instanceof IOException; return t instanceof IOException;
} }

View File

@ -73,7 +73,7 @@ public class GoogleStorageConnector extends ChunkingStorageConnector<GoogleInput
catch (IOException e) { catch (IOException e) {
throw new RE( throw new RE(
e, e,
StringUtils.format("Cannot create tempDir : [%s] for google storage connector", config.getTempDir()) StringUtils.format("Cannot create tempDir [%s] for google storage connector", config.getTempDir())
); );
} }
} }
@ -95,7 +95,7 @@ public class GoogleStorageConnector extends ChunkingStorageConnector<GoogleInput
{ {
try { try {
final String fullPath = objectPath(path); final String fullPath = objectPath(path);
log.debug("Deleting file at bucket: [%s], path: [%s]", config.getBucket(), fullPath); log.debug("Deleting file at bucket [%s] and path [%s].", config.getBucket(), fullPath);
GoogleUtils.retryGoogleCloudStorageOperation( GoogleUtils.retryGoogleCloudStorageOperation(
() -> { () -> {
@ -105,7 +105,7 @@ public class GoogleStorageConnector extends ChunkingStorageConnector<GoogleInput
); );
} }
catch (Exception e) { catch (Exception e) {
log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage()); log.error("Failed to delete object at bucket [%s] and path [%s].", config.getBucket(), path);
throw new IOException(e); throw new IOException(e);
} }
} }
@ -113,7 +113,17 @@ public class GoogleStorageConnector extends ChunkingStorageConnector<GoogleInput
@Override @Override
public void deleteFiles(Iterable<String> paths) throws IOException public void deleteFiles(Iterable<String> paths) throws IOException
{ {
storage.batchDelete(config.getBucket(), Iterables.transform(paths, this::objectPath)); try {
GoogleUtils.retryGoogleCloudStorageOperation(() -> {
storage.batchDelete(config.getBucket(), Iterables.transform(paths, this::objectPath));
return null;
});
}
catch (Exception e) {
log.error("Failed to delete object(s) at bucket [%s].", config.getBucket());
throw new IOException(e);
}
} }
@Override @Override
@ -127,10 +137,19 @@ public class GoogleStorageConnector extends ChunkingStorageConnector<GoogleInput
inputDataConfig.getMaxListingLength() inputDataConfig.getMaxListingLength()
); );
storage.batchDelete( try {
config.getBucket(), GoogleUtils.retryGoogleCloudStorageOperation(() -> {
() -> Iterators.transform(storageObjects, GoogleStorageObjectMetadata::getName) storage.batchDelete(
); config.getBucket(),
() -> Iterators.transform(storageObjects, GoogleStorageObjectMetadata::getName)
);
return null;
});
}
catch (Exception e) {
log.error("Failed to delete object(s) at bucket [%s] and prefix [%s].", config.getBucket(), fullPath);
throw new IOException(e);
}
} }
@Override @Override

View File

@ -19,11 +19,7 @@
package org.apache.druid.storage.google; package org.apache.druid.storage.google;
import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.cloud.storage.StorageException;
import com.google.api.client.googleapis.testing.json.GoogleJsonResponseExceptionFactoryTesting;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
@ -54,8 +50,8 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
private static final long TIME_0 = 0L; private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L; private static final long TIME_1 = 1L;
private static final int MAX_KEYS = 1; private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(429, "recoverable", new HttpHeaders()).build(); private static final Exception RECOVERABLE_EXCEPTION = new StorageException(429, "recoverable");
private static final Exception NON_RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(404, "non recoverable", new HttpHeaders()).build(); private static final Exception NON_RECOVERABLE_EXCEPTION = new StorageException(404, "non-recoverable");
private static final DataSegment DATA_SEGMENT = new DataSegment( private static final DataSegment DATA_SEGMENT = new DataSegment(
@ -83,7 +79,7 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
} }
@Test @Test
public void killTest() throws SegmentLoadingException, IOException public void killTest() throws SegmentLoadingException
{ {
storage.delete(EasyMock.eq(BUCKET), EasyMock.eq(INDEX_PATH)); storage.delete(EasyMock.eq(BUCKET), EasyMock.eq(INDEX_PATH));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
@ -99,38 +95,30 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
verifyAll(); verifyAll();
} }
@Test(expected = SegmentLoadingException.class) @Test
public void killWithErrorTest() throws SegmentLoadingException, IOException public void killWithErrorTest()
{ {
final GoogleJsonResponseException exception = GoogleJsonResponseExceptionFactoryTesting.newMock( Assert.assertThrows(SegmentLoadingException.class, () -> {
JacksonFactory.getDefaultInstance(), storage.delete(EasyMock.eq(BUCKET), EasyMock.eq(INDEX_PATH));
300, EasyMock.expectLastCall().andThrow(NON_RECOVERABLE_EXCEPTION);
"test"
);
storage.delete(EasyMock.eq(BUCKET), EasyMock.eq(INDEX_PATH));
EasyMock.expectLastCall().andThrow(exception);
replayAll(); replayAll();
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.kill(DATA_SEGMENT); killer.kill(DATA_SEGMENT);
verifyAll(); verifyAll();
});
} }
@Test @Test
public void killRetryWithErrorTest() throws SegmentLoadingException, IOException public void killRetryWithErrorTest() throws SegmentLoadingException
{ {
final GoogleJsonResponseException exception = GoogleJsonResponseExceptionFactoryTesting.newMock(
JacksonFactory.getDefaultInstance(),
500,
"test"
);
storage.delete(EasyMock.eq(BUCKET), EasyMock.eq(INDEX_PATH)); storage.delete(EasyMock.eq(BUCKET), EasyMock.eq(INDEX_PATH));
EasyMock.expectLastCall().andThrow(exception).once().andVoid().once(); EasyMock.expectLastCall().andThrow(RECOVERABLE_EXCEPTION).once().andVoid().once();
storage.delete(EasyMock.eq(BUCKET), EasyMock.eq(DESCRIPTOR_PATH)); storage.delete(EasyMock.eq(BUCKET), EasyMock.eq(DESCRIPTOR_PATH));
EasyMock.expectLastCall().andThrow(exception).once().andVoid().once(); EasyMock.expectLastCall().andThrow(RECOVERABLE_EXCEPTION).once().andVoid().once();
replayAll(); replayAll();
@ -142,25 +130,16 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
} }
@Test @Test
public void test_killAll_accountConfigWithNullBucketAndPrefix_throwsISEException() throws IOException public void test_killAll_accountConfigWithNullBucketAndPrefix_throwsISEException()
{ {
EasyMock.expect(accountConfig.getBucket()).andReturn(null).atLeastOnce(); EasyMock.expect(accountConfig.getBucket()).andReturn(null).atLeastOnce();
EasyMock.expect(accountConfig.getPrefix()).andReturn(null).anyTimes(); EasyMock.expect(accountConfig.getPrefix()).andReturn(null).anyTimes();
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
EasyMock.replay(storage, inputDataConfig, accountConfig);
boolean thrownISEException = false; Assert.assertThrows(ISE.class, killer::killAll);
try {
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
EasyMock.replay(storage, inputDataConfig, accountConfig);
killer.killAll();
}
catch (ISE e) {
thrownISEException = true;
}
Assert.assertTrue(thrownISEException);
EasyMock.verify(accountConfig, inputDataConfig, storage); EasyMock.verify(accountConfig, inputDataConfig, storage);
} }
@ -217,34 +196,27 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
} }
@Test @Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws IOException
{ {
boolean ioExceptionThrown = false; GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
try {
GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0);
GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1));
GoogleTestUtils.expectDeleteObjects( GoogleTestUtils.expectDeleteObjects(
storage, storage,
ImmutableList.of(), ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION) ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
); );
EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes(); EasyMock.expect(accountConfig.getBucket()).andReturn(BUCKET).anyTimes();
EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.replay(accountConfig, inputDataConfig, storage); EasyMock.replay(accountConfig, inputDataConfig, storage);
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig);
killer.killAll();
}
catch (IOException e) {
ioExceptionThrown = true;
}
Assert.assertTrue(ioExceptionThrown); Assert.assertThrows(IOException.class, killer::killAll);
EasyMock.verify(accountConfig, inputDataConfig, storage); EasyMock.verify(accountConfig, inputDataConfig, storage);
} }

View File

@ -23,9 +23,11 @@ import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob; import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.easymock.Capture; import org.easymock.Capture;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -50,6 +52,8 @@ public class GoogleStorageTest
static final String PATH = "/path"; static final String PATH = "/path";
static final long SIZE = 100; static final long SIZE = 100;
static final OffsetDateTime UPDATE_TIME = OffsetDateTime.MIN; static final OffsetDateTime UPDATE_TIME = OffsetDateTime.MIN;
private static final Exception STORAGE_EXCEPTION = new StorageException(404, "Runtime Storage Exception");
@Before @Before
public void setUp() public void setUp()
@ -62,7 +66,7 @@ public class GoogleStorageTest
} }
@Test @Test
public void testDeleteSuccess() throws IOException public void testDeleteSuccess()
{ {
EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(true); EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(true);
EasyMock.replay(mockStorage); EasyMock.replay(mockStorage);
@ -70,23 +74,23 @@ public class GoogleStorageTest
} }
@Test @Test
public void testDeleteFailure() public void testDeleteFileNotFound()
{ {
EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(false); EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(false);
EasyMock.replay(mockStorage); EasyMock.replay(mockStorage);
boolean thrownIOException = false; googleStorage.delete(BUCKET, PATH);
try {
googleStorage.delete(BUCKET, PATH);
}
catch (IOException e) {
thrownIOException = true;
}
assertTrue(thrownIOException);
} }
@Test @Test
public void testBatchDeleteSuccess() throws IOException public void testDeleteFailure()
{
EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(STORAGE_EXCEPTION);
EasyMock.replay(mockStorage);
Assert.assertThrows(StorageException.class, () -> googleStorage.delete(BUCKET, PATH));
}
@Test
public void testBatchDeleteSuccess()
{ {
List<String> paths = ImmutableList.of("/path1", "/path2"); List<String> paths = ImmutableList.of("/path1", "/path2");
final Capture<Iterable<BlobId>> pathIterable = Capture.newInstance(); final Capture<Iterable<BlobId>> pathIterable = Capture.newInstance();
@ -103,6 +107,29 @@ public class GoogleStorageTest
assertTrue(paths.size() == recordedPaths.size() && paths.containsAll(recordedPaths) && recordedPaths.containsAll( assertTrue(paths.size() == recordedPaths.size() && paths.containsAll(recordedPaths) && recordedPaths.containsAll(
paths)); paths));
assertEquals(BUCKET, recordedBlobIds.get(0).getBucket()); assertEquals(BUCKET, recordedBlobIds.get(0).getBucket());
}
@Test
public void testBatchDeleteFileNotFound()
{
List<String> paths = ImmutableList.of("/path1", "/path2");
final Capture<Iterable<BlobId>> pathIterable = Capture.newInstance();
EasyMock.expect(mockStorage.delete(EasyMock.capture(pathIterable))).andReturn(ImmutableList.of(true, false));
EasyMock.replay(mockStorage);
googleStorage.batchDelete(BUCKET, paths);
List<BlobId> recordedBlobIds = new ArrayList<>();
pathIterable.getValue().iterator().forEachRemaining(recordedBlobIds::add);
List<String> recordedPaths = recordedBlobIds.stream().map(BlobId::getName).collect(Collectors.toList());
assertTrue(paths.size() == recordedPaths.size());
assertTrue(paths.containsAll(recordedPaths));
assertTrue(recordedPaths.containsAll(paths));
assertEquals(BUCKET, recordedBlobIds.get(0).getBucket());
} }
@Test @Test
@ -110,17 +137,9 @@ public class GoogleStorageTest
{ {
List<String> paths = ImmutableList.of("/path1", "/path2"); List<String> paths = ImmutableList.of("/path1", "/path2");
EasyMock.expect(mockStorage.delete((Iterable<BlobId>) EasyMock.anyObject())) EasyMock.expect(mockStorage.delete((Iterable<BlobId>) EasyMock.anyObject()))
.andReturn(ImmutableList.of(false, true)); .andThrow(STORAGE_EXCEPTION);
EasyMock.replay(mockStorage); EasyMock.replay(mockStorage);
boolean thrownIOException = false; Assert.assertThrows(StorageException.class, () -> googleStorage.batchDelete(BUCKET, paths));
try {
googleStorage.batchDelete(BUCKET, paths);
}
catch (IOException e) {
thrownIOException = true;
}
assertTrue(thrownIOException);
} }
@Test @Test

View File

@ -19,9 +19,8 @@
package org.apache.druid.storage.google; package org.apache.druid.storage.google;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.InputStreamContent; import com.google.api.client.http.InputStreamContent;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
@ -58,8 +57,8 @@ public class GoogleTaskLogsTest extends EasyMockSupport
private static final long TIME_NOW = 2L; private static final long TIME_NOW = 2L;
private static final long TIME_FUTURE = 3L; private static final long TIME_FUTURE = 3L;
private static final int MAX_KEYS = 1; private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(429, "recoverable", new HttpHeaders()).build(); private static final Exception RECOVERABLE_EXCEPTION = new StorageException(429, "recoverable");
private static final Exception NON_RECOVERABLE_EXCEPTION = new HttpResponseException.Builder(404, "non recoverable", new HttpHeaders()).build(); private static final Exception NON_RECOVERABLE_EXCEPTION = new StorageException(404, "non-recoverable");
private GoogleStorage storage; private GoogleStorage storage;
private GoogleTaskLogs googleTaskLogs; private GoogleTaskLogs googleTaskLogs;

View File

@ -71,7 +71,7 @@ public class GoogleTestUtils extends EasyMockSupport
GoogleStorage storage, GoogleStorage storage,
List<GoogleStorageObjectMetadata> deleteObjectExpected, List<GoogleStorageObjectMetadata> deleteObjectExpected,
Map<GoogleStorageObjectMetadata, Exception> deleteObjectToException Map<GoogleStorageObjectMetadata, Exception> deleteObjectToException
) throws IOException )
{ {
Map<GoogleStorageObjectMetadata, IExpectationSetters<GoogleStorageObjectMetadata>> requestToResultExpectationSetter = new HashMap<>(); Map<GoogleStorageObjectMetadata, IExpectationSetters<GoogleStorageObjectMetadata>> requestToResultExpectationSetter = new HashMap<>();
for (Map.Entry<GoogleStorageObjectMetadata, Exception> deleteObjectAndException : deleteObjectToException.entrySet()) { for (Map.Entry<GoogleStorageObjectMetadata, Exception> deleteObjectAndException : deleteObjectToException.entrySet()) {

View File

@ -22,6 +22,7 @@ package org.apache.druid.storage.google;
import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponseException; import com.google.api.client.http.HttpResponseException;
import com.google.cloud.storage.StorageException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -78,5 +79,30 @@ public class GoogleUtilsTest
new IOException("generic io exception") new IOException("generic io exception")
) )
); );
Assert.assertFalse(
GoogleUtils.isRetryable(
new StorageException(404, "ignored")
)
);
Assert.assertTrue(
GoogleUtils.isRetryable(
new StorageException(429, "ignored")
)
);
Assert.assertTrue(
GoogleUtils.isRetryable(
new StorageException(500, "ignored")
)
);
Assert.assertTrue(
GoogleUtils.isRetryable(
new StorageException(503, "ignored")
)
);
Assert.assertFalse(
GoogleUtils.isRetryable(
new StorageException(599, "ignored")
)
);
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.druid.storage.google.output; package org.apache.druid.storage.google.output;
import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -54,6 +55,8 @@ public class GoogleStorageConnectorTest
GoogleStorageConnector googleStorageConnector; GoogleStorageConnector googleStorageConnector;
private final GoogleStorage googleStorage = EasyMock.createMock(GoogleStorage.class); private final GoogleStorage googleStorage = EasyMock.createMock(GoogleStorage.class);
private static final Exception RECOVERABLE_EXCEPTION = new StorageException(429, "recoverable");
private static final Exception NON_RECOVERABLE_EXCEPTION = new StorageException(404, "non-recoverable");
@Before @Before
public void setUp() throws IOException public void setUp() throws IOException
@ -91,7 +94,7 @@ public class GoogleStorageConnectorTest
} }
@Test @Test
public void testDeleteFile() throws IOException public void testDeleteFileSuccess() throws IOException
{ {
Capture<String> bucketCapture = EasyMock.newCapture(); Capture<String> bucketCapture = EasyMock.newCapture();
Capture<String> pathCapture = EasyMock.newCapture(); Capture<String> pathCapture = EasyMock.newCapture();
@ -107,7 +110,40 @@ public class GoogleStorageConnectorTest
} }
@Test @Test
public void testDeleteFiles() throws IOException public void testDeleteFileRetrySuccess() throws IOException
{
Capture<String> bucketCapture = EasyMock.newCapture();
Capture<String> pathCapture = EasyMock.newCapture();
googleStorage.delete(
EasyMock.capture(bucketCapture),
EasyMock.capture(pathCapture)
);
EasyMock.expectLastCall().andThrow(RECOVERABLE_EXCEPTION).once().andVoid().once();
EasyMock.replay(googleStorage);
googleStorageConnector.deleteFile(TEST_FILE);
Assert.assertEquals(BUCKET, bucketCapture.getValue());
Assert.assertEquals(PREFIX + "/" + TEST_FILE, pathCapture.getValue());
}
@Test
public void testDeleteFileFailure()
{
Capture<String> bucketCapture = EasyMock.newCapture();
Capture<String> pathCapture = EasyMock.newCapture();
googleStorage.delete(
EasyMock.capture(bucketCapture),
EasyMock.capture(pathCapture)
);
EasyMock.expectLastCall().andThrow(NON_RECOVERABLE_EXCEPTION).once().andVoid().once();
EasyMock.replay(googleStorage);
Assert.assertThrows(IOException.class, () -> googleStorageConnector.deleteFile(TEST_FILE));
Assert.assertEquals(BUCKET, bucketCapture.getValue());
Assert.assertEquals(PREFIX + "/" + TEST_FILE, pathCapture.getValue());
}
@Test
public void testDeleteFilesSuccess() throws IOException
{ {
Capture<String> containerCapture = EasyMock.newCapture(); Capture<String> containerCapture = EasyMock.newCapture();
Capture<Iterable<String>> pathsCapture = EasyMock.newCapture(); Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
@ -125,6 +161,191 @@ public class GoogleStorageConnectorTest
EasyMock.reset(googleStorage); EasyMock.reset(googleStorage);
} }
@Test
public void testDeleteFilesRetrySuccess() throws IOException
{
Capture<String> containerCapture = EasyMock.newCapture();
Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
googleStorage.batchDelete(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture));
EasyMock.expectLastCall().andThrow(RECOVERABLE_EXCEPTION).once().andVoid().once();
EasyMock.replay(googleStorage);
googleStorageConnector.deleteFiles(ImmutableList.of(TEST_FILE + "_1.part", TEST_FILE + "_2.json"));
Assert.assertEquals(BUCKET, containerCapture.getValue());
Assert.assertEquals(
ImmutableList.of(
PREFIX + "/" + TEST_FILE + "_1.part",
PREFIX + "/" + TEST_FILE + "_2.json"
),
Lists.newArrayList(pathsCapture.getValue())
);
EasyMock.reset(googleStorage);
}
@Test
public void testDeleteFilesFailure()
{
Capture<String> containerCapture = EasyMock.newCapture();
Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
googleStorage.batchDelete(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture));
EasyMock.expectLastCall().andThrow(NON_RECOVERABLE_EXCEPTION).once().andVoid().once();
EasyMock.replay(googleStorage);
Assert.assertThrows(
IOException.class,
() -> googleStorageConnector.deleteFiles(ImmutableList.of(
TEST_FILE + "_1.part",
TEST_FILE + "_2.json"
))
);
Assert.assertEquals(BUCKET, containerCapture.getValue());
Assert.assertEquals(
ImmutableList.of(
PREFIX + "/" + TEST_FILE + "_1.part",
PREFIX + "/" + TEST_FILE + "_2.json"
),
Lists.newArrayList(pathsCapture.getValue())
);
EasyMock.reset(googleStorage);
}
@Test
public void testDeleteFilesRecursivelySuccess() throws IOException
{
GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata(
BUCKET,
PREFIX + "/x/y/" + TEST_FILE,
(long) 3,
null
);
GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata(
BUCKET,
PREFIX + "/p/q/r/" + TEST_FILE,
(long) 4,
null
);
Capture<Long> maxListingCapture = EasyMock.newCapture();
Capture<String> pageTokenCapture = EasyMock.newCapture();
EasyMock.expect(googleStorage.list(
EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.capture(maxListingCapture),
EasyMock.capture(pageTokenCapture)
))
.andReturn(new GoogleStorageObjectPage(ImmutableList.of(objectMetadata1, objectMetadata2), null));
Capture<String> containerCapture = EasyMock.newCapture();
Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
googleStorage.batchDelete(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture));
EasyMock.expectLastCall().andVoid().once();
EasyMock.replay(googleStorage);
googleStorageConnector.deleteRecursively("");
Assert.assertEquals(BUCKET, containerCapture.getValue());
Assert.assertEquals(
ImmutableList.of(
PREFIX + "/x/y/" + TEST_FILE,
PREFIX + "/p/q/r/" + TEST_FILE
),
Lists.newArrayList(pathsCapture.getValue())
);
EasyMock.reset(googleStorage);
}
@Test
public void testDeleteFilesRecursivelyRetrySuccess() throws IOException
{
GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata(
BUCKET,
PREFIX + "/x/y/" + TEST_FILE,
(long) 3,
null
);
GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata(
BUCKET,
PREFIX + "/p/q/r/" + TEST_FILE,
(long) 4,
null
);
Capture<Long> maxListingCapture = EasyMock.newCapture();
Capture<String> pageTokenCapture = EasyMock.newCapture();
EasyMock.expect(googleStorage.list(
EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.capture(maxListingCapture),
EasyMock.capture(pageTokenCapture)
))
.andReturn(new GoogleStorageObjectPage(ImmutableList.of(objectMetadata1, objectMetadata2), null));
Capture<String> containerCapture = EasyMock.newCapture();
Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
googleStorage.batchDelete(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture));
EasyMock.expectLastCall().andThrow(RECOVERABLE_EXCEPTION).once().andVoid().once();
EasyMock.replay(googleStorage);
googleStorageConnector.deleteRecursively("");
Assert.assertEquals(BUCKET, containerCapture.getValue());
Assert.assertEquals(
ImmutableList.of(
PREFIX + "/x/y/" + TEST_FILE,
PREFIX + "/p/q/r/" + TEST_FILE
),
Lists.newArrayList(pathsCapture.getValue())
);
}
@Test
public void testDeleteFilesRecursivelyFailure() throws IOException
{
GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata(
BUCKET,
PREFIX + "/x/y/" + TEST_FILE,
(long) 3,
null
);
GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata(
BUCKET,
PREFIX + "/p/q/r/" + TEST_FILE,
(long) 4,
null
);
Capture<Long> maxListingCapture = EasyMock.newCapture();
Capture<String> pageTokenCapture = EasyMock.newCapture();
EasyMock.expect(googleStorage.list(
EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.capture(maxListingCapture),
EasyMock.capture(pageTokenCapture)
))
.andReturn(new GoogleStorageObjectPage(ImmutableList.of(objectMetadata1, objectMetadata2), null));
Capture<String> containerCapture = EasyMock.newCapture();
Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
googleStorage.batchDelete(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture));
EasyMock.expectLastCall().andThrow(NON_RECOVERABLE_EXCEPTION).once().andVoid().once();
EasyMock.replay(googleStorage);
Assert.assertThrows(IOException.class, () -> googleStorageConnector.deleteRecursively(""));
Assert.assertEquals(BUCKET, containerCapture.getValue());
Assert.assertEquals(
ImmutableList.of(
PREFIX + "/x/y/" + TEST_FILE,
PREFIX + "/p/q/r/" + TEST_FILE
),
Lists.newArrayList(pathsCapture.getValue())
);
}
@Test @Test
public void testListDir() throws IOException public void testListDir() throws IOException
{ {

View File

@ -99,7 +99,7 @@ public class GcsTestUtil
); );
} }
public void deleteFileFromGcs(String gcsObjectName) throws IOException public void deleteFileFromGcs(String gcsObjectName)
{ {
LOG.info("Deleting object %s at path %s in bucket %s", gcsObjectName, GOOGLE_PREFIX, GOOGLE_BUCKET); LOG.info("Deleting object %s at path %s in bucket %s", gcsObjectName, GOOGLE_PREFIX, GOOGLE_BUCKET);
googleStorageClient.delete(GOOGLE_BUCKET, GOOGLE_PREFIX + "/" + gcsObjectName); googleStorageClient.delete(GOOGLE_BUCKET, GOOGLE_PREFIX + "/" + gcsObjectName);