Fix compatibility issues with SqlFirehose (#9365)

* Make SqlFirehose compatible with FiniteFirehose

* Fix build
This commit is contained in:
Atul Mohan 2020-02-14 19:45:12 -06:00 committed by GitHub
parent 2e54755a03
commit 043abd5529
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 2 deletions

View File

@ -25,8 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.prefetch.CacheManager;
import org.apache.druid.data.input.impl.prefetch.FetchConfig;
@ -49,6 +51,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* PrefetchSqlFirehoseFactory is an abstract firehose factory for reading prefetched sql resultset data. Regardless
@ -89,7 +92,7 @@ import java.util.concurrent.TimeUnit;
* and the read will fail.
*/
public abstract class PrefetchSqlFirehoseFactory<T>
implements FirehoseFactory<InputRowParser<Map<String, Object>>>
implements FiniteFirehoseFactory<InputRowParser<Map<String, Object>>, T>
{
private static final Logger LOG = new Logger(PrefetchSqlFirehoseFactory.class);
@ -236,6 +239,32 @@ public abstract class PrefetchSqlFirehoseFactory<T>
);
}
protected void initializeObjectsIfNeeded()
{
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "initObjects"));
}
}
public List<T> getObjects()
{
return objects;
}
@Override
public Stream<InputSplit<T>> getSplits(@Nullable SplitHintSpec splitHintSpec)
{
initializeObjectsIfNeeded();
return getObjects().stream().map(InputSplit::new);
}
@Override
public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
{
initializeObjectsIfNeeded();
return getObjects().size();
}
/**
* Open an input stream from the given object. The object is fetched into the file and an input
* stream to the file is provided.

View File

@ -25,6 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
@ -43,6 +46,7 @@ import java.io.InputStream;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -170,4 +174,19 @@ public class SqlFirehoseFactory extends PrefetchSqlFirehoseFactory<String>
{
return sqls;
}
@Override
public FiniteFirehoseFactory<InputRowParser<Map<String, Object>>, String> withSplit(InputSplit<String> split)
{
return new SqlFirehoseFactory(
Collections.singletonList(split.get()),
getMaxCacheCapacityBytes(),
getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(),
getFetchTimeout(),
foldCase,
sqlFirehoseDatabaseConnector,
objectMapper
);
}
}