From e43bb74c3af67a8ac2d52a17aee91a47d735c027 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Thu, 14 Dec 2023 07:34:49 +0530 Subject: [PATCH] Add MSQ Durable Storage Connector for Google Cloud Storage and change current Google Cloud Storage client library (#15398) The PR addresses 2 things: Add MSQ durable storage connector for GCS Change GCS client library from the old Google API Client Library to the recommended Google Cloud Client Library. Ref: https://cloud.google.com/apis/docs/client-libraries-explained --- distribution/bin/check-licenses.py | 1 + docs/multi-stage-query/reference.md | 8 +- docs/operations/durable-storage.md | 2 +- extensions-core/google-extensions/pom.xml | 22 +- .../google/GoogleCloudStorageInputSource.java | 15 +- .../storage/google/GoogleByteSource.java | 4 +- .../google/GoogleDataSegmentPuller.java | 2 +- .../druid/storage/google/GoogleStorage.java | 208 ++++++++++++-- .../google/GoogleStorageDruidModule.java | 18 +- .../google/GoogleStorageObjectMetadata.java | 96 +++++++ .../google/GoogleStorageObjectPage.java | 51 ++++ .../druid/storage/google/GoogleTaskLogs.java | 2 +- .../GoogleTimestampVersionedDataFinder.java | 24 +- .../druid/storage/google/GoogleUtils.java | 21 +- .../storage/google/ObjectStorageIterator.java | 66 ++--- .../google/output/GoogleInputRange.java | 91 ++++++ .../google/output/GoogleOutputConfig.java | 147 ++++++++++ .../google/output/GoogleStorageConnector.java | 216 ++++++++++++++ .../output/GoogleStorageConnectorModule.java | 44 +++ .../GoogleStorageConnectorProvider.java | 64 +++++ ...rg.apache.druid.initialization.DruidModule | 3 +- .../GoogleCloudStorageInputSourceTest.java | 79 ++++-- .../storage/google/GoogleByteSourceTest.java | 4 +- .../google/GoogleDataSegmentKillerTest.java | 50 +--- .../google/GoogleDataSegmentPullerTest.java | 4 +- .../google/GoogleStorageDruidModuleTest.java | 43 +-- .../storage/google/GoogleStorageTest.java | 264 ++++++++++++++---- .../storage/google/GoogleTaskLogsTest.java | 106 ++----- .../druid/storage/google/GoogleTestUtils.java | 71 ++--- ...oogleTimestampVersionedDataFinderTest.java | 34 ++- .../google/ObjectStorageIteratorTest.java | 105 ++----- .../google/output/GoogleInputRangeTest.java | 34 +++ .../google/output/GoogleOutputConfigTest.java | 49 ++++ .../GoogleStorageConnectorProviderTest.java | 152 ++++++++++ .../output/GoogleStorageConnectorTest.java | 210 ++++++++++++++ integration-tests-ex/cases/pom.xml | 29 +- .../druid/testsEx/utils/GcsTestUtil.java | 7 +- licenses.yaml | 203 ++++++++++++-- pom.xml | 2 +- 39 files changed, 2000 insertions(+), 551 deletions(-) create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index cf6e7e35b66..70afd903136 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -258,6 +258,7 @@ def build_compatible_license_names(): compatible_licenses['The BSD 3-Clause License'] = 'BSD-3-Clause License' compatible_licenses['Revised BSD'] = 'BSD-3-Clause License' compatible_licenses['New BSD License'] = 'BSD-3-Clause License' + compatible_licenses['BSD New license'] = 'BSD-3-Clause License' compatible_licenses['3-Clause BSD License'] = 'BSD-3-Clause License' compatible_licenses['BSD 3-Clause'] = 'BSD-3-Clause License' compatible_licenses['BSD-3-Clause'] = 'BSD-3-Clause License' diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index d71c58abbd1..d74ef85f50e 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -356,7 +356,7 @@ SQL-based ingestion supports using durable storage to store intermediate files t ### Durable storage configurations -Durable storage is supported on Amazon S3 storage and Microsoft's Azure Blob Storage. +Durable storage is supported on Amazon S3 storage, Microsoft's Azure Blob Storage and Google Cloud Storage. There are common configurations that control the behavior regardless of which storage service you use. Apart from these common configurations, there are a few properties specific to S3 and to Azure. Common properties to configure the behavior of durable storage @@ -364,16 +364,16 @@ Common properties to configure the behavior of durable storage |Parameter | Required | Description | Default | |--|--|--| |`druid.msq.intermediate.storage.enable` | Yes | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md). | false | -|`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3 and `azure` for Azure | n/a | +|`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3, `azure` for Azure and `google` for Google | n/a | |`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data | n/a | |`druid.msq.intermediate.storage.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | |`druid.msq.intermediate.storage.chunkSize` | No | Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| 100MiB | -To use S3 for durable storage, you also need to configure the following properties: +To use S3 or Google for durable storage, you also need to configure the following properties: |Parameter | Required | Description | Default | |-------------------|----------------------------------------|----------------------| --| -|`druid.msq.intermediate.storage.bucket` | Yes | The S3 bucket where the files are uploaded to and download from | n/a | +|`druid.msq.intermediate.storage.bucket` | Yes | The S3 or Google bucket where the files are uploaded to and download from | n/a | |`druid.msq.intermediate.storage.prefix` | Yes | Path prepended to all the paths uploaded to the bucket to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. | n/a | To use Azure for durable storage, you also need to configure the following properties: diff --git a/docs/operations/durable-storage.md b/docs/operations/durable-storage.md index b7a8ad1ef90..78c5fe3765a 100644 --- a/docs/operations/durable-storage.md +++ b/docs/operations/durable-storage.md @@ -25,7 +25,7 @@ sidebar_label: "Durable storage" You can use durable storage to improve querying from deep storage and SQL-based ingestion. -> Note that only S3 is supported as a durable storage location. +> Note that S3, Azure and Google are all supported as durable storage locations. Durable storage for queries from deep storage provides a location where you can write the results of deep storage queries to. Durable storage for SQL-based ingestion is used to temporarily house intermediate files, which can improve reliability. diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml index cb3c72d1b95..b13428242ea 100644 --- a/extensions-core/google-extensions/pom.xml +++ b/extensions-core/google-extensions/pom.xml @@ -48,15 +48,9 @@ - com.google.apis - google-api-services-storage - ${com.google.apis.storage.version} - - - com.google.api-client - google-api-client - - + com.google.cloud + google-cloud-storage + ${com.google.cloud.storage.version} commons-io @@ -125,6 +119,16 @@ 2.0.1 provided + + com.google.api + gax + 2.37.0 + + + com.google.cloud + google-cloud-core + 2.27.0 + org.apache.druid diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 5b9cea5d9c6..b99e8a36df9 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.Iterators; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputSplit; @@ -37,12 +36,12 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleStorageDruidModule; +import org.apache.druid.storage.google.GoogleStorageObjectMetadata; import org.apache.druid.storage.google.GoogleUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; -import java.math.BigInteger; import java.net.URI; import java.util.Collections; import java.util.Iterator; @@ -139,7 +138,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource @Override public long getObjectSize(CloudObjectLocation location) throws IOException { - final StorageObject storageObject = storage.getMetadata(location.getBucket(), location.getPath()); + final GoogleStorageObjectMetadata storageObject = storage.getMetadata(location.getBucket(), location.getPath()); return getSize(storageObject); } } @@ -147,15 +146,15 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource return new SplitWidget(); } - private static long getSize(final StorageObject object) + private static long getSize(final GoogleStorageObjectMetadata object) { - final BigInteger sizeInBigInteger = object.getSize(); + final Long sizeInLong = object.getSize(); - if (sizeInBigInteger == null) { + if (sizeInLong == null) { return Long.MAX_VALUE; } else { try { - return sizeInBigInteger.longValueExact(); + return sizeInLong; } catch (ArithmeticException e) { LOG.warn( @@ -164,7 +163,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource + "The max long value will be used for its size instead.", object.getBucket(), object.getName(), - sizeInBigInteger + sizeInLong ); return Long.MAX_VALUE; } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java index 977353f9f20..03554fc5c63 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java @@ -51,12 +51,12 @@ public class GoogleByteSource extends ByteSource @Override public InputStream openStream() throws IOException { - return storage.get(bucket, path); + return storage.getInputStream(bucket, path); } public InputStream openStream(long start) throws IOException { - return storage.get(bucket, path, start); + return storage.getInputStream(bucket, path, start); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java index fc3f7d371f4..61595264fc6 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java @@ -83,7 +83,7 @@ public class GoogleDataSegmentPuller implements URIDataPuller public InputStream getInputStream(URI uri) throws IOException { String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath()); - return storage.get(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path); + return storage.getInputStream(uri.getHost() != null ? uri.getHost() : uri.getAuthority(), path); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java index f181d08f443..91d290b1785 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java @@ -20,13 +20,26 @@ package org.apache.druid.storage.google; import com.google.api.client.http.AbstractInputStreamContent; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.Storage.Objects.Get; -import com.google.api.services.storage.model.StorageObject; +import com.google.api.gax.paging.Page; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.IOE; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; public class GoogleStorage { @@ -36,69 +49,204 @@ public class GoogleStorage * if we have a Storage instead of a supplier of it, it can cause unnecessary config validation * against Google storage even when it's not used at all. To perform the config validation * only when it is actually used, we use a supplier. - * + *

* See OmniDataSegmentKiller for how DataSegmentKillers are initialized. */ private final Supplier storage; - public GoogleStorage(Supplier storage) + private final HumanReadableBytes DEFAULT_WRITE_CHUNK_SIZE = new HumanReadableBytes("4MiB"); + + public GoogleStorage(final Supplier storage) { this.storage = storage; } public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException { - Storage.Objects.Insert insertObject = storage.get().objects().insert(bucket, null, mediaContent); - insertObject.setName(path); - insertObject.getMediaHttpUploader().setDirectUploadEnabled(false); - insertObject.execute(); + storage.get().createFrom(getBlobInfo(bucket, path), mediaContent.getInputStream()); } - public InputStream get(final String bucket, final String path) throws IOException + public InputStream getInputStream(final String bucket, final String path) throws IOException { - return get(bucket, path, 0); + return getInputStream(bucket, path, 0, null, null); } - public InputStream get(final String bucket, final String path, long start) throws IOException + public InputStream getInputStream( + final String bucket, + final String path, + long start + ) throws IOException { - final Get get = storage.get().objects().get(bucket, path); - InputStream inputStream = get.executeMediaAsInputStream(); - inputStream.skip(start); - return inputStream; + return getInputStream(bucket, path, start, null, null); } - public StorageObject getMetadata(final String bucket, final String path) throws IOException + public InputStream getInputStream( + final String bucket, + final String path, + long start, + Long length + ) throws IOException { - return storage.get().objects().get(bucket, path).execute(); + return getInputStream(bucket, path, start, length, null); + } + + public InputStream getInputStream( + final String bucket, + final String path, + long start, + @Nullable Long length, + @Nullable final Integer chunkSize + ) + throws IOException + { + ReadChannel reader = storage.get().reader(bucket, path); + reader.seek(start); + if (length != null) { + reader.limit(start + length); + } + if (chunkSize != null) { + reader.setChunkSize(chunkSize); + } + // Using default read buffer size (2 MB) + return Channels.newInputStream(reader); + } + + public OutputStream getObjectOutputStream( + final String bucket, + final String path, + @Nullable final Integer chunkSize + ) + { + WriteChannel writer = storage.get().writer(getBlobInfo(bucket, path)); + // Limit GCS internal write buffer memory to prevent OOM errors + writer.setChunkSize(chunkSize == null ? DEFAULT_WRITE_CHUNK_SIZE.getBytesInInt() : chunkSize); + + return Channels.newOutputStream(writer); + } + + public GoogleStorageObjectMetadata getMetadata( + final String bucket, + final String path + ) throws IOException + { + Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values())); + if (blob == null) { + throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); + } + return new GoogleStorageObjectMetadata( + blob.getBucket(), + blob.getName(), + blob.getSize(), + blob.getUpdateTimeOffsetDateTime() + .toEpochSecond() + ); } public void delete(final String bucket, final String path) throws IOException { - storage.get().objects().delete(bucket, path).execute(); + if (!storage.get().delete(bucket, path)) { + throw new IOE( + "Failed deleting google cloud storage object [bucket: %s path: %s]", + bucket, + path + ); + } + } + + /** + * Deletes a list of objects in a bucket + * + * @param bucket GCS bucket + * @param paths Iterable for absolute paths of objects to be deleted inside the bucket + */ + public void batchDelete(final String bucket, final Iterable paths) throws IOException + { + List statuses = storage.get().delete(Iterables.transform(paths, input -> BlobId.of(bucket, input))); + if (statuses.contains(false)) { + throw new IOE("Failed deleting google cloud storage object(s)"); + } } public boolean exists(final String bucket, final String path) { - try { - return storage.get().objects().get(bucket, path).executeUsingHead().isSuccessStatusCode(); - } - catch (Exception e) { - return false; - } + Blob blob = storage.get().get(bucket, path); + return blob != null; } - + public long size(final String bucket, final String path) throws IOException { - return storage.get().objects().get(bucket, path).execute().getSize().longValue(); + Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); + if (blob == null) { + throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); + } + return blob.getSize(); } public String version(final String bucket, final String path) throws IOException { - return storage.get().objects().get(bucket, path).execute().getEtag(); + Blob blob = storage.get().get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.GENERATION)); + if (blob == null) { + throw new IOE("Failed fetching google cloud storage object [bucket: %s, path: %s]", bucket, path); + } + return blob.getGeneratedId(); } - public Storage.Objects.List list(final String bucket) throws IOException + /*** + * Provides a paged listing of objects for a given bucket and prefix + * @param bucket GCS bucket + * @param prefix Path prefix + * @param pageSize Number of objects per page + * @param pageToken Continuation token for the next page; use null for the first page + * or the nextPageToken from the previous {@link GoogleStorageObjectPage} + */ + public GoogleStorageObjectPage list( + final String bucket, + @Nullable final String prefix, + @Nullable final Long pageSize, + @Nullable final String pageToken + ) throws IOException { - return storage.get().objects().list(bucket); + List options = new ArrayList<>(); + + if (prefix != null) { + options.add(Storage.BlobListOption.prefix(prefix)); + } + + if (pageSize != null) { + options.add(Storage.BlobListOption.pageSize(pageSize)); + } + + if (pageToken != null) { + options.add(Storage.BlobListOption.pageToken(pageToken)); + } + + Page blobPage = storage.get().list(bucket, options.toArray(new Storage.BlobListOption[0])); + + if (blobPage == null) { + throw new IOE("Failed fetching google cloud storage object [bucket: %s, prefix: %s]", bucket, prefix); + } + + + List googleStorageObjectMetadataList = + blobPage.streamValues() + .map(blob -> new GoogleStorageObjectMetadata( + blob.getBucket(), + blob.getName(), + blob.getSize(), + blob.getUpdateTimeOffsetDateTime() + .toEpochSecond() + )) + .collect(Collectors.toList()); + + return new GoogleStorageObjectPage(googleStorageObjectMetadataList, blobPage.getNextPageToken()); + + } + + + private BlobInfo getBlobInfo(final String bucket, final String path) + { + BlobId blobId = BlobId.of(bucket, path); + return BlobInfo.newBuilder(blobId).build(); + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java index cb90cb51fb8..0467906a6ca 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java @@ -23,10 +23,8 @@ import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.services.storage.Storage; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provider; @@ -86,6 +84,7 @@ public class GoogleStorageDruidModule implements DruidModule { LOG.info("Configuring GoogleStorageDruidModule..."); + JsonConfigProvider.bind(binder, "druid.google", GoogleInputDataConfig.class); JsonConfigProvider.bind(binder, "druid.google", GoogleAccountConfig.class); Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentPusher.class) @@ -104,16 +103,9 @@ public class GoogleStorageDruidModule implements DruidModule @Provides @LazySingleton - public Storage getGcpStorage( - HttpTransport httpTransport, - JsonFactory jsonFactory, - HttpRequestInitializer requestInitializer - ) + public Storage getGcpStorage() { - return new Storage - .Builder(httpTransport, jsonFactory, requestInitializer) - .setApplicationName(APPLICATION_NAME) - .build(); + return StorageOptions.getDefaultInstance().getService(); } /** diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java new file mode 100644 index 00000000000..87feb774a5d --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectMetadata.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import java.util.Objects; + +public class GoogleStorageObjectMetadata +{ + final String bucket; + final String name; + final Long size; + Long lastUpdateTime; + + public GoogleStorageObjectMetadata(final String bucket, final String name, final Long size, final Long lastUpdateTime) + { + this.bucket = bucket; + this.name = name; + this.size = size; + this.lastUpdateTime = lastUpdateTime; + } + + public void setLastUpdateTime(Long lastUpdateTime) + { + this.lastUpdateTime = lastUpdateTime; + } + + + public String getBucket() + { + return bucket; + } + + public String getName() + { + return name; + } + + public Long getSize() + { + return size; + } + + public Long getLastUpdateTime() + { + return lastUpdateTime; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleStorageObjectMetadata that = (GoogleStorageObjectMetadata) o; + return Objects.equals(bucket, that.bucket) + && Objects.equals(name, that.name) + && Objects.equals(size, that.size); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, name, size); + } + + @Override + public String toString() + { + return "GoogleStorageObjectMetadata{" + + "bucket='" + bucket + '\'' + + ", name='" + name + '\'' + + ", size=" + size + + ", lastUpdateTime=" + lastUpdateTime + + '}'; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java new file mode 100644 index 00000000000..e58059125a1 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageObjectPage.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import javax.annotation.Nullable; +import java.util.List; + +public class GoogleStorageObjectPage +{ + final List objectList; + + @Nullable + final String nextPageToken; + + public GoogleStorageObjectPage( + List objectList, + String nextPageToken + ) + { + this.objectList = objectList; + this.nextPageToken = nextPageToken; + } + + public List getObjectList() + { + return objectList; + } + + @Nullable + public String getNextPageToken() + { + return nextPageToken; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index ae4024172a6..4f7444f8ea9 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -204,7 +204,7 @@ public class GoogleTaskLogs implements TaskLogs inputDataConfig, config.getBucket(), config.getPrefix(), - (object) -> object.getUpdated().getValue() < timestamp + (object) -> object.getLastUpdateTime() < timestamp ); } catch (Exception e) { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java index d1ed8a7ef6a..b93128cc2fa 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java @@ -19,8 +19,6 @@ package org.apache.druid.storage.google; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; import com.google.inject.Inject; import org.apache.druid.data.SearchableVersionedDataFinder; import org.apache.druid.data.input.impl.CloudObjectLocation; @@ -49,21 +47,27 @@ public class GoogleTimestampVersionedDataFinder extends GoogleDataSegmentPuller long mostRecent = Long.MIN_VALUE; URI latest = null; final CloudObjectLocation baseLocation = new CloudObjectLocation(descriptorBase); - final Objects objects = storage.list(baseLocation.getBucket()).setPrefix(baseLocation.getPath()).setMaxResults(MAX_LISTING_KEYS).execute(); - for (StorageObject storageObject : objects.getItems()) { - if (GoogleUtils.isDirectoryPlaceholder(storageObject)) { + final GoogleStorageObjectPage googleStorageObjectPage = storage.list( + baseLocation.getBucket(), + baseLocation.getPath(), + MAX_LISTING_KEYS, + null + ); + for (GoogleStorageObjectMetadata objectMetadata : googleStorageObjectPage.getObjectList()) { + if (GoogleUtils.isDirectoryPlaceholder(objectMetadata)) { continue; } // remove path prefix from file name - final CloudObjectLocation objectLocation = new CloudObjectLocation(storageObject.getBucket(), - storageObject.getName() + final CloudObjectLocation objectLocation = new CloudObjectLocation( + objectMetadata.getBucket(), + objectMetadata.getName() ); final String keyString = StringUtils - .maybeRemoveLeadingSlash(storageObject.getName().substring(baseLocation.getPath().length())); + .maybeRemoveLeadingSlash(objectMetadata.getName().substring(baseLocation.getPath().length())); if (pattern != null && !pattern.matcher(keyString).matches()) { continue; } - final long latestModified = storageObject.getUpdated().getValue(); + final long latestModified = objectMetadata.getLastUpdateTime(); if (latestModified >= mostRecent) { mostRecent = latestModified; latest = objectLocation.toUri(GoogleStorageDruidModule.SCHEME_GS); @@ -72,7 +76,7 @@ public class GoogleTimestampVersionedDataFinder extends GoogleDataSegmentPuller return latest; } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException(); } } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 25b4f3286ea..a819442ef35 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -20,7 +20,6 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; -import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.impl.CloudObjectLocation; @@ -45,22 +44,22 @@ public class GoogleUtils return t instanceof IOException; } - static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception + public static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception { return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES); } - public static URI objectToUri(StorageObject object) + public static URI objectToUri(GoogleStorageObjectMetadata object) { return objectToCloudObjectLocation(object).toUri(GoogleStorageDruidModule.SCHEME_GS); } - public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object) + public static CloudObjectLocation objectToCloudObjectLocation(GoogleStorageObjectMetadata object) { return new CloudObjectLocation(object.getBucket(), object.getName()); } - public static Iterator lazyFetchingStorageObjectsIterator( + public static Iterator lazyFetchingStorageObjectsIterator( final GoogleStorage storage, final Iterator uris, final long maxListingLength @@ -85,18 +84,18 @@ public class GoogleUtils GoogleInputDataConfig config, String bucket, String prefix, - Predicate filter + Predicate filter ) throws Exception { - final Iterator iterator = lazyFetchingStorageObjectsIterator( + final Iterator iterator = lazyFetchingStorageObjectsIterator( storage, ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), config.getMaxListingLength() ); while (iterator.hasNext()) { - final StorageObject nextObject = iterator.next(); + final GoogleStorageObjectMetadata nextObject = iterator.next(); if (filter.apply(nextObject)) { retryGoogleCloudStorageOperation(() -> { storage.delete(nextObject.getBucket(), nextObject.getName()); @@ -110,13 +109,13 @@ public class GoogleUtils * Similar to {@link org.apache.druid.storage.s3.ObjectSummaryIterator#isDirectoryPlaceholder} * Copied to avoid creating dependency on s3 extensions */ - public static boolean isDirectoryPlaceholder(final StorageObject storageObject) + public static boolean isDirectoryPlaceholder(final GoogleStorageObjectMetadata objectMetadata) { // Recognize "standard" directory place-holder indications - if (storageObject.getName().endsWith("/") && storageObject.getSize().intValue() == 0) { + if (objectMetadata.getName().endsWith("/") && objectMetadata.getSize().intValue() == 0) { return true; } // Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension. - return storageObject.getName().endsWith("_$folder$") && storageObject.getSize().intValue() == 0; + return objectMetadata.getName().endsWith("_$folder$") && objectMetadata.getSize().intValue() == 0; } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java index 10275112f6f..b1ad9871776 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/ObjectStorageIterator.java @@ -19,9 +19,6 @@ package org.apache.druid.storage.google; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; import org.apache.druid.java.util.common.StringUtils; import java.io.IOException; @@ -29,61 +26,48 @@ import java.net.URI; import java.util.Iterator; import java.util.NoSuchElementException; -public class ObjectStorageIterator implements Iterator +public class ObjectStorageIterator implements Iterator { private final GoogleStorage storage; private final Iterator uris; private final long maxListingLength; - - private Storage.Objects.List listRequest; - private Objects results; + private GoogleStorageObjectPage googleStorageObjectPage; private URI currentUri; private String nextPageToken; - private Iterator storageObjectsIterator; - private StorageObject currentObject; + private Iterator blobIterator; + private GoogleStorageObjectMetadata currentObject; public ObjectStorageIterator(GoogleStorage storage, Iterator uris, long maxListingLength) { this.storage = storage; this.uris = uris; this.maxListingLength = maxListingLength; - this.nextPageToken = null; - prepareNextRequest(); - fetchNextBatch(); + advanceURI(); + fetchNextPage(); advanceStorageObject(); } - private void prepareNextRequest() + + private void advanceURI() + { + currentUri = uris.next(); + } + + private void fetchNextPage() { try { - currentUri = uris.next(); String currentBucket = currentUri.getAuthority(); String currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath()); - nextPageToken = null; - listRequest = storage.list(currentBucket) - .setPrefix(currentPrefix) - .setMaxResults(maxListingLength); - + googleStorageObjectPage = storage.list(currentBucket, currentPrefix, maxListingLength, nextPageToken); + blobIterator = googleStorageObjectPage.getObjectList().iterator(); + nextPageToken = googleStorageObjectPage.getNextPageToken(); } catch (IOException io) { throw new RuntimeException(io); } } - private void fetchNextBatch() - { - try { - listRequest.setPageToken(nextPageToken); - results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute()); - storageObjectsIterator = results.getItems().iterator(); - nextPageToken = results.getNextPageToken(); - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - @Override public boolean hasNext() { @@ -91,35 +75,35 @@ public class ObjectStorageIterator implements Iterator } @Override - public StorageObject next() + public GoogleStorageObjectMetadata next() { if (!hasNext()) { throw new NoSuchElementException(); } - final StorageObject retVal = currentObject; + final GoogleStorageObjectMetadata retVal = currentObject; advanceStorageObject(); return retVal; } private void advanceStorageObject() { - while (storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext()) { - while (storageObjectsIterator.hasNext()) { - final StorageObject next = storageObjectsIterator.next(); + while (blobIterator.hasNext() || nextPageToken != null || uris.hasNext()) { + while (blobIterator.hasNext()) { + final GoogleStorageObjectMetadata next = blobIterator.next(); // list with prefix can return directories, but they should always end with `/`, ignore them. // also skips empty objects. - if (!next.getName().endsWith("/") && next.getSize().signum() > 0) { + if (!next.getName().endsWith("/") && Long.signum(next.getSize()) > 0) { currentObject = next; return; } } if (nextPageToken != null) { - fetchNextBatch(); + fetchNextPage(); } else if (uris.hasNext()) { - prepareNextRequest(); - fetchNextBatch(); + advanceURI(); + fetchNextPage(); } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java new file mode 100644 index 00000000000..a3d1c863a75 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleInputRange.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import java.util.Objects; + +public class GoogleInputRange +{ + private final long start; + private final long size; + private final String bucket; + private final String path; + + public GoogleInputRange(long start, long size, String bucket, String path) + { + this.start = start; + this.size = size; + this.bucket = bucket; + this.path = path; + } + + public long getStart() + { + return start; + } + + public long getSize() + { + return size; + } + + public String getBucket() + { + return bucket; + } + + public String getPath() + { + return path; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleInputRange that = (GoogleInputRange) o; + return start == that.start + && size == that.size + && Objects.equals(bucket, that.bucket) + && Objects.equals(path, that.path); + } + + @Override + public int hashCode() + { + return Objects.hash(start, size, bucket, path); + } + + @Override + public String toString() + { + return "GoogleInputRange{" + + "start=" + start + + ", size=" + size + + ", bucket='" + bucket + '\'' + + ", path='" + path + '\'' + + '}'; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java new file mode 100644 index 00000000000..c9c78151ae9 --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.RetryUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +public class GoogleOutputConfig +{ + + @JsonProperty + private final String bucket; + + @JsonProperty + private final String prefix; + + @JsonProperty + private final File tempDir; + + @JsonProperty + private HumanReadableBytes chunkSize; + + private static final HumanReadableBytes DEFAULT_CHUNK_SIZE = new HumanReadableBytes("4MiB"); + + // GCS imposed minimum chunk size + private static final long GOOGLE_MIN_CHUNK_SIZE_BYTES = new HumanReadableBytes("256KiB").getBytes(); + + // Self-imposed max chunk size since this size is allocated per open file consuming significant memory. + private static final long GOOGLE_MAX_CHUNK_SIZE_BYTES = new HumanReadableBytes("16MiB").getBytes(); + + + @JsonProperty + private int maxRetry; + + public GoogleOutputConfig( + final String bucket, + final String prefix, + final File tempDir, + @Nullable final HumanReadableBytes chunkSize, + @Nullable final Integer maxRetry + ) + { + this.bucket = bucket; + this.prefix = prefix; + this.tempDir = tempDir; + this.chunkSize = chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE; + this.maxRetry = maxRetry != null ? maxRetry : RetryUtils.DEFAULT_MAX_TRIES; + + validateFields(); + } + + public String getBucket() + { + return bucket; + } + + public String getPrefix() + { + return prefix; + } + + public File getTempDir() + { + return tempDir; + } + + public HumanReadableBytes getChunkSize() + { + return chunkSize; + } + + public Integer getMaxRetry() + { + return maxRetry; + } + + private void validateFields() + { + if (chunkSize.getBytes() < GOOGLE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > GOOGLE_MAX_CHUNK_SIZE_BYTES) { + throw InvalidInput.exception( + "'chunkSize' [%d] bytes to the GoogleConfig should be between [%d] bytes and [%d] bytes", + chunkSize.getBytes(), + GOOGLE_MIN_CHUNK_SIZE_BYTES, + GOOGLE_MAX_CHUNK_SIZE_BYTES + ); + } + } + + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleOutputConfig that = (GoogleOutputConfig) o; + return Objects.equals(bucket, that.bucket) + && Objects.equals(prefix, that.prefix) + && Objects.equals(tempDir, that.tempDir) + && Objects.equals(chunkSize, that.chunkSize) + && Objects.equals(maxRetry, that.maxRetry); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, prefix, tempDir, chunkSize, maxRetry); + } + + @Override + public String toString() + { + return "GoogleOutputConfig{" + + "container='" + bucket + '\'' + + ", prefix='" + prefix + '\'' + + ", tempDir=" + tempDir + + ", chunkSize=" + chunkSize + + ", maxRetry=" + maxRetry + + '}'; + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java new file mode 100644 index 00000000000..6edbad3beaf --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnector.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; +import org.apache.druid.storage.google.GoogleStorageObjectMetadata; +import org.apache.druid.storage.google.GoogleUtils; +import org.apache.druid.storage.remote.ChunkingStorageConnector; +import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; + +public class GoogleStorageConnector extends ChunkingStorageConnector +{ + + private static final String DELIM = "/"; + private static final Joiner JOINER = Joiner.on(DELIM).skipNulls(); + private static final Logger log = new Logger(GoogleStorageConnector.class); + + private final GoogleStorage storage; + private final GoogleOutputConfig config; + private final GoogleInputDataConfig inputDataConfig; + + public GoogleStorageConnector( + GoogleOutputConfig config, + GoogleStorage googleStorage, + GoogleInputDataConfig inputDataConfig + ) + { + this.storage = googleStorage; + this.config = config; + this.inputDataConfig = inputDataConfig; + + Preconditions.checkNotNull(config, "config is null"); + Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in google config"); + + try { + FileUtils.mkdirp(config.getTempDir()); + } + catch (IOException e) { + throw new RE( + e, + StringUtils.format("Cannot create tempDir : [%s] for google storage connector", config.getTempDir()) + ); + } + } + + @Override + public boolean pathExists(String path) + { + return storage.exists(config.getBucket(), objectPath(path)); + } + + @Override + public OutputStream write(String path) + { + return storage.getObjectOutputStream(config.getBucket(), objectPath(path), config.getChunkSize().getBytesInInt()); + } + + @Override + public void deleteFile(String path) throws IOException + { + try { + final String fullPath = objectPath(path); + log.debug("Deleting file at bucket: [%s], path: [%s]", config.getBucket(), fullPath); + + GoogleUtils.retryGoogleCloudStorageOperation( + () -> { + storage.delete(config.getBucket(), fullPath); + return null; + } + ); + } + catch (Exception e) { + log.error("Error occurred while deleting file at path [%s]. Error: [%s]", path, e.getMessage()); + throw new IOException(e); + } + } + + @Override + public void deleteFiles(Iterable paths) throws IOException + { + storage.batchDelete(config.getBucket(), Iterables.transform(paths, this::objectPath)); + } + + @Override + public void deleteRecursively(String path) throws IOException + { + final String fullPath = objectPath(path); + Iterator storageObjects = GoogleUtils.lazyFetchingStorageObjectsIterator( + storage, + ImmutableList.of(new CloudObjectLocation(config.getBucket(), fullPath) + .toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), + inputDataConfig.getMaxListingLength() + ); + + storage.batchDelete( + config.getBucket(), + () -> Iterators.transform(storageObjects, GoogleStorageObjectMetadata::getName) + ); + } + + @Override + public Iterator listDir(String dirName) + { + final String fullPath = objectPath(dirName); + Iterator storageObjects = GoogleUtils.lazyFetchingStorageObjectsIterator( + storage, + ImmutableList.of(new CloudObjectLocation(config.getBucket(), fullPath) + .toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), + inputDataConfig.getMaxListingLength() + ); + + return Iterators.transform( + storageObjects, + storageObject -> { + String[] split = storageObject.getName().split(fullPath, 2); + if (split.length > 1) { + return split[1]; + } else { + return ""; + } + } + ); + } + + @Override + public ChunkingStorageConnectorParameters buildInputParams(String path) throws IOException + { + long size = storage.size(config.getBucket(), objectPath(path)); + return buildInputParams(path, 0, size); + } + + @Override + public ChunkingStorageConnectorParameters buildInputParams(String path, long from, long size) + { + ChunkingStorageConnectorParameters.Builder builder = new ChunkingStorageConnectorParameters.Builder<>(); + builder.start(from); + builder.end(from + size); + builder.cloudStoragePath(objectPath(path)); + builder.tempDirSupplier(config::getTempDir); + builder.maxRetry(config.getMaxRetry()); + builder.retryCondition(GoogleUtils.GOOGLE_RETRY); + builder.objectSupplier(((start, end) -> new GoogleInputRange( + start, + end - start, + config.getBucket(), + objectPath(path) + ))); + builder.objectOpenFunction(new ObjectOpenFunction() + { + @Override + public InputStream open(GoogleInputRange googleInputRange) throws IOException + { + return storage.getInputStream( + googleInputRange.getBucket(), + googleInputRange.getPath(), + googleInputRange.getStart(), + googleInputRange.getSize() + ); + } + + @Override + public InputStream open(GoogleInputRange googleInputRange, long offset) throws IOException + { + long rangeStart = googleInputRange.getStart() + offset; + return storage.getInputStream( + googleInputRange.getBucket(), + googleInputRange.getPath(), + rangeStart, + googleInputRange.getSize() + ); + } + }); + + return builder.build(); + } + + private String objectPath(String path) + { + return JOINER.join(config.getPrefix(), path); + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java new file mode 100644 index 00000000000..cba33b5804c --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorModule.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class GoogleStorageConnectorModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule(this.getClass().getSimpleName()).registerSubtypes(GoogleStorageConnectorProvider.class)); + } + + @Override + public void configure(Binder binder) + { + + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java new file mode 100644 index 00000000000..f33a3b1f44d --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; + +import javax.annotation.Nullable; +import java.io.File; + +@JsonTypeName(GoogleStorageDruidModule.SCHEME) +public class GoogleStorageConnectorProvider extends GoogleOutputConfig implements StorageConnectorProvider +{ + + @JacksonInject + GoogleStorage googleStorage; + + @JacksonInject + GoogleInputDataConfig googleInputDataConfig; + + @JsonCreator + public GoogleStorageConnectorProvider( + @JsonProperty(value = "bucket", required = true) String bucket, + @JsonProperty(value = "prefix", required = true) String prefix, + @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, + @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry + ) + { + super(bucket, prefix, tempDir, chunkSize, maxRetry); + } + + @Override + public StorageConnector get() + { + return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig); + } + +} diff --git a/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule index 3d89b732031..92cb05897cc 100644 --- a/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ b/extensions-core/google-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.druid.storage.google.GoogleStorageDruidModule +org.apache.druid.storage.google.output.GoogleStorageConnectorModule +org.apache.druid.storage.google.GoogleStorageDruidModule \ No newline at end of file diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index 556eb840ea9..ae968aa3d6f 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -23,9 +23,6 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.guice.ObjectMapperModule; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Guice; @@ -56,6 +53,8 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageObjectMetadata; +import org.apache.druid.storage.google.GoogleStorageObjectPage; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.utils.CompressionUtils; import org.easymock.EasyMock; @@ -68,7 +67,6 @@ import org.junit.rules.ExpectedException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.math.BigInteger; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -114,6 +112,10 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe private static final byte[] CONTENT = StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); + private static final String BUCKET = "TEST_BUCKET"; + private static final String OBJECT_NAME = "TEST_NAME"; + private static final Long UPDATE_TIME = 111L; + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -207,21 +209,31 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe } @Test - public void testWithUrisSplit() throws Exception + public void testWithUrisSplit() throws IOException { EasyMock.reset(STORAGE); + + GoogleStorageObjectMetadata objectMetadata = new GoogleStorageObjectMetadata( + BUCKET, + OBJECT_NAME, + (long) CONTENT.length, + UPDATE_TIME + ); + EasyMock.expect( STORAGE.getMetadata( EXPECTED_URIS.get(0).getAuthority(), StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(0).getPath()) ) - ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); + ).andReturn(objectMetadata); + EasyMock.expect( STORAGE.getMetadata( EXPECTED_URIS.get(1).getAuthority(), StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(1).getPath()) ) - ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); + ).andReturn(objectMetadata); + EasyMock.replay(STORAGE); GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( @@ -243,21 +255,28 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe } @Test - public void testWithUrisGlob() throws Exception + public void testWithUrisGlob() throws IOException { + GoogleStorageObjectMetadata objectMetadata = new GoogleStorageObjectMetadata( + BUCKET, + OBJECT_NAME, + (long) CONTENT.length, + UPDATE_TIME + ); + EasyMock.reset(STORAGE); EasyMock.expect( STORAGE.getMetadata( EXPECTED_URIS.get(0).getAuthority(), StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(0).getPath()) ) - ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); + ).andReturn(objectMetadata); EasyMock.expect( STORAGE.getMetadata( EXPECTED_URIS.get(1).getAuthority(), StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(1).getPath()) ) - ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); + ).andReturn(objectMetadata); EasyMock.replay(STORAGE); GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( STORAGE, @@ -488,28 +507,30 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe { final String bucket = prefix.getAuthority(); - Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); - EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once(); - EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once(); - EasyMock.expect(listRequest.setMaxResults((long) MAX_LISTING_LENGTH)).andReturn(listRequest).once(); - EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())))) - .andReturn(listRequest) - .once(); + GoogleStorageObjectPage response = EasyMock.createMock(GoogleStorageObjectPage.class); - List mockObjects = new ArrayList<>(); + List mockObjects = new ArrayList<>(); for (URI uri : uris) { - StorageObject s = new StorageObject(); - s.setBucket(bucket); - s.setName(uri.getPath()); - s.setSize(BigInteger.valueOf(CONTENT.length)); + GoogleStorageObjectMetadata s = new GoogleStorageObjectMetadata( + bucket, + uri.getPath(), + (long) CONTENT.length, + UPDATE_TIME + ); mockObjects.add(s); } - Objects response = new Objects(); - response.setItems(mockObjects); - EasyMock.expect(listRequest.execute()).andReturn(response).once(); - EasyMock.expect(response.getItems()).andReturn(mockObjects).once(); - EasyMock.replay(listRequest); + EasyMock.expect(STORAGE.list( + EasyMock.eq(bucket), + EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())), + EasyMock.eq((long) MAX_LISTING_LENGTH), + EasyMock.eq(null) + )).andReturn(response).once(); + + EasyMock.expect(response.getObjectList()).andReturn(mockObjects).once(); + EasyMock.expect(response.getNextPageToken()).andReturn(null).once(); + + EasyMock.replay(response); } private static void addExpectedGetObjectMock(URI uri) throws IOException @@ -517,7 +538,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe CloudObjectLocation location = new CloudObjectLocation(uri); EasyMock.expect( - STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) + STORAGE.getInputStream(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) ).andReturn(new ByteArrayInputStream(CONTENT)).once(); } @@ -529,7 +550,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped); EasyMock.expect( - STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) + STORAGE.getInputStream(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) ).andReturn(new ByteArrayInputStream(gzipped.toByteArray())).once(); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java index a65f4750d3b..db5c0e724bd 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleByteSourceTest.java @@ -36,7 +36,7 @@ public class GoogleByteSourceTest extends EasyMockSupport GoogleStorage storage = createMock(GoogleStorage.class); InputStream stream = createMock(InputStream.class); - EasyMock.expect(storage.get(bucket, path)).andReturn(stream); + EasyMock.expect(storage.getInputStream(bucket, path)).andReturn(stream); replayAll(); @@ -54,7 +54,7 @@ public class GoogleByteSourceTest extends EasyMockSupport final String path = "/path/to/file"; GoogleStorage storage = createMock(GoogleStorage.class); - EasyMock.expect(storage.get(bucket, path)).andThrow(new IOException("")); + EasyMock.expect(storage.getInputStream(bucket, path)).andThrow(new IOException("")); replayAll(); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java index 97a0e61dce3..8d9612f4d8d 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentKillerTest.java @@ -24,8 +24,6 @@ import com.google.api.client.googleapis.testing.json.GoogleJsonResponseException import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpResponseException; import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.ISE; @@ -169,17 +167,10 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport @Test public void test_killAll_noException_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1, object2) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2)); GoogleTestUtils.expectDeleteObjects( storage, @@ -190,29 +181,22 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.replay(accountConfig, inputDataConfig, storage); GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.killAll(); - EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.verify(accountConfig, inputDataConfig, storage); } @Test public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -224,30 +208,22 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.replay(accountConfig, inputDataConfig, storage); GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.killAll(); - EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.verify(accountConfig, inputDataConfig, storage); } @Test public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; - Storage.Objects.List listRequest = null; try { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -259,7 +235,7 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport EasyMock.expect(accountConfig.getPrefix()).andReturn(PREFIX).anyTimes(); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.replay(accountConfig, inputDataConfig, storage); GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage, accountConfig, inputDataConfig); killer.killAll(); @@ -270,6 +246,6 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(listRequest, accountConfig, inputDataConfig, storage); + EasyMock.verify(accountConfig, inputDataConfig, storage); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java index deb2383fd6c..059432ba818 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleDataSegmentPullerTest.java @@ -52,7 +52,7 @@ public class GoogleDataSegmentPullerTest extends EasyMockSupport 300, "test" ); - EasyMock.expect(storage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(exception); + EasyMock.expect(storage.getInputStream(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andThrow(exception); replayAll(); @@ -93,7 +93,7 @@ public class GoogleDataSegmentPullerTest extends EasyMockSupport String prefix = "prefix/"; GoogleStorage storage = createMock(GoogleStorage.class); - EasyMock.expect(storage.get(EasyMock.eq(bucket), EasyMock.eq(prefix))).andReturn(EasyMock.createMock(InputStream.class)); + EasyMock.expect(storage.getInputStream(EasyMock.eq(bucket), EasyMock.eq(prefix))).andReturn(EasyMock.createMock(InputStream.class)); EasyMock.replay(storage); GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java index 9ca384cf468..e45877c89f4 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageDruidModuleTest.java @@ -19,14 +19,9 @@ package org.apache.druid.storage.google; -import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.services.storage.Storage; +import com.google.cloud.storage.Storage; import com.google.common.collect.ImmutableList; import com.google.inject.Injector; -import org.apache.druid.common.gcp.GcpMockModule; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.junit.Assert; @@ -42,23 +37,7 @@ public class GoogleStorageDruidModuleTest // HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded // lazily, the exception should end up thrown. // 2. That the same object is returned. - Injector injector = GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - new GcpMockModule() - { - - @Override - public HttpRequestInitializer mockRequestInitializer( - HttpTransport transport, - JsonFactory factory - ) - { - return new MockGoogleCredential.Builder().setTransport(transport).setJsonFactory(factory).build(); - } - }, - new GoogleStorageDruidModule() - ) - ); + Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule())); OmniDataSegmentKiller killer = injector.getInstance(OmniDataSegmentKiller.class); Assert.assertTrue(killer.getKillers().containsKey(GoogleStorageDruidModule.SCHEME)); Assert.assertSame( @@ -78,23 +57,7 @@ public class GoogleStorageDruidModuleTest // HttpRquestInitializer, the test throws an exception from that method, meaning that if they are not loaded // lazily, the exception should end up thrown. // 2. That the same object is returned. - Injector injector = GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - new GcpMockModule() - { - - @Override - public HttpRequestInitializer mockRequestInitializer( - HttpTransport transport, - JsonFactory factory - ) - { - throw new UnsupportedOperationException("should not be called, because this should be lazy"); - } - }, - new GoogleStorageDruidModule() - ) - ); + Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new GoogleStorageDruidModule())); final GoogleStorage instance = injector.getInstance(GoogleStorage.class); Assert.assertSame(instance, injector.getInstance(GoogleStorage.class)); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java index 848cf97fed1..d92339f53c7 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleStorageTest.java @@ -19,73 +19,243 @@ package org.apache.druid.storage.google; -import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; -import com.google.api.client.http.ByteArrayContent; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.services.storage.Storage; -import com.google.common.base.Suppliers; -import org.apache.druid.java.util.common.StringUtils; -import org.junit.Assert; +import com.google.api.gax.paging.Page; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.common.collect.ImmutableList; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.io.InputStream; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class GoogleStorageTest { - @Test - public void testGet() throws IOException + Storage mockStorage; + GoogleStorage googleStorage; + + Blob blob; + + static final String BUCKET = "bucket"; + static final String PATH = "/path"; + static final long SIZE = 100; + static final OffsetDateTime UPDATE_TIME = OffsetDateTime.MIN; + + @Before + public void setUp() { - String content = "abcdefghij"; - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContent(content); - GoogleStorage googleStorage = makeGoogleStorage(response); - InputStream is = googleStorage.get("bucket", "path"); - String actual = GoogleTestUtils.readAsString(is); - Assert.assertEquals(content, actual); + mockStorage = EasyMock.mock(Storage.class); + + googleStorage = new GoogleStorage(() -> mockStorage); + + blob = EasyMock.mock(Blob.class); } @Test - public void testGetWithOffset() throws IOException + public void testDeleteSuccess() throws IOException { - String content = "abcdefghij"; - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContent(content); - GoogleStorage googleStorage = makeGoogleStorage(response); - InputStream is = googleStorage.get("bucket", "path", 2); - String actual = GoogleTestUtils.readAsString(is); - Assert.assertEquals(content.substring(2), actual); + EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(true); + EasyMock.replay(mockStorage); + googleStorage.delete(BUCKET, PATH); } @Test - public void testInsert() throws IOException + public void testDeleteFailure() { - String content = "abcdefghij"; - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.addHeader("Location", "http://random-path"); - response.setContent("{}"); - MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build(); - GoogleStorage googleStorage = makeGoogleStorage(transport); - googleStorage.insert("bucket", "path", new ByteArrayContent("text/html", StringUtils.toUtf8(content))); - MockLowLevelHttpRequest request = transport.getLowLevelHttpRequest(); - String actual = request.getContentAsString(); - Assert.assertEquals(content, actual); + EasyMock.expect(mockStorage.delete(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(false); + EasyMock.replay(mockStorage); + boolean thrownIOException = false; + try { + googleStorage.delete(BUCKET, PATH); + + } + catch (IOException e) { + thrownIOException = true; + } + assertTrue(thrownIOException); } - private GoogleStorage makeGoogleStorage(MockLowLevelHttpResponse response) + @Test + public void testBatchDeleteSuccess() throws IOException { - MockHttpTransport transport = new MockHttpTransport.Builder().setLowLevelHttpResponse(response).build(); - return makeGoogleStorage(transport); + List paths = ImmutableList.of("/path1", "/path2"); + final Capture> pathIterable = Capture.newInstance(); + EasyMock.expect(mockStorage.delete(EasyMock.capture(pathIterable))).andReturn(ImmutableList.of(true, true)); + EasyMock.replay(mockStorage); + + googleStorage.batchDelete(BUCKET, paths); + + List recordedBlobIds = new ArrayList<>(); + pathIterable.getValue().iterator().forEachRemaining(recordedBlobIds::add); + + List recordedPaths = recordedBlobIds.stream().map(BlobId::getName).collect(Collectors.toList()); + + assertTrue(paths.size() == recordedPaths.size() && paths.containsAll(recordedPaths) && recordedPaths.containsAll( + paths)); + assertEquals(BUCKET, recordedBlobIds.get(0).getBucket()); } - private GoogleStorage makeGoogleStorage(MockHttpTransport transport) + @Test + public void testBatchDeleteFailure() { - HttpRequestInitializer initializer = new MockGoogleCredential.Builder().build(); - Storage storage = new Storage(transport, JacksonFactory.getDefaultInstance(), initializer); - return new GoogleStorage(Suppliers.ofInstance(storage)); + List paths = ImmutableList.of("/path1", "/path2"); + EasyMock.expect(mockStorage.delete((Iterable) EasyMock.anyObject())) + .andReturn(ImmutableList.of(false, true)); + EasyMock.replay(mockStorage); + boolean thrownIOException = false; + try { + googleStorage.batchDelete(BUCKET, paths); + + } + catch (IOException e) { + thrownIOException = true; + } + assertTrue(thrownIOException); + } + + @Test + public void testGetMetadata() throws IOException + { + EasyMock.expect(mockStorage.get( + EasyMock.eq(BUCKET), + EasyMock.eq(PATH), + EasyMock.anyObject(Storage.BlobGetOption.class) + )).andReturn(blob); + + EasyMock.expect(blob.getBucket()).andReturn(BUCKET); + EasyMock.expect(blob.getName()).andReturn(PATH); + EasyMock.expect(blob.getSize()).andReturn(SIZE); + EasyMock.expect(blob.getUpdateTimeOffsetDateTime()).andReturn(UPDATE_TIME); + + EasyMock.replay(mockStorage, blob); + + GoogleStorageObjectMetadata objectMetadata = googleStorage.getMetadata(BUCKET, PATH); + assertEquals(objectMetadata, new GoogleStorageObjectMetadata(BUCKET, PATH, SIZE, UPDATE_TIME.toEpochSecond())); + + } + + @Test + public void testExistsTrue() + { + EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(blob); + EasyMock.replay(mockStorage); + assertTrue(googleStorage.exists(BUCKET, PATH)); + } + + @Test + public void testExistsFalse() + { + EasyMock.expect(mockStorage.get(EasyMock.eq(BUCKET), EasyMock.eq(PATH))).andReturn(null); + EasyMock.replay(mockStorage); + assertFalse(googleStorage.exists(BUCKET, PATH)); + } + + @Test + public void testSize() throws IOException + { + EasyMock.expect(mockStorage.get( + EasyMock.eq(BUCKET), + EasyMock.eq(PATH), + EasyMock.anyObject(Storage.BlobGetOption.class) + )).andReturn(blob); + + EasyMock.expect(blob.getSize()).andReturn(SIZE); + + EasyMock.replay(mockStorage, blob); + + long size = googleStorage.size(BUCKET, PATH); + + assertEquals(size, SIZE); + } + + @Test + public void testVersion() throws IOException + { + final String version = "7"; + EasyMock.expect(mockStorage.get( + EasyMock.eq(BUCKET), + EasyMock.eq(PATH), + EasyMock.anyObject(Storage.BlobGetOption.class) + )).andReturn(blob); + + EasyMock.expect(blob.getGeneratedId()).andReturn(version); + + EasyMock.replay(mockStorage, blob); + + assertEquals(version, googleStorage.version(BUCKET, PATH)); + } + + @Test + public void testList() throws IOException + { + Page blobPage = EasyMock.mock(Page.class); + EasyMock.expect(mockStorage.list( + EasyMock.eq(BUCKET), + EasyMock.anyObject() + )).andReturn(blobPage); + + Blob blob1 = EasyMock.mock(Blob.class); + Blob blob2 = EasyMock.mock(Blob.class); + + final String bucket1 = "BUCKET_1"; + final String path1 = "PATH_1"; + final long size1 = 7; + final OffsetDateTime updateTime1 = OffsetDateTime.MIN; + + final String bucket2 = "BUCKET_2"; + final String path2 = "PATH_2"; + final long size2 = 9; + final OffsetDateTime updateTime2 = OffsetDateTime.MIN; + + final String nextPageToken = "TOKEN"; + + EasyMock.expect(blob1.getBucket()).andReturn(bucket1); + EasyMock.expect(blob1.getName()).andReturn(path1); + EasyMock.expect(blob1.getSize()).andReturn(size1); + EasyMock.expect(blob1.getUpdateTimeOffsetDateTime()).andReturn(updateTime1); + + EasyMock.expect(blob2.getBucket()).andReturn(bucket2); + EasyMock.expect(blob2.getName()).andReturn(path2); + EasyMock.expect(blob2.getSize()).andReturn(size2); + EasyMock.expect(blob2.getUpdateTimeOffsetDateTime()).andReturn(updateTime2); + + + List blobs = ImmutableList.of(blob1, blob2); + + EasyMock.expect(blobPage.streamValues()).andReturn(blobs.stream()); + + EasyMock.expect(blobPage.getNextPageToken()).andReturn(nextPageToken); + + + EasyMock.replay(mockStorage, blobPage, blob1, blob2); + + GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata( + bucket1, + path1, + size1, + updateTime1.toEpochSecond() + ); + GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata( + bucket2, + path2, + size2, + updateTime2.toEpochSecond() + ); + + GoogleStorageObjectPage objectPage = googleStorage.list(BUCKET, PATH, null, null); + + assertEquals(objectPage.getObjectList().get(0), objectMetadata1); + assertEquals(objectPage.getObjectList().get(1), objectMetadata2); + assertEquals(objectPage.getNextPageToken(), nextPageToken); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java index 9bfe2706f80..a0f17c97d91 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java @@ -22,8 +22,6 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpResponseException; import com.google.api.client.http.InputStreamContent; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -146,7 +144,7 @@ public class GoogleTaskLogsTest extends EasyMockSupport final String logPath = PREFIX + "/" + TASKID; EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length()); - EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog))); + EasyMock.expect(storage.getInputStream(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(testLog))); replayAll(); @@ -168,7 +166,7 @@ public class GoogleTaskLogsTest extends EasyMockSupport final String logPath = PREFIX + "/" + TASKID; EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length()); - EasyMock.expect(storage.get(BUCKET, logPath, offset)) + EasyMock.expect(storage.getInputStream(BUCKET, logPath, offset)) .andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog))); replayAll(); @@ -192,7 +190,7 @@ public class GoogleTaskLogsTest extends EasyMockSupport final String logPath = PREFIX + "/" + TASKID; EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) testLog.length()); - EasyMock.expect(storage.get(BUCKET, logPath, internalOffset)) + EasyMock.expect(storage.getInputStream(BUCKET, logPath, internalOffset)) .andReturn(new ByteArrayInputStream(StringUtils.toUtf8(expectedLog))); replayAll(); @@ -214,7 +212,7 @@ public class GoogleTaskLogsTest extends EasyMockSupport final String logPath = PREFIX + "/" + TASKID + ".status.json"; EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) taskStatus.length()); - EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(taskStatus))); + EasyMock.expect(storage.getInputStream(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(taskStatus))); replayAll(); @@ -230,18 +228,11 @@ public class GoogleTaskLogsTest extends EasyMockSupport @Test public void test_killAll_noException_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_1); EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1, object2) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2)); GoogleTestUtils.expectDeleteObjects( storage, @@ -250,29 +241,22 @@ public class GoogleTaskLogsTest extends EasyMockSupport ); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.replay(inputDataConfig, storage, timeSupplier); googleTaskLogs.killAll(); - EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.verify(inputDataConfig, storage, timeSupplier); } @Test public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -282,31 +266,23 @@ public class GoogleTaskLogsTest extends EasyMockSupport EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.replay(inputDataConfig, storage, timeSupplier); googleTaskLogs.killAll(); - EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.verify(inputDataConfig, storage, timeSupplier); } @Test public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; - Storage.Objects.List listRequest = null; try { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -316,7 +292,7 @@ public class GoogleTaskLogsTest extends EasyMockSupport EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.replay(inputDataConfig, storage, timeSupplier); googleTaskLogs.killAll(); } @@ -326,23 +302,16 @@ public class GoogleTaskLogsTest extends EasyMockSupport Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(listRequest, inputDataConfig, storage, timeSupplier); + EasyMock.verify(inputDataConfig, storage, timeSupplier); } @Test public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - StorageObject object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_FUTURE); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object2 = GoogleTestUtils.newStorageObject(BUCKET, KEY_2, TIME_FUTURE); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1, object2) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1, object2)); GoogleTestUtils.expectDeleteObjects( storage, @@ -351,25 +320,18 @@ public class GoogleTaskLogsTest extends EasyMockSupport ); EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage); + EasyMock.replay(inputDataConfig, storage); googleTaskLogs.killOlderThan(TIME_NOW); - EasyMock.verify(listRequest, inputDataConfig, storage); + EasyMock.verify(inputDataConfig, storage); } @Test public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - Storage.Objects.List listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -379,29 +341,21 @@ public class GoogleTaskLogsTest extends EasyMockSupport EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage); + EasyMock.replay(inputDataConfig, storage); googleTaskLogs.killOlderThan(TIME_NOW); - EasyMock.verify(listRequest, inputDataConfig, storage); + EasyMock.verify(inputDataConfig, storage); } @Test public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; - Storage.Objects.List listRequest = null; try { - StorageObject object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); + GoogleStorageObjectMetadata object1 = GoogleTestUtils.newStorageObject(BUCKET, KEY_1, TIME_0); - listRequest = GoogleTestUtils.expectListRequest(storage, PREFIX_URI); - - GoogleTestUtils.expectListObjects( - listRequest, - PREFIX_URI, - MAX_KEYS, - ImmutableList.of(object1) - ); + GoogleTestUtils.expectListObjectsPageRequest(storage, PREFIX_URI, MAX_KEYS, ImmutableList.of(object1)); GoogleTestUtils.expectDeleteObjects( storage, @@ -411,7 +365,7 @@ public class GoogleTaskLogsTest extends EasyMockSupport EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); - EasyMock.replay(listRequest, inputDataConfig, storage); + EasyMock.replay(inputDataConfig, storage); googleTaskLogs.killOlderThan(TIME_NOW); } @@ -421,6 +375,6 @@ public class GoogleTaskLogsTest extends EasyMockSupport Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(listRequest, inputDataConfig, storage); + EasyMock.verify(inputDataConfig, storage); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java index 219d96c2166..c68911448e2 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTestUtils.java @@ -19,21 +19,17 @@ package org.apache.druid.storage.google; -import com.google.api.client.util.DateTime; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.easymock.IExpectationSetters; +import org.joda.time.DateTime; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; -import java.math.BigInteger; import java.net.URI; import java.util.HashMap; import java.util.List; @@ -41,79 +37,60 @@ import java.util.Map; public class GoogleTestUtils extends EasyMockSupport { - private static final org.joda.time.DateTime NOW = DateTimes.nowUtc(); + private static final DateTime NOW = DateTimes.nowUtc(); private static final byte[] CONTENT = StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); - public static StorageObject newStorageObject( + public static GoogleStorageObjectMetadata newStorageObject( String bucket, String key, long lastModifiedTimestamp ) { - StorageObject object = new StorageObject(); - object.setBucket(bucket); - object.setName(key); - object.setUpdated(new DateTime(lastModifiedTimestamp)); - object.setEtag("etag"); - object.setSize(BigInteger.valueOf(CONTENT.length)); + GoogleStorageObjectMetadata object = new GoogleStorageObjectMetadata(bucket, key, (long) CONTENT.length, + lastModifiedTimestamp + ); return object; } - public static Storage.Objects.List expectListRequest( + public static void expectListObjectsPageRequest( GoogleStorage storage, - URI prefix - ) throws IOException - { - Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); - String bucket = prefix.getAuthority(); - EasyMock.expect( - storage.list(bucket) - ).andReturn(listRequest).once(); - return listRequest; - } - - public static void expectListObjects( - Storage.Objects.List listRequest, URI prefix, long maxListingLength, - List objects + List objectMetadataList ) throws IOException { - EasyMock.expect(listRequest.setPrefix(StringUtils.maybeRemoveLeadingSlash(prefix.getPath()))).andReturn(listRequest); - EasyMock.expect(listRequest.setMaxResults(maxListingLength)).andReturn(listRequest); - EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).anyTimes(); - - Objects resultObjects = new Objects(); - resultObjects.setItems(objects); - - EasyMock.expect( - listRequest.execute() - ).andReturn(resultObjects).once(); + GoogleStorageObjectPage objectMetadataPage = new GoogleStorageObjectPage(objectMetadataList, null); + String bucket = prefix.getAuthority(); + EasyMock.expect(storage.list(bucket, StringUtils.maybeRemoveLeadingSlash(prefix.getPath()), maxListingLength, null)) + .andReturn(objectMetadataPage) + .once(); } public static void expectDeleteObjects( GoogleStorage storage, - List deleteObjectExpected, - Map deleteObjectToException + List deleteObjectExpected, + Map deleteObjectToException ) throws IOException { - Map> requestToResultExpectationSetter = new HashMap<>(); - for (Map.Entry deleteObjectAndException : deleteObjectToException.entrySet()) { - StorageObject deleteObject = deleteObjectAndException.getKey(); + Map> requestToResultExpectationSetter = new HashMap<>(); + for (Map.Entry deleteObjectAndException : deleteObjectToException.entrySet()) { + GoogleStorageObjectMetadata deleteObject = deleteObjectAndException.getKey(); Exception exception = deleteObjectAndException.getValue(); - IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get( + deleteObject); if (resultExpectationSetter == null) { storage.delete(deleteObject.getBucket(), deleteObject.getName()); - resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); + resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter); } else { resultExpectationSetter.andThrow(exception); } } - for (StorageObject deleteObject : deleteObjectExpected) { - IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject); + for (GoogleStorageObjectMetadata deleteObject : deleteObjectExpected) { + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get( + deleteObject); if (resultExpectationSetter == null) { storage.delete(deleteObject.getBucket(), deleteObject.getName()); resultExpectationSetter = EasyMock.expectLastCall(); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java index 408033db053..b9417b7f7f0 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java @@ -19,8 +19,6 @@ package org.apache.druid.storage.google; -import com.google.api.client.util.DateTime; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; @@ -38,14 +36,14 @@ public class GoogleTimestampVersionedDataFinderTest String keyPrefix = "prefix/dir/0"; // object for directory prefix/dir/0/ - final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0); - storageObject1.setUpdated(new DateTime(System.currentTimeMillis())); - final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1); - storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); - final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1); - storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100)); - final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4); - storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100)); + final GoogleStorageObjectMetadata storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0); + storageObject1.setLastUpdateTime(System.currentTimeMillis()); + final GoogleStorageObjectMetadata storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1); + storageObject2.setLastUpdateTime(System.currentTimeMillis()); + final GoogleStorageObjectMetadata storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1); + storageObject3.setLastUpdateTime(System.currentTimeMillis() + 100); + final GoogleStorageObjectMetadata storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4); + storageObject4.setLastUpdateTime(System.currentTimeMillis() + 100); final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); @@ -62,14 +60,14 @@ public class GoogleTimestampVersionedDataFinderTest String keyPrefix = "prefix/dir/0/"; // object for directory prefix/dir/0/ - final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0); - storageObject1.setUpdated(new DateTime(System.currentTimeMillis())); - final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1); - storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); - final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1); - storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100)); - final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4); - storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100)); + final GoogleStorageObjectMetadata storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0); + storageObject1.setLastUpdateTime(System.currentTimeMillis()); + final GoogleStorageObjectMetadata storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1); + storageObject2.setLastUpdateTime(System.currentTimeMillis()); + final GoogleStorageObjectMetadata storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1); + storageObject3.setLastUpdateTime(System.currentTimeMillis() + 100); + final GoogleStorageObjectMetadata storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4); + storageObject4.setLastUpdateTime(System.currentTimeMillis() + 100); final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java index a0b2bd8f297..a1f227ab5d4 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java @@ -19,19 +19,11 @@ package org.apache.druid.storage.google; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import org.apache.druid.storage.google.ObjectStorageIteratorTest.MockStorage.MockObjects.MockList; -import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; -import java.math.BigInteger; import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -39,7 +31,7 @@ import java.util.stream.Collectors; public class ObjectStorageIteratorTest { - private static final ImmutableList TEST_OBJECTS = + private static final ImmutableList TEST_OBJECTS = ImmutableList.of( makeStorageObject("b", "foo", 10L), makeStorageObject("b", "foo/", 0L), // directory @@ -163,11 +155,11 @@ public class ObjectStorageIteratorTest final int maxListingLength ) { - final List expectedObjects = new ArrayList<>(); + final List expectedObjects = new ArrayList<>(); // O(N^2) but who cares -- the list is short. for (final String uri : expectedUris) { - final List matches = TEST_OBJECTS + final List matches = TEST_OBJECTS .stream() .filter(storageObject -> GoogleUtils.objectToUri(storageObject).toString().equals(uri)) .collect(Collectors.toList()); @@ -175,7 +167,7 @@ public class ObjectStorageIteratorTest expectedObjects.add(Iterables.getOnlyElement(matches)); } - final List actualObjects = ImmutableList.copyOf( + final List actualObjects = ImmutableList.copyOf( GoogleUtils.lazyFetchingStorageObjectsIterator( makeMockClient(TEST_OBJECTS), prefixes.stream().map(URI::create).iterator(), @@ -194,70 +186,33 @@ public class ObjectStorageIteratorTest * Makes a mock Google Storage client that handles enough of "List" to test the functionality of the * {@link ObjectStorageIterator} class. */ - static GoogleStorage makeMockClient(final List storageObjects) + static GoogleStorage makeMockClient(final List storageObjects) { return new GoogleStorage(null) { @Override - public Storage.Objects.List list(final String bucket) + public GoogleStorageObjectPage list( + final String bucket, + final String prefix, + final Long pageSize, + final String pageToken + ) { - return mockList(bucket, storageObjects); - } - }; - } - - @SuppressWarnings("UnnecessaryFullyQualifiedName") - static class MockStorage extends Storage - { - private MockStorage() - { - super( - EasyMock.niceMock(HttpTransport.class), - EasyMock.niceMock(JsonFactory.class), - EasyMock.niceMock(HttpRequestInitializer.class) - ); - } - - private MockList mockList(String bucket, java.util.List storageObjects) - { - return new MockObjects().mockList(bucket, storageObjects); - } - - class MockObjects extends Storage.Objects - { - private MockList mockList(String bucket, java.util.List storageObjects) - { - return new MockList(bucket, storageObjects); - } - - class MockList extends Objects.List - { - private final java.util.List storageObjects; - - private MockList(String bucket, java.util.List storageObjects) - { - super(bucket); - this.storageObjects = storageObjects; - } - - @Override - public com.google.api.services.storage.model.Objects execute() { // Continuation token is an index in the "objects" list. - final String continuationToken = getPageToken(); - final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken); + final int startIndex = pageToken == null ? 0 : Integer.parseInt(pageToken); // Find matching objects. - java.util.List objects = new ArrayList<>(); + List objects = new ArrayList<>(); int nextIndex = -1; for (int i = startIndex; i < storageObjects.size(); i++) { - final StorageObject storageObject = storageObjects.get(i); + final GoogleStorageObjectMetadata storageObject = storageObjects.get(i); - if (storageObject.getBucket().equals(getBucket()) - && storageObject.getName().startsWith(getPrefix())) { + if (storageObject.getBucket().equals(bucket) + && storageObject.getName().startsWith(prefix)) { - if (objects.size() == getMaxResults()) { + if (objects.size() == pageSize) { // We reached our max key limit; set nextIndex (which will lead to a result with truncated = true). nextIndex = i; break; @@ -268,30 +223,18 @@ public class ObjectStorageIteratorTest } } - com.google.api.services.storage.model.Objects retVal = new com.google.api.services.storage.model.Objects(); - retVal.setItems(objects); - if (nextIndex >= 0) { - retVal.setNextPageToken(String.valueOf(nextIndex)); - } else { - retVal.setNextPageToken(null); - } + GoogleStorageObjectPage retVal = new GoogleStorageObjectPage( + objects, + nextIndex >= 0 ? String.valueOf(nextIndex) : null + ); return retVal; } } - } + }; } - private static MockList mockList(String bucket, List storageObjects) + static GoogleStorageObjectMetadata makeStorageObject(final String bucket, final String key, final long size) { - return new MockStorage().mockList(bucket, storageObjects); - } - - static StorageObject makeStorageObject(final String bucket, final String key, final long size) - { - final StorageObject summary = new StorageObject(); - summary.setBucket(bucket); - summary.setName(key); - summary.setSize(BigInteger.valueOf(size)); - return summary; + return new GoogleStorageObjectMetadata(bucket, key, size, null); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java new file mode 100644 index 00000000000..1c3dabcf984 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleInputRangeTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class GoogleInputRangeTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(GoogleInputRange.class) + .usingGetClass() + .verify(); + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java new file mode 100644 index 00000000000..59081d96149 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleOutputConfigTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class GoogleOutputConfigTest +{ + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final String BUCKET = "bucket"; + private static final String PREFIX = "prefix"; + private static final int MAX_RETRY_COUNT = 0; + + @Test + public void testTooLargeChunkSize() + { + HumanReadableBytes chunkSize = new HumanReadableBytes("17MiB"); + Assert.assertThrows( + DruidException.class, + () -> new GoogleOutputConfig(BUCKET, PREFIX, temporaryFolder.newFolder(), chunkSize, MAX_RETRY_COUNT) + ); + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java new file mode 100644 index 00000000000..df6c66e84c3 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.ProvisionException; +import com.google.inject.name.Names; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorModule; +import org.apache.druid.storage.StorageConnectorProvider; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Properties; + +public class GoogleStorageConnectorProviderTest +{ + private static final String CUSTOM_NAMESPACE = "custom"; + + @Test + public void createGoogleStorageFactoryWithRequiredProperties() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "google"); + properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp"); + StorageConnectorProvider googleStorageConnectorProvider = getStorageConnectorProvider(properties); + + Assert.assertTrue(googleStorageConnectorProvider instanceof GoogleStorageConnectorProvider); + Assert.assertTrue(googleStorageConnectorProvider.get() instanceof GoogleStorageConnector); + Assert.assertEquals("bucket", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getBucket()); + Assert.assertEquals("prefix", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getPrefix()); + Assert.assertEquals(new File("/tmp"), ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getTempDir()); + + } + + @Test + public void createGoogleStorageFactoryWithMissingPrefix() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "bucket"); + properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket"); + properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp"); + Assert.assertThrows( + "Missing required creator property 'prefix'", + ProvisionException.class, + () -> getStorageConnectorProvider(properties) + ); + } + + + @Test + public void createGoogleStorageFactoryWithMissingbucket() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "Google"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp"); + Assert.assertThrows( + "Missing required creator property 'bucket'", + ProvisionException.class, + () -> getStorageConnectorProvider(properties) + ); + } + + @Test + public void createGoogleStorageFactoryWithMissingTempDir() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "Google"); + properties.setProperty(CUSTOM_NAMESPACE + ".bucket", "bucket"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + + Assert.assertThrows( + "Missing required creator property 'tempDir'", + ProvisionException.class, + () -> getStorageConnectorProvider(properties) + ); + } + + private StorageConnectorProvider getStorageConnectorProvider(Properties properties) + { + StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add( + new GoogleStorageDruidModule(), + new StorageConnectorModule(), + new GoogleStorageConnectorModule(), + binder -> { + JsonConfigProvider.bind( + binder, + CUSTOM_NAMESPACE, + StorageConnectorProvider.class, + Names.named(CUSTOM_NAMESPACE) + ); + + binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) + .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) + .in(LazySingleton.class); + } + ).withProperties(properties); + + Injector injector = startupInjectorBuilder.build(); + injector.getInstance(ObjectMapper.class).registerModules(new GoogleStorageConnectorModule().getJacksonModules()); + injector.getInstance(ObjectMapper.class).setInjectableValues( + new InjectableValues.Std() + .addValue( + GoogleStorage.class, + EasyMock.mock(GoogleStorage.class) + ).addValue( + GoogleInputDataConfig.class, + EasyMock.mock(GoogleInputDataConfig.class) + )); + + + return injector.getInstance(Key.get( + StorageConnectorProvider.class, + Names.named(CUSTOM_NAMESPACE) + )); + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java new file mode 100644 index 00000000000..7a5e6ba107b --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google.output; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.storage.google.GoogleInputDataConfig; +import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageObjectMetadata; +import org.apache.druid.storage.google.GoogleStorageObjectPage; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class GoogleStorageConnectorTest +{ + private static final String BUCKET = "BUCKET"; + private static final String PREFIX = "PREFIX"; + private static final String TEST_FILE = "TEST_FILE"; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final int MAX_LISTING_LEN = 10; + + private static final HumanReadableBytes CHUNK_SIZE = new HumanReadableBytes("4MiB"); + + GoogleStorageConnector googleStorageConnector; + private final GoogleStorage googleStorage = EasyMock.createMock(GoogleStorage.class); + + @Before + public void setUp() throws IOException + { + GoogleOutputConfig config = new GoogleOutputConfig(BUCKET, PREFIX, temporaryFolder.newFolder(), CHUNK_SIZE, null); + GoogleInputDataConfig inputDataConfig = new GoogleInputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_LISTING_LEN); + googleStorageConnector = new GoogleStorageConnector(config, googleStorage, inputDataConfig); + } + + @Test + public void testPathExistsSuccess() + { + final Capture bucket = Capture.newInstance(); + final Capture path = Capture.newInstance(); + EasyMock.expect(googleStorage.exists(EasyMock.capture(bucket), EasyMock.capture(path))).andReturn(true); + EasyMock.replay(googleStorage); + Assert.assertTrue(googleStorageConnector.pathExists(TEST_FILE)); + Assert.assertEquals(BUCKET, bucket.getValue()); + Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue()); + EasyMock.verify(googleStorage); + } + + @Test + public void testPathExistsFailure() + { + final Capture bucket = Capture.newInstance(); + final Capture path = Capture.newInstance(); + EasyMock.expect(googleStorage.exists(EasyMock.capture(bucket), EasyMock.capture(path))).andReturn(false); + EasyMock.replay(googleStorage); + Assert.assertFalse(googleStorageConnector.pathExists(TEST_FILE)); + Assert.assertEquals(BUCKET, bucket.getValue()); + Assert.assertEquals(PREFIX + "/" + TEST_FILE, path.getValue()); + EasyMock.verify(googleStorage); + } + + @Test + public void testDeleteFile() throws IOException + { + Capture bucketCapture = EasyMock.newCapture(); + Capture pathCapture = EasyMock.newCapture(); + googleStorage.delete( + EasyMock.capture(bucketCapture), + EasyMock.capture(pathCapture) + ); + + EasyMock.replay(googleStorage); + googleStorageConnector.deleteFile(TEST_FILE); + Assert.assertEquals(BUCKET, bucketCapture.getValue()); + Assert.assertEquals(PREFIX + "/" + TEST_FILE, pathCapture.getValue()); + } + + @Test + public void testDeleteFiles() throws IOException + { + Capture containerCapture = EasyMock.newCapture(); + Capture> pathsCapture = EasyMock.newCapture(); + googleStorage.batchDelete(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture)); + EasyMock.replay(googleStorage); + googleStorageConnector.deleteFiles(ImmutableList.of(TEST_FILE + "_1.part", TEST_FILE + "_2.json")); + Assert.assertEquals(BUCKET, containerCapture.getValue()); + Assert.assertEquals( + ImmutableList.of( + PREFIX + "/" + TEST_FILE + "_1.part", + PREFIX + "/" + TEST_FILE + "_2.json" + ), + Lists.newArrayList(pathsCapture.getValue()) + ); + EasyMock.reset(googleStorage); + } + + @Test + public void testListDir() throws IOException + { + GoogleStorageObjectMetadata objectMetadata1 = new GoogleStorageObjectMetadata( + BUCKET, + PREFIX + "/x/y" + TEST_FILE, + (long) 3, + null + ); + GoogleStorageObjectMetadata objectMetadata2 = new GoogleStorageObjectMetadata( + BUCKET, + PREFIX + "/p/q/r/" + TEST_FILE, + (long) 4, + null + ); + Capture maxListingCapture = EasyMock.newCapture(); + Capture pageTokenCapture = EasyMock.newCapture(); + EasyMock.expect(googleStorage.list( + EasyMock.anyString(), + EasyMock.anyString(), + EasyMock.capture(maxListingCapture), + EasyMock.capture(pageTokenCapture) + )) + .andReturn(new GoogleStorageObjectPage(ImmutableList.of(objectMetadata1, objectMetadata2), null)); + EasyMock.replay(googleStorage); + List ret = Lists.newArrayList(googleStorageConnector.listDir("")); + Assert.assertEquals(ImmutableList.of("x/y" + TEST_FILE, "p/q/r/" + TEST_FILE), ret); + Assert.assertEquals(MAX_LISTING_LEN, maxListingCapture.getValue().intValue()); + Assert.assertEquals(null, pageTokenCapture.getValue()); + + } + + @Test + public void testRead() throws IOException + { + String data = "test"; + EasyMock.expect(googleStorage.size(EasyMock.anyString(), EasyMock.anyString())) + .andReturn(4L); + EasyMock.expect( + googleStorage.getInputStream( + EasyMock.anyString(), + EasyMock.anyString(), + EasyMock.anyLong(), + EasyMock.anyLong() + ) + ).andReturn(IOUtils.toInputStream(data, StandardCharsets.UTF_8)); + + EasyMock.replay(googleStorage); + InputStream is = googleStorageConnector.read(TEST_FILE); + byte[] dataBytes = new byte[data.length()]; + Assert.assertEquals(data.length(), is.read(dataBytes)); + Assert.assertEquals(-1, is.read()); + Assert.assertEquals(data, new String(dataBytes, StandardCharsets.UTF_8)); + } + + @Test + public void testReadRange() throws IOException + { + String data = "test"; + + for (int start = 0; start < data.length(); ++start) { + for (long length = 1; length <= data.length() - start; ++length) { + String dataQueried = data.substring(start, start + ((Long) length).intValue()); + EasyMock.expect(googleStorage.getInputStream( + EasyMock.anyString(), + EasyMock.anyString(), + EasyMock.anyLong(), + EasyMock.anyLong() + )) + .andReturn(IOUtils.toInputStream(dataQueried, StandardCharsets.UTF_8)); + EasyMock.replay(googleStorage); + + InputStream is = googleStorageConnector.readRange(TEST_FILE, start, length); + byte[] dataBytes = new byte[((Long) length).intValue()]; + Assert.assertEquals(length, is.read(dataBytes)); + Assert.assertEquals(-1, is.read()); + Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8)); + EasyMock.reset(googleStorage); + } + } + + } +} diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml index a2b5e596671..4eb51f9b75d 100644 --- a/integration-tests-ex/cases/pom.xml +++ b/integration-tests-ex/cases/pom.xml @@ -145,23 +145,6 @@ aws-java-sdk-core ${aws.sdk.version} - - com.google.api-client - google-api-client - ${com.google.apis.client.version} - provided - - - com.google.apis - google-api-services-storage - ${com.google.apis.storage.version} - - - com.google.api-client - google-api-client - - - com.microsoft.azure azure-storage @@ -316,6 +299,18 @@ curator-client 5.5.0 + + com.google.cloud + google-cloud-storage + 2.29.1 + provided + + + com.google.api-client + google-api-client + 2.2.0 + provided + diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java index f1823bdcf87..9c44d5b1439 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/utils/GcsTestUtil.java @@ -25,7 +25,7 @@ import com.google.api.client.http.FileContent; import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.services.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.common.base.Predicates; import com.google.common.base.Suppliers; import org.apache.druid.java.util.common.logger.Logger; @@ -84,10 +84,7 @@ public class GcsTestUtil GoogleCredential finalCredential = credential; return new GoogleStorage( Suppliers.memoize( - () -> new Storage - .Builder(httpTransport, jsonFactory, finalCredential) - .setApplicationName("GcsTestUtil") - .build() + () -> StorageOptions.getDefaultInstance().getService() ) ); } diff --git a/licenses.yaml b/licenses.yaml index 0eea187bf6e..4367878cbd0 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3883,22 +3883,12 @@ name: Google Cloud Storage JSON API license_category: binary module: extensions/druid-google-extensions license_name: Apache License version 2.0 -version: v1-rev20230301-2.0.0 +version: v1-rev20231028-2.0.0 libraries: - com.google.apis: google-api-services-storage --- -name: Google Compute Engine API -license_category: binary -module: extensions/gce-extensions -license_name: Apache License version 2.0 -version: v1-rev20230606-2.0.0 -libraries: - - com.google.apis: google-api-services-compute - ---- - name: Google APIs Client Library For Java license_category: binary module: java-core @@ -3909,14 +3899,193 @@ libraries: --- -name: Google HTTP Client Library For Java +name: Google Storage Client Library For Java license_category: binary -module: java-core -license_name: Apache License version 2.0 -version: 1.42.3 +module: extensions/druid-google-extensions +license_name: Apache-2.0 +version: 2.29.1 libraries: - - com.google.http-client: google-http-client - - com.google.http-client: google-http-client-jackson2 + - com.google.cloud: google-cloud-storage + +--- + +name: Google Cloud Storage API +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 2.20.0 +libraries: + - com.google.cloud: google-cloud-storage + - com.google.api: api-common + +--- + +name: gax +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 2.37.0 +libraries: + - com.google.api: gax + - com.google.api: gax-grpc + - com.google.api: gax-httpjson + +--- + +name: grpc-api +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 2.29.1-alpha +libraries: + - com.google.api.grpc: gapic-google-cloud-storage-v2 + - com.google.api.grpc: grpc-google-cloud-storage-v2 + - com.google.api.grpc: proto-google-cloud-storage-v2 + +--- + +name: grpc-io +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.59.0 +libraries: + - io.grpc: grpc-alts + - io.grpc: grpc-api + - io.grpc: grpc-auth + - io.grpc: grpc-context + - io.grpc: grpc-core + - io.grpc: grpc-grpclib + - io.grpc: grpc-inprocess + - io.grpc: grpc-protobuf + - io.grpc: grpc-protobuf-lite + - io.grpc: grpc-stub + +--- + +name: proto-google-common-protos +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 2.28.0 +libraries: + - com.google.api.grpc: proto-google-common-protos + +--- + +name: proto-google-iam-v1 +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.23.0 +libraries: + - com.google.api.grpc: proto-google-iam-v1 + +--- + +name: google-auth +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 1.20.0 +libraries: + - com.google.auth: google-auth-library-credentials + - com.google.auth: google-auth-library-oauth2-http + +--- + +name: google-auto-value +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.10.4 +libraries: + - com.google.auto.value: auto-value-annotations + +--- + +name: google-cloud +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 2.27.0 +libraries: + - com.google.cloud: google-cloud-core + - com.google.cloud: google-cloud-core-grpc + - com.google.cloud: google-cloud-core-http + +--- + +name: listenablefuture +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 9999.0-empty-to-avoid-conflict-with-guava +libraries: + - com.google.guava: listenablefuture + +--- + +name: google-http-client +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.43.3 +libraries: + - com.google.http-client: google-http-client-apache-v2 + - com.google.http-client: google-http-client-appengine + - com.google.http-client: google-http-client-gson + +--- + +name: google-protobuf +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 3.24.4 +libraries: + - com.google.protobuf: protobuf-java-util + +--- + +name: google-grpc +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 1.59.0 +libraries: + - io.grpc: grpc-grpclb + +--- + +name: io-opencensus +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 0.31.1 +libraries: + - io.opencensus: opencensus-api + - io.opencensus: opencensus-contrib-http-util + +--- + +name: conscrypt-openjdk-uber +license_category: binary +module: extensions/druid-google-extensions +license_name: Apache License version 2.0 +version: 2.5.2 +libraries: + - org.conscrypt: conscrypt-openjdk-uber + +--- + +name: threetenbp +license_category: binary +module: extensions/druid-google-extensions +license_name: BSD-3-Clause License +version: 1.6.8 +libraries: + - org.threeten: threetenbp --- diff --git a/pom.xml b/pom.xml index 8c3b26fbaf4..59d8bacb639 100644 --- a/pom.xml +++ b/pom.xml @@ -129,7 +129,7 @@ 2.2.0 1.42.3 v1-rev20230606-2.0.0 - v1-rev20230301-2.0.0 + 2.29.1 maven.org