Harmonize local input sources; fix batch index integration test. (#11965)

* Make LocalInputSource.files a List instead of Set and adjust wikipedia_index_task to use file list.

Rationale: the behavior of wikipedia_index_task.json is order-dependent with regard to its input
files; some orders produce 4 segments and some produce 5 segments. Some integration tests, like
ITSystemTableBatchIndexTaskTest and ITAutoCompactionTest, are written assuming that the
4-segment case will always happen. Providing the file list in a specific order ensures that this
will happen as expected by the tests.

I didn't see a specific reason why the LocalInputSource.files parameter needed to be a Set, so
changing it to a List was the simplest way to achieve the consistent ordering. I think it will
also make the behavior make more sense if someone does specify the same input file multiple
times in a spec: I think they'd expect it to be loaded multiple times instead of deduped. This
is consistent with the behavior of other input sources like S3, GCS, HTTP.

* Sort files in LocalFirehoseFactory.
This commit is contained in:
Gian Merlino 2021-11-21 22:26:31 -08:00 committed by GitHub
parent cb0a2af644
commit b13f07a057
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 56 additions and 46 deletions

View File

@ -47,11 +47,9 @@ import org.apache.druid.utils.Streams;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -63,18 +61,18 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI
private final File baseDir;
@Nullable
private final String filter;
private final Set<File> files;
private final List<File> files;
@JsonCreator
public LocalInputSource(
@JsonProperty("baseDir") @Nullable File baseDir,
@JsonProperty("filter") @Nullable String filter,
@JsonProperty("files") @Nullable Set<File> files
@JsonProperty("files") @Nullable List<File> files
)
{
this.baseDir = baseDir;
this.filter = baseDir != null ? Preconditions.checkNotNull(filter, "filter") : filter;
this.files = files == null ? Collections.emptySet() : files;
this.files = files == null ? Collections.emptyList() : files;
if (baseDir == null && CollectionUtils.isNullOrEmpty(files)) {
throw new IAE("At least one of baseDir or files should be specified");
@ -101,7 +99,7 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI
}
@JsonProperty
public Set<File> getFiles()
public List<File> getFiles()
{
return files;
}
@ -180,7 +178,7 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI
@Override
public SplittableInputSource<List<File>> withSplit(InputSplit<List<File>> split)
{
return new LocalInputSource(null, null, new HashSet<>(split.get()));
return new LocalInputSource(null, null, split.get());
}
@Override

View File

@ -71,7 +71,7 @@ public class LocalInputSourceTest
{
final long fileSize = 15;
final HumanReadableBytes maxSplitSize = new HumanReadableBytes(50L);
final Set<File> files = mockFiles(10, fileSize);
final List<File> files = mockFiles(10, fileSize);
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
final List<InputSplit<List<File>>> splits = inputSource
.createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize, null))
@ -88,7 +88,7 @@ public class LocalInputSourceTest
{
final long fileSize = 13;
final HumanReadableBytes maxSplitSize = new HumanReadableBytes(40L);
final Set<File> files = mockFiles(10, fileSize);
final List<File> files = mockFiles(10, fileSize);
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
Assert.assertEquals(
4,
@ -108,7 +108,7 @@ public class LocalInputSourceTest
}
filesInBaseDir.add(file);
}
Set<File> files = new HashSet<>(filesInBaseDir.subList(0, 5));
List<File> files = filesInBaseDir.subList(0, 5);
for (int i = 0; i < 3; i++) {
final File file = File.createTempFile("local-input-source", ".data", baseDir);
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
@ -145,7 +145,7 @@ public class LocalInputSourceTest
public void testGetFileIteratorWithOnlyFilesIteratingAllFiles() throws IOException
{
File baseDir = temporaryFolder.newFolder();
Set<File> filesInBaseDir = new HashSet<>();
List<File> filesInBaseDir = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final File file = File.createTempFile("local-input-source", ".data", baseDir);
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
@ -154,23 +154,23 @@ public class LocalInputSourceTest
filesInBaseDir.add(file);
}
Iterator<File> fileIterator = new LocalInputSource(null, null, filesInBaseDir).getFileIterator();
Set<File> actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet());
List<File> actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toList());
Assert.assertEquals(filesInBaseDir, actualFiles);
}
@Test
public void testFileIteratorWithEmptyFilesIteratingNonEmptyFilesOnly()
{
final Set<File> files = new HashSet<>(mockFiles(10, 5));
final List<File> files = mockFiles(10, 5);
files.addAll(mockFiles(10, 0));
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
List<File> iteratedFiles = Lists.newArrayList(inputSource.getFileIterator());
Assert.assertTrue(iteratedFiles.stream().allMatch(file -> file.length() > 0));
}
private static Set<File> mockFiles(int numFiles, long fileSize)
private static List<File> mockFiles(int numFiles, long fileSize)
{
final Set<File> files = new HashSet<>();
final List<File> files = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
final File file = EasyMock.niceMock(File.class);
EasyMock.expect(file.length()).andReturn(fileSize).anyTimes();

View File

@ -3,6 +3,26 @@
"spec": {
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
},
"metricsSpec": [
{
"type": "count",
@ -43,39 +63,22 @@
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"parser": {
"parseSpec": {
"format" : "json",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
"page",
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
}
}
}
},
"ioConfig": {
"type": "index",
"firehose": {
"inputSource": {
"type": "local",
"baseDir": "/resources/data/batch_index/json",
"filter": "wikipedia_index_data*"
}
"files": [
"/resources/data/batch_index/json/wikipedia_index_data1.json",
"/resources/data/batch_index/json/wikipedia_index_data2.json",
"/resources/data/batch_index/json/wikipedia_index_data3.json"
]
},
"inputFormat": {
"type": "json"
},
"appendToExisting": false
},
"tuningConfig": {
"type": "index",

View File

@ -35,7 +35,10 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
/**
* Firehose that reads data from files on local disk
@ -82,11 +85,16 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory<File>
@Override
protected Collection<File> initObjects()
{
return FileUtils.listFiles(
Preconditions.checkNotNull(baseDir, "baseDir").getAbsoluteFile(),
final Collection<File> files = FileUtils.listFiles(
Preconditions.checkNotNull(this.baseDir, "baseDir").getAbsoluteFile(),
new WildcardFileFilter(filter),
TrueFileFilter.INSTANCE
);
// Sort files for consistent ordering from run to run.
final List<File> fileList = files instanceof List ? (List<File>) files : new ArrayList<>(files);
fileList.sort(Comparator.naturalOrder());
return fileList;
}
@Override

View File

@ -25,6 +25,7 @@ import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -40,7 +41,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
public class LocalFirehoseFactoryTest
public class LocalFirehoseFactoryTest extends InitializedNullHandlingTest
{
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();