Respect reportParseExceptions option in IndexTask.determineShardSpecs() (#4467)

* Respect reportParseExceptions option in IndexTask.determineShardSpecs()

* Fix typo
This commit is contained in:
Jihoon Son 2017-06-28 02:28:22 +09:00 committed by Gian Merlino
parent 7a261c8311
commit e3c13c246a
3 changed files with 184 additions and 35 deletions

View File

@ -23,11 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.List; import java.util.List;

View File

@ -261,6 +261,7 @@ public class IndexTask extends AbstractTask
// determine intervals containing data and prime HLL collectors // determine intervals containing data and prime HLL collectors
final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = Maps.newHashMap(); final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = Maps.newHashMap();
int thrownAway = 0; int thrownAway = 0;
int unparseable = 0;
log.info("Determining intervals and shardSpecs"); log.info("Determining intervals and shardSpecs");
long determineShardSpecsStartMillis = System.currentTimeMillis(); long determineShardSpecsStartMillis = System.currentTimeMillis();
@ -269,48 +270,61 @@ public class IndexTask extends AbstractTask
firehoseTempDir) firehoseTempDir)
) { ) {
while (firehose.hasMore()) { while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow(); try {
final InputRow inputRow = firehose.nextRow();
// The null inputRow means the caller must skip this row. // The null inputRow means the caller must skip this row.
if (inputRow == null) { if (inputRow == null) {
continue;
}
final Interval interval;
if (determineIntervals) {
interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
} else {
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
if (!optInterval.isPresent()) {
thrownAway++;
continue; continue;
} }
interval = optInterval.get();
}
if (!determineNumPartitions) { final Interval interval;
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() if (determineIntervals) {
// for the interval and don't instantiate a HLL collector interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
if (!hllCollectors.containsKey(interval)) { } else {
hllCollectors.put(interval, Optional.<HyperLogLogCollector>absent()); final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
if (!optInterval.isPresent()) {
thrownAway++;
continue;
}
interval = optInterval.get();
} }
continue;
}
if (!hllCollectors.containsKey(interval)) { if (!determineNumPartitions) {
hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
} // for the interval and don't instantiate a HLL collector
if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.<HyperLogLogCollector>absent());
}
continue;
}
List<Object> groupKey = Rows.toGroupKey( if (!hllCollectors.containsKey(interval)) {
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
inputRow }
);
hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); List<Object> groupKey = Rows.toGroupKey(
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
inputRow
);
hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
}
catch (ParseException e) {
if (ingestionSchema.getTuningConfig().isReportParseExceptions()) {
throw e;
} else {
unparseable++;
}
}
} }
} }
// These metrics are reported in generateAndPublishSegments()
if (thrownAway > 0) { if (thrownAway > 0) {
log.warn("Unable to to find a matching interval for [%,d] events", thrownAway); log.warn("Unable to find a matching interval for [%,d] events", thrownAway);
}
if (unparseable > 0) {
log.warn("Unable to parse [%,d] events", unparseable);
} }
final ImmutableSortedMap<Interval, Optional<HyperLogLogCollector>> sortedMap = ImmutableSortedMap.copyOf( final ImmutableSortedMap<Interval, Optional<HyperLogLogCollector>> sortedMap = ImmutableSortedMap.copyOf(

View File

@ -37,8 +37,10 @@ import io.druid.indexing.common.actions.SegmentAllocateAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
@ -61,6 +63,7 @@ import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.BufferedWriter; import java.io.BufferedWriter;
@ -78,6 +81,9 @@ public class IndexTaskTest
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
new TimestampSpec( new TimestampSpec(
"ts", "ts",
@ -443,7 +449,6 @@ public class IndexTaskTest
writer.write("2014-01-01T00:00:10Z,a,1\n"); writer.write("2014-01-01T00:00:10Z,a,1\n");
} }
IndexTask indexTask = new IndexTask( IndexTask indexTask = new IndexTask(
null, null,
null, null,
@ -484,6 +489,115 @@ public class IndexTaskTest
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
} }
@Test
public void testIgnoreParseException() throws Exception
{
final File tmpDir = temporaryFolder.newFolder();
final File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("time,d,val\n");
writer.write("unparseable,a,1\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
// GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in
// IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments()
final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
tmpDir,
new CSVParseSpec(
new TimestampSpec(
"time",
"auto",
null
),
new DimensionsSpec(
null,
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("time", "dim", "val"),
true,
0
),
null,
2,
null,
false,
false,
false // ignore parse exception
);
IndexTask indexTask = new IndexTask(
null,
null,
parseExceptionIgnoreSpec,
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions());
Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics());
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
}
@Test
public void testReportParseException() throws Exception
{
expectedException.expect(ParseException.class);
expectedException.expectMessage("Unparseable timestamp found!");
final File tmpDir = temporaryFolder.newFolder();
final File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("time,d,val\n");
writer.write("unparseable,a,1\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}
final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
tmpDir,
new CSVParseSpec(
new TimestampSpec(
"time",
"auto",
null
),
new DimensionsSpec(
null,
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("time", "dim", "val"),
true,
0
),
null,
2,
null,
false,
false,
true // report parse exception
);
IndexTask indexTask = new IndexTask(
null,
null,
parseExceptionIgnoreSpec,
null,
jsonMapper
);
runTask(indexTask);
}
private final List<DataSegment> runTask(final IndexTask indexTask) throws Exception private final List<DataSegment> runTask(final IndexTask indexTask) throws Exception
{ {
final List<DataSegment> segments = Lists.newArrayList(); final List<DataSegment> segments = Lists.newArrayList();
@ -574,6 +688,29 @@ public class IndexTaskTest
boolean forceExtendableShardSpecs, boolean forceExtendableShardSpecs,
boolean appendToExisting boolean appendToExisting
) )
{
return createIngestionSpec(
baseDir,
parseSpec,
granularitySpec,
targetPartitionSize,
numShards,
forceExtendableShardSpecs,
appendToExisting,
true
);
}
private IndexTask.IndexIngestionSpec createIngestionSpec(
File baseDir,
ParseSpec parseSpec,
GranularitySpec granularitySpec,
Integer targetPartitionSize,
Integer numShards,
boolean forceExtendableShardSpecs,
boolean appendToExisting,
boolean reportParseException
)
{ {
return new IndexTask.IndexIngestionSpec( return new IndexTask.IndexIngestionSpec(
new DataSchema( new DataSchema(
@ -611,7 +748,7 @@ public class IndexTaskTest
null, null,
true, true,
forceExtendableShardSpecs, forceExtendableShardSpecs,
true, reportParseException,
null null
) )
); );