Kafka Index Task that supports Incremental handoffs (#4815)

* Kafka Index Task that supports Incremental handoffs
- Incrementally handoff segments when they hit maxRowsPerSegment limit
- Decouple segment partitioning from Kafka partitioning, all records from consumed partitions go to a single druid segment
- Support for restoring task on middle manager restarts by check pointing end offsets for segments

* take care of review comments

* make getCurrentOffsets call async, keep track of publishing sequence, review comments

* fix setEndoffset duplicate request handling, formatting

* fix unit test

* backward compatibility

* make AppenderatorDriverMetadata backwards compatible

* add unit test

* fix deadlock between persist and push executors in AppenderatorImpl

* fix formatting

* use persist dir instead of work dir

* review comments

* fix deadlock

* actually fix deadlock
This commit is contained in:
Parag Jain 2017-11-17 16:05:20 -06:00 committed by Himanshu
parent 590633c595
commit cb03efeb14
31 changed files with 2599 additions and 486 deletions

View File

@ -121,7 +121,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. This option is deprecated. Use `completionTimeout` of KafkaSupervisorIOConfig instead.|no (default == 0)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|
|`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))|
|`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))|

View File

@ -34,7 +34,6 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.FullResponseHandler;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
@ -45,6 +44,7 @@ import io.druid.java.util.common.IAE;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.segment.realtime.firehose.ChatHandlerResource;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
@ -58,6 +58,7 @@ import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
public class KafkaIndexTaskClient
@ -84,6 +85,7 @@ public class KafkaIndexTaskClient
private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class);
private static final String BASE_PATH = "/druid/worker/v1/chat";
private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
private static final TreeMap EMPTY_TREE_MAP = new TreeMap();
private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
@ -270,6 +272,33 @@ public class KafkaIndexTaskClient
}
}
public TreeMap<Integer, Map<Integer, Long>> getCheckpoints(final String id, final boolean retry)
{
log.debug("GetCheckpoints task[%s] retry[%s]", id, retry);
try {
final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "checkpoints", null, retry);
return jsonMapper.readValue(response.getContent(), new TypeReference<TreeMap<Integer, TreeMap<Integer, Long>>>()
{
});
}
catch (NoTaskLocationException e) {
return EMPTY_TREE_MAP;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public ListenableFuture<TreeMap<Integer, Map<Integer, Long>>> getCheckpointsAsync(
final String id,
final boolean retry
)
{
return executorService.submit(
() -> getCheckpoints(id, retry)
);
}
public Map<Integer, Long> getEndOffsets(final String id)
{
log.debug("GetEndOffsets task[%s]", id);
@ -288,21 +317,21 @@ public class KafkaIndexTaskClient
}
}
public boolean setEndOffsets(final String id, final Map<Integer, Long> endOffsets)
public boolean setEndOffsets(
final String id,
final Map<Integer, Long> endOffsets,
final boolean resume,
final boolean finalize
)
{
return setEndOffsets(id, endOffsets, false);
}
public boolean setEndOffsets(final String id, final Map<Integer, Long> endOffsets, final boolean resume)
{
log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s]", id, endOffsets, resume);
log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize);
try {
final FullResponseHolder response = submitRequest(
id,
HttpMethod.POST,
"offsets/end",
resume ? "resume=true" : null,
StringUtils.format("resume=%s&finish=%s", resume, finalize),
jsonMapper.writeValueAsBytes(endOffsets),
true
);
@ -419,13 +448,8 @@ public class KafkaIndexTaskClient
);
}
public ListenableFuture<Boolean> setEndOffsetsAsync(final String id, final Map<Integer, Long> endOffsets)
{
return setEndOffsetsAsync(id, endOffsets, false);
}
public ListenableFuture<Boolean> setEndOffsetsAsync(
final String id, final Map<Integer, Long> endOffsets, final boolean resume
final String id, final Map<Integer, Long> endOffsets, final boolean resume, final boolean finalize
)
{
return executorService.submit(
@ -434,7 +458,7 @@ public class KafkaIndexTaskClient
@Override
public Boolean call() throws Exception
{
return setEndOffsets(id, endOffsets, resume);
return setEndOffsets(id, endOffsets, resume, finalize);
}
}
);
@ -483,7 +507,10 @@ public class KafkaIndexTaskClient
Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(id);
if (!status.isPresent() || !status.get().isRunnable()) {
throw new TaskNotRunnableException(StringUtils.format("Aborting request because task [%s] is not runnable", id));
throw new TaskNotRunnableException(StringUtils.format(
"Aborting request because task [%s] is not runnable",
id
));
}
String host = location.getHost();

View File

@ -38,10 +38,10 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final int maxRowsPerSegment;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
@Deprecated
private final int maxPendingPersists;
private final IndexSpec indexSpec;
private final boolean reportParseExceptions;
@Deprecated
private final long handoffConditionTimeout;
private final boolean resetOffsetAutomatically;
@ -69,7 +69,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
? defaults.getIntermediatePersistPeriod()
: intermediatePersistPeriod;
this.basePersistDirectory = defaults.getBasePersistDirectory();
this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists;
this.maxPendingPersists = 0;
this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
this.reportParseExceptions = reportParseExceptions == null
? defaults.isReportParseExceptions()
@ -127,6 +127,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
@Override
@JsonProperty
@Deprecated
public int getMaxPendingPersists()
{
return maxPendingPersists;
@ -156,7 +157,6 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return reportParseExceptions;
}
@Deprecated
@JsonProperty
public long getHandoffConditionTimeout()
{

View File

@ -20,6 +20,7 @@
package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@ -31,10 +32,13 @@ import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -42,7 +46,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
@ -68,7 +71,9 @@ import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.metadata.EntryExistsException;
import io.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.commons.codec.digest.DigestUtils;
@ -80,14 +85,18 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -97,6 +106,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -118,6 +128,9 @@ public class KafkaSupervisor implements Supervisor
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
private static final CopyOnWriteArrayList EMPTY_LIST = Lists.newCopyOnWriteArrayList();
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
// Internal data structures
// --------------------------------------------------------
@ -143,12 +156,24 @@ public class KafkaSupervisor implements Supervisor
final Optional<DateTime> minimumMessageTime;
final Optional<DateTime> maximumMessageTime;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();
public TaskGroup(ImmutableMap<Integer, Long> partitionOffsets, Optional<DateTime> minimumMessageTime, Optional<DateTime> maximumMessageTime)
public TaskGroup(
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
)
{
this.partitionOffsets = partitionOffsets;
this.minimumMessageTime = minimumMessageTime;
this.maximumMessageTime = maximumMessageTime;
this.sequenceOffsets.put(0, partitionOffsets);
}
public int addNewCheckpoint(Map<Integer, Long> checkpoint)
{
sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint);
return sequenceOffsets.lastKey();
}
Set<String> taskIds()
@ -186,6 +211,9 @@ public class KafkaSupervisor implements Supervisor
private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> partitionGroups = new ConcurrentHashMap<>();
// --------------------------------------------------------
// BaseSequenceName -> TaskGroup
private final ConcurrentHashMap<String, TaskGroup> sequenceTaskGroup = new ConcurrentHashMap<>();
private final TaskStorage taskStorage;
private final TaskMaster taskMaster;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@ -464,6 +492,32 @@ public class KafkaSupervisor implements Supervisor
notices.add(new ResetNotice(dataSourceMetadata));
}
@Override
public void checkpoint(
String sequenceName,
DataSourceMetadata previousCheckpoint,
DataSourceMetadata currentCheckpoint
)
{
Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a sequence name");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
Preconditions.checkArgument(
ioConfig.getTopic()
.equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
.getTopic()),
"Supervisor topic [%s] and topic in checkpoint [%s] does not match",
ioConfig.getTopic(),
((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
);
log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint, sequenceName);
notices.add(new CheckpointNotice(
sequenceName,
(KafkaDataSourceMetadata) previousCheckpoint,
(KafkaDataSourceMetadata) currentCheckpoint
));
}
public void possiblyRegisterListener()
{
// getTaskRunner() sometimes fails if the task queue is still being initialized so retry later until we succeed
@ -503,13 +557,13 @@ public class KafkaSupervisor implements Supervisor
private interface Notice
{
void handle() throws ExecutionException, InterruptedException, TimeoutException;
void handle() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException;
}
private class RunNotice implements Notice
{
@Override
public void handle() throws ExecutionException, InterruptedException, TimeoutException
public void handle() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException
{
long nowTime = System.currentTimeMillis();
if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
@ -557,11 +611,83 @@ public class KafkaSupervisor implements Supervisor
@Override
public void handle()
{
log.makeAlert("Resetting dataSource [%s]", dataSource).emit();
resetInternal(dataSourceMetadata);
}
}
private class CheckpointNotice implements Notice
{
final String sequenceName;
final KafkaDataSourceMetadata previousCheckpoint;
final KafkaDataSourceMetadata currentCheckpoint;
CheckpointNotice(
String sequenceName,
KafkaDataSourceMetadata previousCheckpoint,
KafkaDataSourceMetadata currentCheckpoint
)
{
this.sequenceName = sequenceName;
this.previousCheckpoint = previousCheckpoint;
this.currentCheckpoint = currentCheckpoint;
}
@Override
public void handle() throws ExecutionException, InterruptedException, TimeoutException
{
// check for consistency
// if already received request for this sequenceName and dataSourceMetadata combination then return
Preconditions.checkNotNull(
sequenceTaskGroup.get(sequenceName),
"WTH?! cannot find task group for this sequence [%s], sequencesTaskGroup map [%s], taskGroups [%s]",
sequenceName,
sequenceTaskGroup,
taskGroups
);
final TreeMap<Integer, Map<Integer, Long>> checkpoints = sequenceTaskGroup.get(sequenceName).sequenceOffsets;
// check validity of previousCheckpoint if it is not null
if (previousCheckpoint != null) {
int index = checkpoints.size();
for (int sequenceId : checkpoints.descendingKeySet()) {
Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
if (checkpoint.equals(previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap())) {
break;
}
index--;
}
if (index == 0) {
throw new ISE("No such previous checkpoint [%s] found", previousCheckpoint);
} else if (index < checkpoints.size()) {
// if the found checkpoint is not the latest one then already checkpointed by a replica
Preconditions.checkState(index == checkpoints.size() - 1, "checkpoint consistency failure");
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return;
}
} else {
// There cannot be more than one checkpoint when previous checkpoint is null
// as when the task starts they are sent existing checkpoints
Preconditions.checkState(
checkpoints.size() <= 1,
"Got checkpoint request with null as previous check point, however found more than one checkpoints in metadata store"
);
if (checkpoints.size() == 1) {
log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0));
return;
}
}
final int taskGroupId = getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions()
.getPartitionOffsetMap()
.keySet()
.iterator()
.next());
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence [%s]", newCheckpoint, sequenceName);
}
}
@VisibleForTesting
void resetInternal(DataSourceMetadata dataSourceMetadata)
{
@ -569,7 +695,10 @@ public class KafkaSupervisor implements Supervisor
// Reset everything
boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
killTaskGroupForPartitions(taskGroups.keySet());
taskGroups.values().forEach(this::killTasksInGroup);
taskGroups.clear();
partitionGroups.clear();
sequenceTaskGroup.clear();
} else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass());
} else {
@ -608,6 +737,7 @@ public class KafkaSupervisor implements Supervisor
}
if (!doReset) {
log.info("Ignoring duplicate reset request [%s]", dataSourceMetadata);
return;
}
@ -625,7 +755,13 @@ public class KafkaSupervisor implements Supervisor
}
}
if (metadataUpdateSuccess) {
killTaskGroupForPartitions(resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet());
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(ImmutableSet.of(partition));
sequenceTaskGroup.remove(generateSequenceName(groupId));
taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET);
});
} else {
throw new ISE("Unable to reset metadata");
}
@ -642,16 +778,17 @@ public class KafkaSupervisor implements Supervisor
private void killTaskGroupForPartitions(Set<Integer> partitions)
{
for (Integer partition : partitions) {
TaskGroup taskGroup = taskGroups.get(getTaskGroupIdForPartition(partition));
if (taskGroup != null) {
// kill all tasks in this task group
for (String taskId : taskGroup.tasks.keySet()) {
log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId);
killTask(taskId);
}
killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)));
}
}
private void killTasksInGroup(TaskGroup taskGroup)
{
if (taskGroup != null) {
for (String taskId : taskGroup.tasks.keySet()) {
log.info("Killing task [%s] in the task group", taskId);
killTask(taskId);
}
partitionGroups.remove(getTaskGroupIdForPartition(partition));
taskGroups.remove(getTaskGroupIdForPartition(partition));
}
}
@ -676,7 +813,7 @@ public class KafkaSupervisor implements Supervisor
}
@VisibleForTesting
void runInternal() throws ExecutionException, InterruptedException, TimeoutException
void runInternal() throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException
{
possiblyRegisterListener();
updatePartitionDataFromKafka();
@ -694,19 +831,19 @@ public class KafkaSupervisor implements Supervisor
}
}
@VisibleForTesting
String generateSequenceName(int groupId)
String generateSequenceName(
Map<Integer, Long> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
)
{
StringBuilder sb = new StringBuilder();
Map<Integer, Long> startPartitions = taskGroups.get(groupId).partitionOffsets;
for (Map.Entry<Integer, Long> entry : startPartitions.entrySet()) {
sb.append(StringUtils.format("+%d(%d)", entry.getKey(), entry.getValue()));
}
String partitionOffsetStr = sb.toString().substring(1);
Optional<DateTime> minimumMessageTime = taskGroups.get(groupId).minimumMessageTime;
Optional<DateTime> maximumMessageTime = taskGroups.get(groupId).maximumMessageTime;
String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : "");
String maxMsgTimeStr = (maximumMessageTime.isPresent() ? String.valueOf(maximumMessageTime.get().getMillis()) : "");
@ -719,12 +856,26 @@ public class KafkaSupervisor implements Supervisor
throw Throwables.propagate(e);
}
String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr + minMsgTimeStr + maxMsgTimeStr)
String hashCode = DigestUtils.sha1Hex(dataSchema
+ tuningConfig
+ partitionOffsetStr
+ minMsgTimeStr
+ maxMsgTimeStr)
.substring(0, 15);
return Joiner.on("_").join("index_kafka", dataSource, hashCode);
}
@VisibleForTesting
String generateSequenceName(int groupId)
{
return generateSequenceName(
taskGroups.get(groupId).partitionOffsets,
taskGroups.get(groupId).minimumMessageTime,
taskGroups.get(groupId).maximumMessageTime
);
}
private static String getRandomId()
{
final StringBuilder suffix = new StringBuilder(8);
@ -813,6 +964,7 @@ public class KafkaSupervisor implements Supervisor
List<String> futureTaskIds = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
List<Task> tasks = taskStorage.getActiveTasks();
final Set<Integer> taskGroupsToVerify = new HashSet<>();
for (Task task : tasks) {
if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) {
@ -847,44 +999,63 @@ public class KafkaSupervisor implements Supervisor
@Override
public Boolean apply(KafkaIndexTask.Status status)
{
if (status == KafkaIndexTask.Status.PUBLISHING) {
addDiscoveredTaskToPendingCompletionTaskGroups(
taskGroupId,
taskId,
kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
);
// update partitionGroups with the publishing task's offsets (if they are greater than what is
// existing) so that the next tasks will start reading from where this task left off
Map<Integer, Long> publishingTaskCurrentOffsets = taskClient.getCurrentOffsets(taskId, true);
for (Map.Entry<Integer, Long> entry : publishingTaskCurrentOffsets.entrySet()) {
Integer partition = entry.getKey();
Long offset = entry.getValue();
ConcurrentHashMap<Integer, Long> partitionOffsets = partitionGroups.get(
getTaskGroupIdForPartition(partition)
try {
log.debug("Task [%s], status [%s]", taskId, status);
if (status == KafkaIndexTask.Status.PUBLISHING) {
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet().forEach(
partition -> addDiscoveredTaskToPendingCompletionTaskGroups(
getTaskGroupIdForPartition(partition),
taskId,
kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
)
);
boolean succeeded;
do {
succeeded = true;
Long previousOffset = partitionOffsets.putIfAbsent(partition, offset);
if (previousOffset != null && previousOffset < offset) {
succeeded = partitionOffsets.replace(partition, previousOffset, offset);
}
} while (!succeeded);
}
// update partitionGroups with the publishing task's offsets (if they are greater than what is
// existing) so that the next tasks will start reading from where this task left off
Map<Integer, Long> publishingTaskEndOffsets = taskClient.getEndOffsets(taskId);
} else {
for (Integer partition : kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
.keySet()) {
if (!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
log.warn(
"Stopping task [%s] which does not match the expected partition allocation",
for (Map.Entry<Integer, Long> entry : publishingTaskEndOffsets.entrySet()) {
Integer partition = entry.getKey();
Long offset = entry.getValue();
ConcurrentHashMap<Integer, Long> partitionOffsets = partitionGroups.get(
getTaskGroupIdForPartition(partition)
);
boolean succeeded;
do {
succeeded = true;
Long previousOffset = partitionOffsets.putIfAbsent(partition, offset);
if (previousOffset != null && previousOffset < offset) {
succeeded = partitionOffsets.replace(partition, previousOffset, offset);
}
} while (!succeeded);
}
} else {
for (Integer partition : kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
.keySet()) {
if (!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
log.warn(
"Stopping task [%s] which does not match the expected partition allocation",
taskId
);
try {
stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(e, "Exception while stopping task");
}
return false;
}
}
// make sure the task's io and tuning configs match with the supervisor config
// if it is current then only create corresponding taskGroup if it does not exist
if (!isTaskCurrent(taskGroupId, taskId)) {
log.info(
"Stopping task [%s] which does not match the expected parameters and ingestion spec",
taskId
);
try {
@ -894,41 +1065,31 @@ public class KafkaSupervisor implements Supervisor
log.warn(e, "Exception while stopping task");
}
return false;
} else {
if (taskGroups.putIfAbsent(
taskGroupId,
new TaskGroup(
ImmutableMap.copyOf(
kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
), kafkaTask.getIOConfig().getMinimumMessageTime(),
kafkaTask.getIOConfig().getMaximumMessageTime()
)
) == null) {
sequenceTaskGroup.put(generateSequenceName(taskGroupId), taskGroups.get(taskGroupId));
log.info("Created new task group [%d]", taskGroupId);
}
taskGroupsToVerify.add(taskGroupId);
taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData());
}
}
if (taskGroups.putIfAbsent(
taskGroupId,
new TaskGroup(
ImmutableMap.copyOf(
kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
),
kafkaTask.getIOConfig().getMinimumMessageTime(),
kafkaTask.getIOConfig().getMaximumMessageTime()
)
) == null) {
log.debug("Created new task group [%d]", taskGroupId);
}
if (!isTaskCurrent(taskGroupId, taskId)) {
log.info(
"Stopping task [%s] which does not match the expected parameters and ingestion spec",
taskId
);
try {
stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(e, "Exception while stopping task");
}
return false;
} else {
taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData());
}
return true;
}
catch (Throwable t) {
log.error(t, "Something bad while discovering task [%s]", taskId);
return null;
}
return true;
}
}, workerExec
)
@ -946,6 +1107,150 @@ public class KafkaSupervisor implements Supervisor
}
}
log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);
// make sure the checkpoints are consistent with each other and with the metadata store
taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
}
/**
* This method does two things -
* 1. Makes sure the checkpoints information in the taskGroup is consistent with that of the tasks, if not kill
* inconsistent tasks.
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
* created tasks for the taskGroup start indexing from after the latest published offsets.
*/
private void verifyAndMergeCheckpoints(final Integer groupId)
{
final TaskGroup taskGroup = taskGroups.get(groupId);
// List<TaskId, Map -> {SequenceId, Checkpoints}>
final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>();
final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>();
for (String taskId : taskGroup.taskIds()) {
final ListenableFuture<TreeMap<Integer, Map<Integer, Long>>> checkpointsFuture = taskClient.getCheckpointsAsync(
taskId,
true
);
futures.add(checkpointsFuture);
Futures.addCallback(
checkpointsFuture,
new FutureCallback<TreeMap<Integer, Map<Integer, Long>>>()
{
@Override
public void onSuccess(TreeMap<Integer, Map<Integer, Long>> checkpoints)
{
if (!checkpoints.isEmpty()) {
taskSequences.add(new Pair<>(taskId, checkpoints));
} else {
log.warn("Ignoring task [%s], as probably it is not started running yet", taskId);
}
}
@Override
public void onFailure(Throwable t)
{
log.error(t, "Problem while getting checkpoints for task [%s], killing the task", taskId);
killTask(taskId);
taskGroup.tasks.remove(taskId);
}
}
);
}
try {
Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (Exception e) {
Throwables.propagate(e);
}
final KafkaDataSourceMetadata latestDataSourceMetadata = (KafkaDataSourceMetadata) indexerMetadataStorageCoordinator
.getDataSourceMetadata(dataSource);
final Map<Integer, Long> latestOffsetsFromDb = (latestDataSourceMetadata == null
|| latestDataSourceMetadata.getKafkaPartitions() == null) ? null
: latestDataSourceMetadata
.getKafkaPartitions()
.getPartitionOffsetMap();
// order tasks of this taskGroup by the latest sequenceId
taskSequences.sort((o1, o2) -> o2.rhs.firstKey().compareTo(o1.rhs.firstKey()));
final Set<String> tasksToKill = new HashSet<>();
final AtomicInteger earliestConsistentSequenceId = new AtomicInteger(-1);
int taskIndex = 0;
while (taskIndex < taskSequences.size()) {
if (earliestConsistentSequenceId.get() == -1) {
// find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata store
if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch(
sequenceCheckpoint -> sequenceCheckpoint.getValue().entrySet().stream().allMatch(
partitionOffset -> Longs.compare(
partitionOffset.getValue(),
latestOffsetsFromDb == null
?
partitionOffset.getValue()
: latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
) == 0) && earliestConsistentSequenceId.compareAndSet(-1, sequenceCheckpoint.getKey())) || (
pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() > 0
&& earliestConsistentSequenceId.compareAndSet(-1, taskSequences.get(taskIndex).rhs.firstKey()))) {
final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(taskSequences.get(taskIndex).rhs
.tailMap(
earliestConsistentSequenceId
.get()));
log.info("Setting taskGroup sequences to [%s] for group [%d]", latestCheckpoints, groupId);
taskGroup.sequenceOffsets.clear();
taskGroup.sequenceOffsets.putAll(latestCheckpoints);
} else {
log.debug(
"Adding task [%s] to kill list, checkpoints[%s], latestoffsets from DB [%s]",
taskSequences.get(taskIndex).lhs,
taskSequences.get(taskIndex).rhs,
latestOffsetsFromDb
);
tasksToKill.add(taskSequences.get(taskIndex).lhs);
}
} else {
// check consistency with taskGroup sequences
if (taskSequences.get(taskIndex).rhs.get(taskGroup.sequenceOffsets.firstKey()) == null
|| !(taskSequences.get(taskIndex).rhs.get(taskGroup.sequenceOffsets.firstKey())
.equals(taskGroup.sequenceOffsets.firstEntry().getValue()))
|| taskSequences.get(taskIndex).rhs.tailMap(taskGroup.sequenceOffsets.firstKey()).size()
!= taskGroup.sequenceOffsets.size()) {
log.debug(
"Adding task [%s] to kill list, checkpoints[%s], taskgroup checkpoints [%s]",
taskSequences.get(taskIndex).lhs,
taskSequences.get(taskIndex).rhs,
taskGroup.sequenceOffsets
);
tasksToKill.add(taskSequences.get(taskIndex).lhs);
}
}
taskIndex++;
}
if ((tasksToKill.size() > 0 && tasksToKill.size() == taskGroup.tasks.size()) ||
(taskGroup.tasks.size() == 0 && pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() == 0)) {
// killing all tasks or no task left in the group ?
// clear state about the taskgroup so that get latest offset information is fetched from metadata store
log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId);
sequenceTaskGroup.remove(generateSequenceName(groupId));
taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(
sequenceCheckpoint -> {
log.warn(
"Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest persisted offsets in metadata store [%s]",
sequenceCheckpoint.lhs,
sequenceCheckpoint.rhs,
taskGroup.sequenceOffsets,
latestOffsetsFromDb
);
killTask(sequenceCheckpoint.lhs);
taskGroup.tasks.remove(sequenceCheckpoint.lhs);
});
}
private void addDiscoveredTaskToPendingCompletionTaskGroups(
@ -960,17 +1265,21 @@ public class KafkaSupervisor implements Supervisor
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.partitionOffsets.equals(startingPartitions)) {
if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
log.info("Added discovered task [%s] to existing pending task group", taskId);
log.info("Added discovered task [%s] to existing pending task group [%s]", taskId, groupId);
}
return;
}
}
log.info("Creating new pending completion task group for discovered task [%s]", taskId);
log.info("Creating new pending completion task group [%s] for discovered task [%s]", groupId, taskId);
// reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot
// change to a state where it will read any more events
TaskGroup newTaskGroup = new TaskGroup(ImmutableMap.copyOf(startingPartitions), Optional.<DateTime>absent(), Optional.<DateTime>absent());
TaskGroup newTaskGroup = new TaskGroup(
ImmutableMap.copyOf(startingPartitions),
Optional.<DateTime>absent(),
Optional.<DateTime>absent()
);
newTaskGroup.tasks.put(taskId, new TaskData());
newTaskGroup.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
@ -1067,7 +1376,7 @@ public class KafkaSupervisor implements Supervisor
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(signalTasksToFinish(groupId));
futures.add(checkpointTaskGroup(groupId, true));
}
}
@ -1096,47 +1405,53 @@ public class KafkaSupervisor implements Supervisor
for (String id : group.taskIds()) {
killTask(id);
}
// clear partitionGroups, so that latest offsets from db is used as start offsets not the stale ones
// if tasks did some successful incremental handoffs
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
sequenceTaskGroup.remove(generateSequenceName(groupId));
// remove this task group from the list of current task groups now that it has been handled
taskGroups.remove(groupId);
}
}
private ListenableFuture<Map<Integer, Long>> signalTasksToFinish(final int groupId)
private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
{
final TaskGroup taskGroup = taskGroups.get(groupId);
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<String, TaskData> taskEntry = i.next();
String taskId = taskEntry.getKey();
TaskData task = taskEntry.getValue();
if (finalize) {
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<String, TaskData> taskEntry = i.next();
String taskId = taskEntry.getKey();
TaskData task = taskEntry.getValue();
if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
// failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup), new Function<Object, Map<Integer, Long>>()
{
@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
// failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup), new Function<Object, Map<Integer, Long>>()
{
return null;
@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
{
return null;
}
}
}
);
}
);
}
if (task.status.isRunnable()) {
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
killTask(taskId);
i.remove();
if (task.status.isRunnable()) {
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
killTask(taskId);
i.remove();
}
}
}
}
@ -1186,12 +1501,23 @@ public class KafkaSupervisor implements Supervisor
return null;
}
log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true));
}
try {
if (endOffsets.equals(taskGroup.sequenceOffsets.lastEntry().getValue())) {
log.warn(
"Not adding checkpoint [%s] as its same as the start offsets [%s] of latest sequence for the task group [%d]",
endOffsets,
taskGroup.sequenceOffsets.lastEntry().getValue(),
groupId
);
return endOffsets;
}
log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize));
}
List<Boolean> results = Futures.successfulAsList(setEndOffsetFutures)
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) {
@ -1204,6 +1530,7 @@ public class KafkaSupervisor implements Supervisor
}
}
catch (Exception e) {
log.error("Something bad happened [%s]", e.getMessage());
Throwables.propagate(e);
}
@ -1284,17 +1611,16 @@ public class KafkaSupervisor implements Supervisor
}
// reset partitions offsets for this task group so that they will be re-read from metadata storage
partitionGroups.remove(groupId);
// stop all the tasks in this pending completion group
futures.add(stopTasksInGroup(group));
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
sequenceTaskGroup.remove(generateSequenceName(groupId));
// kill all the tasks in this pending completion group
killTasksInGroup(group);
// set a flag so the other pending completion groups for this set of partitions will also stop
stopTasksInTaskGroup = true;
// stop all the tasks in the currently reading task group and remove the bad task group
futures.add(stopTasksInGroup(taskGroups.remove(groupId)));
// kill all the tasks in the currently reading task group and remove the bad task group
killTasksInGroup(taskGroups.remove(groupId));
toRemove.add(group);
}
}
@ -1347,6 +1673,7 @@ public class KafkaSupervisor implements Supervisor
// be recreated with the next set of offsets
if (taskData.status.isSuccess()) {
futures.add(stopTasksInGroup(taskGroup));
sequenceTaskGroup.remove(generateSequenceName(groupId));
iTaskGroups.remove();
break;
}
@ -1358,8 +1685,14 @@ public class KafkaSupervisor implements Supervisor
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
void createNewTasks()
void createNewTasks() throws JsonProcessingException
{
// update the checkpoints in the taskGroup to latest ones so that new tasks do not read what is already published
taskGroups.entrySet()
.stream()
.filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas())
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey()));
// check that there is a current task group for each group of partitions in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) {
if (!taskGroups.containsKey(groupId)) {
@ -1373,7 +1706,15 @@ public class KafkaSupervisor implements Supervisor
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
) : Optional.<DateTime>absent());
taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime, maximumMessageTime));
taskGroups.put(
groupId,
new TaskGroup(
generateStartingOffsetsForPartitionGroup(groupId),
minimumMessageTime,
maximumMessageTime
)
);
sequenceTaskGroup.put(generateSequenceName(groupId), taskGroups.get(groupId));
}
}
@ -1400,10 +1741,11 @@ public class KafkaSupervisor implements Supervisor
}
}
private void createKafkaTasksForGroup(int groupId, int replicas)
private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProcessingException
{
Map<Integer, Long> startPartitions = taskGroups.get(groupId).partitionOffsets;
Map<Integer, Long> endPartitions = new HashMap<>();
for (Integer partition : startPartitions.keySet()) {
endPartitions.put(partition, Long.MAX_VALUE);
}
@ -1426,6 +1768,16 @@ public class KafkaSupervisor implements Supervisor
ioConfig.isSkipOffsetGaps()
);
final String checkpoints = sortingMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
}).writeValueAsString(taskGroups.get(groupId).sequenceOffsets);
final Map<String, Object> context = spec.getContext() == null
? ImmutableMap.of("checkpoints", checkpoints, IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
: ImmutableMap.<String, Object>builder()
.put("checkpoints", checkpoints)
.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
.putAll(spec.getContext())
.build();
for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(sequenceName, getRandomId());
KafkaIndexTask indexTask = new KafkaIndexTask(
@ -1434,7 +1786,7 @@ public class KafkaSupervisor implements Supervisor
spec.getDataSchema(),
taskTuningConfig,
kafkaIOConfig,
spec.getContext(),
context,
null,
null
);
@ -1558,8 +1910,17 @@ public class KafkaSupervisor implements Supervisor
}
String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName();
return generateSequenceName(taskGroupId).equals(taskSequenceName);
if (taskGroups.get(taskGroupId) != null) {
return generateSequenceName(taskGroupId).equals(taskSequenceName);
} else {
return generateSequenceName(
((KafkaIndexTask) taskOptional.get()).getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap(),
((KafkaIndexTask) taskOptional.get()).getIOConfig().getMinimumMessageTime(),
((KafkaIndexTask) taskOptional.get()).getIOConfig().getMaximumMessageTime()
).equals(taskSequenceName);
}
}
private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup)
@ -1607,7 +1968,7 @@ public class KafkaSupervisor implements Supervisor
}
}
private int getTaskGroupIdForPartition(int partition)
protected int getTaskGroupIdForPartition(int partition)
{
return partition % ioConfig.getTaskCount();
}
@ -1698,7 +2059,7 @@ public class KafkaSupervisor implements Supervisor
}
}
taskReports.stream().forEach(report::addTask);
taskReports.forEach(report::addTask);
}
catch (Exception e) {
log.warn(e, "Failed to generate status report");

View File

@ -46,7 +46,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("workerThreads") Integer workerThreads,
@JsonProperty("chatThreads") Integer chatThreads,
@ -65,8 +65,6 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
indexSpec,
true,
reportParseExceptions,
// Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of
// handoffConditionTimeout
handoffConditionTimeout,
resetOffsetAutomatically
);

View File

@ -108,7 +108,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
headers = createMock(HttpHeaders.class);
client = new TestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider);
expect(taskInfoProvider.getTaskLocation(TEST_ID)).andReturn(new TaskLocation(TEST_HOST, TEST_PORT, TEST_TLS_PORT)).anyTimes();
expect(taskInfoProvider.getTaskLocation(TEST_ID)).andReturn(new TaskLocation(TEST_HOST, TEST_PORT, TEST_TLS_PORT))
.anyTimes();
expect(taskInfoProvider.getTaskStatus(TEST_ID)).andReturn(Optional.of(TaskStatus.running(TEST_ID))).anyTimes();
for (String testId : TEST_IDS) {
@ -143,8 +144,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Assert.assertEquals(null, client.getStartTime(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true));
Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of()));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), true));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), false, true));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), true, true));
verifyAll();
}
@ -153,7 +154,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testTaskNotRunnableException() throws Exception
{
reset(taskInfoProvider);
expect(taskInfoProvider.getTaskLocation(TEST_ID)).andReturn(new TaskLocation(TEST_HOST, TEST_PORT, TEST_TLS_PORT)).anyTimes();
expect(taskInfoProvider.getTaskLocation(TEST_ID)).andReturn(new TaskLocation(TEST_HOST, TEST_PORT, TEST_TLS_PORT))
.anyTimes();
expect(taskInfoProvider.getTaskStatus(TEST_ID)).andReturn(Optional.of(TaskStatus.failure(TEST_ID))).anyTimes();
replayAll();
@ -542,13 +544,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
);
replayAll();
client.setEndOffsets(TEST_ID, endOffsets);
client.setEndOffsets(TEST_ID, endOffsets, false, true);
verifyAll();
Request request = captured.getValue();
Assert.assertEquals(HttpMethod.POST, request.getMethod());
Assert.assertEquals(
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end"),
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@ -567,13 +569,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
);
replayAll();
client.setEndOffsets(TEST_ID, endOffsets, true);
client.setEndOffsets(TEST_ID, endOffsets, true, true);
verifyAll();
Request request = captured.getValue();
Assert.assertEquals(HttpMethod.POST, request.getMethod());
Assert.assertEquals(
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true"),
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@ -902,8 +904,14 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
for (String testId : TEST_IDS) {
expectedUrls.add(new URL(StringUtils.format(URL_FORMATTER, TEST_HOST, TEST_PORT, testId, "offsets/end")));
futures.add(client.setEndOffsetsAsync(testId, endOffsets));
expectedUrls.add(new URL(StringUtils.format(
URL_FORMATTER,
TEST_HOST,
TEST_PORT,
testId,
StringUtils.format("offsets/end?resume=%s&finish=%s", false, true)
)));
futures.add(client.setEndOffsetsAsync(testId, endOffsets, false, true));
}
List<Boolean> responses = Futures.allAsList(futures).get();
@ -942,11 +950,11 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
TEST_HOST,
TEST_PORT,
testId,
"offsets/end?resume=true"
"offsets/end?resume=true&finish=true"
)
)
);
futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
futures.add(client.setEndOffsetsAsync(testId, endOffsets, true, true));
}
List<Boolean> responses = Futures.allAsList(futures).get();

View File

@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -62,7 +63,9 @@ import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.kafka.supervisor.KafkaSupervisor;
import io.druid.indexing.kafka.test.TestBroker;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
@ -121,6 +124,7 @@ import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.appenderator.AppenderatorImpl;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
@ -141,12 +145,18 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@ -154,6 +164,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class)
public class KafkaIndexTaskTest
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
@ -170,7 +181,9 @@ public class KafkaIndexTaskTest
private long handoffConditionTimeout = 0;
private boolean reportParseExceptions = false;
private boolean resetOffsetAutomatically = false;
private boolean doHandoff = true;
private Integer maxRowsPerSegment = null;
private TaskToolboxFactory toolboxFactory;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
@ -179,6 +192,21 @@ public class KafkaIndexTaskTest
private File directory;
private String topic;
private List<ProducerRecord<byte[], byte[]>> records;
private final boolean isIncrementalHandoffSupported;
private final Set<Integer> checkpointRequestsHash = Sets.newHashSet();
// This should be removed in versions greater that 0.11.1
// isIncrementalHandoffSupported should always be set to true in those later versions
@Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{true}, new Object[]{false});
}
public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported)
{
this.isIncrementalHandoffSupported = isIncrementalHandoffSupported;
}
private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
@ -322,8 +350,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -365,8 +392,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -405,6 +431,96 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
public void testIncrementalHandOff() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 1;
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
}
Map<String, String> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L));
// Checkpointing will happen at either checkpoint1 or checkpoint2 depending on ordering
// of events fetched across two partitions from Kafka
final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 0L));
final KafkaPartitions checkpoint2 = new KafkaPartitions(topic, ImmutableMap.of(0, 4L, 1, 2L));
final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 8L, 1, 2L));
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
baseSequenceName,
startPartitions,
endPartitions,
consumerProps,
true,
false,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets());
Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap()
.equals(currentOffsets));
task.setEndOffsets(currentOffsets, true, false);
Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
Assert.assertTrue(checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
baseSequenceName,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets))
)
));
// Check metrics
Assert.assertEquals(8, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 8L, 1, 2L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3));
Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4))
&& ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) ||
(ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4))
&& ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5))));
Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6));
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
}
@Test(timeout = 60_000L)
public void testRunWithMinimumMessageTime() throws Exception
{
@ -420,8 +536,7 @@ public class KafkaIndexTaskTest
DateTimes.of("2010"),
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -475,8 +590,7 @@ public class KafkaIndexTaskTest
null,
DateTimes.of("2010"),
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -540,8 +654,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -601,8 +714,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -643,8 +755,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -696,14 +807,16 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit
Assert.assertEquals(TaskStatus.Status.FAILED, future.get().getStatusCode());
Assert.assertEquals(
isIncrementalHandoffSupported ? TaskStatus.Status.SUCCESS : TaskStatus.Status.FAILED,
future.get().getStatusCode()
);
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
@ -748,8 +861,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -782,8 +894,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final KafkaIndexTask task2 = createTask(
null,
@ -797,8 +908,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future1 = runTask(task1);
@ -852,8 +962,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final KafkaIndexTask task2 = createTask(
null,
@ -867,8 +976,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
// Insert data
@ -923,8 +1031,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final KafkaIndexTask task2 = createTask(
null,
@ -938,8 +1045,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
// Insert data
@ -999,8 +1105,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -1024,9 +1129,12 @@ public class KafkaIndexTaskTest
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
// desc3 will not be created in KafkaIndexTask (0.11.1) as it does not create per Kafka partition Druid segments
SegmentDescriptor desc3 = SD(task, "2011/P1D", 1);
SegmentDescriptor desc4 = SD(task, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals(isIncrementalHandoffSupported
? ImmutableSet.of(desc1, desc2, desc4)
: ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
@ -1038,8 +1146,12 @@ public class KafkaIndexTaskTest
// Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically
Assert.assertEquals(
ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")),
ImmutableSet.of(readSegmentColumn("dim1", desc2), readSegmentColumn("dim1", desc3))
isIncrementalHandoffSupported
? ImmutableSet.of(ImmutableList.of("d", "e", "h"))
: ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")),
isIncrementalHandoffSupported
? ImmutableSet.of(readSegmentColumn("dim1", desc2))
: ImmutableSet.of(readSegmentColumn("dim1", desc2), readSegmentColumn("dim1", desc3))
);
}
@ -1058,8 +1170,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final KafkaIndexTask task2 = createTask(
null,
@ -1073,8 +1184,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future1 = runTask(task1);
@ -1130,8 +1240,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future1 = runTask(task1);
@ -1151,6 +1260,8 @@ public class KafkaIndexTaskTest
// Stop without publishing segment
task1.stopGracefully();
unlockAppenderatorBasePersistDirForTask(task1);
Assert.assertEquals(TaskStatus.Status.SUCCESS, future1.get().getStatusCode());
// Start a new task
@ -1166,8 +1277,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future2 = runTask(task2);
@ -1219,8 +1329,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -1304,8 +1413,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
@ -1326,7 +1434,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets());
Map<Integer, Long> newEndOffsets = ImmutableMap.of(0, 4L);
task.setEndOffsets(newEndOffsets, false);
task.setEndOffsets(newEndOffsets, false, true);
Assert.assertEquals(newEndOffsets, task.getEndOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
task.resume();
@ -1341,7 +1449,7 @@ public class KafkaIndexTaskTest
// try again but with resume flag == true
newEndOffsets = ImmutableMap.of(0, 6L);
task.setEndOffsets(newEndOffsets, true);
task.setEndOffsets(newEndOffsets, true, true);
Assert.assertEquals(newEndOffsets, task.getEndOffsets());
Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
@ -1392,8 +1500,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
null
)
);
runTask(task);
@ -1412,6 +1519,7 @@ public class KafkaIndexTaskTest
@Test(timeout = 30_000L)
public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAvailable() throws Exception
{
resetOffsetAutomatically = true;
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
@ -1431,8 +1539,7 @@ public class KafkaIndexTaskTest
null,
null,
false
),
true
)
);
runTask(task);
@ -1500,23 +1607,21 @@ public class KafkaIndexTaskTest
private KafkaIndexTask createTask(
final String taskId,
final KafkaIOConfig ioConfig,
final Boolean resetOffsetAutomatically
final KafkaIOConfig ioConfig
)
{
return createTask(taskId, DATA_SCHEMA, ioConfig, resetOffsetAutomatically);
return createTask(taskId, DATA_SCHEMA, ioConfig);
}
private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KafkaIOConfig ioConfig,
final Boolean resetOffsetAutomatically
final KafkaIOConfig ioConfig
)
{
final KafkaTuningConfig tuningConfig = new KafkaTuningConfig(
1000,
null,
maxRowsPerSegment,
new Period("P1Y"),
null,
null,
@ -1526,13 +1631,16 @@ public class KafkaIndexTaskTest
handoffConditionTimeout,
resetOffsetAutomatically
);
final Map<String, Object> context = isIncrementalHandoffSupported
? ImmutableMap.of(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
: null;
final KafkaIndexTask task = new KafkaIndexTask(
taskId,
null,
cloneDataSchema(dataSchema),
tuningConfig,
ioConfig,
null,
context,
null,
null
);
@ -1633,6 +1741,25 @@ public class KafkaIndexTaskTest
metadataStorageCoordinator,
emitter,
new SupervisorManager(null)
{
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
)
{
log.info("Adding checkpoint hash to the set");
checkpointRequestsHash.add(Objects.hash(
supervisorId,
sequenceName,
previousDataSourceMetadata,
currentDataSourceMetadata
));
return true;
}
}
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
@ -1738,6 +1865,16 @@ public class KafkaIndexTaskTest
).toSet();
}
private void unlockAppenderatorBasePersistDirForTask(KafkaIndexTask task)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException
{
Method unlockBasePersistDir = ((AppenderatorImpl) task.getAppenderator()).getClass()
.getDeclaredMethod(
"unlockBasePersistDirectory");
unlockBasePersistDir.setAccessible(true);
unlockBasePersistDir.invoke(task.getAppenderator());
}
private File getSegmentDirectory()
{
return new File(directory, "segments");

View File

@ -93,7 +93,7 @@ public class KafkaTuningConfigTest
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
}
@ -119,7 +119,7 @@ public class KafkaTuningConfigTest
Assert.assertEquals(2, copy.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
Assert.assertEquals(4, copy.getMaxPendingPersists());
Assert.assertEquals(0, copy.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
Assert.assertEquals(true, copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());

View File

@ -96,8 +96,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.capture;
@ -620,6 +622,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskQueue.shutdown("id3");
expect(taskQueue.add(anyObject(Task.class))).andReturn(true);
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
replayAll();
supervisor.start();
@ -693,8 +700,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(taskStorage.getTask("id4")).andReturn(Optional.of(id3)).anyTimes();
expect(taskStorage.getTask("id5")).andReturn(Optional.of(id3)).anyTimes();
expect(taskStorage.getTask("id4")).andReturn(Optional.of(id4)).anyTimes();
expect(taskStorage.getTask("id5")).andReturn(Optional.of(id5)).anyTimes();
expect(taskClient.getStatusAsync(anyString())).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED))
.anyTimes();
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
@ -706,6 +713,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(true));
expect(taskClient.stopAsync("id4", false)).andReturn(Futures.immediateFuture(false));
expect(taskClient.stopAsync("id5", false)).andReturn(Futures.immediateFuture((Boolean) null));
TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
taskQueue.shutdown("id4");
taskQueue.shutdown("id5");
@ -736,6 +751,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
)
).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true).times(4);
TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.anyTimes();
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -820,6 +845,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
)
).anyTimes();
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -837,6 +867,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
reset(taskStorage);
reset(taskQueue);
reset(taskClient);
// for the newly created replica task
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
expect(taskStorage.getStatus(runningTaskId)).andReturn(Optional.of(TaskStatus.running(runningTaskId))).anyTimes();
@ -888,6 +924,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true).times(4);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
@ -897,12 +934,28 @@ public class KafkaSupervisorTest extends EasyMockSupport
List<Task> tasks = captured.getValues();
reset(taskStorage);
reset(taskClient);
expect(taskClient.getStatusAsync(anyString())).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED))
.anyTimes();
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
// there would be 4 tasks, 2 for each task group
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
expect(taskStorage.getStatus(task.getId())).andReturn(Optional.of(TaskStatus.running(task.getId()))).anyTimes();
expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
replay(taskStorage);
replay(taskClient);
supervisor.runInternal();
verifyAll();
@ -992,11 +1045,21 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
EasyMock.eq(true),
EasyMock.eq(true)
)
).andReturn(Futures.immediateFuture(true)).times(2);
expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
replay(taskStorage, taskRunner, taskClient, taskQueue);
supervisor.runInternal();
@ -1055,9 +1118,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
expect(taskClient.getCurrentOffsetsAsync("id1", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskQueue.add(capture(captured))).andReturn(true);
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
expect(taskClient.getCheckpoints(anyString(), anyBoolean())).andReturn(checkpoints).anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -1146,7 +1213,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
expect(taskClient.getCurrentOffsetsAsync("id1", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 2, 30L)));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 2, 30L));
expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 10L, 2, 30L));
expect(taskQueue.add(capture(captured))).andReturn(true);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
@ -1253,11 +1320,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getCurrentOffsetsAsync("id1", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
expect(taskClient.getCurrentOffsetsAsync("id2", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 4L, 1, 5L, 2, 6L)));
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
// since id1 is publishing, so getCheckpoints wouldn't be called for it
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
replayAll();
supervisor.start();
@ -1327,6 +1400,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
List<Task> tasks = captured.getValues();
reset(taskStorage, taskClient, taskQueue);
TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
expect(taskStorage.getStatus(task.getId())).andReturn(Optional.of(TaskStatus.running(task.getId()))).anyTimes();
@ -1376,6 +1459,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
reset(taskStorage, taskRunner, taskClient, taskQueue);
TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
@ -1443,6 +1536,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
reset(taskStorage, taskRunner, taskClient, taskQueue);
TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
@ -1466,6 +1569,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
EasyMock.eq(true),
EasyMock.eq(true)
)
).andReturn(Futures.<Boolean>immediateFailedFuture(new RuntimeException())).times(2);
@ -1571,7 +1675,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
// getCheckpoints will not be called for id1 as it is in publishing state
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -1584,7 +1694,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true))
.andReturn(Futures.immediateFuture(true));
taskQueue.shutdown("id3");
expectLastCall().times(2);
@ -1660,7 +1770,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
)).andReturn(true);
replay(indexerMetadataStorageCoordinator);
supervisor.resetInternal(resetMetadata);
try {
supervisor.resetInternal(resetMetadata);
}
catch (NullPointerException npe) {
// Expected as there will be an attempt to reset partitionGroups offsets to NOT_SET
// however there would be no entries in the map as we have not put nay data in kafka
Assert.assertTrue(npe.getCause() == null);
}
verifyAll();
Assert.assertEquals(captureDataSource.getValue(), DATASOURCE);
@ -1760,7 +1877,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -1969,5 +2091,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
return StringUtils.format("sequenceName-%d", groupId);
}
@Override
protected String generateSequenceName(
Map<Integer, Long> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
)
{
return generateSequenceName(getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()));
}
}
}

View File

@ -107,7 +107,7 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(12, (int) config.getWorkerThreads());

View File

@ -46,7 +46,8 @@ public class ActionBasedSegmentAllocator implements SegmentAllocator
public SegmentIdentifier allocate(
final InputRow row,
final String sequenceName,
final String previousSegmentId
final String previousSegmentId,
final boolean skipSegmentLineageCheck
) throws IOException
{
return taskActionClient.submit(
@ -56,7 +57,8 @@ public class ActionBasedSegmentAllocator implements SegmentAllocator
dataSchema.getGranularitySpec().getQueryGranularity(),
dataSchema.getGranularitySpec().getSegmentGranularity(),
sequenceName,
previousSegmentId
previousSegmentId,
skipSegmentLineageCheck
)
);
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.DataSourceMetadata;
import java.io.IOException;
public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
private final String supervisorId;
private final String sequenceName;
private final DataSourceMetadata previousCheckPoint;
private final DataSourceMetadata currentCheckPoint;
public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId,
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint,
@JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
)
{
this.supervisorId = supervisorId;
this.sequenceName = sequenceName;
this.previousCheckPoint = previousCheckPoint;
this.currentCheckPoint = currentCheckPoint;
}
@JsonProperty
public String getSupervisorId()
{
return supervisorId;
}
@JsonProperty
public String getSequenceName()
{
return sequenceName;
}
@JsonProperty
public DataSourceMetadata getPreviousCheckPoint()
{
return previousCheckPoint;
}
@JsonProperty
public DataSourceMetadata getCurrentCheckPoint()
{
return currentCheckPoint;
}
@Override
public TypeReference<Boolean> getReturnTypeReference()
{
return new TypeReference<Boolean>()
{
};
}
@Override
public Boolean perform(
Task task, TaskActionToolbox toolbox
) throws IOException
{
return toolbox.getSupervisorManager()
.checkPointDataSourceMetadata(supervisorId, sequenceName, previousCheckPoint, currentCheckPoint);
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{
return "CheckPointDataSourceMetadataAction{" +
"supervisorId='" + supervisorId + '\'' +
", sequenceName='" + sequenceName + '\'' +
", previousCheckPoint=" + previousCheckPoint +
", currentCheckPoint=" + currentCheckPoint +
'}';
}
}

View File

@ -66,6 +66,7 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
private final Granularity preferredSegmentGranularity;
private final String sequenceName;
private final String previousSegmentId;
private final boolean skipSegmentLineageCheck;
public SegmentAllocateAction(
@JsonProperty("dataSource") String dataSource,
@ -73,7 +74,8 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
@JsonProperty("queryGranularity") Granularity queryGranularity,
@JsonProperty("preferredSegmentGranularity") Granularity preferredSegmentGranularity,
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("previousSegmentId") String previousSegmentId
@JsonProperty("previousSegmentId") String previousSegmentId,
@JsonProperty("skipSegmentLineageCheck") boolean skipSegmentLineageCheck
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
@ -85,6 +87,7 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
);
this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName");
this.previousSegmentId = previousSegmentId;
this.skipSegmentLineageCheck = skipSegmentLineageCheck;
}
@JsonProperty
@ -123,6 +126,12 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
return previousSegmentId;
}
@JsonProperty
public boolean isSkipSegmentLineageCheck()
{
return skipSegmentLineageCheck;
}
@Override
public TypeReference<SegmentIdentifier> getReturnTypeReference()
{
@ -157,12 +166,13 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
);
final SegmentIdentifier identifier = usedSegmentsForRow.isEmpty() ?
tryAllocateFirstSegment(toolbox, task, rowInterval) :
tryAllocateFirstSegment(toolbox, task, rowInterval, skipSegmentLineageCheck) :
tryAllocateSubsequentSegment(
toolbox,
task,
rowInterval,
usedSegmentsForRow.iterator().next()
usedSegmentsForRow.iterator().next(),
skipSegmentLineageCheck
);
if (identifier != null) {
return identifier;
@ -205,7 +215,8 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
private SegmentIdentifier tryAllocateFirstSegment(
TaskActionToolbox toolbox,
Task task,
Interval rowInterval
Interval rowInterval,
boolean skipSegmentLineageCheck
) throws IOException
{
// No existing segments for this row, but there might still be nearby ones that conflict with our preferred
@ -216,7 +227,7 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
.collect(Collectors.toList());
for (Interval tryInterval : tryIntervals) {
if (tryInterval.contains(rowInterval)) {
final SegmentIdentifier identifier = tryAllocate(toolbox, task, tryInterval, rowInterval, false);
final SegmentIdentifier identifier = tryAllocate(toolbox, task, tryInterval, rowInterval, false, skipSegmentLineageCheck);
if (identifier != null) {
return identifier;
}
@ -229,7 +240,8 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
TaskActionToolbox toolbox,
Task task,
Interval rowInterval,
DataSegment usedSegment
DataSegment usedSegment,
boolean skipSegmentLineageCheck
) throws IOException
{
// Existing segment(s) exist for this row; use the interval of the first one.
@ -239,7 +251,7 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
} else {
// If segment allocation failed here, it is highly likely an unrecoverable error. We log here for easier
// debugging.
return tryAllocate(toolbox, task, usedSegment.getInterval(), rowInterval, true);
return tryAllocate(toolbox, task, usedSegment.getInterval(), rowInterval, true, skipSegmentLineageCheck);
}
}
@ -248,7 +260,8 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
Task task,
Interval tryInterval,
Interval rowInterval,
boolean logOnFail
boolean logOnFail,
boolean skipSegmentLineageCheck
) throws IOException
{
log.debug(
@ -268,7 +281,8 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
sequenceName,
previousSegmentId,
tryInterval,
lockResult.getTaskLock().getVersion()
lockResult.getTaskLock().getVersion(),
skipSegmentLineageCheck
);
if (identifier != null) {
return identifier;
@ -316,6 +330,7 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
", preferredSegmentGranularity=" + preferredSegmentGranularity +
", sequenceName='" + sequenceName + '\'' +
", previousSegmentId='" + previousSegmentId + '\'' +
", skipSegmentLineageCheck='" + skipSegmentLineageCheck + '\'' +
'}';
}
}

View File

@ -39,7 +39,8 @@ import java.io.IOException;
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
@JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class),
@JsonSubTypes.Type(name = "resetDataSourceMetadata", value = ResetDataSourceMetadataAction.class)
@JsonSubTypes.Type(name = "resetDataSourceMetadata", value = ResetDataSourceMetadataAction.class),
@JsonSubTypes.Type(name = "checkPointDataSourceMetadata", value = CheckPointDataSourceMetadataAction.class)
})
public interface TaskAction<RetType>
{

View File

@ -622,7 +622,7 @@ public class IndexTask extends AbstractTask
}
}
segmentAllocator = (row, sequenceName, previousSegmentId) -> lookup.get(sequenceName);
segmentAllocator = (row, sequenceName, previousSegmentId, skipSegmentLineageCheck) -> lookup.get(sequenceName);
} else if (ioConfig.isAppendToExisting()) {
// Append mode: Allocate segments as needed using Overlord APIs.
segmentAllocator = new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema);
@ -630,7 +630,7 @@ public class IndexTask extends AbstractTask
// Overwrite mode, non-guaranteed rollup: We can make up our own segment ids but we don't know them in advance.
final Map<Interval, AtomicInteger> counters = new HashMap<>();
segmentAllocator = (row, sequenceName, previousSegmentId) -> {
segmentAllocator = (row, sequenceName, previousSegmentId, skipSegmentLineageCheck) -> {
final DateTime timestamp = row.getTimestamp();
Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp);
if (!maybeInterval.isPresent()) {

View File

@ -156,6 +156,31 @@ public class SupervisorManager
return true;
}
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
)
{
try {
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata, currentDataSourceMetadata);
return true;
}
catch (Exception e) {
log.error(e, "Checkpoint request failed");
}
return false;
}
/**
* Stops a supervisor with a given id and then removes it from the list.
* <p/>

View File

@ -690,7 +690,8 @@ public class SegmentAllocateActionTest
Granularities.MINUTE,
Granularities.HOUR,
"s1",
"prev"
"prev",
false
);
final ObjectMapper objectMapper = new DefaultObjectMapper();
@ -722,7 +723,8 @@ public class SegmentAllocateActionTest
queryGranularity,
preferredSegmentGranularity,
sequenceName,
sequencePreviousId
sequencePreviousId,
false
);
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
}

View File

@ -116,7 +116,8 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
String sequenceName,
String previousSegmentId,
Interval interval,
String maxVersion
String maxVersion,
boolean skipSegmentLineageCheck
) throws IOException
{
throw new UnsupportedOperationException();

View File

@ -82,6 +82,8 @@ public interface IndexerMetadataStorageCoordinator
* @param interval interval for which to allocate a segment
* @param maxVersion use this version if we have no better version to use. The returned segment identifier may
* have a version lower than this one, but will not have one higher.
* @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
*
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/
@ -90,7 +92,8 @@ public interface IndexerMetadataStorageCoordinator
String sequenceName,
String previousSegmentId,
Interval interval,
String maxVersion
String maxVersion,
boolean skipSegmentLineageCheck
) throws IOException;
/**

View File

@ -20,6 +20,8 @@
package io.druid.indexing.overlord.supervisor;
import io.druid.indexing.overlord.DataSourceMetadata;
import javax.annotation.Nullable;
import java.util.List;
/**
@ -52,6 +54,16 @@ public class NoopSupervisorSpec implements SupervisorSpec
@Override
public void reset(DataSourceMetadata dataSourceMetadata) {}
@Override
public void checkpoint(
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousCheckPoint,
@Nullable DataSourceMetadata currentCheckPoint
)
{
}
};
}

View File

@ -21,6 +21,8 @@ package io.druid.indexing.overlord.supervisor;
import io.druid.indexing.overlord.DataSourceMetadata;
import javax.annotation.Nullable;
public interface Supervisor
{
void start();
@ -36,4 +38,20 @@ public interface Supervisor
SupervisorReport getStatus();
void reset(DataSourceMetadata dataSourceMetadata);
/**
* The definition of checkpoint is not very strict as currently it does not affect data or control path
* On this call Supervisor can potentially checkpoint data processed so far to some durable storage
* for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data
* represented by dataSourceMetadata
*
* @param sequenceName unique Identifier to figure out for which sequence to do check pointing
* @param previousCheckPoint DataSourceMetadata check pointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be check pointed
*/
void checkpoint(
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousCheckPoint,
@Nullable DataSourceMetadata currentCheckPoint
);
}

View File

@ -385,7 +385,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final String sequenceName,
final String previousSegmentId,
final Interval interval,
final String maxVersion
final String maxVersion,
final boolean skipSegmentLineageCheck
) throws IOException
{
Preconditions.checkNotNull(dataSource, "dataSource");
@ -401,20 +402,40 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Override
public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final List<byte[]> existingBytes = handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
).bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull)
.map(ByteArrayMapper.FIRST)
.list();
final List<byte[]> existingBytes;
if (!skipSegmentLineageCheck) {
existingBytes = handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
).bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull)
.map(ByteArrayMapper.FIRST)
.list();
} else {
existingBytes = handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "start = :start AND "
+ "%2$send%2$s = :end",
dbTables.getPendingSegmentsTable(), connector.getQuoteString()
)
).bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.list();
}
if (!existingBytes.isEmpty()) {
final SegmentIdentifier existingIdentifier = jsonMapper.readValue(
@ -744,6 +765,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
log.info("Not updating metadata, existing state is not the expected start state.");
log.debug("Existing database state [%s], request's start metadata [%s]", oldCommitMetadataFromDb, startMetadata);
return DataSourceMetadataUpdateResult.FAILURE;
}

View File

@ -104,7 +104,7 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if,
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first.
*
* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
*/
@ -135,7 +135,7 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
*
* @param identifiers segment identifiers to be persisted
* @param committer a committer associated with all data that has been added to segments of the given identifiers so
* far
* far
*
* @return future that resolves when all pending data to segments of the identifiers has been persisted, contains
* commit metadata for this persist
@ -178,9 +178,18 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
ListenableFuture<SegmentsAndMetadata> push(Collection<SegmentIdentifier> identifiers, Committer committer);
/**
* Stop any currently-running processing and clean up after ourselves. This will not remove any on-disk persisted
* data, but it will drop any data that has not yet been persisted.
* Stop any currently-running processing and clean up after ourselves. This allows currently running persists and pushes
* to finish. This will not remove any on-disk persisted data, but it will drop any data that has not yet been persisted.
*/
@Override
void close();
/**
* Stop all processing, abandoning current pushes, currently running persist may be allowed to finish if they persist
* critical metadata otherwise shutdown immediately. This will not remove any on-disk persisted data,
* but it will drop any data that has not yet been persisted.
* Since this does not wait for pushes to finish, implementations have to make sure if any push is still happening
* in background thread then it does not cause any problems.
*/
void closeNow();
}

View File

@ -19,6 +19,8 @@
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@ -52,11 +54,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -65,11 +67,11 @@ import java.util.stream.Collectors;
/**
* A AppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you
* index unbounded streams. All handoff is done at the end of indexing.
*
* <p>
* This class helps with doing things that Appenderators don't, including deciding which segments to use (with a
* SegmentAllocator), publishing segments to the metadata store (with a SegmentPublisher), and monitoring handoff (with
* a SegmentHandoffNotifier).
*
* <p>
* Note that the commit metadata stored by this class via the underlying Appenderator is not the same metadata as
* you pass in. It's wrapped in some extra metadata needed by the driver.
*/
@ -84,16 +86,56 @@ public class AppenderatorDriver implements Closeable
private final ObjectMapper objectMapper;
private final FireDepartmentMetrics metrics;
// All access to "activeSegments", "publishPendingSegments", and "lastSegmentId" must be synchronized on
// "activeSegments".
enum SegmentState
{
ACTIVE,
INACTIVE,
PUBLISHING
}
// sequenceName -> start of segment interval -> segment we're currently adding data to
private final Map<String, NavigableMap<Long, SegmentIdentifier>> activeSegments = new TreeMap<>();
static class SegmentWithState
{
private SegmentIdentifier segmentIdentifier;
private SegmentState state;
// sequenceName -> list of identifiers of segments waiting for being published
// publishPendingSegments is always a super set of activeSegments because there can be some segments to which data
// are not added anymore, but not published yet.
private final Map<String, List<SegmentIdentifier>> publishPendingSegments = new HashMap<>();
@JsonCreator
SegmentWithState(
@JsonProperty("segmentIdentifier") SegmentIdentifier segmentIdentifier,
@JsonProperty("state") SegmentState state
)
{
this.segmentIdentifier = segmentIdentifier;
this.state = state;
}
@JsonProperty
public SegmentIdentifier getSegmentIdentifier()
{
return segmentIdentifier;
}
@JsonProperty
public SegmentState getState()
{
return state;
}
@Override
public String toString()
{
return "SegmentWithState{" +
"segmentIdentifier=" + segmentIdentifier +
", state=" + state +
'}';
}
}
// sequenceName -> {Interval Start millis -> List of Segments for this interval}
// there might be multiple segments for a start interval, for example one segment
// can be in ACTIVE state and others might be in PUBLISHING state
private final Map<String, NavigableMap<Long, LinkedList<SegmentWithState>>> segments = new TreeMap<>();
private final Set<String> publishingSequences = new HashSet<>();
// sequenceName -> most recently allocated segment
private final Map<String, String> lastSegmentIds = Maps.newHashMap();
@ -103,12 +145,12 @@ public class AppenderatorDriver implements Closeable
/**
* Create a driver.
*
* @param appenderator appenderator
* @param segmentAllocator segment allocator
* @param handoffNotifierFactory handoff notifier factory
* @param usedSegmentChecker used segment checker
* @param objectMapper object mapper, used for serde of commit metadata
* @param metrics Firedepartment metrics
* @param appenderator appenderator
* @param segmentAllocator segment allocator
* @param handoffNotifierFactory handoff notifier factory
* @param usedSegmentChecker used segment checker
* @param objectMapper object mapper, used for serde of commit metadata
* @param metrics Firedepartment metrics
*/
public AppenderatorDriver(
Appenderator appenderator,
@ -130,20 +172,14 @@ public class AppenderatorDriver implements Closeable
}
@VisibleForTesting
Map<String, NavigableMap<Long, SegmentIdentifier>> getActiveSegments()
Map<String, NavigableMap<Long, LinkedList<SegmentWithState>>> getSegments()
{
return activeSegments;
}
@VisibleForTesting
Map<String, List<SegmentIdentifier>> getPublishPendingSegments()
{
return publishPendingSegments;
return segments;
}
/**
* Perform any initial setup and return currently persisted commit metadata.
*
* <p>
* Note that this method returns the same metadata you've passed in with your Committers, even though this class
* stores extra metadata on disk.
*
@ -161,18 +197,29 @@ public class AppenderatorDriver implements Closeable
log.info("Restored metadata[%s].", metadata);
if (metadata != null) {
synchronized (activeSegments) {
for (Map.Entry<String, List<SegmentIdentifier>> entry : metadata.getActiveSegments().entrySet()) {
synchronized (segments) {
for (Map.Entry<String, List<SegmentWithState>> entry : metadata.getSegments().entrySet()) {
final String sequenceName = entry.getKey();
final TreeMap<Long, SegmentIdentifier> segmentMap = Maps.newTreeMap();
final TreeMap<Long, LinkedList<SegmentWithState>> segmentMap = Maps.newTreeMap();
activeSegments.put(sequenceName, segmentMap);
segments.put(sequenceName, segmentMap);
for (SegmentIdentifier identifier : entry.getValue()) {
segmentMap.put(identifier.getInterval().getStartMillis(), identifier);
for (SegmentWithState segmentWithState : entry.getValue()) {
segmentMap.computeIfAbsent(
segmentWithState.getSegmentIdentifier().getInterval().getStartMillis(),
k -> new LinkedList<>()
);
LinkedList<SegmentWithState> segmentList = segmentMap.get(segmentWithState.getSegmentIdentifier()
.getInterval()
.getStartMillis());
// always keep the ACTIVE segment for an interval start millis in the front
if (segmentWithState.getState() == SegmentState.ACTIVE) {
segmentList.addFirst(segmentWithState);
} else {
segmentList.addLast(segmentWithState);
}
}
}
publishPendingSegments.putAll(metadata.getPublishPendingSegments());
lastSegmentIds.putAll(metadata.getLastSegmentIds());
}
@ -184,12 +231,10 @@ public class AppenderatorDriver implements Closeable
private void addSegment(String sequenceName, SegmentIdentifier identifier)
{
synchronized (activeSegments) {
activeSegments.computeIfAbsent(sequenceName, k -> new TreeMap<>())
.putIfAbsent(identifier.getInterval().getStartMillis(), identifier);
publishPendingSegments.computeIfAbsent(sequenceName, k -> new ArrayList<>())
.add(identifier);
synchronized (segments) {
segments.computeIfAbsent(sequenceName, k -> new TreeMap<>())
.computeIfAbsent(identifier.getInterval().getStartMillis(), k -> new LinkedList<>())
.addFirst(new SegmentWithState(identifier, SegmentState.ACTIVE));
lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString());
}
}
@ -199,8 +244,8 @@ public class AppenderatorDriver implements Closeable
*/
public void clear() throws InterruptedException
{
synchronized (activeSegments) {
activeSegments.clear();
synchronized (segments) {
segments.clear();
}
appenderator.clear();
}
@ -221,12 +266,22 @@ public class AppenderatorDriver implements Closeable
final String sequenceName,
final Supplier<Committer> committerSupplier
) throws IOException
{
return add(row, sequenceName, committerSupplier, false);
}
public AppenderatorDriverAddResult add(
final InputRow row,
final String sequenceName,
final Supplier<Committer> committerSupplier,
final boolean skipSegmentLineageCheck
) throws IOException
{
Preconditions.checkNotNull(row, "row");
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(committerSupplier, "committerSupplier");
final SegmentIdentifier identifier = getSegment(row, sequenceName);
final SegmentIdentifier identifier = getSegment(row, sequenceName, skipSegmentLineageCheck);
if (identifier != null) {
try {
@ -243,7 +298,7 @@ public class AppenderatorDriver implements Closeable
/**
* Persist all data indexed through this driver so far. Blocks until complete.
*
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier)}.
*
* @param committer committer representing all data that has been added so far
@ -275,8 +330,8 @@ public class AppenderatorDriver implements Closeable
* {@link #publish(TransactionalSegmentPublisher, Committer, Collection)}
*
* @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task
* which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata
* of the caller of {@link AppenderatorDriverMetadata}
* which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata
* of the caller of {@link AppenderatorDriverMetadata}
*/
public ListenableFuture<SegmentsAndMetadata> registerHandoff(SegmentsAndMetadata segmentsAndMetadata)
{
@ -324,7 +379,7 @@ public class AppenderatorDriver implements Closeable
public void onSuccess(Object result)
{
if (numRemainingHandoffSegments.decrementAndGet() == 0) {
log.info("All segments handed off.");
log.info("Successfully handed off [%d] segments.", segmentsAndMetadata.getSegments().size());
resultFuture.set(
new SegmentsAndMetadata(
segmentsAndMetadata.getSegments(),
@ -364,16 +419,18 @@ public class AppenderatorDriver implements Closeable
private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName)
{
synchronized (activeSegments) {
final NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = activeSegments.get(sequenceName);
synchronized (segments) {
final NavigableMap<Long, LinkedList<SegmentWithState>> segmentsForSequence = segments.get(sequenceName);
if (activeSegmentsForSequence == null) {
if (segmentsForSequence == null) {
return null;
}
final Map.Entry<Long, SegmentIdentifier> candidateEntry = activeSegmentsForSequence.floorEntry(timestamp.getMillis());
if (candidateEntry != null && candidateEntry.getValue().getInterval().contains(timestamp)) {
return candidateEntry.getValue();
final Map.Entry<Long, LinkedList<SegmentWithState>> candidateEntry = segmentsForSequence.floorEntry(timestamp.getMillis());
if (candidateEntry != null
&& candidateEntry.getValue().getFirst().getSegmentIdentifier().getInterval().contains(timestamp)
&& candidateEntry.getValue().getFirst().getState().equals(SegmentState.ACTIVE)) {
return candidateEntry.getValue().getFirst().getSegmentIdentifier();
} else {
return null;
}
@ -390,9 +447,13 @@ public class AppenderatorDriver implements Closeable
*
* @throws IOException if an exception occurs while allocating a segment
*/
private SegmentIdentifier getSegment(final InputRow row, final String sequenceName) throws IOException
private SegmentIdentifier getSegment(
final InputRow row,
final String sequenceName,
final boolean skipSegmentLineageCheck
) throws IOException
{
synchronized (activeSegments) {
synchronized (segments) {
final DateTime timestamp = row.getTimestamp();
final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName);
if (existing != null) {
@ -402,7 +463,11 @@ public class AppenderatorDriver implements Closeable
final SegmentIdentifier newSegment = segmentAllocator.allocate(
row,
sequenceName,
lastSegmentIds.get(sequenceName)
lastSegmentIds.get(sequenceName),
// send lastSegmentId irrespective of skipSegmentLineageCheck so that
// unique constraint for sequence_name_prev_id_sha1 does not fail for
// allocatePendingSegment in IndexerSQLMetadataStorageCoordinator
skipSegmentLineageCheck
);
if (newSegment != null) {
@ -433,8 +498,8 @@ public class AppenderatorDriver implements Closeable
*/
public void moveSegmentOut(final String sequenceName, final List<SegmentIdentifier> identifiers)
{
synchronized (activeSegments) {
final NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = activeSegments.get(sequenceName);
synchronized (segments) {
final NavigableMap<Long, LinkedList<SegmentWithState>> activeSegmentsForSequence = segments.get(sequenceName);
if (activeSegmentsForSequence == null) {
throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName);
}
@ -442,7 +507,16 @@ public class AppenderatorDriver implements Closeable
for (final SegmentIdentifier identifier : identifiers) {
log.info("Moving segment[%s] out of active list.", identifier);
final long key = identifier.getInterval().getStartMillis();
if (!activeSegmentsForSequence.remove(key).equals(identifier)) {
if (activeSegmentsForSequence.get(key) == null || activeSegmentsForSequence.get(key).stream().noneMatch(
segmentWithState -> {
if (segmentWithState.getSegmentIdentifier().equals(identifier)) {
segmentWithState.state = SegmentState.INACTIVE;
return true;
} else {
return false;
}
}
)) {
throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier);
}
}
@ -456,36 +530,30 @@ public class AppenderatorDriver implements Closeable
* @param committer committer
*
* @return a {@link ListenableFuture} for the publish task which removes published {@code sequenceNames} from
* {@code activeSegments} and {@code publishPendingSegments}
* {@code activeSegments} and {@code publishPendingSegments}
*/
public ListenableFuture<SegmentsAndMetadata> publishAll(
final TransactionalSegmentPublisher publisher,
final Committer committer
)
{
final List<SegmentIdentifier> theSegments;
synchronized (activeSegments) {
final List<String> sequenceNames = ImmutableList.copyOf(publishPendingSegments.keySet());
theSegments = sequenceNames.stream()
.map(publishPendingSegments::remove)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
sequenceNames.forEach(activeSegments::remove);
final List<String> theSequences;
synchronized (segments) {
theSequences = ImmutableList.copyOf(segments.keySet());
}
return publish(publisher, wrapCommitter(committer), theSegments);
return publish(publisher, wrapCommitter(committer), theSequences);
}
/**
* Execute a task in background to publish all segments corresponding to the given sequence names. The task
* internally pushes the segments to the deep storage first, and then publishes the metadata to the metadata storage.
*
* @param publisher segment publisher
* @param committer committer
* @param publisher segment publisher
* @param committer committer
* @param sequenceNames a collection of sequence names to be published
*
* @return a {@link ListenableFuture} for the submitted task which removes published {@code sequenceNames} from
* {@code activeSegments} and {@code publishPendingSegments}
* {@code activeSegments} and {@code publishPendingSegments}
*/
public ListenableFuture<SegmentsAndMetadata> publish(
final TransactionalSegmentPublisher publisher,
@ -493,26 +561,63 @@ public class AppenderatorDriver implements Closeable
final Collection<String> sequenceNames
)
{
final List<SegmentIdentifier> theSegments;
synchronized (activeSegments) {
theSegments = sequenceNames.stream()
.map(publishPendingSegments::remove)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
sequenceNames.forEach(activeSegments::remove);
final List<SegmentIdentifier> theSegments = new ArrayList<>();
synchronized (segments) {
sequenceNames.stream()
.filter(sequenceName -> !publishingSequences.contains(sequenceName))
.forEach(sequenceName -> {
if (segments.containsKey(sequenceName)) {
publishingSequences.add(sequenceName);
segments.get(sequenceName)
.values()
.stream()
.flatMap(Collection::stream)
.forEach(segmentWithState -> {
segmentWithState.state = SegmentState.PUBLISHING;
theSegments.add(segmentWithState.getSegmentIdentifier());
});
}
});
}
return publish(publisher, wrapCommitter(committer), theSegments);
final ListenableFuture<SegmentsAndMetadata> publishFuture = publish(
publisher,
wrapCommitter(committer),
theSegments
);
Futures.addCallback(
publishFuture,
new FutureCallback<SegmentsAndMetadata>()
{
@Override
public void onSuccess(SegmentsAndMetadata result)
{
if (result != null) {
publishingSequences.removeAll(sequenceNames);
sequenceNames.forEach(segments::remove);
}
}
@Override
public void onFailure(Throwable t)
{
// Do nothing, caller should handle the exception
log.error("Error publishing sequences [%s]", sequenceNames);
}
}
);
return publishFuture;
}
/**
* Execute a task in background to publish the given segments. The task blocks until complete.
* Retries forever on transient failures, but may exit early on permanent failures.
*
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier)}.
*
* @param publisher publisher to use for this set of segments
* @param publisher publisher to use for this set of segments
* @param wrappedCommitter committer representing all data that has been added so far
*
* @return segments and metadata published if successful, or null if segments could not be handed off due to
@ -603,22 +708,15 @@ public class AppenderatorDriver implements Closeable
private WrappedCommitter wrapCommitter(final Committer committer)
{
final AppenderatorDriverMetadata wrappedMetadata;
synchronized (activeSegments) {
synchronized (segments) {
wrappedMetadata = new AppenderatorDriverMetadata(
ImmutableMap.copyOf(
Maps.transformValues(
activeSegments,
new Function<NavigableMap<Long, SegmentIdentifier>, List<SegmentIdentifier>>()
{
@Override
public List<SegmentIdentifier> apply(NavigableMap<Long, SegmentIdentifier> input)
{
return ImmutableList.copyOf(input.values());
}
}
segments,
(Function<NavigableMap<Long, LinkedList<SegmentWithState>>, List<SegmentWithState>>) input -> ImmutableList
.copyOf(input.values().stream().flatMap(Collection::stream).collect(Collectors.toList()))
)
),
ImmutableMap.copyOf(publishPendingSegments),
ImmutableMap.copyOf(lastSegmentIds),
committer.getMetadata()
);

View File

@ -21,41 +21,96 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class AppenderatorDriverMetadata
{
private final Map<String, List<SegmentIdentifier>> activeSegments;
private final Map<String, List<SegmentIdentifier>> publishPendingSegments;
private final Map<String, List<AppenderatorDriver.SegmentWithState>> segments;
private final Map<String, String> lastSegmentIds;
private final Object callerMetadata;
@JsonCreator
public AppenderatorDriverMetadata(
@JsonProperty("activeSegments") Map<String, List<SegmentIdentifier>> activeSegments,
@JsonProperty("publishPendingSegments") Map<String, List<SegmentIdentifier>> publishPendingSegments,
@JsonProperty("segments") Map<String, List<AppenderatorDriver.SegmentWithState>> segments,
@JsonProperty("lastSegmentIds") Map<String, String> lastSegmentIds,
@JsonProperty("callerMetadata") Object callerMetadata
@JsonProperty("callerMetadata") Object callerMetadata,
// Next two properties are for backwards compatibility, should be removed on versions greater than 0.11.1
@JsonProperty("activeSegments") Map<String, List<SegmentIdentifier>> activeSegments,
@JsonProperty("publishPendingSegments") Map<String, List<SegmentIdentifier>> publishPendingSegments
)
{
this.activeSegments = activeSegments;
this.publishPendingSegments = publishPendingSegments;
Preconditions.checkState(
segments != null || (activeSegments != null && publishPendingSegments != null),
"Metadata should either have segments with state information or both active segments and publish pending segments information. "
+ "segments [%s], activeSegments [%s], publishPendingSegments [%s]",
segments,
activeSegments,
publishPendingSegments
);
if (segments == null) {
// convert old metadata to new one
final Map<String, List<AppenderatorDriver.SegmentWithState>> newMetadata = Maps.newHashMap();
final Set<String> activeSegmentsAlreadySeen = Sets.newHashSet(); // temp data structure
activeSegments.entrySet()
.forEach(sequenceSegments -> newMetadata.put(
sequenceSegments.getKey(),
sequenceSegments.getValue()
.stream()
.map(segmentIdentifier -> {
activeSegmentsAlreadySeen.add(segmentIdentifier.toString());
return new AppenderatorDriver.SegmentWithState(
segmentIdentifier,
AppenderatorDriver.SegmentState.ACTIVE
);
})
.collect(Collectors.toList())
));
// publishPendingSegments is a superset of activeSegments
publishPendingSegments.entrySet()
.forEach(sequenceSegments -> newMetadata.computeIfAbsent(
sequenceSegments.getKey(),
k -> new ArrayList<>()
).addAll(
sequenceSegments.getValue()
.stream()
.filter(segmentIdentifier -> !activeSegmentsAlreadySeen.contains(
segmentIdentifier.toString()))
.map(segmentIdentifier -> new AppenderatorDriver.SegmentWithState(
segmentIdentifier,
AppenderatorDriver.SegmentState.INACTIVE
))
.collect(Collectors.toList())
));
this.segments = newMetadata;
} else {
this.segments = segments;
}
this.lastSegmentIds = lastSegmentIds;
this.callerMetadata = callerMetadata;
}
@JsonProperty
public Map<String, List<SegmentIdentifier>> getActiveSegments()
public AppenderatorDriverMetadata(
Map<String, List<AppenderatorDriver.SegmentWithState>> segments,
Map<String, String> lastSegmentIds,
Object callerMetadata
)
{
return activeSegments;
this(segments, lastSegmentIds, callerMetadata, null, null);
}
@JsonProperty
public Map<String, List<SegmentIdentifier>> getPublishPendingSegments()
public Map<String, List<AppenderatorDriver.SegmentWithState>> getSegments()
{
return publishPendingSegments;
return segments;
}
@JsonProperty
@ -74,8 +129,7 @@ public class AppenderatorDriverMetadata
public String toString()
{
return "AppenderatorDriverMetadata{" +
"activeSegments=" + activeSegments +
", publishPendingSegments=" + publishPendingSegments +
"segments=" + segments +
", lastSegmentIds=" + lastSegmentIds +
", callerMetadata=" + callerMetadata +
'}';

View File

@ -42,7 +42,6 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.DateTimes;
@ -51,6 +50,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.io.Closer;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -94,7 +94,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
*/
@ -123,12 +127,20 @@ public class AppenderatorImpl implements Appenderator
// This variable updated in add(), persist(), and drop()
private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
private final AtomicInteger totalRows = new AtomicInteger();
// Synchronize persisting commitMetadata so that multiple persist threads (if present)
// and abandon threads do not step over each other
private final Lock commitLock = new ReentrantLock();
private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService pushExecutor = null;
// use intermediate executor so that deadlock conditions can be prevented
// where persist and push Executor try to put tasks in each other queues
// thus creating circular dependency
private volatile ListeningExecutorService intermediateTempExecutor = null;
private volatile long nextFlush;
private volatile FileLock basePersistDirLock = null;
private volatile FileChannel basePersistDirLockChannel = null;
private AtomicBoolean closed = new AtomicBoolean(false);
public AppenderatorImpl(
DataSchema schema,
@ -328,7 +340,13 @@ public class AppenderatorImpl implements Appenderator
@Override
public Object call() throws Exception
{
objectMapper.writeValue(computeCommitFile(), Committed.nil());
try {
commitLock.lock();
objectMapper.writeValue(computeCommitFile(), Committed.nil());
}
finally {
commitLock.unlock();
}
return null;
}
}
@ -365,7 +383,7 @@ public class AppenderatorImpl implements Appenderator
@Override
public ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, Committer committer)
{
final Map<SegmentIdentifier, Integer> commitHydrants = Maps.newHashMap();
final Map<String, Integer> currentHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
int numPersistedRows = 0;
for (SegmentIdentifier identifier : identifiers) {
@ -374,7 +392,7 @@ public class AppenderatorImpl implements Appenderator
throw new ISE("No sink for identifier: %s", identifier);
}
final List<FireHydrant> hydrants = Lists.newArrayList(sink);
commitHydrants.put(identifier, hydrants.size());
currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size());
numPersistedRows += sink.getNumRowsInMemory();
final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
@ -410,22 +428,34 @@ public class AppenderatorImpl implements Appenderator
log.info(
"Committing metadata[%s] for sinks[%s].", commitMetadata, Joiner.on(", ").join(
Iterables.transform(
commitHydrants.entrySet(),
new Function<Map.Entry<SegmentIdentifier, Integer>, String>()
{
@Override
public String apply(Map.Entry<SegmentIdentifier, Integer> entry)
{
return StringUtils.format("%s:%d", entry.getKey().getIdentifierAsString(), entry.getValue());
}
}
)
currentHydrants.entrySet()
.stream()
.map(entry -> StringUtils.format(
"%s:%d",
entry.getKey(),
entry.getValue()
))
.collect(Collectors.toList())
)
);
committer.run();
objectMapper.writeValue(computeCommitFile(), Committed.create(commitHydrants, commitMetadata));
try {
commitLock.lock();
final File commitFile = computeCommitFile();
final Map<String, Integer> commitHydrants = Maps.newHashMap();
if (commitFile.exists()) {
// merge current hydrants with existing hydrants
final Committed oldCommitted = objectMapper.readValue(commitFile, Committed.class);
commitHydrants.putAll(oldCommitted.getHydrants());
}
commitHydrants.putAll(currentHydrants);
objectMapper.writeValue(commitFile, new Committed(commitHydrants, commitMetadata));
}
finally {
commitLock.unlock();
}
return commitMetadata;
}
@ -481,29 +511,24 @@ public class AppenderatorImpl implements Appenderator
return Futures.transform(
persist(identifiers, committer),
new Function<Object, SegmentsAndMetadata>()
{
@Override
public SegmentsAndMetadata apply(Object commitMetadata)
{
final List<DataSegment> dataSegments = Lists.newArrayList();
(Function<Object, SegmentsAndMetadata>) commitMetadata -> {
final List<DataSegment> dataSegments = Lists.newArrayList();
for (Map.Entry<SegmentIdentifier, Sink> entry : theSinks.entrySet()) {
if (droppingSinks.contains(entry.getKey())) {
log.info("Skipping push of currently-dropping sink[%s]", entry.getKey());
continue;
}
final DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue());
if (dataSegment != null) {
dataSegments.add(dataSegment);
} else {
log.warn("mergeAndPush[%s] returned null, skipping.", entry.getKey());
}
for (Map.Entry<SegmentIdentifier, Sink> entry : theSinks.entrySet()) {
if (droppingSinks.contains(entry.getKey())) {
log.info("Skipping push of currently-dropping sink[%s]", entry.getKey());
continue;
}
return new SegmentsAndMetadata(dataSegments, commitMetadata);
final DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue());
if (dataSegment != null) {
dataSegments.add(dataSegment);
} else {
log.warn("mergeAndPush[%s] returned null, skipping.", entry.getKey());
}
}
return new SegmentsAndMetadata(dataSegments, commitMetadata);
},
pushExecutor
);
@ -516,15 +541,8 @@ public class AppenderatorImpl implements Appenderator
*/
private ListenableFuture<?> pushBarrier()
{
return pushExecutor.submit(
new Runnable()
{
@Override
public void run()
{
// Do nothing
}
}
return intermediateTempExecutor.submit(
(Runnable) () -> pushExecutor.submit(() -> {})
);
}
@ -632,6 +650,11 @@ public class AppenderatorImpl implements Appenderator
@Override
public void close()
{
if (!closed.compareAndSet(false, true)) {
log.info("Appenderator already closed");
return;
}
log.info("Shutting down...");
final List<ListenableFuture<?>> futures = Lists.newArrayList();
@ -652,8 +675,18 @@ public class AppenderatorImpl implements Appenderator
try {
shutdownExecutors();
Preconditions.checkState(persistExecutor.awaitTermination(365, TimeUnit.DAYS), "persistExecutor not terminated");
Preconditions.checkState(pushExecutor.awaitTermination(365, TimeUnit.DAYS), "pushExecutor not terminated");
Preconditions.checkState(
persistExecutor == null || persistExecutor.awaitTermination(365, TimeUnit.DAYS),
"persistExecutor not terminated"
);
Preconditions.checkState(
pushExecutor == null || pushExecutor.awaitTermination(365, TimeUnit.DAYS),
"pushExecutor not terminated"
);
Preconditions.checkState(
intermediateTempExecutor == null || intermediateTempExecutor.awaitTermination(365, TimeUnit.DAYS),
"intermediateTempExecutor not terminated"
);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -664,6 +697,50 @@ public class AppenderatorImpl implements Appenderator
unlockBasePersistDirectory();
}
/**
* Unannounce the segments and wait for outstanding persists to finish.
* Do not unlock base persist dir as we are not waiting for push executor to shut down
* relying on current JVM to shutdown to not cause any locking problem if the task is restored.
* In case when task is restored and current task is still active because of push executor (which it shouldn't be
* since push executor starts daemon threads) then the locking should fail and new task should fail to start.
* This also means that this method should only be called when task is shutting down.
*/
@Override
public void closeNow()
{
if (!closed.compareAndSet(false, true)) {
log.info("Appenderator already closed");
return;
}
log.info("Shutting down immediately...");
for (Map.Entry<SegmentIdentifier, Sink> entry : sinks.entrySet()) {
try {
segmentAnnouncer.unannounceSegment(entry.getValue().getSegment());
}
catch (Exception e) {
log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource())
.addData("identifier", entry.getKey().getIdentifierAsString())
.emit();
}
}
try {
shutdownExecutors();
Preconditions.checkState(
persistExecutor == null || persistExecutor.awaitTermination(365, TimeUnit.DAYS),
"persistExecutor not terminated"
);
Preconditions.checkState(
intermediateTempExecutor == null || intermediateTempExecutor.awaitTermination(365, TimeUnit.DAYS),
"intermediateTempExecutor not terminated"
);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ISE("Failed to shutdown executors during close()");
}
}
private void lockBasePersistDirectory()
{
if (basePersistDirLock == null) {
@ -719,12 +796,27 @@ public class AppenderatorImpl implements Appenderator
)
);
}
if (intermediateTempExecutor == null) {
// use single threaded executor with SynchronousQueue so that all abandon operations occur sequentially
intermediateTempExecutor = MoreExecutors.listeningDecorator(
Execs.newBlockingSingleThreaded(
"appenderator_abandon_%d", 0
)
);
}
}
private void shutdownExecutors()
{
persistExecutor.shutdownNow();
pushExecutor.shutdownNow();
if (persistExecutor != null) {
persistExecutor.shutdownNow();
}
if (pushExecutor != null) {
pushExecutor.shutdownNow();
}
if (intermediateTempExecutor != null) {
intermediateTempExecutor.shutdownNow();
}
}
private void resetNextFlush()
@ -751,9 +843,12 @@ public class AppenderatorImpl implements Appenderator
return null;
}
final File commitFile = computeCommitFile();
final Committed committed;
File commitFile = null;
try {
commitLock.lock();
commitFile = computeCommitFile();
if (commitFile.exists()) {
committed = objectMapper.readValue(commitFile, Committed.class);
} else {
@ -763,6 +858,9 @@ public class AppenderatorImpl implements Appenderator
catch (Exception e) {
throw new ISE(e, "Failed to read commitFile: %s", commitFile);
}
finally {
commitLock.unlock();
}
log.info("Loading sinks from[%s]: %s", baseDir, committed.getHydrants().keySet());
@ -923,6 +1021,7 @@ public class AppenderatorImpl implements Appenderator
// Remove this segment from the committed list. This must be done from the persist thread.
log.info("Removing commit metadata for segment[%s].", identifier);
try {
commitLock.lock();
final File commitFile = computeCommitFile();
if (commitFile.exists()) {
final Committed oldCommitted = objectMapper.readValue(commitFile, Committed.class);
@ -935,6 +1034,9 @@ public class AppenderatorImpl implements Appenderator
.emit();
throw Throwables.propagate(e);
}
finally {
commitLock.unlock();
}
}
// Unannounce the segment.
@ -970,6 +1072,8 @@ public class AppenderatorImpl implements Appenderator
return null;
}
},
// use persistExecutor to make sure that all the pending persists completes before
// starting to abandon segments
persistExecutor
);
}

View File

@ -83,6 +83,14 @@ public class Committed
return new Committed(newHydrants, metadata);
}
public Committed with(final Map<String, Integer> hydrantsToAdd)
{
final Map<String, Integer> newHydrants = Maps.newHashMap();
newHydrants.putAll(hydrants);
newHydrants.putAll(hydrantsToAdd);
return new Committed(newHydrants, metadata);
}
@Override
public boolean equals(Object o)
{

View File

@ -37,6 +37,7 @@ public interface SegmentAllocator
SegmentIdentifier allocate(
InputRow row,
String sequenceName,
String previousSegmentId
String previousSegmentId,
boolean skipSegmentLineageCheck
) throws IOException;
}

View File

@ -419,6 +419,12 @@ public class AppenderatorDriverFailTest
}
@Override
public void closeNow()
{
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(
Query<T> query, Iterable<Interval> intervals

View File

@ -122,7 +122,7 @@ public class AppenderatorDriverTest
driver.close();
}
@Test
@Test(timeout = 2000L)
public void testSimple() throws Exception
{
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
@ -139,8 +139,11 @@ public class AppenderatorDriverTest
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertFalse(driver.getActiveSegments().containsKey("dummy"));
Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy"));
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
.get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
@ -186,8 +189,11 @@ public class AppenderatorDriverTest
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertFalse(driver.getActiveSegments().containsKey("dummy"));
Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy"));
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
.get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size());
@ -212,8 +218,11 @@ public class AppenderatorDriverTest
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertFalse(driver.getActiveSegments().containsKey("dummy"));
Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy"));
while (driver.getSegments().containsKey("dummy")) {
Thread.sleep(100);
}
driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
}
@ -414,7 +423,8 @@ public class AppenderatorDriverTest
public SegmentIdentifier allocate(
final InputRow row,
final String sequenceName,
final String previousSegmentId
final String previousSegmentId,
final boolean skipSegmentLineageCheck
) throws IOException
{
synchronized (counters) {