Data loader (sampler component) (#7531)

* sampler initial check-in
fix checkstyle issues
add sampler fix to process CSV files from cache properly
change to composition and rename some classes
add tests and report num rows read and indexed
remove excludedByFilter flag and don't send filtered out data
fix tests to handle both settings for druid.generic.useDefaultValueForNull

* wrap sampler firehose in TimedShutoffFirehoseFactory to support timeouts

* code review changes - add additional comments, limit maxRows
This commit is contained in:
David Lim 2019-05-01 23:37:14 -06:00 committed by Jonathan Wei
parent ed2beb6ba5
commit ec8562c885
25 changed files with 2713 additions and 100 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
import java.io.Closeable;
@ -66,6 +67,24 @@ public interface Firehose extends Closeable
@Nullable
InputRow nextRow();
/**
* Returns an InputRowPlusRaw object containing the InputRow plus the raw, unparsed data corresponding to the next row
* available. Used in the sampler to provide the caller with information to assist in configuring a parse spec. If a
* ParseException is thrown by the parser, it should be caught and returned in the InputRowPlusRaw so we will be able
* to provide information on the raw row which failed to be parsed. Should only be called if hasMore returns true.
*
* @return an InputRowPlusRaw which may contain any of: an InputRow, the raw data, or a ParseException
*/
default InputRowPlusRaw nextRowWithRaw()
{
try {
return InputRowPlusRaw.of(nextRow(), null);
}
catch (ParseException e) {
return InputRowPlusRaw.of(null, e);
}
}
/**
* Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is
* often equivalent to everything that has been read since the last commit() call (or instantiation of the object),
@ -79,9 +98,9 @@ public interface Firehose extends Closeable
* The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
* been committed on the writer side of this interface protocol.
* <p>
* A simple implementation of this interface might do nothing when run() is called
* (in which case the same do-nothing instance can be returned every time), or
* a more complex implementation might clean up temporary resources that are no longer needed
* A simple implementation of this interface might do nothing when run() is called
* (in which case the same do-nothing instance can be returned every time), or
* a more complex implementation might clean up temporary resources that are no longer needed
* because of InputRows delivered by prior calls to {@link #nextRow()}.
* </p>
*/

View File

@ -73,6 +73,18 @@ public interface FirehoseFactory<T extends InputRowParser>
return connect(parser);
}
/**
* Initialization method that connects up the firehose. This method is intended for use by the sampler, and allows
* implementors to return a more efficient firehose, knowing that only a small number of rows will be read.
*
* @param parser an input row parser
* @param temporaryDirectory a directory where temporary files are stored
*/
default Firehose connectForSampler(T parser, @Nullable File temporaryDirectory) throws IOException, ParseException
{
return connect(parser, temporaryDirectory);
}
default boolean isSplittable()
{
return false;

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import org.apache.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
public class InputRowPlusRaw
{
@Nullable
private final InputRow inputRow;
@Nullable
private final byte[] raw;
@Nullable
private final ParseException parseException;
private InputRowPlusRaw(@Nullable InputRow inputRow, @Nullable byte[] raw, @Nullable ParseException parseException)
{
this.inputRow = inputRow;
this.raw = raw;
this.parseException = parseException;
}
@Nullable
public InputRow getInputRow()
{
return inputRow;
}
/**
* The raw, unparsed event (as opposed to an {@link InputRow} which is the output of a parser). The interface default
* for {@link Firehose#nextRowWithRaw()} sets this to null, so this will only be non-null if nextRowWithRaw() is
* overridden by an implementation, such as in
* {@link org.apache.druid.data.input.impl.FileIteratingFirehose#nextRowWithRaw()}. Note that returning the raw row
* does not make sense for some sources (e.g. non-row based types), so clients should be able to handle this field
* being unset.
*/
@Nullable
public byte[] getRaw()
{
return raw;
}
@Nullable
public ParseException getParseException()
{
return parseException;
}
public boolean isEmpty()
{
return inputRow == null && raw == null && parseException == null;
}
public static InputRowPlusRaw of(@Nullable InputRow inputRow, @Nullable byte[] raw)
{
return new InputRowPlusRaw(inputRow, raw, null);
}
public static InputRowPlusRaw of(@Nullable byte[] raw, @Nullable ParseException parseException)
{
return new InputRowPlusRaw(null, raw, parseException);
}
}

View File

@ -22,6 +22,9 @@ package org.apache.druid.data.input.impl;
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
@ -30,8 +33,6 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
*/
public class FileIteratingFirehose implements Firehose
{
private final Iterator<LineIterator> lineIterators;
@ -81,6 +82,22 @@ public class FileIteratingFirehose implements Firehose
return parser.parse(lineIterator.next());
}
@Override
public InputRowPlusRaw nextRowWithRaw()
{
if (!hasMore()) {
throw new NoSuchElementException();
}
String raw = lineIterator.next();
try {
return InputRowPlusRaw.of(parser.parse(raw), StringUtils.toUtf8(raw));
}
catch (ParseException e) {
return InputRowPlusRaw.of(StringUtils.toUtf8(raw), e);
}
}
private LineIterator getNextLineIterator()
{
if (lineIterator != null) {

View File

@ -47,7 +47,7 @@ public class TimestampSpec
private final String timestampColumn;
private final String timestampFormat;
// this value should never be set for production data
// this value should never be set for production data; the data loader uses it before a timestamp column is chosen
private final DateTime missingValue;
/** This field is a derivative of {@link #timestampFormat}; not checked in {@link #equals} and {@link #hashCode} */
private final Function<Object, DateTime> timestampConverter;
@ -59,7 +59,7 @@ public class TimestampSpec
public TimestampSpec(
@JsonProperty("column") String timestampColumn,
@JsonProperty("format") String format,
// this value should never be set for production data
// this value should never be set for production data; the data loader uses it before a timestamp column is chosen
@JsonProperty("missingValue") DateTime missingValue
)
{

View File

@ -91,6 +91,9 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
{
private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class);
private static final CacheManager DISABLED_CACHE_MANAGER = new CacheManager(0);
private static final PrefetchConfig DISABLED_PREFETCH_CONFIG = new PrefetchConfig(0L, 0L, 0L, 0L);
public static final int DEFAULT_MAX_FETCH_RETRY = 3;
private final CacheManager<T> cacheManager;
@ -157,6 +160,22 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
@Override
public Firehose connect(StringInputRowParser firehoseParser, @Nullable File temporaryDirectory) throws IOException
{
return connectInternal(firehoseParser, temporaryDirectory, this.prefetchConfig, this.cacheManager);
}
@Override
public Firehose connectForSampler(StringInputRowParser parser, @Nullable File temporaryDirectory) throws IOException
{
return connectInternal(parser, temporaryDirectory, DISABLED_PREFETCH_CONFIG, DISABLED_CACHE_MANAGER);
}
private Firehose connectInternal(
StringInputRowParser firehoseParser,
@Nullable File temporaryDirectory,
PrefetchConfig prefetchConfig,
CacheManager cacheManager
) throws IOException
{
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects"));

View File

@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.Parser;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
public class FirehoseSampler
{
private static final EmittingLogger log = new EmittingLogger(FirehoseSampler.class);
// These are convenience shims to allow the data loader to not need to provide a dummy parseSpec during the early
// stages when the parameters for the parseSpec are still unknown and they are only interested in the unparsed rows.
// We need two of these because firehose factories based on AbstractTextFilesFirehoseFactory expect to be used with
// StringInputRowParser, while all the others expect InputRowParser.
// ---------------------------
private static final InputRowParser EMPTY_STRING_PARSER_SHIM = new StringInputRowParser(
new ParseSpec(new TimestampSpec(null, null, DateTimes.EPOCH), new DimensionsSpec(null))
{
@Override
public Parser<String, Object> makeParser()
{
return new Parser<String, Object>()
{
@Nullable
@Override
public Map<String, Object> parseToMap(String input)
{
throw new ParseException(null);
}
@Override
public void setFieldNames(Iterable<String> fieldNames)
{
}
@Override
public List<String> getFieldNames()
{
return ImmutableList.of();
}
};
}
}, null);
private static final InputRowParser EMPTY_PARSER_SHIM = new InputRowParser()
{
@Override
public ParseSpec getParseSpec()
{
return null;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return null;
}
@Override
public List<InputRow> parseBatch(Object input)
{
throw new ParseException(null);
}
@Override
public InputRow parse(Object input)
{
throw new ParseException(null);
}
};
// ---------------------------
// We want to be able to sort the list of processed results back into the same order that we read them from the
// firehose so that the rows in the data loader are not always changing. To do this, we add a temporary column to the
// InputRow (in SamplerInputRow) and tag each row with a sortKey. We use an aggregator so that it will not affect
// rollup, and we use a longMin aggregator so that as rows get rolled up, the earlier rows stay stable and later
// rows may get rolled into these rows. After getting the results back from the IncrementalIndex, we sort by this
// column and then exclude it from the response.
private static final AggregatorFactory INTERNAL_ORDERING_AGGREGATOR = new LongMinAggregatorFactory(
SamplerInputRow.SAMPLER_ORDERING_COLUMN,
SamplerInputRow.SAMPLER_ORDERING_COLUMN
);
private final ObjectMapper objectMapper;
private final SamplerCache samplerCache;
@Inject
public FirehoseSampler(ObjectMapper objectMapper, SamplerCache samplerCache)
{
this.objectMapper = objectMapper;
this.samplerCache = samplerCache;
}
public SamplerResponse sample(FirehoseFactory firehoseFactory, DataSchema dataSchema, SamplerConfig samplerConfig)
{
Preconditions.checkNotNull(firehoseFactory, "firehoseFactory required");
if (dataSchema == null) {
dataSchema = new DataSchema("sampler", null, null, null, null, objectMapper);
}
if (samplerConfig == null) {
samplerConfig = SamplerConfig.empty();
}
final InputRowParser parser = dataSchema.getParser() != null
? dataSchema.getParser()
: (firehoseFactory instanceof AbstractTextFilesFirehoseFactory
? EMPTY_STRING_PARSER_SHIM
: EMPTY_PARSER_SHIM);
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withTimestampSpec(parser)
.withQueryGranularity(dataSchema.getGranularitySpec().getQueryGranularity())
.withDimensionsSpec(parser)
.withMetrics(ArrayUtils.addAll(dataSchema.getAggregators(), INTERNAL_ORDERING_AGGREGATOR))
.withRollup(dataSchema.getGranularitySpec().isRollup())
.build();
FirehoseFactory myFirehoseFactory = null;
boolean usingCachedData = true;
if (!samplerConfig.isSkipCache() && samplerConfig.getCacheKey() != null) {
myFirehoseFactory = samplerCache.getAsFirehoseFactory(samplerConfig.getCacheKey(), parser);
}
if (myFirehoseFactory == null) {
myFirehoseFactory = firehoseFactory;
usingCachedData = false;
}
if (samplerConfig.getTimeoutMs() > 0) {
myFirehoseFactory = new TimedShutoffFirehoseFactory(
myFirehoseFactory,
DateTimes.nowUtc().plusMillis(samplerConfig.getTimeoutMs())
);
}
final File tempDir = Files.createTempDir();
try (final Firehose firehose = myFirehoseFactory.connectForSampler(parser, tempDir);
final IncrementalIndex index = new IncrementalIndex.Builder().setIndexSchema(indexSchema)
.setMaxRowCount(samplerConfig.getNumRows())
.buildOnheap()) {
List<byte[]> dataToCache = new ArrayList<>();
SamplerResponse.SamplerResponseRow responseRows[] = new SamplerResponse.SamplerResponseRow[samplerConfig.getNumRows()];
int counter = 0, numRowsIndexed = 0;
while (counter < responseRows.length && firehose.hasMore()) {
String raw = null;
try {
final InputRowPlusRaw row = firehose.nextRowWithRaw();
if (row == null || row.isEmpty()) {
continue;
}
if (row.getRaw() != null) {
raw = StringUtils.fromUtf8(row.getRaw());
if (!usingCachedData) {
dataToCache.add(row.getRaw());
}
}
if (row.getParseException() != null) {
throw row.getParseException();
}
if (row.getInputRow() == null) {
continue;
}
if (!Intervals.ETERNITY.contains(row.getInputRow().getTimestamp())) {
throw new ParseException("Timestamp cannot be represented as a long: [%s]", row.getInputRow());
}
IncrementalIndexAddResult result = index.add(new SamplerInputRow(row.getInputRow(), counter), true);
if (result.getParseException() != null) {
throw result.getParseException();
} else {
// store the raw value; will be merged with the data from the IncrementalIndex later
responseRows[counter] = new SamplerResponse.SamplerResponseRow(raw, null, null, null);
counter++;
numRowsIndexed++;
}
}
catch (ParseException e) {
responseRows[counter] = new SamplerResponse.SamplerResponseRow(raw, null, true, e.getMessage());
counter++;
}
}
final List<String> columnNames = index.getColumnNames();
columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
for (Row row : (Iterable<Row>) index) {
Map<String, Object> parsed = new HashMap<>();
columnNames.forEach(k -> {
if (row.getRaw(k) != null) {
parsed.put(k, row.getRaw(k));
}
});
parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
if (sortKey != null) {
responseRows[sortKey.intValue()] = responseRows[sortKey.intValue()].withParsed(parsed);
}
}
// cache raw data if available
String cacheKey = usingCachedData ? samplerConfig.getCacheKey() : null;
if (!samplerConfig.isSkipCache() && !dataToCache.isEmpty()) {
cacheKey = samplerCache.put(UUIDUtils.generateUuid(), dataToCache);
}
return new SamplerResponse(
cacheKey,
counter,
numRowsIndexed,
Arrays.stream(responseRows)
.filter(Objects::nonNull)
.filter(x -> x.getParsed() != null || x.isUnparseable() != null)
.collect(Collectors.toList())
);
}
catch (Exception e) {
throw new SamplerException(e, "Failed to sample data: %s", e.getMessage());
}
finally {
try {
FileUtils.deleteDirectory(tempDir);
}
catch (IOException e) {
log.warn(e, "Failed to cleanup temporary directory");
}
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.segment.indexing.DataSchema;
public class IndexTaskSamplerSpec implements SamplerSpec
{
private final DataSchema dataSchema;
private final FirehoseFactory firehoseFactory;
private final SamplerConfig samplerConfig;
private final FirehoseSampler firehoseSampler;
@JsonCreator
public IndexTaskSamplerSpec(
@JsonProperty("spec") final IndexTask.IndexIngestionSpec ingestionSpec,
@JsonProperty("samplerConfig") final SamplerConfig samplerConfig,
@JacksonInject FirehoseSampler firehoseSampler
)
{
this.dataSchema = Preconditions.checkNotNull(ingestionSpec, "[spec] is required").getDataSchema();
Preconditions.checkNotNull(ingestionSpec.getIOConfig(), "[spec.ioConfig] is required");
this.firehoseFactory = Preconditions.checkNotNull(
ingestionSpec.getIOConfig().getFirehoseFactory(),
"[spec.ioConfig.firehose] is required"
);
this.samplerConfig = samplerConfig;
this.firehoseSampler = firehoseSampler;
}
@Override
public SamplerResponse sample()
{
return firehoseSampler.sample(firehoseFactory, dataSchema, samplerConfig);
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.data.input.ByteBufferInputRowParser;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
public class SamplerCache
{
private static final EmittingLogger log = new EmittingLogger(SamplerCache.class);
private static final String NAMESPACE = "sampler";
private final Cache cache;
@Inject
public SamplerCache(Cache cache)
{
this.cache = cache;
}
@Nullable
public String put(String key, Collection<byte[]> values)
{
if (values == null) {
return null;
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(new ArrayList<>(values));
cache.put(new Cache.NamedKey(NAMESPACE, StringUtils.toUtf8(key)), baos.toByteArray());
return key;
}
catch (IOException e) {
log.warn(e, "Exception while serializing to sampler cache");
return null;
}
}
@Nullable
public FirehoseFactory getAsFirehoseFactory(String key, InputRowParser parser)
{
if (!(parser instanceof ByteBufferInputRowParser)) {
log.warn("SamplerCache expects a ByteBufferInputRowParser");
return null;
}
Collection<byte[]> data = get(key);
if (data == null) {
return null;
}
return new FirehoseFactory<ByteBufferInputRowParser>()
{
@Override
public Firehose connect(ByteBufferInputRowParser parser, @Nullable File temporaryDirectory)
{
return new SamplerCacheFirehose(parser, data);
}
};
}
@Nullable
private Collection<byte[]> get(String key)
{
byte[] data = cache.get(new Cache.NamedKey(NAMESPACE, StringUtils.toUtf8(key)));
if (data == null) {
return null;
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais)) {
return (ArrayList) ois.readObject();
}
catch (Exception e) {
log.warn(e, "Exception while deserializing from sampler cache");
return null;
}
}
public static class SamplerCacheFirehose implements Firehose
{
private final ByteBufferInputRowParser parser;
private final Iterator<byte[]> it;
public SamplerCacheFirehose(ByteBufferInputRowParser parser, Collection<byte[]> data)
{
this.parser = parser;
this.it = data != null ? data.iterator() : Collections.emptyIterator();
if (parser instanceof StringInputRowParser) {
((StringInputRowParser) parser).startFileFromBeginning();
}
}
@Override
public boolean hasMore()
{
return it.hasNext();
}
@Nullable
@Override
public InputRow nextRow()
{
if (!hasMore()) {
throw new NoSuchElementException();
}
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(it.next()));
return rows.isEmpty() ? null : rows.get(0);
}
@Override
public InputRowPlusRaw nextRowWithRaw()
{
if (!hasMore()) {
throw new NoSuchElementException();
}
byte[] raw = it.next();
try {
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(raw));
return InputRowPlusRaw.of(rows.isEmpty() ? null : rows.get(0), raw);
}
catch (ParseException e) {
return InputRowPlusRaw.of(raw, e);
}
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close()
{
}
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
public class SamplerConfig
{
private static final int DEFAULT_NUM_ROWS = 200;
private static final int MAX_NUM_ROWS = 5000;
private static final boolean DEFAULT_SKIP_CACHE = false;
private static final int DEFAULT_TIMEOUT_MS = 10000;
private final int numRows;
private final String cacheKey;
private final boolean skipCache;
private final int timeoutMs;
@JsonCreator
public SamplerConfig(
@JsonProperty("numRows") Integer numRows,
@JsonProperty("cacheKey") String cacheKey,
@JsonProperty("skipCache") Boolean skipCache,
@JsonProperty("timeoutMs") Integer timeoutMs
)
{
this.numRows = numRows != null ? numRows : DEFAULT_NUM_ROWS;
this.cacheKey = cacheKey;
this.skipCache = skipCache != null ? skipCache : DEFAULT_SKIP_CACHE;
this.timeoutMs = timeoutMs != null ? timeoutMs : DEFAULT_TIMEOUT_MS;
Preconditions.checkArgument(this.numRows <= MAX_NUM_ROWS, "numRows must be <= %s", MAX_NUM_ROWS);
}
/**
* The maximum number of rows to return in a response. The actual number of returned rows may be less if:
* - The sampled source contains less data.
* - We are reading from the cache ({@link SamplerConfig#cacheKey} is set and {@link SamplerConfig#isSkipCache()}
* is false) and the cache contains less data.
* - {@link SamplerConfig#timeoutMs} elapses before this value is reached.
* - {@link org.apache.druid.segment.indexing.granularity.GranularitySpec#isRollup()} is true and input rows get
* rolled-up into fewer indexed rows.
*
* @return maximum number of sampled rows to return
*/
public int getNumRows()
{
return numRows;
}
/**
* The sampler uses a best-effort system to attempt to cache the raw data so that future requests to the sampler
* can be answered without reading again from the source. In addition to responsiveness benefits, this also provides a
* better user experience for sources such as streams, where repeated calls to the sampler (which would happen as the
* user tweaks data schema configurations) would otherwise return a different set of sampled data every time. For the
* caching system to work, 1) the sampler must have access to the raw data (e.g. for {@link FirehoseSampler},
* {@link org.apache.druid.data.input.InputRowPlusRaw#getRaw()} must be non-null) and 2) the parser must be an
* implementation of {@link org.apache.druid.data.input.ByteBufferInputRowParser} since the data is cached as a byte
* array. If these conditions are not satisfied, the cache returns a miss and the sampler would read from source.
* <p>
* {@link SamplerResponse} returns a {@link SamplerResponse#cacheKey} which should be supplied here in
* {@link SamplerConfig} for future requests to prefer the cache if available. This field is ignored if
* {@link SamplerConfig#skipCache} is true.
*
* @return key to use for locating previously cached raw data
*/
public String getCacheKey()
{
return cacheKey;
}
/**
* Whether to read/write to the cache. See cache description in {@link SamplerConfig#getCacheKey()}.
*
* @return true if cache reads and writes should be skipped
*/
public boolean isSkipCache()
{
return skipCache;
}
/**
* Time to wait in milliseconds before closing the sampler and returning the data which has already been read.
* Particularly useful for handling streaming input sources where the rate of data is unknown, to prevent the sampler
* from taking an excessively long time trying to reach {@link SamplerConfig#numRows}.
*
* @return timeout in milliseconds
*/
public int getTimeoutMs()
{
return timeoutMs;
}
public static SamplerConfig empty()
{
return new SamplerConfig(null, null, null, null);
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import org.apache.druid.java.util.common.StringUtils;
public class SamplerException extends RuntimeException
{
public SamplerException(Throwable cause, String formatText, Object... arguments)
{
super(StringUtils.nonStrictFormat(formatText, arguments), cause);
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.google.common.collect.ImmutableMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
@Provider
public class SamplerExceptionMapper implements ExceptionMapper<SamplerException>
{
@Override
public Response toResponse(SamplerException exception)
{
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of(
"error",
exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage()
))
.build();
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
public class SamplerInputRow implements InputRow
{
public static final String SAMPLER_ORDERING_COLUMN = "__internal_sampler_order";
private final InputRow row;
private final int sortKey;
public SamplerInputRow(InputRow row, int sortKey)
{
this.row = row;
this.sortKey = sortKey;
}
@Override
public List<String> getDimensions()
{
return row.getDimensions();
}
@Override
public long getTimestampFromEpoch()
{
return row.getTimestampFromEpoch();
}
@Override
public DateTime getTimestamp()
{
return row.getTimestamp();
}
@Override
public List<String> getDimension(String dimension)
{
return row.getDimension(dimension);
}
@Nullable
@Override
public Object getRaw(String dimension)
{
return SAMPLER_ORDERING_COLUMN.equals(dimension) ? sortKey : row.getRaw(dimension);
}
@Nullable
@Override
public Number getMetric(String metric)
{
return SAMPLER_ORDERING_COLUMN.equals(metric) ? sortKey : row.getMetric(metric);
}
@Override
public int compareTo(Row o)
{
return row.compareTo(o);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SamplerInputRow that = (SamplerInputRow) o;
return sortKey == that.sortKey && Objects.equals(row, that.row);
}
@Override
public int hashCode()
{
return Objects.hash(row, sortKey);
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Singleton;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.initialization.DruidModule;
import java.util.List;
public class SamplerModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(IndexTaskSamplerSpec.class, "index")
)
);
}
@Override
public void configure(Binder binder)
{
Jerseys.addResource(binder, SamplerResource.class);
binder.install(new CacheModule());
binder.bind(FirehoseSampler.class).in(Singleton.class);
binder.bind(SamplerExceptionMapper.class).in(Singleton.class);
binder.bind(SamplerCache.class).in(Singleton.class);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.google.common.base.Preconditions;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.server.http.security.StateResourceFilter;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/druid/indexer/v1/sampler")
public class SamplerResource
{
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public SamplerResponse post(final SamplerSpec sampler)
{
return Preconditions.checkNotNull(sampler, "Request body cannot be empty").sample();
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SamplerResponse
{
private final String cacheKey;
private final Integer numRowsRead;
private final Integer numRowsIndexed;
private final List<SamplerResponseRow> data;
public SamplerResponse(String cacheKey, Integer numRowsRead, Integer numRowsIndexed, List<SamplerResponseRow> data)
{
this.cacheKey = cacheKey;
this.numRowsRead = numRowsRead;
this.numRowsIndexed = numRowsIndexed;
this.data = data;
}
@JsonProperty
public String getCacheKey()
{
return cacheKey;
}
@JsonProperty
public Integer getNumRowsRead()
{
return numRowsRead;
}
@JsonProperty
public Integer getNumRowsIndexed()
{
return numRowsIndexed;
}
@JsonProperty
public List<SamplerResponseRow> getData()
{
return data;
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class SamplerResponseRow
{
private final String raw;
private final Map<String, Object> parsed;
private final Boolean unparseable;
private final String error;
public SamplerResponseRow(
String raw,
Map<String, Object> parsed,
Boolean unparseable,
String error
)
{
this.raw = raw;
this.parsed = parsed;
this.unparseable = unparseable;
this.error = error;
}
@JsonProperty
public String getRaw()
{
return raw;
}
@JsonProperty
public Map<String, Object> getParsed()
{
return parsed;
}
@JsonProperty
public Boolean isUnparseable()
{
return unparseable;
}
@JsonProperty
public String getError()
{
return error;
}
public SamplerResponseRow withParsed(Map<String, Object> parsed)
{
return new SamplerResponseRow(raw, parsed, unparseable, error);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SamplerResponseRow that = (SamplerResponseRow) o;
return Objects.equals(raw, that.raw) &&
Objects.equals(parsed, that.parsed) &&
Objects.equals(unparseable, that.unparseable) &&
Objects.equals(error, that.error);
}
@Override
public int hashCode()
{
return Objects.hash(raw, parsed, unparseable, error);
}
@Override
public String toString()
{
return "SamplerResponseRow{" +
"raw='" + raw + '\'' +
", parsed=" + parsed +
", unparseable=" + unparseable +
", error='" + error + '\'' +
'}';
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface SamplerSpec
{
SamplerResponse sample();
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
public class TestFirehose implements Firehose
{
public static class TestFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private boolean waitForClose = true;
private List<Object> seedRows;
public TestFirehoseFactory() {}
public TestFirehoseFactory(boolean waitForClose, List<Object> seedRows)
{
this.waitForClose = waitForClose;
this.seedRows = seedRows;
}
@Override
@SuppressWarnings("unchecked")
public Firehose connect(InputRowParser parser, File temporaryDirectory) throws ParseException
{
return new TestFirehose(parser, waitForClose, seedRows);
}
}
public static class TestAbstractTextFilesFirehoseFactory extends AbstractTextFilesFirehoseFactory
{
private boolean waitForClose;
private List<Object> seedRows;
public TestAbstractTextFilesFirehoseFactory(boolean waitForClose, List<Object> seedRows)
{
this.waitForClose = waitForClose;
this.seedRows = seedRows;
}
@Override
@SuppressWarnings("unchecked")
public Firehose connect(StringInputRowParser parser, File temporaryDirectory) throws ParseException
{
return new TestFirehose(parser, waitForClose, seedRows);
}
@Override
protected Collection initObjects()
{
return null;
}
@Override
protected InputStream openObjectStream(Object object)
{
return null;
}
@Override
protected InputStream wrapObjectStream(Object object, InputStream stream)
{
return null;
}
@Override
public FiniteFirehoseFactory withSplit(InputSplit split)
{
return null;
}
}
public static final String FAIL_DIM = "__fail__";
private final Deque<Optional<Object>> queue = new ArrayDeque<>();
private InputRowParser parser;
private boolean closed;
private TestFirehose(InputRowParser parser, boolean waitForClose, List<Object> seedRows)
{
this.parser = parser;
this.closed = !waitForClose;
if (parser instanceof StringInputRowParser) {
((StringInputRowParser) parser).startFileFromBeginning();
}
if (seedRows != null) {
seedRows.stream().map(Optional::ofNullable).forEach(queue::add);
}
}
public void addRows(List<Object> rows)
{
synchronized (this) {
rows.stream().map(Optional::ofNullable).forEach(queue::add);
notifyAll();
}
}
@Override
public boolean hasMore()
{
try {
synchronized (this) {
while (queue.isEmpty() && !closed) {
wait();
}
return !queue.isEmpty();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public InputRow nextRow()
{
synchronized (this) {
final InputRow row = parser instanceof StringInputRowParser
? ((StringInputRowParser) parser).parse((String) queue.removeFirst().orElse(null))
: (InputRow) parser.parseBatch(queue.removeFirst().orElse(null)).get(0);
if (row != null && row.getRaw(FAIL_DIM) != null) {
throw new ParseException(FAIL_DIM);
}
return row;
}
}
@Override
public InputRowPlusRaw nextRowWithRaw()
{
Object next = queue.removeFirst().orElse(null);
synchronized (this) {
try {
final InputRow row = parser instanceof StringInputRowParser
? ((StringInputRowParser) parser).parse((String) next)
: (InputRow) parser.parseBatch(next).get(0);
if (row != null && row.getRaw(FAIL_DIM) != null) {
throw new ParseException(FAIL_DIM);
}
return InputRowPlusRaw.of(row, next != null ? StringUtils.toUtf8(next.toString()) : null);
}
catch (ParseException e) {
return InputRowPlusRaw.of(next != null ? StringUtils.toUtf8(next.toString()) : null, e);
}
}
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close()
{
synchronized (this) {
closed = true;
notifyAll();
}
}
}

View File

@ -31,11 +31,8 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
@ -47,6 +44,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestFirehose;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@ -117,7 +115,6 @@ import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.Runnables;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
@ -134,14 +131,11 @@ import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.nio.file.Files;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@ -156,86 +150,6 @@ public class RealtimeIndexTaskTest
new NoopEmitter()
);
private static final String FAIL_DIM = "__fail__";
private static class TestFirehose implements Firehose
{
private final InputRowParser<Map<String, Object>> parser;
private final Deque<Optional<Map<String, Object>>> queue = new ArrayDeque<>();
private boolean closed = false;
public TestFirehose(final InputRowParser<Map<String, Object>> parser)
{
this.parser = parser;
}
public void addRows(List<Map<String, Object>> rows)
{
synchronized (this) {
rows.stream().map(Optional::ofNullable).forEach(queue::add);
notifyAll();
}
}
@Override
public boolean hasMore()
{
try {
synchronized (this) {
while (queue.isEmpty() && !closed) {
wait();
}
return !queue.isEmpty();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public InputRow nextRow()
{
synchronized (this) {
final InputRow row = parser.parseBatch(queue.removeFirst().orElse(null)).get(0);
if (row != null && row.getRaw(FAIL_DIM) != null) {
throw new ParseException(FAIL_DIM);
}
return row;
}
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close()
{
synchronized (this) {
closed = true;
notifyAll();
}
}
}
private static class TestFirehoseFactory implements FirehoseFactory<InputRowParser>
{
public TestFirehoseFactory()
{
}
@Override
@SuppressWarnings("unchecked")
public Firehose connect(InputRowParser parser, File temporaryDirectory) throws ParseException
{
return new TestFirehose(parser);
}
}
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@ -521,7 +435,7 @@ public class RealtimeIndexTaskTest
ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"),
// Bad row- will be unparseable.
ImmutableMap.of("dim1", "foo", "met1", 2.0, FAIL_DIM, "x"),
ImmutableMap.of("dim1", "foo", "met1", 2.0, TestFirehose.FAIL_DIM, "x"),
// Old row- will be thrownAway.
ImmutableMap.of("t", now.minus(Period.days(1)).getMillis(), "dim1", "foo", "met1", 2.0),
@ -907,7 +821,7 @@ public class RealtimeIndexTaskTest
objectMapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new TestFirehoseFactory(),
new TestFirehose.TestFirehoseFactory(),
null,
null
);

View File

@ -0,0 +1,839 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.DelimitedParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TestFirehose;
import org.apache.druid.indexing.overlord.sampler.SamplerResponse.SamplerResponseRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class FirehoseSamplerTest
{
private enum ParserType
{
MAP, STR_JSON, STR_CSV
}
private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
private static final boolean USE_DEFAULT_VALUE_FOR_NULL = Boolean.valueOf(System.getProperty(
NullHandling.NULL_HANDLING_CONFIG_STRING,
"true"
));
private static final List<Object> MAP_ROWS = ImmutableList.of(
ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo", "met1", "1"),
ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo", "met1", "2"),
ImmutableMap.of("t", "2019-04-22T12:01", "dim1", "foo", "met1", "3"),
ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo2", "met1", "4"),
ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo", "dim2", "bar", "met1", "5"),
ImmutableMap.of("t", "bad_timestamp", "dim1", "foo", "met1", "6")
);
private static final List<Object> STR_JSON_ROWS = ImmutableList.of(
"{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo\", \"met1\": 1 }",
"{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo\", \"met1\": 2 }",
"{ \"t\": \"2019-04-22T12:01\", \"dim1\": \"foo\", \"met1\": 3 }",
"{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo2\", \"met1\": 4 }",
"{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo\", \"dim2\": \"bar\", \"met1\": 5 }",
"{ \"t\": \"bad_timestamp\", \"dim1\": \"foo\", \"met1\": 6 }"
);
private static final List<Object> STR_CSV_ROWS = ImmutableList.of(
"2019-04-22T12:00,foo,,1",
"2019-04-22T12:00,foo,,2",
"2019-04-22T12:01,foo,,3",
"2019-04-22T12:00,foo2,,4",
"2019-04-22T12:00,foo,bar,5",
"bad_timestamp,foo,,6"
);
private SamplerCache samplerCache;
private FirehoseSampler firehoseSampler;
private ParserType parserType;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Parameterized.Parameters(name = "parserType = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{ParserType.MAP},
new Object[]{ParserType.STR_JSON},
new Object[]{ParserType.STR_CSV}
);
}
public FirehoseSamplerTest(ParserType parserType)
{
this.parserType = parserType;
}
@Before
public void setupTest()
{
samplerCache = new SamplerCache(MapCache.create(100000));
firehoseSampler = new FirehoseSampler(objectMapper, samplerCache);
}
@Test
public void testNoParams()
{
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("firehoseFactory required");
firehoseSampler.sample(null, null, null);
}
@Test
public void testNoDataSchema()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
SamplerResponse response = firehoseSampler.sample(firehoseFactory, null, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(0, (int) response.getNumRowsIndexed());
Assert.assertEquals(6, response.getData().size());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(0).toString(), null, true, null), data.get(0));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(1).toString(), null, true, null), data.get(1));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(2).toString(), null, true, null), data.get(2));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(3).toString(), null, true, null), data.get(3));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(4).toString(), null, true, null), data.get(4));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(5).toString(), null, true, null), data.get(5));
}
@Test
public void testNoDataSchemaNumRows()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
SamplerResponse response = firehoseSampler.sample(firehoseFactory, null, new SamplerConfig(3, null, true, null));
Assert.assertNull(response.getCacheKey());
Assert.assertEquals(3, (int) response.getNumRowsRead());
Assert.assertEquals(0, (int) response.getNumRowsIndexed());
Assert.assertEquals(3, response.getData().size());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(0).toString(), null, true, null), data.get(0));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(1).toString(), null, true, null), data.get(1));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(2).toString(), null, true, null), data.get(2));
}
@Test
public void testNoDataSchemaNumRowsCacheReplay()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
SamplerResponse response = firehoseSampler.sample(firehoseFactory, null, new SamplerConfig(3, null, false, null));
String cacheKey = response.getCacheKey();
Assert.assertNotNull(cacheKey);
Assert.assertEquals(3, (int) response.getNumRowsRead());
Assert.assertEquals(0, (int) response.getNumRowsIndexed());
Assert.assertEquals(3, response.getData().size());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(0).toString(), null, true, null), data.get(0));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(1).toString(), null, true, null), data.get(1));
Assert.assertEquals(new SamplerResponseRow(getTestRows().get(2).toString(), null, true, null), data.get(2));
response = firehoseSampler.sample(firehoseFactory, null, new SamplerConfig(3, cacheKey, false, null));
Assert.assertTrue(!isCacheable() || cacheKey.equals(response.getCacheKey()));
Assert.assertEquals(3, (int) response.getNumRowsRead());
Assert.assertEquals(0, (int) response.getNumRowsIndexed());
Assert.assertEquals(3, response.getData().size());
Assert.assertEquals(data, response.getData());
}
@Test
public void testMissingValueTimestampSpec()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(new TimestampSpec(null, null, DateTimes.of("1970")), new DimensionsSpec(null));
DataSchema dataSchema = new DataSchema("sampler", getParser(parseSpec), null, null, null, objectMapper);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(6, (int) response.getNumRowsIndexed());
Assert.assertEquals(6, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "met1", "1"),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(1).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "met1", "2"),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(2).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:01", "dim1", "foo", "met1", "3"),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo2", "met1", "4"),
null,
null
), data.get(3));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "dim2", "bar", "met1", "5"),
null,
null
), data.get(4));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
ImmutableMap.of("__time", 0L, "t", "bad_timestamp", "dim1", "foo", "met1", "6"),
null,
null
), data.get(5));
}
@Test
public void testWithTimestampSpec()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(new TimestampSpec("t", null, null), new DimensionsSpec(null));
DataSchema dataSchema = new DataSchema("sampler", getParser(parseSpec), null, null, null, objectMapper);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(6, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "1"),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(1).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "2"),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(2).toString(),
ImmutableMap.of("__time", 1555934460000L, "dim1", "foo", "met1", "3"),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", "4"),
null,
null
), data.get(3));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", "5"),
null,
null
), data.get(4));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(5));
}
@Test
public void testWithDimensionSpec()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(
new TimestampSpec("t", null, null),
new DimensionsSpec(ImmutableList.of(
StringDimensionSchema.create("dim1"),
StringDimensionSchema.create("met1")
))
);
DataSchema dataSchema = new DataSchema("sampler", getParser(parseSpec), null, null, null, objectMapper);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(6, response.getData().size());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "1"),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(1).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "2"),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(2).toString(),
ImmutableMap.of("__time", 1555934460000L, "dim1", "foo", "met1", "3"),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", "4"),
null,
null
), data.get(3));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "5"),
null,
null
), data.get(4));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(5));
}
@Test
public void testWithNoRollup()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(new TimestampSpec("t", null, null), new DimensionsSpec(null));
AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, false, null);
DataSchema dataSchema = new DataSchema(
"sampler",
getParser(parseSpec),
aggregatorFactories,
granularitySpec,
null,
objectMapper
);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(6, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 1L),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(1).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 2L),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(2).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 3L),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
null,
null
), data.get(3));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
null,
null
), data.get(4));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(5));
}
@Test
public void testWithRollup()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(new TimestampSpec("t", null, null), new DimensionsSpec(null));
AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
DataSchema dataSchema = new DataSchema(
"sampler",
getParser(parseSpec),
aggregatorFactories,
granularitySpec,
null,
objectMapper
);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(4, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(3));
}
@Test
public void testWithMoreRollup()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(
new TimestampSpec("t", null, null),
new DimensionsSpec(ImmutableList.of(StringDimensionSchema.create("dim1")))
);
AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
DataSchema dataSchema = new DataSchema(
"sampler",
getParser(parseSpec),
aggregatorFactories,
granularitySpec,
null,
objectMapper
);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(3, response.getData().size());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 11L),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(2));
}
@Test
public void testWithMoreRollupCacheReplay()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(
new TimestampSpec("t", null, null),
new DimensionsSpec(ImmutableList.of(StringDimensionSchema.create("dim1")))
);
AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
DataSchema dataSchema = new DataSchema(
"sampler",
getParser(parseSpec),
aggregatorFactories,
granularitySpec,
null,
objectMapper
);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
String cacheKey = response.getCacheKey();
response = firehoseSampler.sample(firehoseFactory, dataSchema, new SamplerConfig(null, cacheKey, false, null));
Assert.assertTrue(!isCacheable() || cacheKey.equals(response.getCacheKey()));
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(3, response.getData().size());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 11L),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(2));
}
@Test
public void testWithTransformsAutoDimensions()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(
new TimestampSpec("t", null, null),
new DimensionsSpec(null)
);
AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
);
DataSchema dataSchema = new DataSchema(
"sampler",
getParser(parseSpec),
aggregatorFactories,
granularitySpec,
transformSpec,
objectMapper
);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(4, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(3));
}
@Test
public void testWithTransformsDimensionsSpec()
{
// There's a bug in the CSV parser that does not allow a column added by a transform to be put in the dimensions
// list if the 'columns' field is specified (it will complain that the dimensionName is not a valid column).
if (ParserType.STR_CSV.equals(parserType)) {
return;
}
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(
new TimestampSpec("t", null, null),
new DimensionsSpec(ImmutableList.of(StringDimensionSchema.create("dim1PlusBar")))
);
AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(new ExpressionTransform("dim1PlusBar", "concat(dim1 + 'bar')", TestExprMacroTable.INSTANCE))
);
DataSchema dataSchema = new DataSchema(
"sampler",
getParser(parseSpec),
aggregatorFactories,
granularitySpec,
transformSpec,
objectMapper
);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(6, (int) response.getNumRowsRead());
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(3, response.getData().size());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1PlusBar", "foobar", "met1", 11L),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1PlusBar", "foo2bar", "met1", 4L),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(2));
}
@Test
public void testWithFilter()
{
FirehoseFactory firehoseFactory = getFirehoseFactory(getTestRows());
ParseSpec parseSpec = getParseSpec(
new TimestampSpec("t", null, null),
new DimensionsSpec(null)
);
AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, true, null);
TransformSpec transformSpec = new TransformSpec(new SelectorDimFilter("dim1", "foo", null), null);
DataSchema dataSchema = new DataSchema(
"sampler",
getParser(parseSpec),
aggregatorFactories,
granularitySpec,
transformSpec,
objectMapper
);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(5, (int) response.getNumRowsRead());
Assert.assertEquals(4, (int) response.getNumRowsIndexed());
Assert.assertEquals(3, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
null,
true,
getUnparseableTimestampString()
), data.get(2));
}
private Map<String, Object> getParser(ParseSpec parseSpec)
{
return objectMapper.convertValue(
ParserType.MAP.equals(parserType)
? new MapInputRowParser(parseSpec)
: new StringInputRowParser(parseSpec, StandardCharsets.UTF_8.name()),
new TypeReference<Map<String, Object>>()
{
}
);
}
private List<Object> getTestRows()
{
switch (parserType) {
case MAP:
return MAP_ROWS;
case STR_JSON:
return STR_JSON_ROWS;
case STR_CSV:
return STR_CSV_ROWS;
default:
throw new UnsupportedOperationException();
}
}
private FirehoseFactory<? extends InputRowParser> getFirehoseFactory(List<Object> seedRows)
{
return ParserType.MAP.equals(parserType)
? new TestFirehose.TestFirehoseFactory(false, seedRows)
: new TestFirehose.TestAbstractTextFilesFirehoseFactory(false, seedRows);
}
private boolean isCacheable()
{
return !ParserType.MAP.equals(parserType);
}
private ParseSpec getParseSpec(TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec)
{
return ParserType.STR_CSV.equals(parserType) ? new DelimitedParseSpec(
timestampSpec,
dimensionsSpec,
",",
null,
ImmutableList.of("t", "dim1", "dim2", "met1"),
false,
0
) : new JSONParseSpec(timestampSpec, dimensionsSpec, null, null);
}
private String getUnparseableTimestampString()
{
return ParserType.STR_CSV.equals(parserType)
? (USE_DEFAULT_VALUE_FOR_NULL
? "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
: "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, dim2=, met1=6}")
: "Unparseable timestamp found! Event: {t=bad_timestamp, dim1=foo, met1=6}";
}
private List<SamplerResponseRow> removeEmptyColumns(List<SamplerResponseRow> rows)
{
return USE_DEFAULT_VALUE_FOR_NULL
? rows
: rows.stream().map(x -> x.withParsed(removeEmptyValues(x.getParsed()))).collect(Collectors.toList());
}
@Nullable
private Map<String, Object> removeEmptyValues(Map<String, Object> data)
{
return data == null
? null : data.entrySet()
.stream()
.filter(x -> !(x.getValue() instanceof String) || !((String) x.getValue()).isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class IndexTaskSamplerSpecTest extends EasyMockSupport
{
private static final ObjectMapper mapper = TestHelper.makeJsonMapper();
private final FirehoseSampler firehoseSampler = createMock(FirehoseSampler.class);
@Rule
public ExpectedException expectedException = ExpectedException.none();
public IndexTaskSamplerSpecTest()
{
mapper.setInjectableValues(
new InjectableValues.Std()
.addValue(FirehoseSampler.class, firehoseSampler)
.addValue(ObjectMapper.class, mapper)
);
mapper.registerModules((Iterable<Module>) new SamplerModule().getJacksonModules());
mapper.registerModules((Iterable<Module>) new FirehoseModule().getJacksonModules());
}
@Test
public void testSerde() throws IOException
{
String json = "{\n"
+ " \"type\": \"index\",\n"
+ " \"samplerConfig\": {\n"
+ " \"numRows\": 123,\n"
+ " \"cacheKey\": \"eaebbfd87ec34bc6a9f8c03ecee4dd7a\",\n"
+ " \"skipCache\": false,\n"
+ " \"timeoutMs\": 2345\n"
+ " },\n"
+ " \"spec\": {\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"sampler\",\n"
+ " \"parser\": {\n"
+ " \"type\": \"string\",\n"
+ " \"parseSpec\": {\n"
+ " \"format\": \"json\",\n"
+ " \"dimensionsSpec\": {},\n"
+ " \"timestampSpec\": {\n"
+ " \"missingValue\": \"1970\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"index\",\n"
+ " \"firehose\": {\n"
+ " \"type\": \"local\",\n"
+ " \"baseDir\": \"/tmp\",\n"
+ " \"filter\": \"wikiticker-2015-09-12-sampled.json\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}";
Capture<FirehoseFactory> capturedFirehoseFactory = EasyMock.newCapture();
Capture<DataSchema> capturedDataSchema = EasyMock.newCapture();
Capture<SamplerConfig> capturedSamplerConfig = EasyMock.newCapture();
IndexTaskSamplerSpec spec = mapper.readValue(json, IndexTaskSamplerSpec.class);
EasyMock.expect(firehoseSampler.sample(
EasyMock.capture(capturedFirehoseFactory),
EasyMock.capture(capturedDataSchema),
EasyMock.capture(capturedSamplerConfig)
)).andReturn(new SamplerResponse(null, null, null, null));
replayAll();
spec.sample();
verifyAll();
FirehoseFactory firehoseFactory = capturedFirehoseFactory.getValue();
Assert.assertEquals(new File("/tmp"), ((LocalFirehoseFactory) firehoseFactory).getBaseDir());
Assert.assertEquals("wikiticker-2015-09-12-sampled.json", ((LocalFirehoseFactory) firehoseFactory).getFilter());
DataSchema dataSchema = capturedDataSchema.getValue();
Assert.assertEquals("sampler", dataSchema.getDataSource());
Assert.assertEquals("json", ((Map) dataSchema.getParserMap().get("parseSpec")).get("format"));
SamplerConfig samplerConfig = capturedSamplerConfig.getValue();
Assert.assertEquals(123, samplerConfig.getNumRows());
Assert.assertEquals("eaebbfd87ec34bc6a9f8c03ecee4dd7a", samplerConfig.getCacheKey());
Assert.assertFalse(samplerConfig.isSkipCache());
Assert.assertEquals(2345, samplerConfig.getTimeoutMs());
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class SamplerCacheTest
{
private static final String KEY_1 = "abcdefghijklmnopqrstuvwxyz";
private static final String KEY_2 = "1234567890!@#$%^&*()";
private static final byte[] VALUE_1_1 = StringUtils.toUtf8("The quick");
private static final byte[] VALUE_1_2 = StringUtils.toUtf8("brown fox");
private static final byte[] VALUE_1_3 = StringUtils.toUtf8("jumps over");
private static final byte[] VALUE_2_1 = StringUtils.toUtf8("the lazy");
private static final byte[] VALUE_2_2 = StringUtils.toUtf8("Druid");
private static final StringInputRowParser PARSER = new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(null, null, DateTimes.of("1970")),
new DimensionsSpec(null),
null,
ImmutableList.of("col"),
false,
0
),
StandardCharsets.UTF_8.name()
);
private SamplerCache cache;
private File tempDir;
@Before
public void setupTest()
{
cache = new SamplerCache(MapCache.create(100000));
tempDir = Files.createTempDir();
}
@After
public void teardownTest() throws IOException
{
FileUtils.deleteDirectory(tempDir);
}
@Test
public void testOneEntryNextRowWithRaw() throws IOException
{
cache.put(KEY_1, ImmutableList.of(VALUE_1_1, VALUE_1_2, VALUE_1_3));
for (int i = 0; i < 4; i++) {
Firehose firehose1 = cache.getAsFirehoseFactory(KEY_1, PARSER).connectForSampler(PARSER, tempDir);
Assert.assertTrue(firehose1.hasMore());
InputRowPlusRaw row = firehose1.nextRowWithRaw();
Assert.assertArrayEquals(VALUE_1_1, row.getRaw());
Assert.assertEquals("The quick", row.getInputRow().getDimension("col").get(0));
row = firehose1.nextRowWithRaw();
Assert.assertArrayEquals(VALUE_1_2, row.getRaw());
Assert.assertEquals("brown fox", row.getInputRow().getDimension("col").get(0));
row = firehose1.nextRowWithRaw();
Assert.assertArrayEquals(VALUE_1_3, row.getRaw());
Assert.assertEquals("jumps over", row.getInputRow().getDimension("col").get(0));
Assert.assertFalse(firehose1.hasMore());
firehose1.close();
if (i % 2 == 1) {
FirehoseFactory firehoseFactory2 = cache.getAsFirehoseFactory(KEY_2, PARSER);
Assert.assertNull(firehoseFactory2);
}
}
}
@Test
public void testOneEntryNextRow() throws IOException
{
cache.put(KEY_1, ImmutableList.of(VALUE_1_1, VALUE_1_2, VALUE_1_3));
Firehose firehose = cache.getAsFirehoseFactory(KEY_1, PARSER).connectForSampler(PARSER, tempDir);
Assert.assertTrue(firehose.hasMore());
InputRow row = firehose.nextRow();
Assert.assertEquals("The quick", row.getDimension("col").get(0));
row = firehose.nextRow();
Assert.assertEquals("brown fox", row.getDimension("col").get(0));
row = firehose.nextRow();
Assert.assertEquals("jumps over", row.getDimension("col").get(0));
Assert.assertFalse(firehose.hasMore());
firehose.close();
}
@Test
public void testTwoEntriesNextRowWithRaw() throws IOException
{
cache.put(KEY_1, ImmutableList.of(VALUE_1_1, VALUE_1_2, VALUE_1_3));
cache.put(KEY_2, ImmutableList.of(VALUE_2_1, VALUE_2_2));
for (int i = 0; i < 4; i++) {
Firehose firehose1 = cache.getAsFirehoseFactory(KEY_1, PARSER).connectForSampler(PARSER, tempDir);
Assert.assertTrue(firehose1.hasMore());
InputRowPlusRaw row = firehose1.nextRowWithRaw();
Assert.assertArrayEquals(VALUE_1_1, row.getRaw());
Assert.assertEquals("The quick", row.getInputRow().getDimension("col").get(0));
row = firehose1.nextRowWithRaw();
Assert.assertArrayEquals(VALUE_1_2, row.getRaw());
Assert.assertEquals("brown fox", row.getInputRow().getDimension("col").get(0));
row = firehose1.nextRowWithRaw();
Assert.assertArrayEquals(VALUE_1_3, row.getRaw());
Assert.assertEquals("jumps over", row.getInputRow().getDimension("col").get(0));
Assert.assertFalse(firehose1.hasMore());
firehose1.close();
Firehose firehose2 = cache.getAsFirehoseFactory(KEY_2, PARSER).connectForSampler(PARSER, tempDir);
Assert.assertTrue(firehose2.hasMore());
row = firehose2.nextRowWithRaw();
Assert.assertArrayEquals(VALUE_2_1, row.getRaw());
Assert.assertEquals("the lazy", row.getInputRow().getDimension("col").get(0));
row = firehose2.nextRowWithRaw();
Assert.assertArrayEquals(VALUE_2_2, row.getRaw());
Assert.assertEquals("Druid", row.getInputRow().getDimension("col").get(0));
Assert.assertFalse(firehose2.hasMore());
firehose2.close();
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class SamplerResponseTest
{
private static final ObjectMapper mapper = TestHelper.makeJsonMapper();
@Test
public void testSerde() throws IOException
{
List<SamplerResponse.SamplerResponseRow> data = ImmutableList.of(
new SamplerResponse.SamplerResponseRow(
"parsed1",
ImmutableMap.of("t", 123456, "dim1", "foo", "met1", 6),
null,
null
),
new SamplerResponse.SamplerResponseRow(
"parsed2",
ImmutableMap.of("t", 123457, "dim1", "foo2", "met1", 7),
null,
null
),
new SamplerResponse.SamplerResponseRow("unparsed", null, true, "Could not parse")
);
String out = mapper.writeValueAsString(new SamplerResponse("eaebbfd87ec34bc6a9f8c03ecee4dd7a", 1123, 1112, data));
String expected = "{\"cacheKey\":\"eaebbfd87ec34bc6a9f8c03ecee4dd7a\",\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"data\":[{\"raw\":\"parsed1\",\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"raw\":\"parsed2\",\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"raw\":\"unparsed\",\"unparseable\":true,\"error\":\"Could not parse\"}]}";
Assert.assertEquals(expected, out);
}
}

View File

@ -25,6 +25,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -63,7 +64,13 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
@Override
public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException
{
return new TimedShutoffFirehose(parser, temporaryDirectory);
return new TimedShutoffFirehose(parser, temporaryDirectory, false);
}
@Override
public Firehose connectForSampler(InputRowParser parser, File temporaryDirectory) throws IOException
{
return new TimedShutoffFirehose(parser, temporaryDirectory, true);
}
class TimedShutoffFirehose implements Firehose
@ -73,9 +80,11 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
@GuardedBy("this")
private boolean closed = false;
TimedShutoffFirehose(InputRowParser parser, File temporaryDirectory) throws IOException
TimedShutoffFirehose(InputRowParser parser, File temporaryDirectory, boolean sampling) throws IOException
{
firehose = delegateFactory.connect(parser, temporaryDirectory);
firehose = sampling
? delegateFactory.connectForSampler(parser, temporaryDirectory)
: delegateFactory.connect(parser, temporaryDirectory);
shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
@ -110,6 +119,12 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
return firehose.nextRow();
}
@Override
public InputRowPlusRaw nextRowWithRaw()
{
return firehose.nextRowWithRaw();
}
@Override
public Runnable commit()
{

View File

@ -87,6 +87,7 @@ import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerResource;
import org.apache.druid.indexing.overlord.http.OverlordRedirectInfo;
import org.apache.druid.indexing.overlord.http.OverlordResource;
import org.apache.druid.indexing.overlord.sampler.SamplerModule;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
@ -331,7 +332,8 @@ public class CliOverlord extends ServerRunnable
}
},
new IndexingServiceFirehoseModule(),
new IndexingServiceTaskLogsModule()
new IndexingServiceTaskLogsModule(),
new SamplerModule()
);
}