GCS lookup support (#11026)

* GCS lookup support

* checkstyle fix

* review comments

* review comments

* remove unused import
This commit is contained in:
Parag Jain 2021-03-30 01:40:41 +05:30 committed by GitHub
parent 74ae2eb71a
commit 2fdc313e4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 192 additions and 13 deletions

View File

@ -214,12 +214,12 @@ The remapping values for each globally cached lookup can be specified by a JSON
|Property|Description|Required|Default| |Property|Description|Required|Default|
|--------|-----------|--------|-------| |--------|-----------|--------|-------|
|`pollPeriod`|Period between polling for updates|No|0 (only once)| |`pollPeriod`|Period between polling for updates|No|0 (only once)|
|`uri`|URI for the file of interest, specified as a file, hdfs, or s3 path|No|Use `uriPrefix`| |`uri`|URI for the file of interest, specified as a file, hdfs, s3 or gs path|No|Use `uriPrefix`|
|`uriPrefix`|A URI that specifies a directory (or other searchable resource) in which to search for files|No|Use `uri`| |`uriPrefix`|A URI that specifies a directory (or other searchable resource) in which to search for files|No|Use `uri`|
|`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`| |`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`|
|`namespaceParseSpec`|How to interpret the data at the URI|Yes|| |`namespaceParseSpec`|How to interpret the data at the URI|Yes||
One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), or S3 (s3://) location. HTTP location is not currently supported. One of either `uri` or `uriPrefix` must be specified, as either a local file system (file://), HDFS (hdfs://), S3 (s3://) or GCS (gs://) location. HTTP location is not currently supported.
The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data. The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data.

View File

@ -25,6 +25,7 @@ import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;
import org.apache.druid.storage.google.GoogleUtils; import org.apache.druid.storage.google.GoogleUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -47,7 +48,7 @@ public class GoogleCloudStorageEntity extends RetryingInputEntity
@Override @Override
public URI getUri() public URI getUri()
{ {
return location.toUri(GoogleCloudStorageInputSource.SCHEME); return location.toUri(GoogleStorageDruidModule.SCHEME_GS);
} }
@Override @Override

View File

@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;
import org.apache.druid.storage.google.GoogleUtils; import org.apache.druid.storage.google.GoogleUtils;
import org.apache.druid.utils.Streams; import org.apache.druid.utils.Streams;
@ -47,8 +48,6 @@ import java.util.stream.Stream;
public class GoogleCloudStorageInputSource extends CloudObjectInputSource public class GoogleCloudStorageInputSource extends CloudObjectInputSource
{ {
public static final String SCHEME = "gs";
private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class); private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class);
private final GoogleStorage storage; private final GoogleStorage storage;
@ -63,7 +62,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
) )
{ {
super(SCHEME, uris, prefixes, objects); super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects);
this.storage = storage; this.storage = storage;
this.inputDataConfig = inputDataConfig; this.inputDataConfig = inputDataConfig;
} }

View File

@ -37,7 +37,7 @@ public class GoogleDataSegmentPuller implements URIDataPuller
{ {
private static final Logger LOG = new Logger(GoogleDataSegmentPuller.class); private static final Logger LOG = new Logger(GoogleDataSegmentPuller.class);
private final GoogleStorage storage; protected final GoogleStorage storage;
@Inject @Inject
public GoogleDataSegmentPuller(final GoogleStorage storage) public GoogleDataSegmentPuller(final GoogleStorage storage)

View File

@ -30,6 +30,8 @@ import com.google.api.services.storage.Storage;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
import org.apache.druid.firehose.google.StaticGoogleBlobStoreFirehoseFactory; import org.apache.druid.firehose.google.StaticGoogleBlobStoreFirehoseFactory;
import org.apache.druid.guice.Binders; import org.apache.druid.guice.Binders;
@ -42,7 +44,8 @@ import java.util.List;
public class GoogleStorageDruidModule implements DruidModule public class GoogleStorageDruidModule implements DruidModule
{ {
static final String SCHEME = "google"; public static final String SCHEME = "google";
public static final String SCHEME_GS = "gs";
private static final Logger LOG = new Logger(GoogleStorageDruidModule.class); private static final Logger LOG = new Logger(GoogleStorageDruidModule.class);
private static final String APPLICATION_NAME = "druid-google-extensions"; private static final String APPLICATION_NAME = "druid-google-extensions";
@ -94,6 +97,10 @@ public class GoogleStorageDruidModule implements DruidModule
Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class); Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", GoogleTaskLogsConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", GoogleTaskLogsConfig.class);
binder.bind(GoogleTaskLogs.class).in(LazySingleton.class); binder.bind(GoogleTaskLogs.class).in(LazySingleton.class);
MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class)
.addBinding(SCHEME_GS)
.to(GoogleTimestampVersionedDataFinder.class)
.in(LazySingleton.class);
} }
@Provides @Provides

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.inject.Inject;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.regex.Pattern;
public class GoogleTimestampVersionedDataFinder extends GoogleDataSegmentPuller
implements SearchableVersionedDataFinder<URI>
{
private static final long MAX_LISTING_KEYS = 1000;
@Inject
public GoogleTimestampVersionedDataFinder(final GoogleStorage storage)
{
super(storage);
}
@Override
public URI getLatestVersion(URI descriptorBase, @Nullable Pattern pattern)
{
try {
long mostRecent = Long.MIN_VALUE;
URI latest = null;
final CloudObjectLocation baseLocation = new CloudObjectLocation(descriptorBase);
final Objects objects = storage.list(baseLocation.getBucket()).setPrefix(baseLocation.getPath()).setMaxResults(MAX_LISTING_KEYS).execute();
for (StorageObject storageObject : objects.getItems()) {
if (GoogleUtils.isDirectoryPlaceholder(storageObject)) {
continue;
}
// remove path prefix from file name
final CloudObjectLocation objectLocation = new CloudObjectLocation(storageObject.getBucket(),
storageObject.getName()
);
final String keyString = StringUtils
.maybeRemoveLeadingSlash(storageObject.getName().substring(baseLocation.getPath().length()));
if (pattern != null && !pattern.matcher(keyString).matches()) {
continue;
}
final long latestModified = storageObject.getUpdated().getValue();
if (latestModified >= mostRecent) {
mostRecent = latestModified;
latest = objectLocation.toUri(GoogleStorageDruidModule.SCHEME_GS);
}
}
return latest;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -23,7 +23,6 @@ import com.google.api.client.http.HttpResponseException;
import com.google.api.services.storage.model.StorageObject; import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -53,7 +52,7 @@ public class GoogleUtils
public static URI objectToUri(StorageObject object) public static URI objectToUri(StorageObject object)
{ {
return objectToCloudObjectLocation(object).toUri(GoogleCloudStorageInputSource.SCHEME); return objectToCloudObjectLocation(object).toUri(GoogleStorageDruidModule.SCHEME_GS);
} }
public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object) public static CloudObjectLocation objectToCloudObjectLocation(StorageObject object)
@ -91,7 +90,7 @@ public class GoogleUtils
{ {
final Iterator<StorageObject> iterator = lazyFetchingStorageObjectsIterator( final Iterator<StorageObject> iterator = lazyFetchingStorageObjectsIterator(
storage, storage,
ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("gs")).iterator(), ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS)).iterator(),
config.getMaxListingLength() config.getMaxListingLength()
); );
@ -105,4 +104,18 @@ public class GoogleUtils
} }
} }
} }
/**
* Similar to {@link org.apache.druid.storage.s3.ObjectSummaryIterator#isDirectoryPlaceholder}
* Copied to avoid creating dependency on s3 extensions
*/
public static boolean isDirectoryPlaceholder(final StorageObject storageObject)
{
// Recognize "standard" directory place-holder indications
if (storageObject.getName().endsWith("/") && storageObject.getSize().intValue() == 0) {
return true;
}
// Recognize place-holder objects created by the Google Storage console or S3 Organizer Firefox extension.
return storageObject.getName().endsWith("_$folder$") && storageObject.getSize().intValue() == 0;
}
} }

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google;
import com.google.api.client.util.DateTime;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.util.regex.Pattern;
public class GoogleTimestampVersionedDataFinderTest
{
@Test
public void getLatestVersion()
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0";
// object for directory prefix/dir/0/
final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "//", 0);
storageObject1.setUpdated(new DateTime(System.currentTimeMillis()));
final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v1", 1);
storageObject2.setUpdated(new DateTime(System.currentTimeMillis()));
final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/v2", 1);
storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100));
final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/other", 4);
storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100));
final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4));
final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage);
Pattern pattern = Pattern.compile("v.*");
URI latest = finder.getLatestVersion(URI.create(StringUtils.format("gs://%s/%s", bucket, keyPrefix)), pattern);
URI expected = URI.create(StringUtils.format("gs://%s/%s", bucket, storageObject3.getName()));
Assert.assertEquals(expected, latest);
}
@Test
public void getLatestVersionTrailingSlashKeyPrefix()
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0/";
// object for directory prefix/dir/0/
final StorageObject storageObject1 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "/", 0);
storageObject1.setUpdated(new DateTime(System.currentTimeMillis()));
final StorageObject storageObject2 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v1", 1);
storageObject2.setUpdated(new DateTime(System.currentTimeMillis()));
final StorageObject storageObject3 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "v2", 1);
storageObject3.setUpdated(new DateTime(System.currentTimeMillis() + 100));
final StorageObject storageObject4 = ObjectStorageIteratorTest.makeStorageObject(bucket, keyPrefix + "other", 4);
storageObject4.setUpdated(new DateTime(System.currentTimeMillis() + 100));
final GoogleStorage storage = ObjectStorageIteratorTest.makeMockClient(ImmutableList.of(storageObject1, storageObject2, storageObject3, storageObject4));
final GoogleTimestampVersionedDataFinder finder = new GoogleTimestampVersionedDataFinder(storage);
Pattern pattern = Pattern.compile("v.*");
URI latest = finder.getLatestVersion(URI.create(StringUtils.format("gs://%s/%s", bucket, keyPrefix)), pattern);
URI expected = URI.create(StringUtils.format("gs://%s/%s", bucket, storageObject3.getName()));
Assert.assertEquals(expected, latest);
}
}

View File

@ -194,7 +194,7 @@ public class ObjectStorageIteratorTest
* Makes a mock Google Storage client that handles enough of "List" to test the functionality of the * Makes a mock Google Storage client that handles enough of "List" to test the functionality of the
* {@link ObjectStorageIterator} class. * {@link ObjectStorageIterator} class.
*/ */
private static GoogleStorage makeMockClient(final List<StorageObject> storageObjects) static GoogleStorage makeMockClient(final List<StorageObject> storageObjects)
{ {
return new GoogleStorage(null) return new GoogleStorage(null)
{ {
@ -286,7 +286,7 @@ public class ObjectStorageIteratorTest
return new MockStorage().mockList(bucket, storageObjects); return new MockStorage().mockList(bucket, storageObjects);
} }
private static StorageObject makeStorageObject(final String bucket, final String key, final long size) static StorageObject makeStorageObject(final String bucket, final String key, final long size)
{ {
final StorageObject summary = new StorageObject(); final StorageObject summary = new StorageObject();
summary.setBucket(bucket); summary.setBucket(bucket);