Tweak isInvalidRows behavior in HadoopTuningConfig (#6339)

* Tweak isInvalidRows behavior in HadoopTuningConfig

* Fix tests
This commit is contained in:
Jonathan Wei 2018-09-24 16:13:13 -07:00 committed by GitHub
parent 93345064b5
commit 00b0a156e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 21 additions and 28 deletions

View File

@ -284,8 +284,7 @@ public class DetermineHashedPartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Context context,
boolean reportParseExceptions
Context context
) throws IOException
{

View File

@ -313,8 +313,7 @@ public class DeterminePartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Context context,
boolean reportParseExceptions
Context context
) throws IOException, InterruptedException
{
final List<Object> groupKey = Rows.toGroupKey(
@ -395,8 +394,7 @@ public class DeterminePartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Context context,
boolean reportParseExceptions
Context context
) throws IOException, InterruptedException
{
final Map<String, Iterable<String>> dims = Maps.newHashMap();

View File

@ -308,11 +308,6 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().isOverwriteFiles();
}
public boolean isIgnoreInvalidRows()
{
return schema.getTuningConfig().isIgnoreInvalidRows();
}
public void setShardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
{
this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs));

View File

@ -45,7 +45,6 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;
private boolean reportParseExceptions;
@Override
protected void setup(Context context)
@ -54,7 +53,6 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getParser();
granularitySpec = config.getGranularitySpec();
reportParseExceptions = !config.isIgnoreInvalidRows();
}
public HadoopDruidIndexerConfig getConfig()
@ -88,7 +86,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
if (!granularitySpec.bucketIntervals().isPresent()
|| granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
.isPresent()) {
innerMap(inputRow, context, reportParseExceptions);
innerMap(inputRow, context);
} else {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
}
@ -147,7 +145,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
}
}
protected abstract void innerMap(InputRow inputRow, Context context, boolean reportParseExceptions)
protected abstract void innerMap(InputRow inputRow, Context context)
throws IOException, InterruptedException;
}

View File

@ -85,7 +85,7 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean leaveIntermediate;
private final Boolean cleanupOnFailure;
private final boolean overwriteFiles;
private final boolean ignoreInvalidRows;
private final Boolean ignoreInvalidRows;
private final Map<String, String> jobProperties;
private final boolean combineText;
private final boolean useCombiner;
@ -108,7 +108,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @Deprecated @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @Deprecated @JsonProperty("ignoreInvalidRows") Boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("useCombiner") Boolean useCombiner,
@ -138,7 +138,6 @@ public class HadoopTuningConfig implements TuningConfig
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
this.overwriteFiles = overwriteFiles;
this.ignoreInvalidRows = ignoreInvalidRows;
this.jobProperties = (jobProperties == null
? ImmutableMap.of()
: ImmutableMap.copyOf(jobProperties));
@ -152,10 +151,16 @@ public class HadoopTuningConfig implements TuningConfig
this.useExplicitVersion = useExplicitVersion;
this.allowedHadoopPrefix = allowedHadoopPrefix == null ? ImmutableList.of() : allowedHadoopPrefix;
if (!this.ignoreInvalidRows) {
this.maxParseExceptions = 0;
this.ignoreInvalidRows = ignoreInvalidRows;
if (maxParseExceptions != null) {
this.maxParseExceptions = maxParseExceptions;
} else {
this.maxParseExceptions = maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions;
if (ignoreInvalidRows == null || !ignoreInvalidRows) {
this.maxParseExceptions = 0;
} else {
this.maxParseExceptions = TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS;
}
}
this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions;
}
@ -221,7 +226,7 @@ public class HadoopTuningConfig implements TuningConfig
}
@JsonProperty
public boolean isIgnoreInvalidRows()
public Boolean isIgnoreInvalidRows()
{
return ignoreInvalidRows;
}

View File

@ -286,7 +286,7 @@ public class IndexGeneratorJob implements Jobby
IncrementalIndex newIndex = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows())
.setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) // only used by OffHeapIncrementalIndex
.setMaxRowCount(tuningConfig.getRowFlushBoundary())
.setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
.buildOnheap();
@ -334,8 +334,7 @@ public class IndexGeneratorJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Context context,
boolean reportParseExceptions
Context context
) throws IOException, InterruptedException
{
// Group by bucket, sort by timestamp

View File

@ -262,8 +262,7 @@ public class HadoopDruidIndexerMapperTest
@Override
protected void innerMap(
final InputRow inputRow,
final Context context,
final boolean reportParseExceptions
final Context context
)
{
rows.add(inputRow);

View File

@ -231,7 +231,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
private final AggregatorFactory[] metrics;
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final boolean reportParseExceptions; // only used by OffHeapIncrementalIndex
private final Metadata metadata;
private final Map<String, MetricDesc> metricDescs;