Clean up the core API required for Iceberg extension (#14614)

Changes:
- Replace `AbstractInputSourceBuilder` with `InputSourceFactory`
- Move iceberg specific logic to `IcebergInputSource`
This commit is contained in:
Abhishek Agarwal 2023-07-21 13:01:33 +05:30 committed by GitHub
parent f5784e66d3
commit efb32810c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 185 additions and 198 deletions

View File

@ -22,19 +22,25 @@ package org.apache.druid.iceberg.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.AbstractInputSourceBuilder;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.iceberg.filter.IcebergFilter;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
@ -60,7 +66,7 @@ public class IcebergInputSource implements SplittableInputSource<List<String>>
private IcebergFilter icebergFilter;
@JsonProperty
private AbstractInputSourceBuilder warehouseSource;
private InputSourceFactory warehouseSource;
private boolean isLoaded = false;
@ -72,7 +78,7 @@ public class IcebergInputSource implements SplittableInputSource<List<String>>
@JsonProperty("namespace") String namespace,
@JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
@JsonProperty("warehouseSource") AbstractInputSourceBuilder warehouseSource
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource
)
{
this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
@ -170,7 +176,77 @@ public class IcebergInputSource implements SplittableInputSource<List<String>>
getTableName(),
getIcebergFilter()
);
delegateInputSource = warehouseSource.setupInputSource(snapshotDataFiles);
if (snapshotDataFiles.isEmpty()) {
delegateInputSource = new EmptyInputSource();
} else {
delegateInputSource = warehouseSource.create(snapshotDataFiles);
}
isLoaded = true;
}
/**
* This input source is used in place of a delegate input source if there are no input file paths.
* Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource
* may use this input source as delegate in such cases.
*/
private static class EmptyInputSource implements SplittableInputSource
{
@Override
public boolean needsFormat()
{
return false;
}
@Override
public boolean isSplittable()
{
return false;
}
@Override
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
)
{
return new InputSourceReader()
{
@Override
public CloseableIterator<InputRow> read(InputStats inputStats)
{
return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
});
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample()
{
return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
});
}
};
}
@Override
public Stream<InputSplit> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
)
{
return Stream.empty();
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return 0;
}
@Override
public InputSource withSplit(InputSplit split)
{
return null;
}
}
}

View File

@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
import org.apache.druid.data.input.impl.LocalInputSourceFactory;
import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.iceberg.DataFile;
@ -87,7 +87,7 @@ public class IcebergInputSourceTest
NAMESPACE,
null,
testCatalog,
new LocalInputSourceBuilder()
new LocalInputSourceFactory()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
@ -115,7 +115,7 @@ public class IcebergInputSourceTest
}
@Test
public void testInputSourceWithFilter() throws IOException
public void testInputSourceWithEmptySource() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
@ -128,7 +128,28 @@ public class IcebergInputSourceTest
NAMESPACE,
new IcebergEqualsFilter("id", "0000"),
testCatalog,
new LocalInputSourceBuilder()
new LocalInputSourceFactory()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
Assert.assertEquals(0, splits.count());
dropTableFromCatalog(tableIdentifier);
}
@Test
public void testInputSourceWithFilter() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);
createAndLoadTable(tableIdentifier);
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
new IcebergEqualsFilter("id", "123988"),
testCatalog,
new LocalInputSourceFactory()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
@ -137,7 +158,21 @@ public class IcebergInputSourceTest
.flatMap(List::stream)
.collect(Collectors.toList());
Assert.assertEquals(0, localInputSourceList.size());
Assert.assertEquals(1, inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L, null)));
Assert.assertEquals(1, localInputSourceList.size());
CloseableIterable<Record> datafileReader = Parquet.read(Files.localInput(localInputSourceList.get(0)))
.project(tableSchema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(
tableSchema,
fileSchema
))
.build();
for (Record record : datafileReader) {
Assert.assertEquals(tableData.get("id"), record.get(0));
Assert.assertEquals(tableData.get("name"), record.get(1));
}
dropTableFromCatalog(tableIdentifier);
}

View File

@ -21,20 +21,20 @@ package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.druid.data.input.AbstractInputSourceBuilder;
import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder
public class HdfsInputSourceFactory implements InputSourceFactory
{
private final Configuration configuration;
private final HdfsInputSourceConfig inputSourceConfig;
@JsonCreator
public HdfsInputSourceBuilder(
public HdfsInputSourceFactory(
@JacksonInject @Hdfs Configuration configuration,
@JacksonInject HdfsInputSourceConfig inputSourceConfig
)
@ -44,7 +44,7 @@ public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder
}
@Override
public SplittableInputSource generateInputSource(List<String> inputFilePaths)
public SplittableInputSource create(List<String> inputFilePaths)
{
return new HdfsInputSource(inputFilePaths, configuration, inputSourceConfig);
}

View File

@ -34,8 +34,8 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceFactory;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import org.apache.hadoop.conf.Configuration;
@ -67,7 +67,7 @@ public class HdfsStorageDruidModule implements DruidModule
new SimpleModule().registerSubtypes(
new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsInputSourceBuilder.class, HdfsStorageDruidModule.SCHEME)
new NamedType(HdfsInputSourceFactory.class, HdfsStorageDruidModule.SCHEME)
)
);
}

View File

@ -32,7 +32,7 @@ public class HdfsInputSourceAdapterTest
{
Configuration conf = new Configuration();
HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null);
HdfsInputSourceBuilder hdfsInputSourceAdapter = new HdfsInputSourceBuilder(conf, inputSourceConfig);
Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet", "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
HdfsInputSourceFactory hdfsInputSourceAdapter = new HdfsInputSourceFactory(conf, inputSourceConfig);
Assert.assertTrue(hdfsInputSourceAdapter.create(Arrays.asList("hdfs://localhost:7020/bar/def.parquet", "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
}
}

View File

@ -40,7 +40,7 @@ public class S3InputSourceDruidModule implements DruidModule
return ImmutableList.of(
new SimpleModule().registerSubtypes(
new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME),
new NamedType(S3InputSourceBuilder.class, S3StorageDruidModule.SCHEME)
new NamedType(S3InputSourceFactory.class, S3StorageDruidModule.SCHEME)
)
);
}

View File

@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.aws.AWSClientConfig;
import org.apache.druid.common.aws.AWSEndpointConfig;
import org.apache.druid.common.aws.AWSProxyConfig;
import org.apache.druid.data.input.AbstractInputSourceBuilder;
import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.s3.S3InputDataConfig;
@ -39,7 +39,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;
public class S3InputSourceBuilder extends AbstractInputSourceBuilder
public class S3InputSourceFactory implements InputSourceFactory
{
private final ServerSideEncryptingAmazonS3 s3Client;
private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder;
@ -51,7 +51,7 @@ public class S3InputSourceBuilder extends AbstractInputSourceBuilder
private final AWSEndpointConfig awsEndpointConfig;
@JsonCreator
public S3InputSourceBuilder(
public S3InputSourceFactory(
@JacksonInject ServerSideEncryptingAmazonS3 s3Client,
@JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
@JacksonInject S3InputDataConfig inputDataConfig,
@ -73,7 +73,7 @@ public class S3InputSourceBuilder extends AbstractInputSourceBuilder
}
@Override
public SplittableInputSource generateInputSource(List<String> inputFilePaths)
public SplittableInputSource create(List<String> inputFilePaths)
{
return new S3InputSource(
s3Client,

View File

@ -28,7 +28,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class S3InputSourceBuilderTest
public class S3InputSourceFactoryTest
{
@Test
public void testAdapterGet()
@ -43,7 +43,7 @@ public class S3InputSourceBuilderTest
"s3://bar/foo/file3.txt"
);
S3InputSourceBuilder s3Builder = new S3InputSourceBuilder(
S3InputSourceFactory s3Builder = new S3InputSourceFactory(
service,
serverSides3Builder,
dataConfig,
@ -53,6 +53,6 @@ public class S3InputSourceBuilderTest
null,
null
);
Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof S3InputSource);
Assert.assertTrue(s3Builder.create(fileUris) instanceof S3InputSource);
}
}

View File

@ -1,123 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
/**
* A wrapper on top of {@link SplittableInputSource} that handles input source creation.
* For composing input sources such as IcebergInputSource, the delegate input source instantiation might fail upon deserialization since the input file paths
* are not available yet and this might fail the input source precondition checks.
* This adapter helps create the delegate input source once the input file paths are fully determined.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value = LocalInputSourceBuilder.class)
})
public abstract class AbstractInputSourceBuilder
{
public abstract SplittableInputSource generateInputSource(List<String> inputFilePaths);
public SplittableInputSource setupInputSource(List<String> inputFilePaths)
{
if (inputFilePaths.isEmpty()) {
return new EmptyInputSource();
} else {
return generateInputSource(inputFilePaths);
}
}
/**
* This input source is used in place of a delegate input source if there are no input file paths.
* Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource
* may use this input source as delegate in such cases.
*/
private static class EmptyInputSource implements SplittableInputSource
{
@Override
public boolean needsFormat()
{
return false;
}
@Override
public boolean isSplittable()
{
return false;
}
@Override
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
)
{
return new InputSourceReader()
{
@Override
public CloseableIterator<InputRow> read(InputStats inputStats)
{
return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
});
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample()
{
return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
});
}
};
}
@Override
public Stream<InputSplit> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
)
{
return Stream.empty();
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return 0;
}
@Override
public InputSource withSplit(InputSplit split)
{
return null;
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.impl.LocalInputSourceFactory;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi;
import java.util.List;
/**
* An interface to generate a {@link SplittableInputSource} objects on the fly.
* For composing input sources such as IcebergInputSource, the delegate input source instantiation might fail upon deserialization since the input file paths
* are not available yet and this might fail the input source precondition checks.
* This factory helps create the delegate input source once the input file paths are fully determined.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "local", value = LocalInputSourceFactory.class)
})
@UnstableApi
public interface InputSourceFactory
{
SplittableInputSource create(List<String> inputFilePaths);
}

View File

@ -19,18 +19,17 @@
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.AbstractInputSourceBuilder;
import org.apache.druid.data.input.InputSourceFactory;
import java.io.File;
import java.util.List;
import java.util.stream.Collectors;
public class LocalInputSourceBuilder extends AbstractInputSourceBuilder
public class LocalInputSourceFactory implements InputSourceFactory
{
public static final String TYPE_KEY = "local";
@Override
public LocalInputSource generateInputSource(List<String> inputFilePaths)
public LocalInputSource create(List<String> inputFilePaths)
{
return new LocalInputSource(
null,

View File

@ -19,22 +19,12 @@
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class LocalInputSourceAdapterTest
{
@ -44,44 +34,10 @@ public class LocalInputSourceAdapterTest
@Test
public void testAdapterGet()
{
LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder();
Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList(
LocalInputSourceFactory localInputSourceAdapter = new LocalInputSourceFactory();
Assert.assertTrue(localInputSourceAdapter.create(Arrays.asList(
"foo.parquet",
"bar.parquet"
)) instanceof LocalInputSource);
}
@Test
public void testAdapterSetup()
{
LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder();
InputSource delegateInputSource = localInputSourceAdapter.setupInputSource(Arrays.asList(
"foo.parquet",
"bar.parquet"
));
Assert.assertTrue(delegateInputSource instanceof LocalInputSource);
}
@Test
public void testEmptyInputSource() throws IOException
{
InputFormat mockFormat = EasyMock.createMock(InputFormat.class);
SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class);
LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder();
SplittableInputSource<Object> emptyInputSource =
(SplittableInputSource<Object>) localInputSourceAdapter.setupInputSource(Collections.emptyList());
List<InputSplit<Object>> splitList = emptyInputSource
.createSplits(mockFormat, mockSplitHint)
.collect(Collectors.toList());
Assert.assertTrue(splitList.isEmpty());
Assert.assertFalse(emptyInputSource.isSplittable());
Assert.assertFalse(emptyInputSource.needsFormat());
Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class)));
Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat, mockSplitHint));
Assert.assertFalse(emptyInputSource.reader(
EasyMock.createMock(InputRowSchema.class),
mockFormat,
temporaryFolder.newFolder()
).read().hasNext());
}
}