Fixing failing compaction/parallel index jobs during upgrade due to new actions being available on the overlord. (#15430)

* Fixing failing compaction/parallel index jobs during upgrade due to new actions not available on the overlord.

* Fixing build

* Removing extra space.

* Fixing json getter.

* Review comments.
This commit is contained in:
Karan Kumar 2023-11-25 13:50:29 +05:30 committed by GitHub
parent 67c7b6248c
commit a0188192de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 77 additions and 46 deletions

View File

@ -57,6 +57,7 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -371,7 +372,8 @@ public class MaterializedViewSupervisor implements Supervisor
// Pair<interval -> max(created_date), interval -> list<DataSegment>>
Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> baseSegmentsSnapshot =
getMaxCreateDateAndBaseSegments(
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), Intervals.ETERNITY)
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(),
Collections.singletonList(Intervals.ETERNITY))
);
// baseSegments are used to create HadoopIndexTask
Map<Interval, List<DataSegment>> baseSegments = baseSegmentsSnapshot.rhs;

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
@ -38,6 +37,7 @@ import org.joda.time.Interval;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -63,20 +63,18 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<Da
{
private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class);
@JsonIgnore
private final String dataSource;
@JsonIgnore
private final Interval interval;
private final List<Interval> intervals;
@JsonCreator
public RetrieveSegmentsToReplaceAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("intervals") List<Interval> intervals
)
{
this.dataSource = dataSource;
this.interval = interval;
this.intervals = intervals;
}
@JsonProperty
@ -86,9 +84,9 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<Da
}
@JsonProperty
public Interval getInterval()
public List<Interval> getIntervals()
{
return interval;
return intervals;
}
@Override
@ -128,7 +126,7 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<Da
Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = new HashMap<>();
for (Pair<DataSegment, String> segmentAndCreatedDate :
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, interval)) {
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) {
final DataSegment segment = segmentAndCreatedDate.lhs;
final String created = segmentAndCreatedDate.rhs;
intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>())
@ -165,7 +163,7 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<Da
private Collection<DataSegment> retrieveAllVisibleSegments(TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(dataSource, interval, Segments.ONLY_VISIBLE);
.retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE);
}
@Override
@ -185,25 +183,20 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<Da
}
RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o;
if (!dataSource.equals(that.dataSource)) {
return false;
}
return interval.equals(that.interval);
return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource, interval);
return Objects.hash(dataSource, intervals);
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
return "RetrieveSegmentsToReplaceAction{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", intervals=" + intervals +
'}';
}
}

View File

@ -79,6 +79,7 @@ public class TaskConfig
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");
private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true;
private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1;
private static final boolean DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE = false;
@JsonProperty
private final String baseDir;
@ -125,6 +126,9 @@ public class TaskConfig
@JsonProperty
private final long tmpStorageBytesPerTask;
@JsonProperty
private final boolean enableConcurrentAppendAndReplace;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@ -142,7 +146,8 @@ public class TaskConfig
@JsonProperty("batchProcessingMode") String batchProcessingMode,
@JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask,
@JsonProperty("enableConcurrentAppendAndReplace") @Nullable Boolean enableConcurrentAppendAndReplace
)
{
this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir"));
@ -193,6 +198,10 @@ public class TaskConfig
this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS);
this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK);
this.enableConcurrentAppendAndReplace = Configs.valueOrDefault(
enableConcurrentAppendAndReplace,
DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE
);
}
private TaskConfig(
@ -210,7 +219,8 @@ public class TaskConfig
BatchProcessingMode batchProcessingMode,
boolean storeEmptyColumns,
boolean encapsulatedTask,
long tmpStorageBytesPerTask
long tmpStorageBytesPerTask,
boolean enableConcurrentAppendAndReplace
)
{
this.baseDir = baseDir;
@ -228,6 +238,7 @@ public class TaskConfig
this.storeEmptyColumns = storeEmptyColumns;
this.encapsulatedTask = encapsulatedTask;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
this.enableConcurrentAppendAndReplace = enableConcurrentAppendAndReplace;
}
@JsonProperty
@ -344,6 +355,12 @@ public class TaskConfig
return tmpStorageBytesPerTask;
}
@JsonProperty("enableConcurrentAppendAndReplace")
public boolean isConcurrentAppendAndReplaceEnabled()
{
return enableConcurrentAppendAndReplace;
}
private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
@ -370,7 +387,8 @@ public class TaskConfig
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask
tmpStorageBytesPerTask,
enableConcurrentAppendAndReplace
);
}
@ -391,7 +409,8 @@ public class TaskConfig
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask
tmpStorageBytesPerTask,
enableConcurrentAppendAndReplace
);
}
}

View File

@ -546,7 +546,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
Preconditions.checkNotNull(interval);
final Collection<DataSegment> usedSegments;
if (toolbox == null) {
if (toolbox == null || !toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) {
usedSegments = FutureUtils.getUnchecked(
coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)),
true
@ -554,7 +554,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
} else {
try {
usedSegments = toolbox.getTaskActionClient()
.submit(new RetrieveSegmentsToReplaceAction(dataSource, interval));
.submit(new RetrieveSegmentsToReplaceAction(dataSource, Collections.singletonList(interval)));
}
catch (IOException e) {
LOG.error(e, "Error retrieving the used segments for interval[%s].", interval);

View File

@ -41,6 +41,7 @@ public class TaskConfigBuilder
private Boolean storeEmptyColumns;
private boolean enableTaskLevelLogPush;
private Long tmpStorageBytesPerTask;
private Boolean enableConcurrentAppendAndReplace;
public TaskConfigBuilder setBaseDir(String baseDir)
{
@ -132,6 +133,18 @@ public class TaskConfigBuilder
return this;
}
public TaskConfigBuilder enableConcurrentAppendAndReplace()
{
this.enableConcurrentAppendAndReplace = true;
return this;
}
public TaskConfigBuilder disableConcurrentAppendAndReplace()
{
this.enableConcurrentAppendAndReplace = false;
return this;
}
public TaskConfig build()
{
return new TaskConfig(
@ -149,7 +162,8 @@ public class TaskConfigBuilder
batchProcessingMode,
storeEmptyColumns,
enableTaskLevelLogPush,
tmpStorageBytesPerTask
tmpStorageBytesPerTask,
enableConcurrentAppendAndReplace
);
}
}

View File

@ -957,7 +957,7 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
Collection<DataSegment> allUsedSegments = taskActionClient.submit(
new RetrieveSegmentsToReplaceAction(
WIKI,
interval
Collections.singletonList(interval)
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));

View File

@ -615,6 +615,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.setDefaultRowFlushBoundary(50000)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.setTmpStorageBytesPerTask(-1L)
.enableConcurrentAppendAndReplace()
.build();
return new TaskToolboxFactory(

View File

@ -89,7 +89,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
}
@Override
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval)
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, List<Interval> intervals)
{
return ImmutableList.of();
}

View File

@ -84,7 +84,7 @@ public interface IndexerMetadataStorageCoordinator
/**
*
* Retrieve all published segments which are marked as used and the created_date of these segments belonging to the
* given data source and interval from the metadata store.
* given data source and list of intervals from the metadata store.
*
* Unlike other similar methods in this interface, this method doesn't accept a {@link Segments} "visibility"
* parameter. The returned collection may include overshadowed segments and their created_dates, as if {@link
@ -92,11 +92,11 @@ public interface IndexerMetadataStorageCoordinator
* if needed.
*
* @param dataSource The data source to query
* @param interval The interval to query
* @param intervals The list of interval to query
*
* @return The DataSegments and the related created_date of segments
*/
Collection<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval);
Collection<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, List<Interval> intervals);
/**
* Retrieve all published segments which may include any data in the given intervals and are marked as used from the

View File

@ -174,21 +174,23 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
@Override
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval)
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, List<Interval> intervals)
{
StringBuilder queryBuilder = new StringBuilder(
"SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true"
);
final List<Interval> intervals = new ArrayList<>();
// Do not need an interval condition if the interval is ETERNITY
if (!Intervals.isEternity(interval)) {
intervals.add(interval);
boolean hasEternityInterval = false;
for (Interval interval : intervals) {
if (Intervals.isEternity(interval)) {
hasEternityInterval = true;
break;
}
}
SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode(
queryBuilder,
intervals,
hasEternityInterval ? Collections.emptyList() : intervals,
SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS,
connector
);

View File

@ -2870,28 +2870,28 @@ public class IndexerSQLMetadataStorageCoordinatorTest
insertUsedSegments(ImmutableSet.of(defaultSegment));
List<Pair<DataSegment, String>> resultForIntervalOnTheLeft =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2001"));
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2001")));
Assert.assertTrue(resultForIntervalOnTheLeft.isEmpty());
List<Pair<DataSegment, String>> resultForIntervalOnTheRight =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("3000/3001"));
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("3000/3001")));
Assert.assertTrue(resultForIntervalOnTheRight.isEmpty());
List<Pair<DataSegment, String>> resultForExactInterval =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval());
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval()));
Assert.assertEquals(1, resultForExactInterval.size());
Assert.assertEquals(defaultSegment, resultForExactInterval.get(0).lhs);
List<Pair<DataSegment, String>> resultForIntervalWithLeftOverlap =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2015-01-02"));
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2015-01-02")));
Assert.assertEquals(resultForExactInterval, resultForIntervalWithLeftOverlap);
List<Pair<DataSegment, String>> resultForIntervalWithRightOverlap =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2015-01-01/3000"));
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2015-01-01/3000")));
Assert.assertEquals(resultForExactInterval, resultForIntervalWithRightOverlap);
List<Pair<DataSegment, String>> resultForEternity =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.ETERNITY);
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.ETERNITY));
Assert.assertEquals(resultForExactInterval, resultForEternity);
}
@ -2902,11 +2902,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest
insertUsedSegments(ImmutableSet.of(eternitySegment, firstHalfEternityRangeSegment, secondHalfEternityRangeSegment));
List<Pair<DataSegment, String>> resultForRandomInterval =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval());
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval()));
Assert.assertEquals(3, resultForRandomInterval.size());
List<Pair<DataSegment, String>> resultForEternity =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), eternitySegment.getInterval());
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(eternitySegment.getInterval()));
Assert.assertEquals(3, resultForEternity.size());
}