Fix backwards compatibility with centralized schema config in partial_index_merge tasks (#16556)

* Handle null values of centralized schema config in PartialMergeTask

* Fix checkstyle

* Do not pass centralized schema config from supervisor task to sub-tasks

* Do not pass ObjectMapper in constructor of task

* Fix logs

* Fix tests
This commit is contained in:
Kashif Faraz 2024-06-06 01:14:04 -07:00 committed by GitHub
parent 277006446d
commit e4f59e00b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 20 additions and 57 deletions

View File

@ -441,9 +441,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask
ingestionSchema.getDataSchema(),
ioConfigs,
ingestionSchema.getTuningConfig(),
getContext(),
toolbox.getJsonMapper(),
toolbox.getCentralizedTableSchemaConfig()
getContext()
);
}
@ -1620,7 +1618,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask
for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
TaskReport.ReportMap taskReport = pushedSegmentsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Received an empty report from subtask[%s]" + pushedSegmentsReport.getTaskId());
LOG.warn("Received an empty report from sub-task[%s].", pushedSegmentsReport.getTaskId());
continue;
}
RowIngestionMetersTotals rowIngestionMetersTotals = getBuildSegmentsStatsFromTaskReport(

View File

@ -19,12 +19,10 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import java.util.Iterator;
import java.util.List;
@ -40,8 +38,6 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
private final DataSchema dataSchema;
private final List<PartialSegmentMergeIOConfig> mergeIOConfigs;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
private final ObjectMapper mapper;
PartialGenericSegmentMergeParallelIndexTaskRunner(
TaskToolbox toolbox,
@ -51,17 +47,13 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
DataSchema dataSchema,
List<PartialSegmentMergeIOConfig> mergeIOConfigs,
ParallelIndexTuningConfig tuningConfig,
Map<String, Object> context,
ObjectMapper mapper,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
Map<String, Object> context
)
{
super(toolbox, taskId, groupId, baseSubtaskSpecName, tuningConfig, context);
this.dataSchema = dataSchema;
this.mergeIOConfigs = mergeIOConfigs;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
this.mapper = mapper;
}
@Override
@ -110,9 +102,7 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
subtaskSpecId,
numAttempts,
ingestionSpec,
getContext(),
centralizedDatasourceSchemaConfig,
mapper
getContext()
);
}
};

View File

@ -19,11 +19,9 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableSet;
@ -31,7 +29,6 @@ import com.google.common.collect.Table;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
@ -53,8 +50,6 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Buil
private final PartialSegmentMergeIngestionSpec ingestionSchema;
private final Table<Interval, Integer, BuildingShardSpec<?>> intervalAndIntegerToShardSpec;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
@JsonCreator
public PartialGenericSegmentMergeTask(
// id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
@ -66,9 +61,7 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Buil
@JsonProperty("subtaskSpecId") @Nullable final String subtaskSpecId,
@JsonProperty("numAttempts") final int numAttempts, // zero-based counting
@JsonProperty("spec") final PartialSegmentMergeIngestionSpec ingestionSchema,
@JsonProperty("context") final Map<String, Object> context,
@JsonProperty("centralizedDatasourceSchemaConfig") CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
@JacksonInject ObjectMapper mapper
@JsonProperty("context") final Map<String, Object> context
)
{
super(
@ -81,12 +74,9 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Buil
ingestionSchema.getIOConfig(),
ingestionSchema.getTuningConfig(),
numAttempts,
context,
mapper,
centralizedDatasourceSchemaConfig
context
);
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
this.ingestionSchema = ingestionSchema;
this.intervalAndIntegerToShardSpec = createIntervalAndIntegerToShardSpec(
ingestionSchema.getIOConfig().getPartitionLocations()
@ -127,12 +117,6 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Buil
return ingestionSchema;
}
@JsonProperty("centralizedDatasourceSchemaConfig")
private CentralizedDatasourceSchemaConfig getCentralizedDatasourceSchemaConfig()
{
return centralizedDatasourceSchemaConfig;
}
@Override
public String getType()
{

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
@ -84,8 +83,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollu
private final PartialSegmentMergeIOConfig ioConfig;
private final int numAttempts;
private final String subtaskSpecId;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
private final FingerprintGenerator fingerprintGenerator;
PartialSegmentMergeTask(
// id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
@ -98,9 +95,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollu
PartialSegmentMergeIOConfig ioConfig,
ParallelIndexTuningConfig tuningConfig,
final int numAttempts, // zero-based counting
final Map<String, Object> context,
final ObjectMapper mapper,
final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
final Map<String, Object> context
)
{
super(
@ -120,8 +115,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollu
this.subtaskSpecId = subtaskSpecId;
this.ioConfig = ioConfig;
this.numAttempts = numAttempts;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
this.fingerprintGenerator = new FingerprintGenerator(mapper);
}
@JsonProperty
@ -265,6 +258,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollu
final Set<DataSegment> pushedSegments = new HashSet<>();
final SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
final FingerprintGenerator fingerprintGenerator = new FingerprintGenerator(toolbox.getJsonMapper());
for (Entry<Interval, Int2ObjectMap<List<File>>> entryPerInterval : intervalToUnzippedFiles.entrySet()) {
final Interval interval = entryPerInterval.getKey();
for (Int2ObjectMap.Entry<List<File>> entryPerBucketId : entryPerInterval.getValue().int2ObjectEntrySet()) {
@ -315,7 +309,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollu
long pushFinishTime = System.nanoTime();
pushedSegments.add(segment);
if (centralizedDatasourceSchemaConfig.isEnabled()) {
if (toolbox.getCentralizedTableSchemaConfig().isEnabled()) {
SchemaPayloadPlus schemaPayloadPlus =
TaskSegmentSchemaUtil.getSegmentSchema(mergedFileAndDimensionNames.lhs, toolbox.getIndexIO());
segmentSchemaMapping.addSchema(
@ -341,7 +335,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollu
);
}
}
if (centralizedDatasourceSchemaConfig.isEnabled()) {
if (toolbox.getCentralizedTableSchemaConfig().isEnabled()) {
LOG.info("SegmentSchema for the pushed segments is [%s]", segmentSchemaMapping);
}
return new DataSegmentsWithSchemas(pushedSegments, segmentSchemaMapping.isNonEmpty() ? segmentSchemaMapping : null);

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
@ -104,9 +103,7 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
ParallelIndexTestingFactory.SUBTASK_SPEC_ID,
ParallelIndexTestingFactory.NUM_ATTEMPTS,
ingestionSpec,
ParallelIndexTestingFactory.CONTEXT,
CentralizedDatasourceSchemaConfig.create(),
null
ParallelIndexTestingFactory.CONTEXT
);
}
@ -143,9 +140,7 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
.partitionsSpec(partitionsSpec)
.build()
),
ParallelIndexTestingFactory.CONTEXT,
CentralizedDatasourceSchemaConfig.create(),
null
ParallelIndexTestingFactory.CONTEXT
);
}

View File

@ -25,6 +25,7 @@ import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@ -71,13 +72,14 @@ public class FingerprintGenerator
}
catch (IOException e) {
log.error(
"Exception generating fingerprint for payload [%s], datasource [%s], version [%s] with stacktrace [%s].",
schemaPayload,
dataSource,
version,
e
e,
"Exception generating schema fingerprint (version[%d]) for datasource[%s], payload[%s].",
version, dataSource, schemaPayload
);
throw DruidException.defensive(
"Could not generate schema fingerprint (version[%d]) for datasource[%s].",
dataSource, version
);
throw new RuntimeException(e);
}
}
}