implement FiniteFirehoseFactory in InlineFirehose (#8682)

* implement FiniteFirehoseFactory in InlineFirehose

* override isSplittable in InlineFirehoseFactory & improve tests
This commit is contained in:
Aditya 2019-10-17 07:58:55 +05:30 committed by Fangjin Yang
parent 9f4e11df32
commit 75527f09cd
2 changed files with 49 additions and 2 deletions

View File

@ -22,19 +22,21 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Stream;
/** /**
* Creates firehose that produces data inlined in its own spec * Creates firehose that produces data inlined in its own spec
*/ */
public class InlineFirehoseFactory implements FirehoseFactory<StringInputRowParser> public class InlineFirehoseFactory implements FiniteFirehoseFactory<StringInputRowParser, String>
{ {
private final String data; private final String data;
@ -74,4 +76,28 @@ public class InlineFirehoseFactory implements FirehoseFactory<StringInputRowPars
{ {
return Objects.hash(data); return Objects.hash(data);
} }
@Override
public boolean isSplittable()
{
return false;
}
@Override
public Stream<InputSplit<String>> getSplits()
{
return Stream.of(new InputSplit<>(data));
}
@Override
public int getNumSplits()
{
return 1;
}
@Override
public FiniteFirehoseFactory<StringInputRowParser, String> withSplit(InputSplit<String> split)
{
return new InlineFirehoseFactory(split.get());
}
} }

View File

@ -20,8 +20,10 @@
package org.apache.druid.segment.realtime.firehose; package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser;
@ -37,6 +39,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
@SuppressWarnings({"NullableProblems", "ConstantConditions"}) @SuppressWarnings({"NullableProblems", "ConstantConditions"})
public class InlineFirehoseFactoryTest public class InlineFirehoseFactoryTest
@ -77,6 +80,14 @@ public class InlineFirehoseFactoryTest
target = new InlineFirehoseFactory(DATA); target = new InlineFirehoseFactory(DATA);
} }
@Test
public void testInterfaceImplementation()
{
Assert.assertTrue(target instanceof FiniteFirehoseFactory);
Assert.assertFalse(target.isSplittable());
Assert.assertEquals(1, target.getNumSplits());
}
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void testContstructorDataRequired() public void testContstructorDataRequired()
{ {
@ -101,6 +112,16 @@ public class InlineFirehoseFactoryTest
Assert.assertEquals(VALUE, values.get(0)); Assert.assertEquals(VALUE, values.get(0));
} }
@Test
public void testForcedSplitAndClone()
{
Optional<InputSplit<String>> inputSplitOptional = target.getSplits().findFirst();
Assert.assertTrue(inputSplitOptional.isPresent());
FiniteFirehoseFactory<StringInputRowParser, String> cloneWithSplit = target.withSplit(inputSplitOptional.get());
Assert.assertTrue(cloneWithSplit instanceof InlineFirehoseFactory);
Assert.assertEquals(DATA, ((InlineFirehoseFactory) cloneWithSplit).getData());
}
@Test @Test
public void testSerde() throws IOException public void testSerde() throws IOException
{ {