From 2fdc313e4d9dab5b005826497508db7f9fbd33a3 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Tue, 30 Mar 2021 01:40:41 +0530 Subject: [PATCH] GCS lookup support (#11026) * GCS lookup support * checkstyle fix * review comments * review comments * remove unused import --- .../extensions-core/lookups-cached-global.md | 4 +- .../google/GoogleCloudStorageEntity.java | 3 +- .../google/GoogleCloudStorageInputSource.java | 5 +- .../google/GoogleDataSegmentPuller.java | 2 +- .../google/GoogleStorageDruidModule.java | 9 ++- .../GoogleTimestampVersionedDataFinder.java | 78 ++++++++++++++++++ .../druid/storage/google/GoogleUtils.java | 19 ++++- ...oogleTimestampVersionedDataFinderTest.java | 81 +++++++++++++++++++ .../google/ObjectStorageIteratorTest.java | 4 +- 9 files changed, 192 insertions(+), 13 deletions(-) create mode 100644 extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java create mode 100644 extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java diff --git a/docs/development/extensions-core/lookups-cached-global.md b/docs/development/extensions-core/lookups-cached-global.md index 4872a0977c4..a97419806ef 100644 --- a/docs/development/extensions-core/lookups-cached-global.md +++ b/docs/development/extensions-core/lookups-cached-global.md @@ -214,12 +214,12 @@ The remapping values for each globally cached lookup can be specified by a JSON |Property|Description|Required|Default| |--------|-----------|--------|-------| |`pollPeriod`|Period between polling for updates|No|0 (only once)| -|`uri`|URI for the file of interest, specified as a file, hdfs, or s3 path|No|Use `uriPrefix`| +|`uri`|URI for the file of interest, specified as a file, hdfs, s3 or gs path|No|Use `uriPrefix`| |`uriPrefix`|A URI that specifies a directory (or other searchable resource) in which to search for files|No|Use `uri`| |`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`| |`namespaceParseSpec`|How to interpret the data at the URI|Yes|| -One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), or S3 (s3://) location. HTTP location is not currently supported. +One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), S3 (s3://) or GCS (gs://) location. HTTP location is not currently supported. The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data. diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 4c1b53abc2b..13583a12f9c 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -25,6 +25,7 @@ import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; import org.apache.druid.storage.google.GoogleUtils; import javax.annotation.Nullable; @@ -47,7 +48,7 @@ public class GoogleCloudStorageEntity extends RetryingInputEntity @Override public URI getUri() { - return location.toUri(GoogleCloudStorageInputSource.SCHEME); + return location.toUri(GoogleStorageDruidModule.SCHEME_GS); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 9d37b3f7f50..47ba4b7c6a0 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleStorageDruidModule; import org.apache.druid.storage.google.GoogleUtils; import org.apache.druid.utils.Streams; @@ -47,8 +48,6 @@ import java.util.stream.Stream; public class GoogleCloudStorageInputSource extends CloudObjectInputSource { - public static final String SCHEME = "gs"; - private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class); private final GoogleStorage storage; @@ -63,7 +62,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource @JsonProperty("objects") @Nullable List objects ) { - super(SCHEME, uris, prefixes, objects); + super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects); this.storage = storage; this.inputDataConfig = inputDataConfig; } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java index dbc22bc5d87..7ed270a4152 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java @@ -37,7 +37,7 @@ public class GoogleDataSegmentPuller implements URIDataPuller { private static final Logger LOG = new Logger(GoogleDataSegmentPuller.class); - private final GoogleStorage storage; + protected final GoogleStorage storage; @Inject public GoogleDataSegmentPuller(final GoogleStorage storage) diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java index c30ce073579..d6ed002b5bc 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java @@ -30,6 +30,8 @@ import com.google.api.services.storage.Storage; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; +import com.google.inject.multibindings.MapBinder; +import org.apache.druid.data.SearchableVersionedDataFinder; import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; import org.apache.druid.firehose.google.StaticGoogleBlobStoreFirehoseFactory; import org.apache.druid.guice.Binders; @@ -42,7 +44,8 @@ import java.util.List; public class GoogleStorageDruidModule implements DruidModule { - static final String SCHEME = "google"; + public static final String SCHEME = "google"; + public static final String SCHEME_GS = "gs"; private static final Logger LOG = new Logger(GoogleStorageDruidModule.class); private static final String APPLICATION_NAME = "druid-google-extensions"; @@ -94,6 +97,10 @@ public class GoogleStorageDruidModule implements DruidModule Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", GoogleTaskLogsConfig.class); binder.bind(GoogleTaskLogs.class).in(LazySingleton.class); + MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class) + .addBinding(SCHEME_GS) + .to(GoogleTimestampVersionedDataFinder.class) + .in(LazySingleton.class); } @Provides diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java new file mode 100644 index 00000000000..d1ed8a7ef6a --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.inject.Inject; +import org.apache.druid.data.SearchableVersionedDataFinder; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; +import java.util.regex.Pattern; + +public class GoogleTimestampVersionedDataFinder extends GoogleDataSegmentPuller + implements SearchableVersionedDataFinder +{ + private static final long MAX_LISTING_KEYS = 1000; + + @Inject + public GoogleTimestampVersionedDataFinder(final GoogleStorage storage) + { + super(storage); + } + + @Override + public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern) + { + try { + long mostRecent = Long.MIN_VALUE; + URI latest = null; + final CloudObjectLocation baseLocation = new CloudObjectLocation(descriptorBase); + final Objects objects = storage.list(baseLocation.getBucket()).setPrefix(baseLocation.getPath()).setMaxResults(MAX_LISTING_KEYS).execute(); + for (StorageObject storageObject : objects.getItems()) { + if (GoogleUtils.isDirectoryPlaceholder(storageObject)) { + continue; + } + // remove path prefix from file name + final CloudObjectLocation objectLocation = new CloudObjectLocation(storageObject.getBucket(), + storageObject.getName() + ); + final String keyString = StringUtils + .maybeRemoveLeadingSlash(storageObject.getName().substring(baseLocation.getPath().length())); + if (pattern != null && !pattern.matcher(keyString).matches()) { + continue; + } + final long latestModified = storageObject.getUpdated().getValue(); + if (latestModified >= mostRecent) { + mostRecent = latestModified; + latest = objectLocation.toUri(GoogleStorageDruidModule.SCHEME_GS); + } + } + return latest; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 2ed80fedb27..c9e84582ec1 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -23,7 +23,6 @@ import com.google.api.client.http.HttpResponseException; import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -53,7 +52,7 @@ public class GoogleUtils public static URI objectToUri(StorageObject object) { - return objectToCloudObjectLocation(object).toUri(GoogleCloudStorageInputSource.SCHEME); + return objectToCloudObjectLocation(object).toUri(GoogleStorageDruidModule.SCHEME_GS); } public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object) @@ -91,7 +90,7 @@ public class GoogleUtils { final Iterator iterator = lazyFetchingStorageObjectsIterator( storage, - ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("gs")).iterator(), + ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(), config.getMaxListingLength() ); @@ -105,4 +104,18 @@ public class GoogleUtils } } } + + /** + * Similar to {@link org.apache.druid.storage.s3.ObjectSummaryIterator#isDirectoryPlaceholder} + * Copied to avoid creating dependency on s3 extensions + */ + public static boolean isDirectoryPlaceholder(final StorageObject storageObject) + { + // Recognize "standard" directory place-holder indications + if (storageObject.getName().endsWith("/") && storageObject.getSize().intValue() == 0) { + return true; + } + // Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension. + return storageObject.getName().endsWith("_$folder$") && storageObject.getSize().intValue() == 0; + } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java new file mode 100644 index 00000000000..408033db053 --- /dev/null +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTimestampVersionedDataFinderTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.google.api.client.util.DateTime; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; +import java.util.regex.Pattern; + +public class GoogleTimestampVersionedDataFinderTest +{ + @Test + public void getLatestVersion() + { + String bucket = "bucket"; + String keyPrefix = "prefix/dir/0"; + + // object for directory prefix/dir/0/ + final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0); + storageObject1.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1); + storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1); + storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100)); + final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4); + storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100)); + final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); + + final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); + Pattern pattern = Pattern.compile("v.*"); + URI latest = finder.getLatestVersion(URI.create(StringUtils.format("gs://%s/%s", bucket, keyPrefix)), pattern); + URI expected = URI.create(StringUtils.format("gs://%s/%s", bucket, storageObject3.getName())); + Assert.assertEquals(expected, latest); + } + + @Test + public void getLatestVersionTrailingSlashKeyPrefix() + { + String bucket = "bucket"; + String keyPrefix = "prefix/dir/0/"; + + // object for directory prefix/dir/0/ + final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0); + storageObject1.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1); + storageObject2.setUpdated(new DateTime(System.currentTimeMillis())); + final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1); + storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100)); + final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4); + storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100)); + final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4)); + + final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage); + Pattern pattern = Pattern.compile("v.*"); + URI latest = finder.getLatestVersion(URI.create(StringUtils.format("gs://%s/%s", bucket, keyPrefix)), pattern); + URI expected = URI.create(StringUtils.format("gs://%s/%s", bucket, storageObject3.getName())); + Assert.assertEquals(expected, latest); + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java index 4d1504f842c..a0b2bd8f297 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/ObjectStorageIteratorTest.java @@ -194,7 +194,7 @@ public class ObjectStorageIteratorTest * Makes a mock Google Storage client that handles enough of "List" to test the functionality of the * {@link ObjectStorageIterator} class. */ - private static GoogleStorage makeMockClient(final List storageObjects) + static GoogleStorage makeMockClient(final List storageObjects) { return new GoogleStorage(null) { @@ -286,7 +286,7 @@ public class ObjectStorageIteratorTest return new MockStorage().mockList(bucket, storageObjects); } - private static StorageObject makeStorageObject(final String bucket, final String key, final long size) + static StorageObject makeStorageObject(final String bucket, final String key, final long size) { final StorageObject summary = new StorageObject(); summary.setBucket(bucket);