Always use file sizes when determining batch ingest splits (#13955)

* Always use file sizes when determining batch ingest splits.

Main changes:

1) Update CloudObjectInputSource and its subclasses (S3, GCS,
   Azure, Aliyun OSS) to use SplitHintSpecs in all cases. Previously, they
   were only used for prefixes, not uris or objects.

2) Update ExternalInputSpecSlicer (MSQ) to consider file size. Previously,
   file size was ignored; all files were treated as equal weight when
   determining splits.

A side effect of these changes is that we'll make additional network
calls to find the sizes of objects when users specify URIs or objects
as opposed to prefixes. IMO, this is worth it because it's the only way
to respect the user's split hint and task assignment settings.

Secondary changes:

1) S3, Aliyun OSS: Use getObjectMetadata instead of listObjects to get
   metadata for a single object. This is a simpler call that is also
   expected to be less expensive.

2) Azure: Fix a bug where getBlobLength did not populate blob
   reference attributes, and therefore would not actually retrieve the
   blob length.

3) MSQ: Align dynamic slicing logic between ExternalInputSpecSlicer and
   TableInputSpecSlicer.

4) MSQ: Adjust WorkerInputs to ensure there is always at least one
   worker, even if it has a nil slice.

* Add msqCompatible to testGroupByWithImpossibleTimeFilter.

* Fix tests.

* Add additional tests.

* Remove unused stuff.

* Remove more unused stuff.

* Adjust thresholds.

* Remove irrelevant test.

* Fix comments.

* Fix bug.

* Updates.
This commit is contained in:
Gian Merlino 2023-04-05 08:54:01 -07:00 committed by GitHub
parent e6a11707cb
commit 319f99db05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1438 additions and 671 deletions

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.aliyun;
import com.aliyun.oss.OSS;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -30,18 +31,15 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CloudObjectSplitWidget;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.storage.aliyun.OssInputDataConfig;
import org.apache.druid.storage.aliyun.OssStorageDruidModule;
import org.apache.druid.storage.aliyun.OssUtils;
import org.apache.druid.utils.Streams;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.nio.file.FileSystems;
@ -50,8 +48,6 @@ import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class OssInputSource extends CloudObjectInputSource
{
@ -113,18 +109,37 @@ public class OssInputSource extends CloudObjectInputSource
}
@Override
protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
protected CloudObjectSplitWidget getSplitWidget()
{
final Iterator<List<OSSObjectSummary>> splitIterator = splitHintSpec.split(
getIterableObjectsFromPrefixes().iterator(),
object -> new InputFileAttribute(object.getSize())
);
class SplitWidget implements CloudObjectSplitWidget
{
@Override
public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
{
return Iterators.transform(
OssUtils.objectSummaryIterator(
clientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength()
),
object -> new LocationWithSize(object.getBucketName(), object.getKey(), object.getSize())
);
}
return Streams.sequentialStreamFrom(splitIterator)
.map(objects -> objects.stream()
.map(OssUtils::summaryToCloudObjectLocation)
.collect(Collectors.toList()))
.map(InputSplit::new);
@Override
public long getObjectSize(CloudObjectLocation location)
{
final ObjectMetadata objectMetadata = OssUtils.getSingleObjectMetadata(
clientSupplier.get(),
location.getBucket(),
location.getPath()
);
return objectMetadata.getContentLength();
}
}
return new SplitWidget();
}
@Override

View File

@ -22,7 +22,7 @@ package org.apache.druid.storage.aliyun;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.io.ByteSource;
@ -151,8 +151,8 @@ public class OssDataSegmentPuller implements URIDataPuller
private FileObject buildFileObject(final URI uri) throws OSSException
{
final CloudObjectLocation coords = new CloudObjectLocation(OssUtils.checkURI(uri));
final OSSObjectSummary objectSummary =
OssUtils.getSingleObjectSummary(client, coords.getBucket(), coords.getPath());
final ObjectMetadata objectMetadata =
OssUtils.getSingleObjectMetadata(client, coords.getBucket(), coords.getPath());
final String path = uri.getPath();
return new FileObject()
@ -181,7 +181,7 @@ public class OssDataSegmentPuller implements URIDataPuller
try {
if (ossObject == null) {
// lazily promote to full GET
ossObject = client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
ossObject = client.getObject(coords.getBucket(), coords.getPath());
}
final InputStream in = ossObject.getObjectContent();
@ -230,7 +230,7 @@ public class OssDataSegmentPuller implements URIDataPuller
@Override
public long getLastModified()
{
return objectSummary.getLastModified().getTime();
return objectMetadata.getLastModified().getTime();
}
@Override
@ -277,9 +277,9 @@ public class OssDataSegmentPuller implements URIDataPuller
{
try {
final CloudObjectLocation coords = new CloudObjectLocation(OssUtils.checkURI(uri));
final OSSObjectSummary objectSummary =
OssUtils.getSingleObjectSummary(client, coords.getBucket(), coords.getPath());
return StringUtils.format("%d", objectSummary.getLastModified().getTime());
final ObjectMetadata objectMetadata =
OssUtils.getSingleObjectMetadata(client, coords.getBucket(), coords.getPath());
return StringUtils.format("%d", objectMetadata.getLastModified().getTime());
}
catch (OSSException e) {
if (OssUtils.isServiceExceptionRecoverable(e)) {

View File

@ -22,15 +22,13 @@ package org.apache.druid.storage.aliyun;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.PutObjectRequest;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils;
@ -161,34 +159,20 @@ public class OssUtils
}
/**
* Gets a single {@link OSSObjectSummary} from aliyun OSS. Since this method might return a wrong object if there are multiple
* objects that match the given key, this method should be used only when it's guaranteed that the given key is unique
* in the given bucket.
* Gets a single {@link ObjectMetadata} from Aliyun OSS.
*
* @param client aliyun OSS client
* @param bucket aliyun OSS bucket
* @param key unique key for the object to be retrieved
*/
public static OSSObjectSummary getSingleObjectSummary(OSS client, String bucket, String key)
public static ObjectMetadata getSingleObjectMetadata(OSS client, String bucket, String key)
{
final ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
request.setPrefix(key);
request.setMaxKeys(1);
final ObjectListing result = client.listObjects(request);
// Using getObjectSummaries().size() instead of getKeyCount as, in some cases
// it is observed that even though the getObjectSummaries returns some data
// keyCount is still zero.
if (result.getObjectSummaries().size() == 0) {
throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key);
try {
return retry(() -> client.getObjectMetadata(bucket, key));
}
final OSSObjectSummary objectSummary = result.getObjectSummaries().get(0);
if (!objectSummary.getBucketName().equals(bucket) || !objectSummary.getKey().equals(key)) {
throw new ISE("Wrong object[%s] for bucket[%s] and key[%s]", objectSummary, bucket, key);
catch (Exception e) {
throw new RuntimeException(e);
}
return objectSummary;
}
/**

View File

@ -27,6 +27,7 @@ import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.Module;
@ -101,9 +102,14 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
URI.create("oss://bar/foo/file2.csv.gz")
);
private static final List<List<CloudObjectLocation>> EXPECTED_COORDS =
private static final List<CloudObjectLocation> EXPECTED_OBJECTS =
EXPECTED_URIS.stream()
.map(uri -> Collections.singletonList(new CloudObjectLocation(uri)))
.map(CloudObjectLocation::new)
.collect(Collectors.toList());
private static final List<List<CloudObjectLocation>> EXPECTED_COORDS =
EXPECTED_OBJECTS.stream()
.map(Collections::singletonList)
.collect(Collectors.toList());
private static final List<URI> PREFIXES = Arrays.asList(
@ -318,6 +324,13 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
@Test
public void testWithUrisSplit()
{
EasyMock.reset(OSSCLIENT);
EasyMock.expect(OSSCLIENT.getObjectMetadata(EXPECTED_OBJECTS.get(0).getBucket(), EXPECTED_OBJECTS.get(0).getPath()))
.andReturn(objectMetadataWithSize(CONTENT.length));
EasyMock.expect(OSSCLIENT.getObjectMetadata(EXPECTED_OBJECTS.get(1).getBucket(), EXPECTED_OBJECTS.get(1).getPath()))
.andReturn(objectMetadataWithSize(CONTENT.length));
EasyMock.replay(OSSCLIENT);
OssInputSource inputSource = new OssInputSource(
OSSCLIENT,
INPUT_DATA_CONFIG,
@ -330,10 +343,11 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
new MaxSizeSplitHintSpec(10, null)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(OSSCLIENT);
}
@Test
@ -683,4 +697,11 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
throw new UnsupportedOperationException();
}
}
private static ObjectMetadata objectMetadataWithSize(final long size)
{
final ObjectMetadata retVal = new ObjectMetadata();
retVal.setContentLength(size);
return retVal;
}
}

View File

@ -21,10 +21,9 @@ package org.apache.druid.storage.aliyun;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
@ -58,24 +57,28 @@ public class OssDataSegmentPullerTest
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0";
String expectedKey = keyPrefix + "/renames-0.gz";
OSS ossClient = EasyMock.createStrictMock(OSS.class);
final OSSObjectSummary objectSummary = new OSSObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(keyPrefix + "/renames-0.gz");
objectSummary.setLastModified(new Date(0));
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setLastModified(new Date(0));
final ObjectListing result = new ObjectListing();
result.getObjectSummaries().add(objectSummary);
EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
.andReturn(result)
EasyMock.expect(ossClient.getObjectMetadata(bucket, expectedKey))
.andReturn(objectMetadata)
.once();
OssDataSegmentPuller puller = new OssDataSegmentPuller(ossClient);
EasyMock.replay(ossClient);
String version = puller.getVersion(URI.create(StringUtils.format(OssStorageDruidModule.SCHEME + "://%s/%s", bucket, objectSummary.getKey())));
String version = puller.getVersion(
URI.create(
StringUtils.format(
OssStorageDruidModule.SCHEME + "://%s/%s",
bucket,
expectedKey
)
)
);
EasyMock.verify(ossClient);
@ -107,16 +110,16 @@ public class OssDataSegmentPullerTest
objectSummary.setKey(keyPrefix + "/renames-0.gz");
objectSummary.setLastModified(new Date(0));
final ObjectListing listObjectsResult = new ObjectListing();
listObjectsResult.getObjectSummaries().add(objectSummary);
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setLastModified(new Date(1));
final File tmpDir = temporaryFolder.newFolder("gzTestDir");
EasyMock.expect(ossClient.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(true)
.once();
EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
.andReturn(listObjectsResult)
EasyMock.expect(ossClient.getObjectMetadata(object0.getBucketName(), object0.getKey()))
.andReturn(objectMetadata)
.once();
EasyMock.expect(ossClient.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(object0)
@ -159,13 +162,8 @@ public class OssDataSegmentPullerTest
object0.getObjectMetadata().setLastModified(new Date(0));
object0.setObjectContent(new FileInputStream(tmpFile));
final OSSObjectSummary objectSummary = new OSSObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(keyPrefix + "/renames-0.gz");
objectSummary.setLastModified(new Date(0));
final ObjectListing listObjectsResult = new ObjectListing();
listObjectsResult.getObjectSummaries().add(objectSummary);
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setLastModified(new Date(0));
File tmpDir = temporaryFolder.newFolder("gzTestDir");
@ -173,14 +171,14 @@ public class OssDataSegmentPullerTest
EasyMock.expect(ossClient.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(true)
.once();
EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
.andReturn(listObjectsResult)
EasyMock.expect(ossClient.getObjectMetadata(bucket, object0.getKey()))
.andReturn(objectMetadata)
.once();
EasyMock.expect(ossClient.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andThrow(exception)
.once();
EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
.andReturn(listObjectsResult)
EasyMock.expect(ossClient.getObjectMetadata(bucket, object0.getKey()))
.andReturn(objectMetadata)
.once();
EasyMock.expect(ossClient.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andReturn(object0)

View File

@ -172,27 +172,27 @@
<limit>
<counter>INSTRUCTION</counter>
<value>COVEREDRATIO</value>
<minimum>0.86</minimum>
<minimum>0.8</minimum>
</limit>
<limit>
<counter>LINE</counter>
<value>COVEREDRATIO</value>
<minimum>0.85</minimum>
<minimum>0.8</minimum>
</limit>
<limit>
<counter>BRANCH</counter>
<value>COVEREDRATIO</value>
<minimum>0.88</minimum>
<minimum>0.8</minimum>
</limit>
<limit>
<counter>COMPLEXITY</counter>
<value>COVEREDRATIO</value>
<minimum>0.80</minimum>
<minimum>0.7</minimum>
</limit>
<limit>
<counter>METHOD</counter>
<value>COVEREDRATIO</value>
<minimum>0.78</minimum>
<minimum>0.7</minimum>
</limit>
<limit>
<counter>CLASS</counter>

View File

@ -24,31 +24,23 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputFileAttribute;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CloudObjectSplitWidget;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.apache.druid.utils.Streams;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Abstracts the Azure storage system where input data is stored. Allows users to retrieve entities in
@ -61,7 +53,6 @@ public class AzureInputSource extends CloudObjectInputSource
private final AzureStorage storage;
private final AzureEntityFactory entityFactory;
private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private final AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
private final AzureInputDataConfig inputDataConfig;
@JsonCreator
@ -69,7 +60,6 @@ public class AzureInputSource extends CloudObjectInputSource
@JacksonInject AzureStorage storage,
@JacksonInject AzureEntityFactory entityFactory,
@JacksonInject AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
@JacksonInject AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter,
@JacksonInject AzureInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@ -85,10 +75,6 @@ public class AzureInputSource extends CloudObjectInputSource
"AzureCloudBlobIterableFactory"
);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "AzureInputDataConfig");
this.azureCloudBlobToLocationConverter = Preconditions.checkNotNull(
azureCloudBlobToLocationConverter,
"AzureCloudBlobToLocationConverter"
);
}
@Override
@ -98,7 +84,6 @@ public class AzureInputSource extends CloudObjectInputSource
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
null,
null,
@ -114,37 +99,48 @@ public class AzureInputSource extends CloudObjectInputSource
}
@Override
protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
protected CloudObjectSplitWidget getSplitWidget()
{
final Iterator<List<CloudBlobHolder>> splitIterator = splitHintSpec.split(
getIterableObjectsFromPrefixes().iterator(),
blobHolder -> new InputFileAttribute(blobHolder.getBlobLength())
);
return Streams.sequentialStreamFrom(splitIterator)
.map(objects -> objects.stream()
.map(azureCloudBlobToLocationConverter::createCloudObjectLocation)
.collect(Collectors.toList()))
.map(InputSplit::new);
}
private Iterable<CloudBlobHolder> getIterableObjectsFromPrefixes()
{
return () -> {
Iterator<CloudBlobHolder> iterator = azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength()).iterator();
// Skip files that didn't match glob filter.
if (StringUtils.isNotBlank(getObjectGlob())) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
iterator = Iterators.filter(
iterator,
object -> m.matches(Paths.get(object.getName()))
class SplitWidget implements CloudObjectSplitWidget
{
@Override
public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
{
return Iterators.transform(
azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength()).iterator(),
blob -> {
try {
return new LocationWithSize(
blob.getContainerName(),
blob.getName(),
blob.getBlobLength()
);
}
catch (URISyntaxException | StorageException e) {
throw new RuntimeException(e);
}
}
);
}
return iterator;
};
@Override
public long getObjectSize(CloudObjectLocation location)
{
try {
final CloudBlob blobWithAttributes = storage.getBlobReferenceWithAttributes(
location.getBucket(),
location.getPath()
);
return blobWithAttributes.getProperties().getLength();
}
catch (URISyntaxException | StorageException e) {
throw new RuntimeException(e);
}
}
}
return new SplitWidget();
}
@Override
@ -155,7 +151,6 @@ public class AzureInputSource extends CloudObjectInputSource
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig
);
}
@ -176,7 +171,6 @@ public class AzureInputSource extends CloudObjectInputSource
return storage.equals(that.storage) &&
entityFactory.equals(that.entityFactory) &&
azureCloudBlobIterableFactory.equals(that.azureCloudBlobIterableFactory) &&
azureCloudBlobToLocationConverter.equals(that.azureCloudBlobToLocationConverter) &&
inputDataConfig.equals(that.inputDataConfig);
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
/**
* Converts a {@link CloudBlobHolder} object to a {@link CloudObjectLocation} object
*/
public class AzureCloudBlobHolderToCloudObjectLocationConverter
implements ICloudSpecificObjectToCloudObjectLocationConverter<CloudBlobHolder>
{
@Override
public CloudObjectLocation createCloudObjectLocation(CloudBlobHolder cloudBlob)
{
try {
return new CloudObjectLocation(cloudBlob.getContainerName(), cloudBlob.getName());
}
catch (Exception e) {
throw new RE(e);
}
}
}

View File

@ -28,6 +28,7 @@ import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.druid.java.util.common.logger.Logger;
@ -97,10 +98,18 @@ public class AzureStorage
}
}
public CloudBlob getBlobReferenceWithAttributes(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
final CloudBlockBlob blobReference = getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath);
blobReference.downloadAttributes();
return blobReference;
}
public long getBlobLength(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath).getProperties().getLength();
return getBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength();
}
public InputStream getBlobInputStream(final String containerName, final String blobPath)

View File

@ -106,7 +106,6 @@ public class AzureStorageDruidModule implements DruidModule
Binders.taskLogsBinder(binder).addBinding(SCHEME).to(AzureTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", AzureTaskLogsConfig.class);
binder.bind(AzureTaskLogs.class).in(LazySingleton.class);
binder.bind(AzureCloudBlobHolderToCloudObjectLocationConverter.class).in(LazySingleton.class);
binder.install(new FactoryModuleBuilder()
.build(AzureByteSourceFactory.class));
binder.install(new FactoryModuleBuilder()

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureDataSegmentConfig;
import org.apache.druid.storage.azure.AzureInputDataConfig;
@ -65,7 +64,6 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
private AzureStorage azureStorage;
private AzureEntityFactory entityFactory;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
private AzureInputDataConfig inputDataConfig;
static {
@ -88,7 +86,6 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
azureStorage = createMock(AzureStorage.class);
entityFactory = createMock(AzureEntityFactory.class);
azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class);
inputDataConfig = createMock(AzureInputDataConfig.class);
}
@ -153,10 +150,6 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
injectableValues.addValue(AzureStorage.class, azureStorage);
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory);
injectableValues.addValue(
AzureCloudBlobHolderToCloudObjectLocationConverter.class,
azureCloudBlobToLocationConverter
);
injectableValues.addValue(AzureInputDataConfig.class, inputDataConfig);
return injectableValues;
}

View File

@ -27,7 +27,6 @@ import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
import org.apache.druid.storage.azure.AzureCloudBlobIterable;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
@ -65,7 +64,6 @@ public class AzureInputSourceTest extends EasyMockSupport
private AzureStorage storage;
private AzureEntityFactory entityFactory;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
private AzureInputDataConfig inputDataConfig;
private InputSplit<List<CloudObjectLocation>> inputSplit;
@ -92,7 +90,6 @@ public class AzureInputSourceTest extends EasyMockSupport
inputSplit = createMock(InputSplit.class);
azureEntity1 = createMock(AzureEntity.class);
azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class);
inputDataConfig = createMock(AzureInputDataConfig.class);
cloudBlobDruid1 = createMock(CloudBlobHolder.class);
azureCloudBlobIterable = createMock(AzureCloudBlobIterable.class);
@ -106,7 +103,6 @@ public class AzureInputSourceTest extends EasyMockSupport
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
@ -127,7 +123,6 @@ public class AzureInputSourceTest extends EasyMockSupport
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
@ -142,7 +137,7 @@ public class AzureInputSourceTest extends EasyMockSupport
}
@Test
public void test_getPrefixesSplitStream_successfullyCreatesCloudLocation_returnsExpectedLocations()
public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLocations() throws Exception
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
@ -152,8 +147,8 @@ public class AzureInputSourceTest extends EasyMockSupport
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, MAX_LISTING_LENGTH)).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
.andReturn(CLOUD_OBJECT_LOCATION_1);
EasyMock.expect(cloudBlobDruid1.getContainerName()).andReturn(CONTAINER).anyTimes();
EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
replayAll();
@ -161,7 +156,6 @@ public class AzureInputSourceTest extends EasyMockSupport
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
prefixes,
@ -169,7 +163,8 @@ public class AzureInputSourceTest extends EasyMockSupport
null
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.createSplits(
null,
new MaxSizeSplitHintSpec(null, 1)
);
@ -181,6 +176,7 @@ public class AzureInputSourceTest extends EasyMockSupport
@Test
public void test_getPrefixesSplitStream_withObjectGlob_successfullyCreatesCloudLocation_returnsExpectedLocations()
throws Exception
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
@ -199,9 +195,8 @@ public class AzureInputSourceTest extends EasyMockSupport
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, MAX_LISTING_LENGTH)).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
.andReturn(CLOUD_OBJECT_LOCATION_1);
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
EasyMock.expect(cloudBlobDruid1.getContainerName()).andReturn(CONTAINER).anyTimes();
EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
replayAll();
@ -210,7 +205,6 @@ public class AzureInputSourceTest extends EasyMockSupport
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
prefixes,
@ -218,7 +212,8 @@ public class AzureInputSourceTest extends EasyMockSupport
objectGlob
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.createSplits(
null,
new MaxSizeSplitHintSpec(null, 1)
);
@ -239,7 +234,6 @@ public class AzureInputSourceTest extends EasyMockSupport
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
prefixes,
@ -260,7 +254,6 @@ public class AzureInputSourceTest extends EasyMockSupport
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
prefixes,
@ -281,7 +274,6 @@ public class AzureInputSourceTest extends EasyMockSupport
.withNonnullFields("storage")
.withNonnullFields("entityFactory")
.withNonnullFields("azureCloudBlobIterableFactory")
.withNonnullFields("azureCloudBlobToLocationConverter")
.withNonnullFields("inputDataConfig")
.withNonnullFields("objectGlob")
.withNonnullFields("scheme")

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AzureCloudBlobHolderToCloudObjectLocationConverterTest extends EasyMockSupport
{
private static final String CONTAINER1 = "container1";
private static final String BLOB1 = "blob1";
private CloudBlobHolder cloudBlob;
private AzureCloudBlobHolderToCloudObjectLocationConverter converter;
@Before
public void setup()
{
cloudBlob = createMock(CloudBlobHolder.class);
}
@Test
public void test_createCloudObjectLocation_returnsExpectedLocation() throws Exception
{
EasyMock.expect(cloudBlob.getContainerName()).andReturn(CONTAINER1);
EasyMock.expect(cloudBlob.getName()).andReturn(BLOB1);
replayAll();
CloudObjectLocation expectedLocation = new CloudObjectLocation(CONTAINER1, BLOB1);
converter = new AzureCloudBlobHolderToCloudObjectLocationConverter();
CloudObjectLocation actualLocation = converter.createCloudObjectLocation(cloudBlob);
Assert.assertEquals(expectedLocation, actualLocation);
verifyAll();
}
}

View File

@ -80,7 +80,7 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
try {
AZURE_ACCOUNT_NAME = "azureAccount1";
AZURE_ACCOUNT_KEY = Base64.getUrlEncoder()
.encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8.toString()));
.encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8));
AZURE_SHARED_ACCESS_TOKEN = "dummyToken";
AZURE_CONTAINER = "azureContainer1";
AZURE_PREFIX = "azurePrefix1";
@ -193,17 +193,6 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
Assert.assertSame(cloudBlobClient.get(), azureStorage.getCloudBlobClient());
}
@Test
public void testGetAzureCloudBlobToLocationConverterExpectedConverted()
{
injector = makeInjectorWithProperties(PROPERTIES);
AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobLocationConverter1 = injector.getInstance(
AzureCloudBlobHolderToCloudObjectLocationConverter.class);
AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobLocationConverter2 = injector.getInstance(
AzureCloudBlobHolderToCloudObjectLocationConverter.class);
Assert.assertSame(azureCloudBlobLocationConverter1, azureCloudBlobLocationConverter2);
}
@Test
public void testGetAzureByteSourceFactoryCanCreateAzureByteSource()
{

View File

@ -24,32 +24,24 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CloudObjectSplitWidget;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;
import org.apache.druid.storage.google.GoogleUtils;
import org.apache.druid.utils.Streams;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class GoogleCloudStorageInputSource extends CloudObjectInputSource
{
@ -79,73 +71,63 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
return new GoogleCloudStorageEntity(storage, location);
}
@Override
protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
{
final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
storageObjectIterable().iterator(),
storageObject -> {
final BigInteger sizeInBigInteger = storageObject.getSize();
long sizeInLong;
if (sizeInBigInteger == null) {
sizeInLong = Long.MAX_VALUE;
} else {
try {
sizeInLong = sizeInBigInteger.longValueExact();
}
catch (ArithmeticException e) {
LOG.warn(
e,
"The object [%s, %s] has a size [%s] out of the range of the long type. "
+ "The max long value will be used for its size instead.",
storageObject.getBucket(),
storageObject.getName(),
sizeInBigInteger
);
sizeInLong = Long.MAX_VALUE;
}
}
return new InputFileAttribute(sizeInLong);
}
);
return Streams.sequentialStreamFrom(splitIterator)
.map(objects -> objects.stream().map(this::byteSourceFromStorageObject).collect(Collectors.toList()))
.map(InputSplit::new);
}
@Override
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
{
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getObjectGlob());
}
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
@Override
protected CloudObjectSplitWidget getSplitWidget()
{
return GoogleUtils.objectToCloudObjectLocation(storageObject);
}
private Iterable<StorageObject> storageObjectIterable()
{
return () -> {
Iterator<StorageObject> iterator = GoogleUtils.lazyFetchingStorageObjectsIterator(
storage,
getPrefixes().iterator(),
inputDataConfig.getMaxListingLength()
);
// Skip files that didn't match glob filter.
if (StringUtils.isNotBlank(getObjectGlob())) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
iterator = Iterators.filter(
iterator,
object -> m.matches(Paths.get(object.getName()))
class SplitWidget implements CloudObjectSplitWidget
{
@Override
public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
{
return Iterators.transform(
GoogleUtils.lazyFetchingStorageObjectsIterator(
storage,
prefixes.iterator(),
inputDataConfig.getMaxListingLength()
),
object -> new LocationWithSize(object.getBucket(), object.getName(), getSize(object))
);
}
return iterator;
};
@Override
public long getObjectSize(CloudObjectLocation location) throws IOException
{
final StorageObject storageObject = storage.getMetadata(location.getBucket(), location.getPath());
return getSize(storageObject);
}
}
return new SplitWidget();
}
private static long getSize(final StorageObject object)
{
final BigInteger sizeInBigInteger = object.getSize();
if (sizeInBigInteger == null) {
return Long.MAX_VALUE;
} else {
try {
return sizeInBigInteger.longValueExact();
}
catch (ArithmeticException e) {
LOG.warn(
e,
"The object [%s, %s] has a size [%s] out of the range of the long type. "
+ "The max long value will be used for its size instead.",
object.getBucket(),
object.getName(),
sizeInBigInteger
);
return Long.MAX_VALUE;
}
}
}
@Override

View File

@ -22,6 +22,7 @@ package org.apache.druid.storage.google;
import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Supplier;
import java.io.IOException;
@ -66,6 +67,11 @@ public class GoogleStorage
return inputStream;
}
public StorageObject getMetadata(final String bucket, final String path) throws IOException
{
return storage.get().objects().get(bucket, path).execute();
}
public void delete(final String bucket, final String path) throws IOException
{
storage.get().objects().delete(bucket, path).execute();

View File

@ -77,6 +77,7 @@ public class GoogleUtils
* @param bucket Google Storage bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found should be deleted and false otherwise.
*
* @throws Exception
*/
public static void deleteObjectsInPath(

View File

@ -154,22 +154,50 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
}
@Test
public void testWithUrisSplit()
public void testWithUrisSplit() throws Exception
{
EasyMock.reset(STORAGE);
EasyMock.expect(
STORAGE.getMetadata(
EXPECTED_URIS.get(0).getAuthority(),
StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(0).getPath())
)
).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length)));
EasyMock.expect(
STORAGE.getMetadata(
EXPECTED_URIS.get(1).getAuthority(),
StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(1).getPath())
)
).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length)));
EasyMock.replay(STORAGE);
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
null,
new MaxSizeSplitHintSpec(10, null)
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(STORAGE);
}
@Test
public void testWithUrisGlob()
public void testWithUrisGlob() throws Exception
{
EasyMock.reset(STORAGE);
EasyMock.expect(
STORAGE.getMetadata(
EXPECTED_URIS.get(0).getAuthority(),
StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(0).getPath())
)
).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length)));
EasyMock.expect(
STORAGE.getMetadata(
EXPECTED_URIS.get(1).getAuthority(),
StringUtils.maybeRemoveLeadingSlash(EXPECTED_URIS.get(1).getPath())
)
).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length)));
EasyMock.replay(STORAGE);
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE,
INPUT_DATA_CONFIG,
@ -180,10 +208,11 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
null,
new MaxSizeSplitHintSpec(10, null)
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(STORAGE);
}
@Test

View File

@ -45,6 +45,12 @@ public interface InputSpecSlicer
* If there is a conflict between maxNumSlices and maxFilesPerSlice or maxBytesPerSlice, then maxNumSlices wins.
* This means that for small values of maxNumSlices, slices may have more than maxFilesPerSlice files, or more
* than maxBytesPerSlice bytes.
*
* The design of this method assumes that the ideal number of {@link InputSlice} can be determined by looking at
* just one {@link InputSpec} at a time. This makes sense today, since there are no situations where a
* {@link org.apache.druid.msq.kernel.StageDefinition} would be created with two {@link InputSpec} other than
* {@link org.apache.druid.msq.input.stage.StageInputSpec} (which is not dynamically splittable, so would not
* use this method anyway). If this changes in the future, we'll want to revisit the design of this method.
*/
List<InputSlice> sliceDynamic(InputSpec inputSpec, int maxNumSlices, int maxFilesPerSlice, long maxBytesPerSlice);
}

View File

@ -35,7 +35,7 @@ public class SlicerUtils
*
* Items are assigned round-robin.
*/
public static <T> List<List<T>> makeSlices(final Iterator<T> iterator, final int numSlices)
public static <T> List<List<T>> makeSlicesStatic(final Iterator<T> iterator, final int numSlices)
{
final List<List<T>> slicesList = new ArrayList<>(numSlices);
@ -57,10 +57,10 @@ public class SlicerUtils
* Creates "numSlices" lists from "iterator", trying to keep each one as evenly-weighted as possible. Some lists may
* be empty.
*
* Each item is assigned to the split list that has the lowest weight at the time that item is encountered, which
* Items are assigned to the slice that has the lowest weight at the time that item is encountered, which
* leads to pseudo-round-robin assignment.
*/
public static <T> List<List<T>> makeSlices(
public static <T> List<List<T>> makeSlicesStatic(
final Iterator<T> iterator,
final ToLongFunction<T> weightFunction,
final int numSlices
@ -98,6 +98,69 @@ public class SlicerUtils
return slicesList;
}
/**
* Creates up to "maxNumSlices" lists from "iterator".
*
* This method creates as few slices as possible, while keeping each slice under the provided limits.
*
* This function uses a greedy algorithm that starts out with a single empty slice. Items are assigned to the slice
* that has the lowest weight at the time that item is encountered. New slices are created, up to maxNumSlices, when
* the lightest existing slice exceeds either maxFilesPerSlice or maxWeightPerSlice.
*
* Slices may have more than maxFilesPerSlice files, or more than maxWeightPerSlice weight, if necessary to
* keep the total number of slices at or below maxNumSlices.
*/
public static <T> List<List<T>> makeSlicesDynamic(
final Iterator<T> iterator,
final ToLongFunction<T> weightFunction,
final int maxNumSlices,
final int maxFilesPerSlice,
final long maxWeightPerSlice
)
{
final List<List<T>> slicesList = new ArrayList<>();
final PriorityQueue<ListWithWeight<T>> pq = new PriorityQueue<>(
// Break ties with position, so earlier slices fill first.
Comparator.<ListWithWeight<T>, Long>comparing(ListWithWeight::getWeight)
.thenComparing(ListWithWeight::getPosition)
);
while (iterator.hasNext()) {
final T item = iterator.next();
final long itemWeight = weightFunction.applyAsLong(item);
// Get lightest-weight list.
ListWithWeight<T> lightestList = pq.poll();
if (lightestList == null) {
// Populate the initial list.
lightestList = new ListWithWeight<>(new ArrayList<>(), 0, 0);
slicesList.add(lightestList.getList());
}
if (!lightestList.getList().isEmpty()
&& slicesList.size() < maxNumSlices
&& (lightestList.getWeight() + itemWeight > maxWeightPerSlice
|| lightestList.getList().size() >= maxFilesPerSlice)) {
// Lightest list can't hold this item without overflowing. Add it back and make another one.
pq.add(lightestList);
lightestList = new ListWithWeight<>(new ArrayList<>(), slicesList.size(), 0);
slicesList.add(lightestList.getList());
}
lightestList.getList().add(item);
pq.add(
new ListWithWeight<>(
lightestList.getList(),
lightestList.getPosition(),
lightestList.getWeight() + itemWeight
)
);
}
return slicesList;
}
private static class ListWithWeight<T>
{
private final List<T> list;

View File

@ -22,27 +22,23 @@ package org.apache.druid.msq.input.external;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.SlicerUtils;
import org.apache.druid.segment.column.RowSignature;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Slices {@link ExternalInputSpec} into {@link ExternalInputSlice} or {@link NilInputSlice}.
@ -59,91 +55,193 @@ public class ExternalInputSpecSlicer implements InputSpecSlicer
public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
final InputSource inputSource = externalInputSpec.getInputSource();
final InputFormat inputFormat = externalInputSpec.getInputFormat();
final RowSignature signature = externalInputSpec.getSignature();
// Worker number -> input source for that worker.
final List<List<InputSource>> workerInputSourcess;
// Figure out input splits for each worker.
if (inputSource.isSplittable()) {
//noinspection unchecked
final SplittableInputSource<Object> splittableInputSource = (SplittableInputSource<Object>) inputSource;
try {
workerInputSourcess = SlicerUtils.makeSlices(
splittableInputSource.createSplits(inputFormat, FilePerSplitHintSpec.INSTANCE)
.map(splittableInputSource::withSplit)
.iterator(),
maxNumSlices
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
if (externalInputSpec.getInputSource().isSplittable()) {
return sliceSplittableInputSource(
externalInputSpec,
new StaticSplitHintSpec(maxNumSlices),
maxNumSlices
);
} else {
workerInputSourcess = Collections.singletonList(Collections.singletonList(inputSource));
return sliceUnsplittableInputSource(externalInputSpec);
}
// Sanity check. It is a bug in this method if this exception is ever thrown.
if (workerInputSourcess.size() > maxNumSlices) {
throw new ISE("Generated too many slices [%d > %d]", workerInputSourcess.size(), maxNumSlices);
}
return IntStream.range(0, maxNumSlices)
.mapToObj(
workerNumber -> {
final List<InputSource> workerInputSources;
if (workerNumber < workerInputSourcess.size()) {
workerInputSources = workerInputSourcess.get(workerNumber);
} else {
workerInputSources = Collections.emptyList();
}
if (workerInputSources.isEmpty()) {
return NilInputSlice.INSTANCE;
} else {
return new ExternalInputSlice(workerInputSources, inputFormat, signature);
}
}
)
.collect(Collectors.toList());
}
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
final InputSpec inputSpec,
final int maxNumSlices,
final int maxFilesPerSlice,
final long maxBytesPerSlice
)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
if (!externalInputSpec.getInputSource().isSplittable()) {
return sliceStatic(inputSpec, 1);
if (externalInputSpec.getInputSource().isSplittable()) {
return sliceSplittableInputSource(
externalInputSpec,
new DynamicSplitHintSpec(maxNumSlices, maxFilesPerSlice, maxBytesPerSlice),
maxNumSlices
);
} else {
return sliceUnsplittableInputSource(externalInputSpec);
}
}
final SplittableInputSource<?> inputSource = (SplittableInputSource<?>) externalInputSpec.getInputSource();
final MaxSizeSplitHintSpec maxSizeSplitHintSpec = new MaxSizeSplitHintSpec(
new HumanReadableBytes(maxBytesPerSlice),
maxFilesPerSlice
/**
* "Slice" an unsplittable input source into a single slice.
*/
private static List<InputSlice> sliceUnsplittableInputSource(final ExternalInputSpec inputSpec)
{
return Collections.singletonList(
new ExternalInputSlice(
Collections.singletonList(inputSpec.getInputSource()),
inputSpec.getInputFormat(),
inputSpec.getSignature()
)
);
}
final long numSlices;
/**
* Slice a {@link SplittableInputSource} using a {@link SplitHintSpec}.
*/
private static List<InputSlice> sliceSplittableInputSource(
final ExternalInputSpec inputSpec,
final SplitHintSpec splitHintSpec,
final int maxNumSlices
)
{
final SplittableInputSource<Object> splittableInputSource =
(SplittableInputSource<Object>) inputSpec.getInputSource();
try {
numSlices = inputSource.createSplits(externalInputSpec.getInputFormat(), maxSizeSplitHintSpec).count();
final List<InputSplit<Object>> splitList =
splittableInputSource.createSplits(inputSpec.getInputFormat(), splitHintSpec).collect(Collectors.toList());
final List<InputSlice> assignments = new ArrayList<>();
if (splitList.size() <= maxNumSlices) {
for (final InputSplit<Object> split : splitList) {
assignments.add(splitsToSlice(inputSpec, Collections.singletonList(split)));
}
} else {
// In some cases (for example, HttpInputSource) "createSplits" ignores our splitHintSpec. If this happens,
// the number of splits may be larger than maxNumSlices. Remix the splits ourselves.
final List<List<InputSplit<Object>>> splitsList =
SlicerUtils.makeSlicesStatic(splitList.iterator(), maxNumSlices);
for (List<InputSplit<Object>> splits : splitsList) {
//noinspection rawtypes, unchecked
assignments.add(splitsToSlice(inputSpec, (List) splits));
}
}
return assignments;
}
catch (IOException e) {
throw new RuntimeException(e);
}
return sliceStatic(inputSpec, (int) Math.min(numSlices, maxNumSlices));
}
/**
* Convert {@link InputSplit} (from {@link SplittableInputSource#createSplits}) into an {@link InputSlice}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
private static InputSlice splitsToSlice(
final ExternalInputSpec spec,
final List<InputSplit<?>> splits
)
{
try {
final SplittableInputSource<?> splittableInputSource = (SplittableInputSource) spec.getInputSource();
final List<InputSource> subSources = new ArrayList<>();
for (final InputSplit<?> split : splits) {
// Use FilePerSplitHintSpec to create an InputSource per file. This allows to us track progress at
// the level of reading individual files.
((SplittableInputSource<?>) splittableInputSource.withSplit((InputSplit) split))
.createSplits(spec.getInputFormat(), FilePerSplitHintSpec.INSTANCE)
.map(subSplit -> splittableInputSource.withSplit((InputSplit) subSplit))
.forEach(subSources::add);
}
if (subSources.isEmpty()) {
return NilInputSlice.INSTANCE;
} else {
return new ExternalInputSlice(subSources, spec.getInputFormat(), spec.getSignature());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Split hint spec used by {@link #sliceStatic(InputSpec, int)}.
*/
static class StaticSplitHintSpec implements SplitHintSpec
{
private final int maxNumSlices;
public StaticSplitHintSpec(final int maxNumSlices)
{
this.maxNumSlices = maxNumSlices;
}
@Override
public <T> Iterator<List<T>> split(
final Iterator<T> inputIterator,
final Function<T, InputFileAttribute> inputAttributeExtractor
)
{
return Iterators.filter(
SlicerUtils.makeSlicesStatic(
inputIterator,
item -> inputAttributeExtractor.apply(item).getSize(),
maxNumSlices
).iterator(),
xs -> !xs.isEmpty()
);
}
}
/**
* Split hint spec used by {@link #sliceDynamic(InputSpec, int, int, long)}.
*/
static class DynamicSplitHintSpec implements SplitHintSpec
{
private final int maxNumSlices;
private final int maxFilesPerSlice;
private final long maxBytesPerSlice;
public DynamicSplitHintSpec(final int maxNumSlices, final int maxFilesPerSlice, final long maxBytesPerSlice)
{
this.maxNumSlices = maxNumSlices;
this.maxFilesPerSlice = maxFilesPerSlice;
this.maxBytesPerSlice = maxBytesPerSlice;
}
@Override
public <T> Iterator<List<T>> split(
final Iterator<T> inputIterator,
final Function<T, InputFileAttribute> inputAttributeExtractor
)
{
return Iterators.filter(
SlicerUtils.makeSlicesDynamic(
inputIterator,
item -> inputAttributeExtractor.apply(item).getSize(),
maxNumSlices,
maxFilesPerSlice,
maxBytesPerSlice
).iterator(),
xs -> !xs.isEmpty()
);
}
}
/**
* Assigns each input file to its own split.
*/
@VisibleForTesting
static class FilePerSplitHintSpec implements SplitHintSpec
{

View File

@ -68,7 +68,7 @@ public class CollectedReadablePartitions implements ReadablePartitions
@Override
public List<ReadablePartitions> split(int maxNumSplits)
{
return SlicerUtils.makeSlices(partitionToWorkerMap.int2IntEntrySet().iterator(), maxNumSplits)
return SlicerUtils.makeSlicesStatic(partitionToWorkerMap.int2IntEntrySet().iterator(), maxNumSplits)
.stream()
.map(
entries -> {

View File

@ -72,7 +72,7 @@ public class StripedReadablePartitions implements ReadablePartitions
{
final List<ReadablePartitions> retVal = new ArrayList<>();
for (List<Integer> entries : SlicerUtils.makeSlices(partitionNumbers.iterator(), maxNumSplits)) {
for (List<Integer> entries : SlicerUtils.makeSlicesStatic(partitionNumbers.iterator(), maxNumSplits)) {
if (!entries.isEmpty()) {
retVal.add(new StripedReadablePartitions(stageNumber, numWorkers, new IntAVLTreeSet(entries)));
}

View File

@ -20,8 +20,6 @@
package org.apache.druid.msq.input.table;
import com.google.common.base.Preconditions;
import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
@ -33,7 +31,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.joda.time.Interval;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -63,8 +60,13 @@ public class TableInputSpecSlicer implements InputSpecSlicer
public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
final Set<DataSegmentWithInterval> prunedSegmentSet = getPrunedSegmentSet(tableInputSpec);
return makeSlices(tableInputSpec, prunedSegmentSet, maxNumSlices);
final List<List<DataSegmentWithInterval>> assignments =
SlicerUtils.makeSlicesStatic(
getPrunedSegmentSet(tableInputSpec).iterator(),
segment -> segment.getSegment().getSize(),
maxNumSlices
);
return makeSlices(tableInputSpec, assignments);
}
@Override
@ -76,30 +78,15 @@ public class TableInputSpecSlicer implements InputSpecSlicer
)
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
final Set<DataSegmentWithInterval> prunedSegmentSet = getPrunedSegmentSet(tableInputSpec);
if (prunedSegmentSet.isEmpty()) {
return Collections.emptyList();
}
int totalFiles = 0;
long totalBytes = 0;
for (DataSegmentWithInterval segmentWithInterval : prunedSegmentSet) {
totalFiles++;
totalBytes += segmentWithInterval.getSegment().getSize();
}
final int numSlices =
Math.min(
final List<List<DataSegmentWithInterval>> assignments =
SlicerUtils.makeSlicesDynamic(
getPrunedSegmentSet(tableInputSpec).iterator(),
segment -> segment.getSegment().getSize(),
maxNumSlices,
Math.max(
Ints.checkedCast(LongMath.divide(totalFiles, maxFilesPerSlice, RoundingMode.CEILING)),
Ints.checkedCast(LongMath.divide(totalBytes, maxBytesPerSlice, RoundingMode.CEILING))
)
maxFilesPerSlice,
maxBytesPerSlice
);
return makeSlices(tableInputSpec, prunedSegmentSet, numSlices);
return makeSlices(tableInputSpec, assignments);
}
private Set<DataSegmentWithInterval> getPrunedSegmentSet(final TableInputSpec tableInputSpec)
@ -133,27 +120,16 @@ public class TableInputSpecSlicer implements InputSpecSlicer
}
}
private List<InputSlice> makeSlices(
private static List<InputSlice> makeSlices(
final TableInputSpec tableInputSpec,
final Set<DataSegmentWithInterval> prunedSegmentSet,
final int maxNumSlices
final List<List<DataSegmentWithInterval>> assignments
)
{
if (prunedSegmentSet.isEmpty()) {
return Collections.emptyList();
}
final List<InputSlice> retVal = new ArrayList<>(assignments.size());
final List<List<DataSegmentWithInterval>> assignments = SlicerUtils.makeSlices(
prunedSegmentSet.iterator(),
segment -> segment.getSegment().getSize(),
maxNumSlices
);
final List<InputSlice> retVal = new ArrayList<>();
for (final List<DataSegmentWithInterval> dataSegmentWithIntervals : assignments) {
for (final List<DataSegmentWithInterval> assignment : assignments) {
final List<RichSegmentDescriptor> descriptors = new ArrayList<>();
for (final DataSegmentWithInterval dataSegmentWithInterval : dataSegmentWithIntervals) {
for (final DataSegmentWithInterval dataSegmentWithInterval : assignment) {
descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
}

View File

@ -91,7 +91,12 @@ public class WorkerInputs
}
} else {
// Non-broadcast case: split slices across workers.
final List<InputSlice> slices = assignmentStrategy.assign(stageDef, inputSpec, stageWorkerCountMap, slicer);
List<InputSlice> slices = assignmentStrategy.assign(stageDef, inputSpec, stageWorkerCountMap, slicer);
if (slices.isEmpty()) {
// Need at least one slice, so we can have at least one worker. It's OK if it has nothing to read.
slices = Collections.singletonList(NilInputSlice.INSTANCE);
}
// Flip the slices, so it's worker number -> slices for that worker.
for (int workerNumber = 0; workerNumber < slices.size(); workerNumber++) {
@ -152,6 +157,11 @@ public class WorkerInputs
return assignmentsMap.size();
}
public Int2ObjectMap<List<InputSlice>> assignmentsMap()
{
return assignmentsMap;
}
@Override
public boolean equals(Object o)
{

View File

@ -25,7 +25,9 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.indexing.ColumnMapping;
@ -243,6 +245,44 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
@Test
public void testSelectOnFooWhereMatchesNoSegments()
{
RowSignature resultSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();
// Filter [__time >= timestamp '3000-01-01 00:00:00'] matches no segments at all.
testSelectQuery()
.setSql("select cnt,dim1 from foo where __time >= timestamp '3000-01-01 00:00:00'")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(
querySegmentSpec(
Intervals.utc(
DateTimes.of("3000").getMillis(),
Intervals.ETERNITY.getEndMillis()
)
)
)
.columns("cnt", "dim1")
.context(defaultScanQueryContext(context, resultSignature))
.build()
)
.columnMappings(ColumnMappings.identity(resultSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build()
)
.setQueryContext(context)
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
}
@Test
public void testGroupByOnFoo()
{
@ -1429,7 +1469,8 @@ public class MSQSelectTest extends MSQTestBase
+ "FROM kttm_data "
+ "GROUP BY 1")
.setExpectedValidationErrorMatcher(
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("LATEST() aggregator depends on __time column"))
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"LATEST() aggregator depends on __time column"))
)
.setExpectedRowSignature(rowSignature)
.verifyPlanningErrors();

View File

@ -29,7 +29,6 @@ import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.utils.Streams;
@ -66,6 +65,12 @@ public class ExternalInputSpecSlicerTest
Assert.assertTrue(slicer.canSliceDynamic(splittableSpec()));
}
@Test
public void test_canSliceDynamic_splittableThatIgnoresSplitHints()
{
Assert.assertTrue(slicer.canSliceDynamic(splittableSpecThatIgnoresSplitHints()));
}
@Test
public void test_canSliceDynamic_unsplittable()
{
@ -76,14 +81,20 @@ public class ExternalInputSpecSlicerTest
public void test_sliceStatic_unsplittable()
{
Assert.assertEquals(
ImmutableList.of(
unsplittableSlice("foo", "bar", "baz"),
NilInputSlice.INSTANCE
),
ImmutableList.of(unsplittableSlice("foo", "bar", "baz")),
slicer.sliceStatic(unsplittableSpec("foo", "bar", "baz"), 2)
);
}
@Test
public void test_sliceStatic_unsplittable_empty()
{
Assert.assertEquals(
ImmutableList.of(unsplittableSlice()),
slicer.sliceStatic(unsplittableSpec(), 2)
);
}
@Test
public void test_sliceStatic_splittable()
{
@ -96,6 +107,40 @@ public class ExternalInputSpecSlicerTest
);
}
@Test
public void test_sliceStatic_splittable_someWorkersEmpty()
{
Assert.assertEquals(
ImmutableList.of(
splittableSlice("foo"),
splittableSlice("bar"),
splittableSlice("baz")
),
slicer.sliceStatic(splittableSpec("foo", "bar", "baz"), 5)
);
}
@Test
public void test_sliceStatic_splittable_empty()
{
Assert.assertEquals(
ImmutableList.of(),
slicer.sliceStatic(splittableSpec(), 2)
);
}
@Test
public void test_sliceStatic_splittableThatIgnoresSplitHints()
{
Assert.assertEquals(
ImmutableList.of(
splittableSlice("foo", "baz"),
splittableSlice("bar")
),
slicer.sliceStatic(splittableSpecThatIgnoresSplitHints("foo", "bar", "baz"), 2)
);
}
@Test
public void test_sliceDynamic_unsplittable()
{
@ -123,8 +168,8 @@ public class ExternalInputSpecSlicerTest
{
Assert.assertEquals(
ImmutableList.of(
splittableSlice("foo", "baz"),
splittableSlice("bar")
splittableSlice("foo", "bar"),
splittableSlice("baz")
),
slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 2, Long.MAX_VALUE)
);
@ -132,20 +177,65 @@ public class ExternalInputSpecSlicerTest
@Test
public void test_sliceDynamic_splittable_needTwoDueToBytes()
{
Assert.assertEquals(
ImmutableList.of(
splittableSlice("foo", "bar"),
splittableSlice("baz")
),
slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 5, 7)
);
}
@Test
public void test_sliceDynamic_splittableThatIgnoresSplitHints_oneHundredMax()
{
Assert.assertEquals(
ImmutableList.of(
splittableSlice("foo"),
splittableSlice("bar"),
splittableSlice("baz")
),
slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", "baz"), 100, 5, 7)
);
}
@Test
public void test_sliceDynamic_splittableThatIgnoresSplitHints_twoMax()
{
Assert.assertEquals(
ImmutableList.of(
splittableSlice("foo", "baz"),
splittableSlice("bar")
),
slicer.sliceDynamic(splittableSpec("foo", "bar", "baz"), 100, 5, 7)
slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", "baz"), 2, 2, Long.MAX_VALUE)
);
}
@Test
public void test_sliceDynamic_splittableThatIgnoresSplitHints_oneMax()
{
Assert.assertEquals(
ImmutableList.of(
splittableSlice("foo", "bar", "baz")
),
slicer.sliceDynamic(splittableSpecThatIgnoresSplitHints("foo", "bar", "baz"), 1, 5, Long.MAX_VALUE)
);
}
static ExternalInputSpec splittableSpec(final String... strings)
{
return new ExternalInputSpec(
new TestSplittableInputSource(Arrays.asList(strings)),
new TestSplittableInputSource(Arrays.asList(strings), true),
INPUT_FORMAT,
SIGNATURE
);
}
static ExternalInputSpec splittableSpecThatIgnoresSplitHints(final String... strings)
{
return new ExternalInputSpec(
new TestSplittableInputSource(Arrays.asList(strings), false),
INPUT_FORMAT,
SIGNATURE
);
@ -164,7 +254,7 @@ public class ExternalInputSpecSlicerTest
{
return new ExternalInputSlice(
Stream.of(strings)
.map(s -> new TestSplittableInputSource(Collections.singletonList(s)))
.map(s -> new TestSplittableInputSource(Collections.singletonList(s), false))
.collect(Collectors.toList()),
INPUT_FORMAT,
SIGNATURE
@ -242,10 +332,12 @@ public class ExternalInputSpecSlicerTest
private static class TestSplittableInputSource implements SplittableInputSource<List<String>>
{
private final List<String> strings;
private final boolean useSplitHintSpec;
public TestSplittableInputSource(final List<String> strings)
public TestSplittableInputSource(final List<String> strings, final boolean useSplitHintSpec)
{
this.strings = strings;
this.useSplitHintSpec = useSplitHintSpec;
}
@Override
@ -270,10 +362,17 @@ public class ExternalInputSpecSlicerTest
@Nullable SplitHintSpec splitHintSpec
)
{
final Iterator<List<String>> splits = splitHintSpec.split(
strings.iterator(),
s -> new InputFileAttribute(s.length())
);
final Iterator<List<String>> splits;
if (useSplitHintSpec) {
splits = splitHintSpec.split(
strings.iterator(),
s -> new InputFileAttribute(s.length())
);
} else {
// Ignore splitHintSpec, return one element per split. Similar to HttpInputSource, for example.
return strings.stream().map(s -> new InputSplit<>(Collections.singletonList(s)));
}
return Streams.sequentialStreamFrom(splits).map(InputSplit::new);
}
@ -287,7 +386,7 @@ public class ExternalInputSpecSlicerTest
@Override
public InputSource withSplit(InputSplit<List<String>> split)
{
return new TestSplittableInputSource(split.get());
return new TestSplittableInputSource(split.get(), useSplitHintSpec);
}
@Override

View File

@ -106,7 +106,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
public void test_sliceStatic_noDataSource()
{
final TableInputSpec spec = new TableInputSpec("no such datasource", null, null);
Assert.assertEquals(Collections.emptyList(), slicer.sliceStatic(spec, 2));
Assert.assertEquals(
ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
slicer.sliceStatic(spec, 2)
);
}
@Test
@ -166,7 +169,10 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
null
);
Assert.assertEquals(Collections.emptyList(), slicer.sliceStatic(spec, 2));
Assert.assertEquals(
ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE),
slicer.sliceStatic(spec, 2)
);
}
@Test

View File

@ -19,14 +19,396 @@
package org.apache.druid.msq.kernel.controller;
import com.google.common.collect.ImmutableMap;
import it.unimi.dsi.fastutil.ints.Int2IntMaps;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.SlicerUtils;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class WorkerInputsTest
{
private static final String QUERY_ID = "myQuery";
@Test
public void test_max_threeInputs_fourWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(new TestInputSpec(1, 2, 3))
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.MAX
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(0, Collections.singletonList(new TestInputSlice(1)))
.put(1, Collections.singletonList(new TestInputSlice(2)))
.put(2, Collections.singletonList(new TestInputSlice(3)))
.put(3, Collections.singletonList(new TestInputSlice()))
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_max_zeroInputs_fourWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(new TestInputSpec())
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.MAX
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(0, Collections.singletonList(new TestInputSlice()))
.put(1, Collections.singletonList(new TestInputSlice()))
.put(2, Collections.singletonList(new TestInputSlice()))
.put(3, Collections.singletonList(new TestInputSlice()))
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_zeroInputSpecs_fourWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs()
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(0, Collections.singletonList(NilInputSlice.INSTANCE))
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_zeroInputSlices_fourWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(new TestInputSpec())
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(0, Collections.singletonList(NilInputSlice.INSTANCE))
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_zeroInputSlices_broadcast_fourWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(new TestInputSpec())
.broadcastInputs(IntSet.of(0))
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(0, Collections.singletonList(new TestInputSlice()))
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_threeInputs_fourWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(new TestInputSpec(1, 2, 3))
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(0, Collections.singletonList(new TestInputSlice(1, 2, 3)))
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_threeBigInputs_fourWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(new TestInputSpec(4_000_000_000L, 4_000_000_001L, 4_000_000_002L))
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(0, Collections.singletonList(new TestInputSlice(4_000_000_000L, 4_000_000_001L)))
.put(1, Collections.singletonList(new TestInputSlice(4_000_000_002L)))
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_tenSmallAndOneBigInputs_twoWorkers()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(new TestInputSpec(1, 2, 30_000_000_000L, 4, 5, 6, 7, 8, 9, 10, 11))
.maxWorkerCount(2)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(
0,
Collections.singletonList(new TestInputSlice(1, 2, 4, 5, 6, 7, 8, 9, 10, 11))
)
.put(
1,
Collections.singletonList(new TestInputSlice(30_000_000_000L))
)
.build(),
inputs.assignmentsMap()
);
}
@Test
public void test_auto_threeBigInputs_oneWorker()
{
final StageDefinition stageDef =
StageDefinition.builder(0)
.inputs(new TestInputSpec(4_000_000_000L, 4_000_000_001L, 4_000_000_002L))
.maxWorkerCount(1)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
WorkerAssignmentStrategy.AUTO
);
Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(
0,
Collections.singletonList(new TestInputSlice(4_000_000_000L, 4_000_000_001L, 4_000_000_002L))
)
.build(),
inputs.assignmentsMap()
);
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(WorkerInputs.class).usingGetClass().verify();
}
private static class TestInputSpec implements InputSpec
{
private final LongList values;
public TestInputSpec(long... values)
{
this.values = LongList.of(values);
}
}
private static class TestInputSlice implements InputSlice
{
private final LongList values;
public TestInputSlice(final LongList values)
{
this.values = values;
}
public TestInputSlice(long... values)
{
this.values = LongList.of(values);
}
@Override
public int fileCount()
{
return values.size();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestInputSlice that = (TestInputSlice) o;
return Objects.equals(values, that.values);
}
@Override
public int hashCode()
{
return Objects.hash(values);
}
@Override
public String toString()
{
return values.toString();
}
}
private static class TestInputSpecSlicer implements InputSpecSlicer
{
private final boolean canSliceDynamic;
public TestInputSpecSlicer(boolean canSliceDynamic)
{
this.canSliceDynamic = canSliceDynamic;
}
@Override
public boolean canSliceDynamic(InputSpec inputSpec)
{
return canSliceDynamic;
}
@Override
public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
{
final TestInputSpec testInputSpec = (TestInputSpec) inputSpec;
final List<List<Long>> assignments =
SlicerUtils.makeSlicesStatic(
testInputSpec.values.iterator(),
i -> i,
maxNumSlices
);
return makeSlices(assignments);
}
@Override
public List<InputSlice> sliceDynamic(
InputSpec inputSpec,
int maxNumSlices,
int maxFilesPerSlice,
long maxBytesPerSlice
)
{
final TestInputSpec testInputSpec = (TestInputSpec) inputSpec;
final List<List<Long>> assignments =
SlicerUtils.makeSlicesDynamic(
testInputSpec.values.iterator(),
i -> i,
maxNumSlices,
maxFilesPerSlice,
maxBytesPerSlice
);
return makeSlices(assignments);
}
private static List<InputSlice> makeSlices(
final List<List<Long>> assignments
)
{
final List<InputSlice> retVal = new ArrayList<>(assignments.size());
for (final List<Long> assignment : assignments) {
retVal.add(new TestInputSlice(new LongArrayList(assignment)));
}
return retVal;
}
}
}

View File

@ -852,7 +852,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
public Builder setExpectedResultRows(List<Object[]> expectedResultRows)
{
Preconditions.checkArgument(expectedResultRows.size() > 0, "Results rows cannot be empty");
this.expectedResultRows = expectedResultRows;
return asBuilder();
}

View File

@ -25,7 +25,7 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.fasterxml.jackson.annotation.JacksonInject;
@ -41,11 +41,10 @@ import org.apache.druid.common.aws.AWSClientConfig;
import org.apache.druid.common.aws.AWSEndpointConfig;
import org.apache.druid.common.aws.AWSProxyConfig;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CloudObjectSplitWidget;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
@ -53,20 +52,14 @@ import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.Streams;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class S3InputSource extends CloudObjectInputSource
{
@ -318,18 +311,38 @@ public class S3InputSource extends CloudObjectInputSource
}
@Override
protected Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
protected CloudObjectSplitWidget getSplitWidget()
{
final Iterator<List<S3ObjectSummary>> splitIterator = splitHintSpec.split(
getIterableObjectsFromPrefixes().iterator(),
object -> new InputFileAttribute(object.getSize())
);
class SplitWidget implements CloudObjectSplitWidget
{
@Override
public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
{
return Iterators.transform(
S3Utils.objectSummaryIterator(
s3ClientSupplier.get(),
prefixes,
inputDataConfig.getMaxListingLength(),
maxRetries
),
object -> new LocationWithSize(object.getBucketName(), object.getKey(), object.getSize())
);
}
return Streams.sequentialStreamFrom(splitIterator)
.map(objects -> objects.stream()
.map(S3Utils::summaryToCloudObjectLocation)
.collect(Collectors.toList()))
.map(InputSplit::new);
@Override
public long getObjectSize(CloudObjectLocation location)
{
final ObjectMetadata objectMetadata = S3Utils.getSingleObjectMetadata(
s3ClientSupplier.get(),
location.getBucket(),
location.getPath()
);
return objectMetadata.getContentLength();
}
}
return new SplitWidget();
}
@Override
@ -390,28 +403,4 @@ public class S3InputSource extends CloudObjectInputSource
", awsClientConfig=" + getAwsClientConfig() +
'}';
}
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> {
Iterator<S3ObjectSummary> iterator = S3Utils.objectSummaryIterator(
s3ClientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength(),
maxRetries
);
// Skip files that didn't match filter.
if (org.apache.commons.lang.StringUtils.isNotBlank(getObjectGlob())) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
iterator = Iterators.filter(
iterator,
object -> m.matches(Paths.get(object.getKey()))
);
}
return iterator;
};
}
}

View File

@ -22,8 +22,8 @@ package org.apache.druid.storage.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.io.ByteSource;
@ -158,7 +158,7 @@ public class S3DataSegmentPuller implements URIDataPuller
return new FileObject()
{
S3Object s3Object = null;
S3ObjectSummary objectSummary = null;
ObjectMetadata objectMetadata = null;
@Override
public URI toUri()
@ -234,11 +234,10 @@ public class S3DataSegmentPuller implements URIDataPuller
if (s3Object != null) {
return s3Object.getObjectMetadata().getLastModified().getTime();
}
if (objectSummary == null) {
objectSummary =
S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath());
if (objectMetadata == null) {
objectMetadata = S3Utils.getSingleObjectMetadata(s3Client, coords.getBucket(), coords.getPath());
}
return objectSummary.getLastModified().getTime();
return objectMetadata.getLastModified().getTime();
}
@Override
@ -267,9 +266,9 @@ public class S3DataSegmentPuller implements URIDataPuller
{
try {
final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri));
final S3ObjectSummary objectSummary =
S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath());
return StringUtils.format("%d", objectSummary.getLastModified().getTime());
final ObjectMetadata objectMetadata =
S3Utils.getSingleObjectMetadata(s3Client, coords.getBucket(), coords.getPath());
return StringUtils.format("%d", objectMetadata.getLastModified().getTime());
}
catch (AmazonClientException e) {
if (AWSClientUtil.isClientExceptionRecoverable(e)) {

View File

@ -28,8 +28,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
@ -42,7 +41,6 @@ import org.apache.druid.common.aws.AWSEndpointConfig;
import org.apache.druid.common.aws.AWSProxyConfig;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils;
@ -209,34 +207,20 @@ public class S3Utils
}
/**
* Gets a single {@link S3ObjectSummary} from s3. Since this method might return a wrong object if there are multiple
* objects that match the given key, this method should be used only when it's guaranteed that the given key is unique
* in the given bucket.
* Gets a single {@link ObjectMetadata} from s3.
*
* @param s3Client s3 client
* @param bucket s3 bucket
* @param key unique key for the object to be retrieved
* @param key s3 object key
*/
public static S3ObjectSummary getSingleObjectSummary(ServerSideEncryptingAmazonS3 s3Client, String bucket, String key)
public static ObjectMetadata getSingleObjectMetadata(ServerSideEncryptingAmazonS3 s3Client, String bucket, String key)
{
final ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(key)
.withMaxKeys(1);
final ListObjectsV2Result result = s3Client.listObjectsV2(request);
// Using getObjectSummaries().size() instead of getKeyCount as, in some cases
// it is observed that even though the getObjectSummaries returns some data
// keyCount is still zero.
if (result.getObjectSummaries().size() == 0) {
throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key);
try {
return retryS3Operation(() -> s3Client.getObjectMetadata(bucket, key));
}
final S3ObjectSummary objectSummary = result.getObjectSummaries().get(0);
if (!objectSummary.getBucketName().equals(bucket) || !objectSummary.getKey().equals(key)) {
throw new ISE("Wrong object[%s] for bucket[%s] and key[%s]", objectSummary, bucket, key);
catch (Exception e) {
throw new RuntimeException(e);
}
return objectSummary;
}
/**

View File

@ -26,9 +26,11 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
@ -647,6 +649,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Test
public void testWithUrisSplit()
{
EasyMock.reset(S3_CLIENT);
expectGetMetadata(EXPECTED_URIS.get(0), CONTENT);
expectGetMetadata(EXPECTED_URIS.get(1), CONTENT);
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
@ -663,15 +670,21 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
new MaxSizeSplitHintSpec(5, null)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(S3_CLIENT);
}
@Test
public void testWithUrisObjectGlob()
{
EasyMock.reset(S3_CLIENT);
expectGetMetadata(EXPECTED_URIS.get(0), CONTENT);
expectGetMetadata(EXPECTED_URIS.get(1), CONTENT);
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
@ -688,15 +701,21 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
new MaxSizeSplitHintSpec(5, null)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(S3_CLIENT);
}
@Test
public void testWithObjectsGlob()
{
EasyMock.reset(S3_CLIENT);
expectGetMetadata(EXPECTED_URIS.get(0), CONTENT);
expectGetMetadata(EXPECTED_URIS.get(1), CONTENT);
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
@ -713,15 +732,21 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
new MaxSizeSplitHintSpec(5, null)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(S3_CLIENT);
}
@Test
public void testWithoutObjectsGlob()
{
EasyMock.reset(S3_CLIENT);
expectGetMetadata(EXPECTED_URIS.get(0), CONTENT);
expectGetMetadata(EXPECTED_URIS.get(1), CONTENT);
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
@ -738,10 +763,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
new MaxSizeSplitHintSpec(5, null)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(S3_CLIENT);
}
@Test
@ -1068,6 +1094,17 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
).andReturn(result).once();
}
private static void expectGetMetadata(URI uri, byte[] content)
{
final CloudObjectLocation location = new CloudObjectLocation(uri);
final ObjectMetadata result = new ObjectMetadata();
result.setContentLength(content.length);
EasyMock.expect(
S3_CLIENT.getObjectMetadata(matchGetMetadataRequest(location.getBucket(), location.getPath()))
).andReturn(result).once();
}
private static void expectListObjectsAndThrowAccessDenied(final URI prefix)
{
AmazonS3Exception boom = new AmazonS3Exception("oh dang, you can't list that bucket friend");
@ -1157,6 +1194,34 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
return null;
}
private static GetObjectMetadataRequest matchGetMetadataRequest(final String bucket, final String key)
{
// Use an IArgumentMatcher to verify that the request has the correct bucket and key.
EasyMock.reportMatcher(
new IArgumentMatcher()
{
@Override
public boolean matches(Object argument)
{
if (!(argument instanceof GetObjectMetadataRequest)) {
return false;
}
final GetObjectMetadataRequest request = (GetObjectMetadataRequest) argument;
return request.getBucketName().equals(bucket) && request.getKey().equals(key);
}
@Override
public void appendTo(StringBuffer buffer)
{
buffer.append("<request for bucket[").append(buffer).append("] key[").append(key).append("]>");
}
}
);
return null;
}
public static ObjectMapper createS3ObjectMapper()
{
DruidModule baseModule = new TestS3Module();

View File

@ -20,8 +20,8 @@
package org.apache.druid.storage.s3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.commons.io.IOUtils;
@ -43,6 +43,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Date;
import java.util.zip.GZIPOutputStream;
@ -59,25 +60,20 @@ public class S3DataSegmentPullerTest
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0";
String expectedKey = keyPrefix + "/renames-0.gz";
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(keyPrefix + "/renames-0.gz");
objectSummary.setLastModified(new Date(0));
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setLastModified(new Date(0));
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setKeyCount(1);
result.getObjectSummaries().add(objectSummary);
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(result)
EasyMock.expect(s3Client.getObjectMetadata(bucket, expectedKey))
.andReturn(objectMetadata)
.once();
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
EasyMock.replay(s3Client);
String version = puller.getVersion(URI.create(StringUtils.format("s3://%s/%s", bucket, objectSummary.getKey())));
String version = puller.getVersion(URI.create(StringUtils.format("s3://%s/%s", bucket, expectedKey)));
EasyMock.verify(s3Client);
@ -236,7 +232,7 @@ public class S3DataSegmentPullerTest
final File tmpFile = temporaryFolder.newFile("testObjectFile");
try (OutputStream outputStream = new FileOutputStream(tmpFile)) {
try (OutputStream outputStream = Files.newOutputStream(tmpFile.toPath())) {
outputStream.write(value);
}
@ -244,19 +240,13 @@ public class S3DataSegmentPullerTest
object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/test-object");
object0.getObjectMetadata().setLastModified(new Date(0));
object0.setObjectContent(new FileInputStream(tmpFile));
object0.setObjectContent(Files.newInputStream(tmpFile.toPath()));
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(keyPrefix + "/test-object");
objectSummary.setLastModified(new Date(0));
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setLastModified(new Date(0));
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setKeyCount(1);
result.getObjectSummaries().add(objectSummary);
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(result)
EasyMock.expect(s3Client.getObjectMetadata(bucket, object0.getKey()))
.andReturn(objectMetadata)
.once();
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.HumanReadableBytes;
import javax.annotation.Nullable;
@ -98,10 +97,6 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
@Override
public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
{
final Iterator<T> nonEmptyFileOnlyIterator = Iterators.filter(
inputIterator,
input -> inputAttributeExtractor.apply(input).getSize() > 0
);
return new Iterator<List<T>>()
{
private final long maxSplitSizeBytes = maxSplitSize.getBytes();
@ -110,7 +105,7 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
@Override
public boolean hasNext()
{
return peeking != null || nonEmptyFileOnlyIterator.hasNext();
return peeking != null || inputIterator.hasNext();
}
@Override
@ -121,9 +116,9 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
}
final List<T> current = new ArrayList<>();
long splitSize = 0;
while (splitSize < maxSplitSizeBytes && (peeking != null || nonEmptyFileOnlyIterator.hasNext())) {
while (splitSize < maxSplitSizeBytes && (peeking != null || inputIterator.hasNext())) {
if (peeking == null) {
peeking = nonEmptyFileOnlyIterator.next();
peeking = inputIterator.next();
}
final long size = inputAttributeExtractor.apply(peeking).getSize();
if (current.isEmpty() // each split should have at least one file even if the file is larger than maxSplitSize

View File

@ -48,7 +48,9 @@ public interface SplitHintSpec
* Returns an iterator of splits. A split has a list of files of the type {@link T}.
*
* @param inputIterator that returns input files.
* @param inputAttributeExtractor to create {@link InputFileAttribute} for each input file.
* @param inputAttributeExtractor to create {@link InputFileAttribute} for each input file. This may involve a
* network call, so implementations of SplitHintSpec should use it only if needed,
* and reuse results if appropriate.
*/
<T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor);
}

View File

@ -21,26 +21,32 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.utils.CollectionUtils;
import org.apache.druid.utils.Streams;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class CloudObjectInputSource extends AbstractInputSource
@ -52,22 +58,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
private final List<CloudObjectLocation> objects;
private final String objectGlob;
public CloudObjectInputSource(
String scheme,
@Nullable List<URI> uris,
@Nullable List<URI> prefixes,
@Nullable List<CloudObjectLocation> objects
)
{
this.scheme = scheme;
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
this.objectGlob = null;
illegalArgsChecker();
}
public CloudObjectInputSource(
String scheme,
@Nullable List<URI> uris,
@ -122,15 +112,10 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
protected abstract InputEntity createEntity(CloudObjectLocation location);
/**
* Create a stream of {@link CloudObjectLocation} splits by listing objects that appear under {@link #prefixes} using
* this input sources backend API. This is called internally by {@link #createSplits} and {@link #estimateNumSplits},
* only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}.
* Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception.
*
* If {@link #objectGlob} is set, the objectGlob will be applied on {@link #uris} or {@link #objects}.
* {@link #objectGlob} uses a glob notation, for example: "**.parquet".
* Returns {@link CloudObjectSplitWidget}, which is used to implement
* {@link #createSplits(InputFormat, SplitHintSpec)}.
*/
protected abstract Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(SplitHintSpec splitHintSpec);
protected abstract CloudObjectSplitWidget getSplitWidget();
@Override
public Stream<InputSplit<List<CloudObjectLocation>>> createSplits(
@ -139,42 +124,34 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
Stream<CloudObjectLocation> objectStream = objects.stream();
if (StringUtils.isNotBlank(objectGlob)) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
objectStream = objectStream.filter(object -> m.matches(Paths.get(object.getPath())));
}
return objectStream.map(object -> new InputSplit<>(Collections.singletonList(object)));
return getSplitsForObjects(
getSplitWidget(),
getSplitHintSpecOrDefault(splitHintSpec),
objects,
objectGlob
);
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
return getSplitsForObjects(
getSplitWidget(),
getSplitHintSpecOrDefault(splitHintSpec),
Lists.transform(uris, CloudObjectLocation::new),
objectGlob
);
} else {
return getSplitsForPrefixes(
getSplitWidget(),
getSplitHintSpecOrDefault(splitHintSpec),
prefixes,
objectGlob
);
}
if (!CollectionUtils.isNullOrEmpty(uris)) {
Stream<URI> uriStream = uris.stream();
if (StringUtils.isNotBlank(objectGlob)) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
uriStream = uriStream.filter(uri -> m.matches(Paths.get(uri.toString())));
}
return uriStream.map(CloudObjectLocation::new).map(object -> new InputSplit<>(Collections.singletonList(object)));
}
return getPrefixesSplitStream(getSplitHintSpecOrDefault(splitHintSpec));
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
return objects.size();
}
if (!CollectionUtils.isNullOrEmpty(uris)) {
return uris.size();
}
return Ints.checkedCast(getPrefixesSplitStream(getSplitHintSpecOrDefault(splitHintSpec)).count());
// We can't effectively estimate the number of splits without actually computing them.
return Ints.checkedCast(createSplits(inputFormat, splitHintSpec).count());
}
@Override
@ -241,4 +218,103 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
throw new IllegalArgumentException("Exactly one of uris, prefixes or objects must be specified");
}
}
/**
* Stream of {@link InputSplit} for situations where this object is based on {@link #getPrefixes()}.
*
* If {@link CloudObjectSplitWidget#getDescriptorIteratorForPrefixes} returns objects with known sizes (as most
* implementations do), this method filters out empty objects.
*/
private static Stream<InputSplit<List<CloudObjectLocation>>> getSplitsForPrefixes(
final CloudObjectSplitWidget splitWidget,
final SplitHintSpec splitHintSpec,
final List<URI> prefixes,
@Nullable final String objectGlob
)
{
Iterator<CloudObjectSplitWidget.LocationWithSize> iterator =
splitWidget.getDescriptorIteratorForPrefixes(prefixes);
if (StringUtils.isNotBlank(objectGlob)) {
final PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + objectGlob);
iterator = Iterators.filter(
iterator,
location -> m.matches(Paths.get(location.getLocation().getPath()))
);
}
// Only consider nonempty objects. Note: size may be unknown; if so we allow it through, to avoid
// calling getObjectSize and triggering a network call.
return toSplitStream(
splitWidget,
splitHintSpec,
Iterators.filter(iterator, object -> object.getSize() != 0) // Allow UNKNOWN_SIZE through
);
}
/**
* Stream of {@link InputSplit} for situations where this object is based
* on {@link #getUris()} or {@link #getObjects()}.
*
* This method does not make network calls. In particular, unlike {@link #getSplitsForPrefixes}, this method does
* not filter out empty objects, because doing so would require calling {@link CloudObjectSplitWidget#getObjectSize}.
* The hope, and assumption, here is that users won't specify empty objects explicitly. (They're more likely to
* come in through prefixes.)
*/
private static Stream<InputSplit<List<CloudObjectLocation>>> getSplitsForObjects(
final CloudObjectSplitWidget splitWidget,
final SplitHintSpec splitHintSpec,
final List<CloudObjectLocation> objectLocations,
@Nullable final String objectGlob
)
{
Iterator<CloudObjectLocation> iterator = objectLocations.iterator();
if (StringUtils.isNotBlank(objectGlob)) {
final PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + objectGlob);
iterator = Iterators.filter(
iterator,
location -> m.matches(Paths.get(location.getPath()))
);
}
return toSplitStream(
splitWidget,
splitHintSpec,
Iterators.transform(
iterator,
location -> new CloudObjectSplitWidget.LocationWithSize(location, CloudObjectSplitWidget.UNKNOWN_SIZE)
)
);
}
private static Stream<InputSplit<List<CloudObjectLocation>>> toSplitStream(
final CloudObjectSplitWidget splitWidget,
final SplitHintSpec splitHintSpec,
final Iterator<CloudObjectSplitWidget.LocationWithSize> objectIterator
)
{
return Streams.sequentialStreamFrom(
splitHintSpec.split(
objectIterator,
o -> {
try {
if (o.getSize() == CloudObjectSplitWidget.UNKNOWN_SIZE) {
return new InputFileAttribute(splitWidget.getObjectSize(o.getLocation()));
} else {
return new InputFileAttribute(o.getSize());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
)
).map(
locations -> new InputSplit<>(
locations.stream()
.map(CloudObjectSplitWidget.LocationWithSize::getLocation)
.collect(Collectors.toList()))
);
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
/**
* Helper used by {@link CloudObjectInputSource} to implement {@link SplittableInputSource#createSplits}.
*/
public interface CloudObjectSplitWidget
{
long UNKNOWN_SIZE = -1;
/**
* Iterator of descriptors that match a list of prefixes. Used if {@link CloudObjectInputSource#getPrefixes()}
* is set.
*
* Sizes in {@link LocationWithSize} are set if the information is naturally available as part of listing
* the cloud object prefix. Otherwise, they are set to {@link #UNKNOWN_SIZE} and filled in later.
*/
Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes);
/**
* Size of an object. May use a cached size, if available, or may issue a network call.
*/
long getObjectSize(CloudObjectLocation descriptor) throws IOException;
/**
* Returned by {@link #getDescriptorIteratorForPrefixes(List)}. A pair of {@link CloudObjectLocation} and its size,
* which may be {@link #UNKNOWN_SIZE} if not known.
*/
class LocationWithSize
{
private final CloudObjectLocation location;
private final long size;
public LocationWithSize(CloudObjectLocation location, long size)
{
this.location = location;
this.size = size;
}
public LocationWithSize(String bucket, String key, long size)
{
this(new CloudObjectLocation(bucket, key), size);
}
public CloudObjectLocation getLocation()
{
return location;
}
public long getSize()
{
return size;
}
}
}

View File

@ -138,29 +138,6 @@ public class MaxSizeSplitHintSpecTest
Assert.assertEquals(1, splits.get(3).size());
}
@Test
public void testSplitSkippingEmptyInputs()
{
final int nonEmptyInputSize = 3;
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(new HumanReadableBytes(10L), null);
final Function<Integer, InputFileAttribute> inputAttributeExtractor = InputFileAttribute::new;
final IntStream dataStream = IntStream.concat(
IntStream.concat(
IntStream.generate(() -> 0).limit(10),
IntStream.generate(() -> nonEmptyInputSize).limit(10)
),
IntStream.generate(() -> 0).limit(10)
);
final List<List<Integer>> splits = Lists.newArrayList(
splitHintSpec.split(dataStream.iterator(), inputAttributeExtractor)
);
Assert.assertEquals(4, splits.size());
Assert.assertEquals(3, splits.get(0).size());
Assert.assertEquals(3, splits.get(1).size());
Assert.assertEquals(3, splits.get(2).size());
Assert.assertEquals(1, splits.get(3).size());
}
@Test
public void testEquals()
{

View File

@ -34,6 +34,7 @@ import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -132,6 +133,7 @@ public class CloudObjectInputSourceTest
.useConstructor(SCHEME, URIS2, null, null, "**.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Mockito.when(inputSource.getSplitWidget()).thenReturn(new MockSplitWidget());
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
@ -153,6 +155,7 @@ public class CloudObjectInputSourceTest
.useConstructor(SCHEME, URIS, null, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Mockito.when(inputSource.getSplitWidget()).thenReturn(new MockSplitWidget());
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
@ -174,6 +177,7 @@ public class CloudObjectInputSourceTest
.useConstructor(SCHEME, null, null, OBJECTS_BEFORE_GLOB, "**.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Mockito.when(inputSource.getSplitWidget()).thenReturn(new MockSplitWidget());
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
@ -195,6 +199,7 @@ public class CloudObjectInputSourceTest
.useConstructor(SCHEME, null, null, OBJECTS, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Mockito.when(inputSource.getSplitWidget()).thenReturn(new MockSplitWidget());
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
@ -218,4 +223,19 @@ public class CloudObjectInputSourceTest
Assert.assertTrue(m2.matches(Paths.get("db/date=2022-08-01/001.parquet")));
Assert.assertFalse(m2.matches(Paths.get("db/date=2022-08-01/_junk/0/001.parquet")));
}
private static class MockSplitWidget implements CloudObjectSplitWidget
{
@Override
public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> prefixes)
{
throw new UnsupportedOperationException();
}
@Override
public long getObjectSize(CloudObjectLocation descriptor)
{
return 0;
}
}
}

View File

@ -4237,6 +4237,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
public void testGroupByWithImpossibleTimeFilter()
{
msqCompatible();
// this gets optimized into 'false'
testQuery(
"SELECT dim1, COUNT(*) FROM druid.foo\n"