mirror of https://github.com/apache/druid.git
Support combining inputsource for parallel ingestion (#10387)
* Add combining inputsource * Fix documentation Co-authored-by: Atul Mohan <atulmohan@yahoo-inc.com>
This commit is contained in:
parent
8657b23ab2
commit
b6ad790dc7
|
@ -22,6 +22,7 @@ package org.apache.druid.data.input;
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
|
import org.apache.druid.data.input.impl.CombiningInputSource;
|
||||||
import org.apache.druid.data.input.impl.HttpInputSource;
|
import org.apache.druid.data.input.impl.HttpInputSource;
|
||||||
import org.apache.druid.data.input.impl.InlineInputSource;
|
import org.apache.druid.data.input.impl.InlineInputSource;
|
||||||
import org.apache.druid.data.input.impl.LocalInputSource;
|
import org.apache.druid.data.input.impl.LocalInputSource;
|
||||||
|
@ -50,7 +51,8 @@ import java.io.File;
|
||||||
@JsonSubTypes(value = {
|
@JsonSubTypes(value = {
|
||||||
@Type(name = "local", value = LocalInputSource.class),
|
@Type(name = "local", value = LocalInputSource.class),
|
||||||
@Type(name = "http", value = HttpInputSource.class),
|
@Type(name = "http", value = HttpInputSource.class),
|
||||||
@Type(name = "inline", value = InlineInputSource.class)
|
@Type(name = "inline", value = InlineInputSource.class),
|
||||||
|
@Type(name = "combining", value = CombiningInputSource.class)
|
||||||
})
|
})
|
||||||
public interface InputSource
|
public interface InputSource
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.data.input.impl;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.druid.data.input.AbstractInputSource;
|
||||||
|
import org.apache.druid.data.input.InputFormat;
|
||||||
|
import org.apache.druid.data.input.InputSource;
|
||||||
|
import org.apache.druid.data.input.InputSplit;
|
||||||
|
import org.apache.druid.data.input.SplitHintSpec;
|
||||||
|
import org.apache.druid.java.util.common.Pair;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* InputSource that combines data from multiple inputSources. The delegate inputSources must be splittable.
|
||||||
|
* The splits for this inputSource are created from the {@link SplittableInputSource#createSplits} of the delegate inputSources.
|
||||||
|
* Each inputSplit is paired up with its respective delegate inputSource so that during split,
|
||||||
|
* {@link SplittableInputSource#withSplit}is called against the correct inputSource for each inputSplit.
|
||||||
|
* This inputSource presently only supports a single {@link InputFormat}.
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class CombiningInputSource extends AbstractInputSource implements SplittableInputSource
|
||||||
|
{
|
||||||
|
private final List<SplittableInputSource> delegates;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public CombiningInputSource(
|
||||||
|
@JsonProperty("delegates") List<SplittableInputSource> delegates
|
||||||
|
)
|
||||||
|
{
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
delegates != null && !delegates.isEmpty(),
|
||||||
|
"Must specify atleast one delegate inputSource"
|
||||||
|
);
|
||||||
|
this.delegates = delegates;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public List<SplittableInputSource> getDelegates()
|
||||||
|
{
|
||||||
|
return delegates;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<InputSplit> createSplits(
|
||||||
|
InputFormat inputFormat,
|
||||||
|
@Nullable SplitHintSpec splitHintSpec
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return delegates.stream().flatMap(inputSource -> {
|
||||||
|
try {
|
||||||
|
return inputSource.createSplits(inputFormat, splitHintSpec)
|
||||||
|
.map(inputsplit -> new InputSplit(Pair.of(inputSource, inputsplit)));
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
|
||||||
|
{
|
||||||
|
return delegates.stream().mapToInt(inputSource -> {
|
||||||
|
try {
|
||||||
|
return inputSource.estimateNumSplits(inputFormat, splitHintSpec);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}).sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputSource withSplit(InputSplit split)
|
||||||
|
{
|
||||||
|
Pair<SplittableInputSource, InputSplit> inputSourceWithSplit = (Pair) split.get();
|
||||||
|
return inputSourceWithSplit.lhs.withSplit(inputSourceWithSplit.rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean needsFormat()
|
||||||
|
{
|
||||||
|
// This is called only when ParallelIndexIngestionSpec needs to decide if either inputformat vs parserspec is required.
|
||||||
|
// So if at least one of the delegate inputSources needsFormat, we set this to true.
|
||||||
|
// All other needsFormat calls will be made against the delegate inputSources.
|
||||||
|
return delegates.stream().anyMatch(SplittableInputSource::needsFormat);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
CombiningInputSource that = (CombiningInputSource) o;
|
||||||
|
return delegates.equals(that.delegates);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return Objects.hash(delegates);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,313 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.data.input.impl;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||||
|
import org.apache.druid.data.input.AbstractInputSource;
|
||||||
|
import org.apache.druid.data.input.InputFormat;
|
||||||
|
import org.apache.druid.data.input.InputSource;
|
||||||
|
import org.apache.druid.data.input.InputSplit;
|
||||||
|
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
|
||||||
|
import org.apache.druid.data.input.SplitHintSpec;
|
||||||
|
import org.apache.druid.java.util.common.HumanReadableBytes;
|
||||||
|
import org.apache.druid.java.util.common.Pair;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
public class CombiningInputSourceTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testSerde() throws IOException
|
||||||
|
{
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
mapper.registerModule(new SimpleModule("test-module").registerSubtypes(TestFileInputSource.class, TestUriInputSource.class));
|
||||||
|
final TestFileInputSource fileSource = new TestFileInputSource(ImmutableList.of(new File("myFile").getAbsoluteFile()));
|
||||||
|
final TestUriInputSource uriInputSource = new TestUriInputSource(
|
||||||
|
ImmutableList.of(URI.create("http://test.com/http-test")));
|
||||||
|
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
|
||||||
|
fileSource,
|
||||||
|
uriInputSource
|
||||||
|
));
|
||||||
|
final byte[] json = mapper.writeValueAsBytes(combiningInputSource);
|
||||||
|
final CombiningInputSource fromJson = (CombiningInputSource) mapper.readValue(json, InputSource.class);
|
||||||
|
Assert.assertEquals(combiningInputSource, fromJson);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEstimateNumSplits()
|
||||||
|
{
|
||||||
|
final File file = EasyMock.niceMock(File.class);
|
||||||
|
EasyMock.expect(file.length()).andReturn(5L).anyTimes();
|
||||||
|
EasyMock.replay(file);
|
||||||
|
final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
|
||||||
|
final TestUriInputSource uriInputSource = new TestUriInputSource(
|
||||||
|
ImmutableList.of(
|
||||||
|
URI.create("http://test.com/http-test1"),
|
||||||
|
URI.create("http://test.com/http-test2"),
|
||||||
|
URI.create("http://test.com/http-test3")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
|
||||||
|
fileSource,
|
||||||
|
uriInputSource
|
||||||
|
));
|
||||||
|
Assert.assertEquals(combiningInputSource.estimateNumSplits(
|
||||||
|
new NoopInputFormat(),
|
||||||
|
new MaxSizeSplitHintSpec(
|
||||||
|
new HumanReadableBytes(5L),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
), 6);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateSplits()
|
||||||
|
{
|
||||||
|
final File file = EasyMock.niceMock(File.class);
|
||||||
|
EasyMock.expect(file.length()).andReturn(30L).anyTimes();
|
||||||
|
EasyMock.replay(file);
|
||||||
|
final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
|
||||||
|
final TestUriInputSource uriInputSource = new TestUriInputSource(
|
||||||
|
ImmutableList.of(
|
||||||
|
URI.create("http://test.com/http-test3"),
|
||||||
|
URI.create("http://test.com/http-test4"),
|
||||||
|
URI.create("http://test.com/http-test5")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
|
||||||
|
fileSource,
|
||||||
|
uriInputSource
|
||||||
|
));
|
||||||
|
List<InputSplit> combinedInputSplits = combiningInputSource.createSplits(
|
||||||
|
new NoopInputFormat(),
|
||||||
|
new MaxSizeSplitHintSpec(
|
||||||
|
new HumanReadableBytes(5L),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
).collect(Collectors.toList());
|
||||||
|
Assert.assertEquals(6, combinedInputSplits.size());
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
Pair<SplittableInputSource, InputSplit> splitPair = (Pair) combinedInputSplits.get(i).get();
|
||||||
|
InputSplit<File> fileSplits = splitPair.rhs;
|
||||||
|
Assert.assertTrue(splitPair.lhs instanceof TestFileInputSource);
|
||||||
|
Assert.assertEquals(5, fileSplits.get().length());
|
||||||
|
}
|
||||||
|
for (int i = 3; i < combinedInputSplits.size(); i++) {
|
||||||
|
Pair<SplittableInputSource, InputSplit> splitPair = (Pair) combinedInputSplits.get(i).get();
|
||||||
|
InputSplit<URI> fileSplits = splitPair.rhs;
|
||||||
|
Assert.assertTrue(splitPair.lhs instanceof TestUriInputSource);
|
||||||
|
Assert.assertEquals(URI.create("http://test.com/http-test" + i), fileSplits.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithSplits()
|
||||||
|
{
|
||||||
|
final TestUriInputSource uriInputSource = new TestUriInputSource(
|
||||||
|
ImmutableList.of(
|
||||||
|
URI.create("http://test.com/http-test1"))
|
||||||
|
);
|
||||||
|
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
|
||||||
|
uriInputSource
|
||||||
|
));
|
||||||
|
InputSplit<URI> testUriSplit = new InputSplit<>(URI.create("http://test.com/http-test1"));
|
||||||
|
TestUriInputSource urlInputSourceWithSplit = (TestUriInputSource) combiningInputSource.withSplit(new InputSplit(Pair.of(
|
||||||
|
uriInputSource,
|
||||||
|
testUriSplit)));
|
||||||
|
Assert.assertEquals(uriInputSource, urlInputSourceWithSplit);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNeedsFormat()
|
||||||
|
{
|
||||||
|
final TestUriInputSource uriInputSource = new TestUriInputSource(
|
||||||
|
ImmutableList.of(
|
||||||
|
URI.create("http://test.com/http-test1")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
|
||||||
|
|
||||||
|
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
|
||||||
|
uriInputSource,
|
||||||
|
fileSource
|
||||||
|
));
|
||||||
|
Assert.assertTrue(combiningInputSource.needsFormat());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEquals()
|
||||||
|
{
|
||||||
|
EqualsVerifier.forClass(CombiningInputSource.class)
|
||||||
|
.withNonnullFields("delegates")
|
||||||
|
.usingGetClass()
|
||||||
|
.verify();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<File> generateFiles(int numFiles)
|
||||||
|
{
|
||||||
|
final List<File> files = new ArrayList<>();
|
||||||
|
for (int i = 0; i < numFiles; i++) {
|
||||||
|
final File file = EasyMock.niceMock(File.class);
|
||||||
|
EasyMock.expect(file.length()).andReturn(5L).anyTimes();
|
||||||
|
EasyMock.replay(file);
|
||||||
|
files.add(file);
|
||||||
|
}
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestFileInputSource extends AbstractInputSource implements SplittableInputSource<File>
|
||||||
|
{
|
||||||
|
private final List<File> files;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
private TestFileInputSource(@JsonProperty("files") List<File> fileList)
|
||||||
|
{
|
||||||
|
files = fileList;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public List<File> getFiles()
|
||||||
|
{
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<InputSplit<File>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
|
||||||
|
{
|
||||||
|
return files.stream().map(InputSplit::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
|
||||||
|
{
|
||||||
|
return files.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SplittableInputSource<File> withSplit(InputSplit<File> split)
|
||||||
|
{
|
||||||
|
return new TestFileInputSource(ImmutableList.of(split.get()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean needsFormat()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
TestFileInputSource that = (TestFileInputSource) o;
|
||||||
|
return Objects.equals(files, that.files);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return Objects.hash(files);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestUriInputSource extends AbstractInputSource implements SplittableInputSource<URI>
|
||||||
|
{
|
||||||
|
private final List<URI> uris;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
private TestUriInputSource(@JsonProperty("uris") List<URI> uriList)
|
||||||
|
{
|
||||||
|
uris = uriList;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public List<URI> getUris()
|
||||||
|
{
|
||||||
|
return uris;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
|
||||||
|
{
|
||||||
|
return uris.stream().map(InputSplit::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
|
||||||
|
{
|
||||||
|
return uris.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SplittableInputSource<URI> withSplit(InputSplit<URI> split)
|
||||||
|
{
|
||||||
|
return new TestUriInputSource(ImmutableList.of(split.get()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean needsFormat()
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
TestUriInputSource that = (TestUriInputSource) o;
|
||||||
|
return Objects.equals(uris, that.uris);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return Objects.hash(uris);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1368,6 +1368,48 @@ Compared to the other native batch InputSources, SQL InputSource behaves differe
|
||||||
* Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`.
|
* Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`.
|
||||||
|
|
||||||
|
|
||||||
|
### Combining Input Source
|
||||||
|
|
||||||
|
The Combining input source is used to read data from multiple InputSources. This input source should be only used if all the delegate input sources are
|
||||||
|
_splittable_ and can be used by the [Parallel task](#parallel-task). This input source will identify the splits from its delegates and each split will be processed by a worker task. Similar to other input sources, this input source supports a single `inputFormat`. Therefore, please note that delegate input sources requiring an `inputFormat` must have the same format for input data.
|
||||||
|
|
||||||
|
|property|description|required?|
|
||||||
|
|--------|-----------|---------|
|
||||||
|
|type|This should be "combining".|Yes|
|
||||||
|
|delegates|List of _splittable_ InputSources to read data from.|Yes|
|
||||||
|
|
||||||
|
Sample spec:
|
||||||
|
|
||||||
|
|
||||||
|
```json
|
||||||
|
...
|
||||||
|
"ioConfig": {
|
||||||
|
"type": "index_parallel",
|
||||||
|
"inputSource": {
|
||||||
|
"type": "combining",
|
||||||
|
"delegates" : [
|
||||||
|
{
|
||||||
|
"type": "local",
|
||||||
|
"filter" : "*.csv",
|
||||||
|
"baseDir": "/data/directory",
|
||||||
|
"files": ["/bar/foo", "/foo/bar"]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "druid",
|
||||||
|
"dataSource": "wikipedia",
|
||||||
|
"interval": "2013-01-01/2013-01-02"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"inputFormat": {
|
||||||
|
"type": "csv"
|
||||||
|
},
|
||||||
|
...
|
||||||
|
},
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
###
|
###
|
||||||
|
|
||||||
## Firehoses (Deprecated)
|
## Firehoses (Deprecated)
|
||||||
|
|
|
@ -1,58 +1,61 @@
|
||||||
{
|
{
|
||||||
"type" : "index",
|
"type": "index_parallel",
|
||||||
"spec" : {
|
"spec": {
|
||||||
"dataSchema" : {
|
"dataSchema": {
|
||||||
"dataSource" : "updates-tutorial",
|
"dataSource": "updates-tutorial",
|
||||||
"parser" : {
|
"dimensionsSpec": {
|
||||||
"type" : "string",
|
"dimensions": [
|
||||||
"parseSpec" : {
|
"animal"
|
||||||
"format" : "json",
|
]
|
||||||
"dimensionsSpec" : {
|
|
||||||
"dimensions" : [
|
|
||||||
"animal"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"timestampSpec": {
|
|
||||||
"column": "timestamp",
|
|
||||||
"format": "iso"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"metricsSpec" : [
|
"timestampSpec": {
|
||||||
{ "type" : "count", "name" : "count" },
|
"column": "timestamp",
|
||||||
{ "type" : "longSum", "name" : "number", "fieldName" : "number" }
|
"format": "iso"
|
||||||
|
},
|
||||||
|
"metricsSpec": [
|
||||||
|
{ "type": "count", "name": "count"},
|
||||||
|
{ "type": "longSum", "name": "number", "fieldName": "number"}
|
||||||
],
|
],
|
||||||
"granularitySpec" : {
|
"granularitySpec": {
|
||||||
"type" : "uniform",
|
"type": "uniform",
|
||||||
"segmentGranularity" : "week",
|
"segmentGranularity": "week",
|
||||||
"queryGranularity" : "minute",
|
"queryGranularity": "minute",
|
||||||
"intervals" : ["2018-01-01/2018-01-03"],
|
"intervals": ["2018-01-01/2018-01-03"],
|
||||||
"rollup" : true
|
"rollup": true
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"ioConfig" : {
|
"ioConfig": {
|
||||||
"type" : "index",
|
"type": "index_parallel",
|
||||||
"firehose" : {
|
"inputSource": {
|
||||||
"type": "combining",
|
"type": "combining",
|
||||||
"delegates": [
|
"delegates": [
|
||||||
{
|
{
|
||||||
"type" : "ingestSegment",
|
"type": "druid",
|
||||||
"dataSource" : "updates-tutorial",
|
"dataSource": "updates-tutorial",
|
||||||
"interval" : "2018-01-01/2018-01-03"
|
"interval": "2018-01-01/2018-01-03"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"type" : "local",
|
"type": "local",
|
||||||
"baseDir" : "quickstart/tutorial",
|
"baseDir": "quickstart/tutorial",
|
||||||
"filter" : "updates-data3.json"
|
"filter": "updates-data3.json"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"appendToExisting" : false
|
"inputFormat": {
|
||||||
|
"type": "json"
|
||||||
|
},
|
||||||
|
"appendToExisting": false
|
||||||
},
|
},
|
||||||
"tuningConfig" : {
|
"tuningConfig": {
|
||||||
"type" : "index",
|
"type": "index_parallel",
|
||||||
"maxRowsPerSegment" : 5000000,
|
"maxRowsPerSegment": 5000000,
|
||||||
"maxRowsInMemory" : 25000
|
"maxRowsInMemory": 25000,
|
||||||
|
"maxNumConcurrentSubTasks": 2,
|
||||||
|
"forceGuaranteedRollup": true,
|
||||||
|
"partitionsSpec": {
|
||||||
|
"type": "hashed",
|
||||||
|
"numShards": 1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.tests.indexer;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||||
|
import org.apache.druid.tests.TestNGGroup;
|
||||||
|
import org.testng.annotations.Guice;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE})
|
||||||
|
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||||
|
public class ITCombiningInputSourceParallelIndexTest extends AbstractITBatchIndexTest
|
||||||
|
{
|
||||||
|
private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json";
|
||||||
|
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
|
||||||
|
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
|
||||||
|
|
||||||
|
private static final String COMBINING_INDEX_TASK = "/indexer/wikipedia_combining_input_source_index_parallel_task.json";
|
||||||
|
private static final String COMBINING_QUERIES_RESOURCE = "/indexer/wikipedia_combining_firehose_index_queries.json";
|
||||||
|
private static final String COMBINING_INDEX_DATASOURCE = "wikipedia_comb_index_test";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndexData() throws Exception
|
||||||
|
{
|
||||||
|
Map inputFormatMap = new ImmutableMap.Builder<String, Object>().put("type", "json")
|
||||||
|
.build();
|
||||||
|
try (
|
||||||
|
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
|
||||||
|
final Closeable ignored2 = unloader(COMBINING_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
|
||||||
|
) {
|
||||||
|
final Function<String, String> combiningInputSourceSpecTransform = spec -> {
|
||||||
|
try {
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%PARTITIONS_SPEC%%",
|
||||||
|
jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null))
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%INPUT_SOURCE_FILTER%%",
|
||||||
|
"wikipedia_index_data*"
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%INPUT_SOURCE_BASE_DIR%%",
|
||||||
|
"/resources/data/batch_index/json"
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%INPUT_FORMAT%%",
|
||||||
|
jsonMapper.writeValueAsString(inputFormatMap)
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%APPEND_TO_EXISTING%%",
|
||||||
|
jsonMapper.writeValueAsString(false)
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%FORCE_GUARANTEED_ROLLUP%%",
|
||||||
|
jsonMapper.writeValueAsString(false)
|
||||||
|
);
|
||||||
|
spec = StringUtils.replace(
|
||||||
|
spec,
|
||||||
|
"%%COMBINING_DATASOURCE%%",
|
||||||
|
INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()
|
||||||
|
);
|
||||||
|
return spec;
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
doIndexTest(
|
||||||
|
INDEX_DATASOURCE,
|
||||||
|
INDEX_TASK,
|
||||||
|
combiningInputSourceSpecTransform,
|
||||||
|
INDEX_QUERIES_RESOURCE,
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
doIndexTest(
|
||||||
|
COMBINING_INDEX_DATASOURCE,
|
||||||
|
COMBINING_INDEX_TASK,
|
||||||
|
combiningInputSourceSpecTransform,
|
||||||
|
COMBINING_QUERIES_RESOURCE,
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
{
|
||||||
|
"type": "index_parallel",
|
||||||
|
"spec": {
|
||||||
|
"dataSchema": {
|
||||||
|
"dataSource": "%%DATASOURCE%%",
|
||||||
|
"timestampSpec": {
|
||||||
|
"column": "timestamp"
|
||||||
|
},
|
||||||
|
"metricsSpec": [
|
||||||
|
{
|
||||||
|
"type": "count",
|
||||||
|
"name": "count"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "added",
|
||||||
|
"fieldName": "added"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "deleted",
|
||||||
|
"fieldName": "deleted"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "delta",
|
||||||
|
"fieldName": "delta"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "thetaSketch",
|
||||||
|
"type": "thetaSketch",
|
||||||
|
"fieldName": "user"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "quantilesDoublesSketch",
|
||||||
|
"type": "quantilesDoublesSketch",
|
||||||
|
"fieldName": "delta"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "HLLSketchBuild",
|
||||||
|
"type": "HLLSketchBuild",
|
||||||
|
"fieldName": "user"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"granularitySpec": {
|
||||||
|
"segmentGranularity": "DAY",
|
||||||
|
"queryGranularity": "second",
|
||||||
|
"intervals" : [ "2013-08-31/2013-09-02" ]
|
||||||
|
},
|
||||||
|
"dimensionsSpec": {
|
||||||
|
"dimensions": [
|
||||||
|
"page",
|
||||||
|
{"type": "string", "name": "language", "createBitmapIndex": false},
|
||||||
|
"user",
|
||||||
|
"unpatrolled",
|
||||||
|
"newPage",
|
||||||
|
"robot",
|
||||||
|
"anonymous",
|
||||||
|
"namespace",
|
||||||
|
"continent",
|
||||||
|
"country",
|
||||||
|
"region",
|
||||||
|
"city"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"ioConfig": {
|
||||||
|
"type": "index_parallel",
|
||||||
|
"inputSource": {
|
||||||
|
"type": "combining",
|
||||||
|
"delegates": [
|
||||||
|
{
|
||||||
|
"type": "local",
|
||||||
|
"baseDir": "/resources/indexer",
|
||||||
|
"filter": "wikipedia_combining_index_data.json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "druid",
|
||||||
|
"dataSource": "%%COMBINING_DATASOURCE%%",
|
||||||
|
"interval": "2013-08-31/2013-09-02"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"appendToExisting": %%APPEND_TO_EXISTING%%,
|
||||||
|
"inputFormat": %%INPUT_FORMAT%%
|
||||||
|
},
|
||||||
|
"tuningConfig": {
|
||||||
|
"type": "index_parallel",
|
||||||
|
"maxNumConcurrentSubTasks": 4,
|
||||||
|
"splitHintSpec": {
|
||||||
|
"type": "maxSize",
|
||||||
|
"maxNumFiles": 1
|
||||||
|
},
|
||||||
|
"forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%,
|
||||||
|
"partitionsSpec": %%PARTITIONS_SPEC%%
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue