Fix two issues with Coordinator -> Overlord communication. (#7412)

* Fix two issues with Coordinator -> Overlord communication.

1) ClientCompactQuery needs to recognize the potential for 'intervals'
to be set instead of 'segments'. The lack of this led to a
NullPointerException on DruidCoordinatorSegmentCompactor.java:102.

2) In two locations (DruidCoordinatorSegmentCompactor,
DruidCoordinatorCleanupPendingSegments) tasks were being retrieved
using waiting/pending/running tasks in the wrong order: by checking
'running' first and then 'pending', tasks could be missed if they
moved from 'pending' to 'running' in between the two calls. Replaced
these methods with calls to 'getActiveTasks', a new method that does
the calls in the right order.

* Remove unused import.
This commit is contained in:
Gian Merlino 2019-04-04 13:25:18 -04:00 committed by Fangjin Yang
parent d29a32062f
commit 78745fea84
7 changed files with 101 additions and 82 deletions

View File

@ -22,15 +22,18 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class ClientCompactQuery implements ClientQuery
{
private final String dataSource;
private final List<DataSegment> segments;
private final Interval interval;
private final boolean keepSegmentGranularity;
@Nullable
private final Long targetCompactionSizeBytes;
@ -40,7 +43,8 @@ public class ClientCompactQuery implements ClientQuery
@JsonCreator
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@Nullable @JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("segments") final List<DataSegment> segments,
@JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@ -49,6 +53,7 @@ public class ClientCompactQuery implements ClientQuery
{
this.dataSource = dataSource;
this.segments = segments;
this.interval = interval;
this.keepSegmentGranularity = keepSegmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
@ -75,6 +80,12 @@ public class ClientCompactQuery implements ClientQuery
return segments;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty
public boolean isKeepSegmentGranularity()
{
@ -100,12 +111,46 @@ public class ClientCompactQuery implements ClientQuery
return context;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClientCompactQuery that = (ClientCompactQuery) o;
return keepSegmentGranularity == that.keepSegmentGranularity &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(segments, that.segments) &&
Objects.equals(interval, that.interval) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(context, that.context);
}
@Override
public int hashCode()
{
return Objects.hash(
dataSource,
segments,
interval,
keepSegmentGranularity,
targetCompactionSizeBytes,
tuningConfig,
context
);
}
@Override
public String toString()
{
return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
", interval=" + interval +
", keepSegmentGranularity=" + keepSegmentGranularity +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
@ -40,10 +41,13 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class HttpIndexingServiceClient implements IndexingServiceClient
{
@ -90,6 +94,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
return runTask(
new ClientCompactQuery(
dataSource,
null,
segments,
keepSegmentGranularity,
targetCompactionSizeBytes,
@ -195,21 +200,30 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
}
@Override
public List<TaskStatusPlus> getRunningTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return getTasks("runningTasks");
}
// Must retrieve waiting, then pending, then running, so if tasks move from one state to the next between
// calls then we still catch them. (Tasks always go waiting -> pending -> running.)
//
// Consider switching to new-style /druid/indexer/v1/tasks API in the future.
final List<TaskStatusPlus> tasks = new ArrayList<>();
final Set<String> taskIdsSeen = new HashSet<>();
@Override
public List<TaskStatusPlus> getPendingTasks()
{
return getTasks("pendingTasks");
}
final Iterable<TaskStatusPlus> activeTasks = Iterables.concat(
getTasks("waitingTasks"),
getTasks("pendingTasks"),
getTasks("runningTasks")
);
@Override
public List<TaskStatusPlus> getWaitingTasks()
{
return getTasks("waitingTasks");
for (TaskStatusPlus task : activeTasks) {
// Use taskIdsSeen to prevent returning the same task ID more than once (if it hops from 'pending' to 'running',
// for example, and we see it twice.)
if (taskIdsSeen.add(task.getId())) {
tasks.add(task);
}
}
return tasks;
}
private List<TaskStatusPlus> getTasks(String endpointSuffix)

View File

@ -49,11 +49,10 @@ public interface IndexingServiceClient
String killTask(String taskId);
List<TaskStatusPlus> getRunningTasks();
List<TaskStatusPlus> getPendingTasks();
List<TaskStatusPlus> getWaitingTasks();
/**
* Gets all tasks that are waiting, pending, or running.
*/
List<TaskStatusPlus> getActiveTasks();
TaskStatusResponse getTaskStatus(String taskId);

View File

@ -52,27 +52,11 @@ public class DruidCoordinatorCleanupPendingSegments implements DruidCoordinatorH
final List<DateTime> createdTimes = new ArrayList<>();
createdTimes.add(
indexingServiceClient
.getRunningTasks()
.getActiveTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no running tasks, this returns the current time.
);
createdTimes.add(
indexingServiceClient
.getPendingTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no pending tasks, this returns the current time.
);
createdTimes.add(
indexingServiceClient
.getWaitingTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no waiting tasks, this returns the current time.
.orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time.
);
final TaskStatusPlus completeTaskStatus = indexingServiceClient.getLastCompleteTask();

View File

@ -41,7 +41,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -84,11 +83,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
indexingServiceClient.getRunningTasks(),
indexingServiceClient.getPendingTasks(),
indexingServiceClient.getWaitingTasks()
);
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks());
// dataSource -> list of intervals of compact tasks
final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
for (TaskStatusPlus status : compactTasks) {
@ -98,13 +93,22 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
}
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
final Interval interval = JodaUtils.umbrellaInterval(
compactQuery.getSegments()
.stream()
.map(DataSegment::getInterval)
.sorted(Comparators.intervalsByStartThenEnd())
.collect(Collectors.toList())
);
final Interval interval;
if (compactQuery.getSegments() != null) {
interval = JodaUtils.umbrellaInterval(
compactQuery.getSegments()
.stream()
.map(DataSegment::getInterval)
.sorted(Comparators.intervalsByStartThenEnd())
.collect(Collectors.toList())
);
} else if (compactQuery.getInterval() != null) {
interval = compactQuery.getInterval();
} else {
throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId());
}
compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
@ -146,13 +150,9 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
.build();
}
@SafeVarargs
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus> taskStatuses)
{
final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);
return allTaskStatusPlus
return taskStatuses
.stream()
.filter(status -> {
final String taskType = status.getType();

View File

@ -25,6 +25,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -74,21 +75,9 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
}
@Override
public List<TaskStatusPlus> getRunningTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return null;
}
@Override
public List<TaskStatusPlus> getPendingTasks()
{
return null;
}
@Override
public List<TaskStatusPlus> getWaitingTasks()
{
return null;
return Collections.emptyList();
}
@Override

View File

@ -107,19 +107,7 @@ public class DruidCoordinatorSegmentCompactorTest
}
@Override
public List<TaskStatusPlus> getRunningTasks()
{
return Collections.emptyList();
}
@Override
public List<TaskStatusPlus> getPendingTasks()
{
return Collections.emptyList();
}
@Override
public List<TaskStatusPlus> getWaitingTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return Collections.emptyList();
}