mirror of https://github.com/apache/druid.git
return task status reported by peon (#14040)
* return task status reported by peon * Write TaskStatus to file in AbstractTask.cleanUp * Get TaskStatus from task log * Fix merge conflicts in AbstractTaskTest * Add unit tests for TaskLogPusher, TaskLogStreamer, NoopTaskLogs to satisfy code coverage * Add license headerss * Fix style * Remove unknown exception declarations
This commit is contained in:
parent
accd5536df
commit
9d4cc501f7
|
@ -37,6 +37,7 @@ import org.apache.druid.initialization.DruidModule;
|
||||||
import org.apache.druid.tasklogs.NoopTaskLogs;
|
import org.apache.druid.tasklogs.NoopTaskLogs;
|
||||||
import org.apache.druid.tasklogs.TaskLogKiller;
|
import org.apache.druid.tasklogs.TaskLogKiller;
|
||||||
import org.apache.druid.tasklogs.TaskLogPusher;
|
import org.apache.druid.tasklogs.TaskLogPusher;
|
||||||
|
import org.apache.druid.tasklogs.TaskLogStreamer;
|
||||||
import org.apache.druid.tasklogs.TaskLogs;
|
import org.apache.druid.tasklogs.TaskLogs;
|
||||||
|
|
||||||
@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
|
@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
|
||||||
|
@ -78,6 +79,7 @@ public class K8sOverlordModule implements DruidModule
|
||||||
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
|
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
|
||||||
|
|
||||||
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
|
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
|
||||||
|
binder.bind(TaskLogStreamer.class).to(TaskLogs.class);
|
||||||
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
|
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.k8s.overlord;
|
package org.apache.druid.k8s.overlord;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
|
@ -31,6 +32,7 @@ import io.fabric8.kubernetes.api.model.Pod;
|
||||||
import io.fabric8.kubernetes.api.model.batch.v1.Job;
|
import io.fabric8.kubernetes.api.model.batch.v1.Job;
|
||||||
import io.netty.util.SuppressForbidden;
|
import io.netty.util.SuppressForbidden;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.druid.indexer.RunnerTaskState;
|
import org.apache.druid.indexer.RunnerTaskState;
|
||||||
import org.apache.druid.indexer.TaskLocation;
|
import org.apache.druid.indexer.TaskLocation;
|
||||||
import org.apache.druid.indexer.TaskStatus;
|
import org.apache.druid.indexer.TaskStatus;
|
||||||
|
@ -44,6 +46,7 @@ import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.Pair;
|
import org.apache.druid.java.util.common.Pair;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||||
import org.apache.druid.java.util.http.client.HttpClient;
|
import org.apache.druid.java.util.http.client.HttpClient;
|
||||||
|
@ -57,8 +60,8 @@ import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
|
||||||
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
|
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
|
||||||
import org.apache.druid.k8s.overlord.common.PeonPhase;
|
import org.apache.druid.k8s.overlord.common.PeonPhase;
|
||||||
import org.apache.druid.k8s.overlord.common.TaskAdapter;
|
import org.apache.druid.k8s.overlord.common.TaskAdapter;
|
||||||
import org.apache.druid.tasklogs.TaskLogPusher;
|
|
||||||
import org.apache.druid.tasklogs.TaskLogStreamer;
|
import org.apache.druid.tasklogs.TaskLogStreamer;
|
||||||
|
import org.apache.druid.tasklogs.TaskLogs;
|
||||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
@ -66,6 +69,7 @@ import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -73,6 +77,7 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -101,28 +106,31 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
||||||
|
|
||||||
protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
|
protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
|
||||||
protected final TaskAdapter adapter;
|
protected final TaskAdapter adapter;
|
||||||
|
protected final KubernetesPeonClient client;
|
||||||
|
|
||||||
|
private final ObjectMapper mapper;
|
||||||
private final KubernetesTaskRunnerConfig k8sConfig;
|
private final KubernetesTaskRunnerConfig k8sConfig;
|
||||||
private final TaskQueueConfig taskQueueConfig;
|
private final TaskQueueConfig taskQueueConfig;
|
||||||
private final TaskLogPusher taskLogPusher;
|
private final TaskLogs taskLogs;
|
||||||
private final ListeningExecutorService exec;
|
private final ListeningExecutorService exec;
|
||||||
private final KubernetesPeonClient client;
|
|
||||||
private final HttpClient httpClient;
|
private final HttpClient httpClient;
|
||||||
|
|
||||||
|
|
||||||
public KubernetesTaskRunner(
|
public KubernetesTaskRunner(
|
||||||
|
ObjectMapper mapper,
|
||||||
TaskAdapter adapter,
|
TaskAdapter adapter,
|
||||||
KubernetesTaskRunnerConfig k8sConfig,
|
KubernetesTaskRunnerConfig k8sConfig,
|
||||||
TaskQueueConfig taskQueueConfig,
|
TaskQueueConfig taskQueueConfig,
|
||||||
TaskLogPusher taskLogPusher,
|
TaskLogs taskLogs,
|
||||||
KubernetesPeonClient client,
|
KubernetesPeonClient client,
|
||||||
HttpClient httpClient
|
HttpClient httpClient
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
this.mapper = mapper;
|
||||||
this.adapter = adapter;
|
this.adapter = adapter;
|
||||||
this.k8sConfig = k8sConfig;
|
this.k8sConfig = k8sConfig;
|
||||||
this.taskQueueConfig = taskQueueConfig;
|
this.taskQueueConfig = taskQueueConfig;
|
||||||
this.taskLogPusher = taskLogPusher;
|
this.taskLogs = taskLogs;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.httpClient = httpClient;
|
this.httpClient = httpClient;
|
||||||
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
|
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
|
||||||
|
@ -178,20 +186,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
||||||
completedPhase = monitorJob(k8sTaskId);
|
completedPhase = monitorJob(k8sTaskId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TaskStatus status;
|
TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
|
||||||
if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) {
|
|
||||||
status = TaskStatus.success(task.getId());
|
|
||||||
} else if (completedPhase.getJob() == null) {
|
|
||||||
status = TaskStatus.failure(
|
|
||||||
task.getId(),
|
|
||||||
"K8s Job for task disappeared before completion: " + k8sTaskId
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
status = TaskStatus.failure(
|
|
||||||
task.getId(),
|
|
||||||
"Task failed: " + k8sTaskId
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if (completedPhase.getJobDuration().isPresent()) {
|
if (completedPhase.getJobDuration().isPresent()) {
|
||||||
status = status.withDuration(completedPhase.getJobDuration().get());
|
status = status.withDuration(completedPhase.getJobDuration().get());
|
||||||
}
|
}
|
||||||
|
@ -210,7 +205,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
||||||
if (logStream.isPresent()) {
|
if (logStream.isPresent()) {
|
||||||
FileUtils.copyInputStreamToFile(logStream.get(), log.toFile());
|
FileUtils.copyInputStreamToFile(logStream.get(), log.toFile());
|
||||||
}
|
}
|
||||||
taskLogPusher.pushTaskLog(task.getId(), log.toFile());
|
taskLogs.pushTaskLog(task.getId(), log.toFile());
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
Files.deleteIfExists(log);
|
Files.deleteIfExists(log);
|
||||||
|
@ -243,10 +238,31 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) throws IOException
|
||||||
|
{
|
||||||
|
Optional<InputStream> maybeTaskStatusStream = taskLogs.streamTaskStatus(task.getOriginalTaskId());
|
||||||
|
if (maybeTaskStatusStream.isPresent()) {
|
||||||
|
String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(), StandardCharsets.UTF_8);
|
||||||
|
return mapper.readValue(taskStatus, TaskStatus.class);
|
||||||
|
} else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) {
|
||||||
|
// fallback to behavior before the introduction of task status streaming for backwards compatibility
|
||||||
|
return TaskStatus.success(task.getOriginalTaskId());
|
||||||
|
} else if (Objects.isNull(jobResponse.getJob())) {
|
||||||
|
return TaskStatus.failure(
|
||||||
|
task.getOriginalTaskId(),
|
||||||
|
StringUtils.format("Task [%s] failed kubernetes job disappeared before completion", task.getOriginalTaskId())
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
return TaskStatus.failure(
|
||||||
|
task.getOriginalTaskId(),
|
||||||
|
StringUtils.format("Task [%s] failed", task.getOriginalTaskId())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateStatus(Task task, TaskStatus status)
|
public void updateStatus(Task task, TaskStatus status)
|
||||||
{
|
{
|
||||||
log.info("Updating task: %s with status %s", task.getId(), status);
|
|
||||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
|
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -508,8 +524,8 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
|
||||||
}
|
}
|
||||||
boolean tlsEnabled = Boolean.parseBoolean(
|
boolean tlsEnabled = Boolean.parseBoolean(
|
||||||
mainPod.getMetadata()
|
mainPod.getMetadata()
|
||||||
.getAnnotations()
|
.getAnnotations()
|
||||||
.getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
|
.getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
|
||||||
return TaskLocation.create(
|
return TaskLocation.create(
|
||||||
mainPod.getStatus().getPodIP(),
|
mainPod.getStatus().getPodIP(),
|
||||||
DruidK8sConstants.PORT,
|
DruidK8sConstants.PORT,
|
||||||
|
|
|
@ -41,7 +41,7 @@ import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
|
||||||
import org.apache.druid.k8s.overlord.common.TaskAdapter;
|
import org.apache.druid.k8s.overlord.common.TaskAdapter;
|
||||||
import org.apache.druid.server.DruidNode;
|
import org.apache.druid.server.DruidNode;
|
||||||
import org.apache.druid.server.log.StartupLoggingConfig;
|
import org.apache.druid.server.log.StartupLoggingConfig;
|
||||||
import org.apache.druid.tasklogs.TaskLogPusher;
|
import org.apache.druid.tasklogs.TaskLogs;
|
||||||
|
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
@ -54,7 +54,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
|
||||||
private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
|
private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
|
||||||
private final StartupLoggingConfig startupLoggingConfig;
|
private final StartupLoggingConfig startupLoggingConfig;
|
||||||
private final TaskQueueConfig taskQueueConfig;
|
private final TaskQueueConfig taskQueueConfig;
|
||||||
private final TaskLogPusher taskLogPusher;
|
private final TaskLogs taskLogs;
|
||||||
private final DruidNode druidNode;
|
private final DruidNode druidNode;
|
||||||
private final TaskConfig taskConfig;
|
private final TaskConfig taskConfig;
|
||||||
private final Properties properties;
|
private final Properties properties;
|
||||||
|
@ -68,7 +68,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
|
||||||
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
|
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
|
||||||
StartupLoggingConfig startupLoggingConfig,
|
StartupLoggingConfig startupLoggingConfig,
|
||||||
@JacksonInject TaskQueueConfig taskQueueConfig,
|
@JacksonInject TaskQueueConfig taskQueueConfig,
|
||||||
TaskLogPusher taskLogPusher,
|
TaskLogs taskLogs,
|
||||||
@Self DruidNode druidNode,
|
@Self DruidNode druidNode,
|
||||||
TaskConfig taskConfig,
|
TaskConfig taskConfig,
|
||||||
Properties properties
|
Properties properties
|
||||||
|
@ -80,7 +80,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
|
||||||
this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
|
this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
|
||||||
this.startupLoggingConfig = startupLoggingConfig;
|
this.startupLoggingConfig = startupLoggingConfig;
|
||||||
this.taskQueueConfig = taskQueueConfig;
|
this.taskQueueConfig = taskQueueConfig;
|
||||||
this.taskLogPusher = taskLogPusher;
|
this.taskLogs = taskLogs;
|
||||||
this.druidNode = druidNode;
|
this.druidNode = druidNode;
|
||||||
this.taskConfig = taskConfig;
|
this.taskConfig = taskConfig;
|
||||||
this.properties = properties;
|
this.properties = properties;
|
||||||
|
@ -100,10 +100,11 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
|
||||||
}
|
}
|
||||||
|
|
||||||
runner = new KubernetesTaskRunner(
|
runner = new KubernetesTaskRunner(
|
||||||
|
smileMapper,
|
||||||
buildTaskAdapter(client),
|
buildTaskAdapter(client),
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
|
new DruidKubernetesPeonClient(client, kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
|
||||||
httpClient
|
httpClient
|
||||||
);
|
);
|
||||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
|
||||||
import org.apache.druid.server.DruidNode;
|
import org.apache.druid.server.DruidNode;
|
||||||
import org.apache.druid.server.log.StartupLoggingConfig;
|
import org.apache.druid.server.log.StartupLoggingConfig;
|
||||||
import org.apache.druid.tasklogs.NoopTaskLogs;
|
import org.apache.druid.tasklogs.NoopTaskLogs;
|
||||||
import org.apache.druid.tasklogs.TaskLogPusher;
|
import org.apache.druid.tasklogs.TaskLogs;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -45,7 +45,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
|
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
|
||||||
private StartupLoggingConfig startupLoggingConfig;
|
private StartupLoggingConfig startupLoggingConfig;
|
||||||
private TaskQueueConfig taskQueueConfig;
|
private TaskQueueConfig taskQueueConfig;
|
||||||
private TaskLogPusher taskLogPusher;
|
private TaskLogs taskLogs;
|
||||||
private DruidNode druidNode;
|
private DruidNode druidNode;
|
||||||
private TaskConfig taskConfig;
|
private TaskConfig taskConfig;
|
||||||
private Properties properties;
|
private Properties properties;
|
||||||
|
@ -62,7 +62,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
null,
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
taskLogPusher = new NoopTaskLogs();
|
taskLogs = new NoopTaskLogs();
|
||||||
druidNode = new DruidNode(
|
druidNode = new DruidNode(
|
||||||
"test",
|
"test",
|
||||||
"",
|
"",
|
||||||
|
@ -85,7 +85,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
startupLoggingConfig,
|
startupLoggingConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
druidNode,
|
druidNode,
|
||||||
taskConfig,
|
taskConfig,
|
||||||
properties
|
properties
|
||||||
|
@ -106,7 +106,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
startupLoggingConfig,
|
startupLoggingConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
druidNode,
|
druidNode,
|
||||||
taskConfig,
|
taskConfig,
|
||||||
properties
|
properties
|
||||||
|
@ -129,7 +129,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
startupLoggingConfig,
|
startupLoggingConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
druidNode,
|
druidNode,
|
||||||
taskConfig,
|
taskConfig,
|
||||||
properties
|
properties
|
||||||
|
@ -153,7 +153,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
startupLoggingConfig,
|
startupLoggingConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
druidNode,
|
druidNode,
|
||||||
taskConfig,
|
taskConfig,
|
||||||
props
|
props
|
||||||
|
@ -179,7 +179,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
startupLoggingConfig,
|
startupLoggingConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
druidNode,
|
druidNode,
|
||||||
taskConfig,
|
taskConfig,
|
||||||
props
|
props
|
||||||
|
@ -206,7 +206,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
startupLoggingConfig,
|
startupLoggingConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
druidNode,
|
druidNode,
|
||||||
taskConfig,
|
taskConfig,
|
||||||
props
|
props
|
||||||
|
@ -230,7 +230,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
startupLoggingConfig,
|
startupLoggingConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
druidNode,
|
druidNode,
|
||||||
taskConfig,
|
taskConfig,
|
||||||
props
|
props
|
||||||
|
@ -257,7 +257,7 @@ public class KubernetesTaskRunnerFactoryTest
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
startupLoggingConfig,
|
startupLoggingConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
druidNode,
|
druidNode,
|
||||||
taskConfig,
|
taskConfig,
|
||||||
props
|
props
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
|
||||||
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
|
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.http.client.HttpClient;
|
import org.apache.druid.java.util.http.client.HttpClient;
|
||||||
import org.apache.druid.java.util.http.client.Request;
|
import org.apache.druid.java.util.http.client.Request;
|
||||||
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
|
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
|
||||||
|
@ -62,7 +63,7 @@ import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
|
||||||
import org.apache.druid.k8s.overlord.common.PeonPhase;
|
import org.apache.druid.k8s.overlord.common.PeonPhase;
|
||||||
import org.apache.druid.server.DruidNode;
|
import org.apache.druid.server.DruidNode;
|
||||||
import org.apache.druid.server.log.StartupLoggingConfig;
|
import org.apache.druid.server.log.StartupLoggingConfig;
|
||||||
import org.apache.druid.tasklogs.TaskLogPusher;
|
import org.apache.druid.tasklogs.TaskLogs;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -96,7 +97,7 @@ public class KubernetesTaskRunnerTest
|
||||||
private StartupLoggingConfig startupLoggingConfig;
|
private StartupLoggingConfig startupLoggingConfig;
|
||||||
private ObjectMapper jsonMapper;
|
private ObjectMapper jsonMapper;
|
||||||
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
|
private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
|
||||||
private TaskLogPusher taskLogPusher;
|
private TaskLogs taskLogs;
|
||||||
private DruidNode druidNode;
|
private DruidNode druidNode;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -116,7 +117,7 @@ public class KubernetesTaskRunnerTest
|
||||||
kubernetesTaskRunnerConfig.javaOptsArray = Collections.singletonList("-Xmx2g");
|
kubernetesTaskRunnerConfig.javaOptsArray = Collections.singletonList("-Xmx2g");
|
||||||
taskQueueConfig = new TaskQueueConfig(1, Period.millis(1), Period.millis(1), Period.millis(1));
|
taskQueueConfig = new TaskQueueConfig(1, Period.millis(1), Period.millis(1), Period.millis(1));
|
||||||
startupLoggingConfig = new StartupLoggingConfig();
|
startupLoggingConfig = new StartupLoggingConfig();
|
||||||
taskLogPusher = mock(TaskLogPusher.class);
|
taskLogs = mock(TaskLogs.class);
|
||||||
druidNode = mock(DruidNode.class);
|
druidNode = mock(DruidNode.class);
|
||||||
when(druidNode.isEnableTlsPort()).thenReturn(false);
|
when(druidNode.isEnableTlsPort()).thenReturn(false);
|
||||||
}
|
}
|
||||||
|
@ -156,22 +157,30 @@ public class KubernetesTaskRunnerTest
|
||||||
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
||||||
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
||||||
|
|
||||||
|
TaskStatus taskStatus = TaskStatus.success(task.getId());
|
||||||
|
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
|
||||||
|
jsonMapper.writeValueAsString(taskStatus),
|
||||||
|
StandardCharsets.UTF_8))
|
||||||
|
);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
adapter,
|
adapter,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
KubernetesTaskRunner spyRunner = spy(taskRunner);
|
KubernetesTaskRunner spyRunner = spy(taskRunner);
|
||||||
|
|
||||||
ListenableFuture<TaskStatus> future = spyRunner.run(task);
|
ListenableFuture<TaskStatus> future = spyRunner.run(task);
|
||||||
future.get();
|
TaskStatus actualTaskStatus = future.get();
|
||||||
|
Assert.assertEquals(taskStatus, actualTaskStatus);
|
||||||
// we should never launch the job here, one exists
|
// we should never launch the job here, one exists
|
||||||
verify(peonClient, never()).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class));
|
verify(peonClient, never()).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class));
|
||||||
verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
|
verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
|
||||||
verify(spyRunner, times(1)).updateStatus(eq(task), eq(TaskStatus.success(task.getId())));
|
verify(spyRunner, times(1)).updateStatus(eq(task), eq(taskStatus));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -208,22 +217,156 @@ public class KubernetesTaskRunnerTest
|
||||||
job,
|
job,
|
||||||
PeonPhase.SUCCEEDED
|
PeonPhase.SUCCEEDED
|
||||||
));
|
));
|
||||||
|
|
||||||
|
TaskStatus taskStatus = TaskStatus.success(task.getId());
|
||||||
|
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
|
||||||
|
jsonMapper.writeValueAsString(taskStatus),
|
||||||
|
StandardCharsets.UTF_8))
|
||||||
|
);
|
||||||
|
|
||||||
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
||||||
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
adapter,
|
adapter,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
KubernetesTaskRunner spyRunner = spy(taskRunner);
|
KubernetesTaskRunner spyRunner = spy(taskRunner);
|
||||||
|
|
||||||
|
ListenableFuture<TaskStatus> future = spyRunner.run(task);
|
||||||
|
TaskStatus actualTaskStatus = future.get();
|
||||||
|
Assert.assertEquals(taskStatus, actualTaskStatus);
|
||||||
|
// we should never launch the job here, one exists
|
||||||
|
verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class));
|
||||||
|
verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
|
||||||
|
verify(spyRunner, times(1)).updateStatus(eq(task), eq(taskStatus));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_run_withSuccessfulJobAndWithoutStatusFile_returnsSucessfulTask() throws Exception
|
||||||
|
{
|
||||||
|
Task task = makeTask();
|
||||||
|
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
|
||||||
|
|
||||||
|
Job job = mock(Job.class);
|
||||||
|
ObjectMeta jobMetadata = mock(ObjectMeta.class);
|
||||||
|
when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
|
||||||
|
JobStatus status = mock(JobStatus.class);
|
||||||
|
when(status.getActive()).thenReturn(1).thenReturn(null);
|
||||||
|
when(job.getStatus()).thenReturn(status);
|
||||||
|
when(job.getMetadata()).thenReturn(jobMetadata);
|
||||||
|
|
||||||
|
Pod peonPod = mock(Pod.class);
|
||||||
|
ObjectMeta metadata = mock(ObjectMeta.class);
|
||||||
|
when(metadata.getName()).thenReturn("peonPodName");
|
||||||
|
when(peonPod.getMetadata()).thenReturn(metadata);
|
||||||
|
PodStatus podStatus = mock(PodStatus.class);
|
||||||
|
when(podStatus.getPodIP()).thenReturn("SomeIP");
|
||||||
|
when(peonPod.getStatus()).thenReturn(podStatus);
|
||||||
|
|
||||||
|
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
|
||||||
|
when(adapter.fromTask(eq(task))).thenReturn(job);
|
||||||
|
|
||||||
|
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
|
||||||
|
|
||||||
|
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
|
||||||
|
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
|
||||||
|
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
|
||||||
|
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
|
||||||
|
job,
|
||||||
|
PeonPhase.SUCCEEDED
|
||||||
|
));
|
||||||
|
|
||||||
|
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
|
||||||
|
|
||||||
|
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
||||||
|
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
||||||
|
|
||||||
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
|
adapter,
|
||||||
|
kubernetesTaskRunnerConfig,
|
||||||
|
taskQueueConfig,
|
||||||
|
taskLogs,
|
||||||
|
peonClient,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
KubernetesTaskRunner spyRunner = spy(taskRunner);
|
||||||
|
|
||||||
ListenableFuture<TaskStatus> future = spyRunner.run(task);
|
ListenableFuture<TaskStatus> future = spyRunner.run(task);
|
||||||
future.get();
|
TaskStatus actualTaskStatus = future.get();
|
||||||
|
Assert.assertTrue(actualTaskStatus.isSuccess());
|
||||||
|
|
||||||
|
// we should never launch the job here, one exists
|
||||||
|
verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class));
|
||||||
|
verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
|
||||||
|
verify(spyRunner, times(1)).updateStatus(eq(task), eq(actualTaskStatus));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_run_withFailedJob_returnsFailedTask() throws Exception
|
||||||
|
{
|
||||||
|
Task task = makeTask();
|
||||||
|
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
|
||||||
|
|
||||||
|
Job job = mock(Job.class);
|
||||||
|
ObjectMeta jobMetadata = mock(ObjectMeta.class);
|
||||||
|
when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
|
||||||
|
JobStatus status = mock(JobStatus.class);
|
||||||
|
when(status.getActive()).thenReturn(1).thenReturn(null);
|
||||||
|
when(job.getStatus()).thenReturn(status);
|
||||||
|
when(job.getMetadata()).thenReturn(jobMetadata);
|
||||||
|
|
||||||
|
Pod peonPod = mock(Pod.class);
|
||||||
|
ObjectMeta metadata = mock(ObjectMeta.class);
|
||||||
|
when(metadata.getName()).thenReturn("peonPodName");
|
||||||
|
when(peonPod.getMetadata()).thenReturn(metadata);
|
||||||
|
PodStatus podStatus = mock(PodStatus.class);
|
||||||
|
when(podStatus.getPodIP()).thenReturn("SomeIP");
|
||||||
|
when(peonPod.getStatus()).thenReturn(podStatus);
|
||||||
|
|
||||||
|
K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
|
||||||
|
when(adapter.fromTask(eq(task))).thenReturn(job);
|
||||||
|
|
||||||
|
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
|
||||||
|
|
||||||
|
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
|
||||||
|
when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod);
|
||||||
|
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
|
||||||
|
when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse(
|
||||||
|
job,
|
||||||
|
PeonPhase.FAILED
|
||||||
|
));
|
||||||
|
|
||||||
|
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
|
||||||
|
|
||||||
|
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
||||||
|
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
||||||
|
|
||||||
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
|
adapter,
|
||||||
|
kubernetesTaskRunnerConfig,
|
||||||
|
taskQueueConfig,
|
||||||
|
taskLogs,
|
||||||
|
peonClient,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
KubernetesTaskRunner spyRunner = spy(taskRunner);
|
||||||
|
|
||||||
|
ListenableFuture<TaskStatus> future = spyRunner.run(task);
|
||||||
|
TaskStatus actualTaskStatus = future.get();
|
||||||
|
Assert.assertTrue(actualTaskStatus.isFailure());
|
||||||
|
Assert.assertEquals(
|
||||||
|
StringUtils.format("Task [%s] failed", task.getId()),
|
||||||
|
actualTaskStatus.getErrorMsg()
|
||||||
|
);
|
||||||
|
|
||||||
// we should never launch the job here, one exists
|
// we should never launch the job here, one exists
|
||||||
verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class));
|
verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class));
|
||||||
verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
|
verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
|
||||||
|
@ -233,7 +376,7 @@ public class KubernetesTaskRunnerTest
|
||||||
DruidK8sConstants.TLS_PORT,
|
DruidK8sConstants.TLS_PORT,
|
||||||
druidNode.isEnableTlsPort()
|
druidNode.isEnableTlsPort()
|
||||||
);
|
);
|
||||||
verify(spyRunner, times(1)).updateStatus(eq(task), eq(TaskStatus.success(task.getId(), expectedTaskLocation)));
|
verify(spyRunner, times(1)).updateStatus(eq(task), eq(actualTaskStatus));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -274,14 +417,22 @@ public class KubernetesTaskRunnerTest
|
||||||
job,
|
job,
|
||||||
PeonPhase.SUCCEEDED
|
PeonPhase.SUCCEEDED
|
||||||
));
|
));
|
||||||
|
|
||||||
|
TaskStatus taskStatus = TaskStatus.success(task.getId());
|
||||||
|
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
|
||||||
|
jsonMapper.writeValueAsString(taskStatus),
|
||||||
|
StandardCharsets.UTF_8))
|
||||||
|
);
|
||||||
|
|
||||||
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
||||||
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
adapter,
|
adapter,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
@ -337,14 +488,22 @@ public class KubernetesTaskRunnerTest
|
||||||
job,
|
job,
|
||||||
PeonPhase.SUCCEEDED
|
PeonPhase.SUCCEEDED
|
||||||
));
|
));
|
||||||
|
|
||||||
|
TaskStatus taskStatus = TaskStatus.success(task.getId());
|
||||||
|
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
|
||||||
|
jsonMapper.writeValueAsString(taskStatus),
|
||||||
|
StandardCharsets.UTF_8))
|
||||||
|
);
|
||||||
|
|
||||||
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
|
||||||
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
adapter,
|
adapter,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
@ -411,10 +570,11 @@ public class KubernetesTaskRunnerTest
|
||||||
);
|
);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
adapter,
|
jsonMapper,
|
||||||
|
null,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
httpClient
|
httpClient
|
||||||
);
|
);
|
||||||
|
@ -432,10 +592,11 @@ public class KubernetesTaskRunnerTest
|
||||||
Task task = makeTask();
|
Task task = makeTask();
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
mock(K8sTaskAdapter.class),
|
mock(K8sTaskAdapter.class),
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
mock(DruidKubernetesPeonClient.class),
|
mock(DruidKubernetesPeonClient.class),
|
||||||
mock(HttpClient.class)
|
mock(HttpClient.class)
|
||||||
);
|
);
|
||||||
|
@ -484,10 +645,11 @@ public class KubernetesTaskRunnerTest
|
||||||
when(future.get()).thenThrow(InterruptedException.class);
|
when(future.get()).thenThrow(InterruptedException.class);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
adapter,
|
jsonMapper,
|
||||||
|
null,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
httpClient
|
httpClient
|
||||||
);
|
);
|
||||||
|
@ -539,10 +701,11 @@ public class KubernetesTaskRunnerTest
|
||||||
when(future.get()).thenThrow(InterruptedException.class);
|
when(future.get()).thenThrow(InterruptedException.class);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
adapter,
|
jsonMapper,
|
||||||
|
null,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
httpClient
|
httpClient
|
||||||
);
|
);
|
||||||
|
@ -594,10 +757,11 @@ public class KubernetesTaskRunnerTest
|
||||||
when(future.get()).thenThrow(ExecutionException.class);
|
when(future.get()).thenThrow(ExecutionException.class);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
adapter,
|
jsonMapper,
|
||||||
|
null,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
httpClient
|
httpClient
|
||||||
);
|
);
|
||||||
|
@ -619,10 +783,11 @@ public class KubernetesTaskRunnerTest
|
||||||
when(peonClient.getMainJobPod(any())).thenReturn(null).thenReturn(pod);
|
when(peonClient.getMainJobPod(any())).thenReturn(null).thenReturn(pod);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
mock(K8sTaskAdapter.class),
|
mock(K8sTaskAdapter.class),
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
@ -647,10 +812,11 @@ public class KubernetesTaskRunnerTest
|
||||||
Period.millis(1)
|
Period.millis(1)
|
||||||
);
|
);
|
||||||
assertThrows(IllegalArgumentException.class, () -> new KubernetesTaskRunner(
|
assertThrows(IllegalArgumentException.class, () -> new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
mock(K8sTaskAdapter.class),
|
mock(K8sTaskAdapter.class),
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
mock(DruidKubernetesPeonClient.class),
|
mock(DruidKubernetesPeonClient.class),
|
||||||
null
|
null
|
||||||
));
|
));
|
||||||
|
@ -724,6 +890,8 @@ public class KubernetesTaskRunnerTest
|
||||||
|
|
||||||
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
|
DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class);
|
||||||
|
|
||||||
|
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
|
||||||
|
|
||||||
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
|
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
|
||||||
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
|
when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
|
||||||
|
|
||||||
|
@ -736,18 +904,19 @@ public class KubernetesTaskRunnerTest
|
||||||
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
|
||||||
|
|
||||||
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
|
||||||
|
jsonMapper,
|
||||||
adapter,
|
adapter,
|
||||||
kubernetesTaskRunnerConfig,
|
kubernetesTaskRunnerConfig,
|
||||||
taskQueueConfig,
|
taskQueueConfig,
|
||||||
taskLogPusher,
|
taskLogs,
|
||||||
peonClient,
|
peonClient,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
KubernetesTaskRunner spyRunner = spy(taskRunner);
|
KubernetesTaskRunner spyRunner = spy(taskRunner);
|
||||||
ListenableFuture<TaskStatus> future = spyRunner.run(task);
|
ListenableFuture<TaskStatus> future = spyRunner.run(task);
|
||||||
TaskStatus taskStatus = future.get();
|
TaskStatus taskStatusResponse = future.get();
|
||||||
Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
|
Assert.assertEquals(TaskState.FAILED, taskStatusResponse.getStatusCode());
|
||||||
Assert.assertEquals("K8s Job for task disappeared before completion: [ k8sTaskId, k8staskid]", taskStatus.getErrorMsg());
|
Assert.assertEquals("Task [k8sTaskId] failed kubernetes job disappeared before completion", taskStatusResponse.getErrorMsg());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -762,9 +931,9 @@ public class KubernetesTaskRunnerTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
ImmutableMap.of("druid.indexer.runner.javaOpts", "abc",
|
ImmutableMap.of("druid.indexer.runner.javaOpts", "abc",
|
||||||
"druid.indexer.fork.property.druid.processing.buffer.sizeBytes", "2048",
|
"druid.indexer.fork.property.druid.processing.buffer.sizeBytes", "2048",
|
||||||
"druid.peon.pod.cpu", "1",
|
"druid.peon.pod.cpu", "1",
|
||||||
"druid.peon.pod.memory", "2G"
|
"druid.peon.pod.memory", "2G"
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,6 +82,14 @@ public class AzureTaskLogs implements TaskLogs
|
||||||
pushTaskFile(reportFile, taskKey);
|
pushTaskFile(reportFile, taskKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pushTaskStatus(String taskid, File statusFile)
|
||||||
|
{
|
||||||
|
final String taskKey = getTaskStatusKey(taskid);
|
||||||
|
log.info("Pushing task status %s to: %s", statusFile, taskKey);
|
||||||
|
pushTaskFile(statusFile, taskKey);
|
||||||
|
}
|
||||||
|
|
||||||
private void pushTaskFile(final File logFile, String taskKey)
|
private void pushTaskFile(final File logFile, String taskKey)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
@ -110,6 +118,12 @@ public class AzureTaskLogs implements TaskLogs
|
||||||
return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
|
return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<InputStream> streamTaskStatus(String taskid) throws IOException
|
||||||
|
{
|
||||||
|
return streamTaskFile(taskid, 0, getTaskStatusKey(taskid));
|
||||||
|
}
|
||||||
|
|
||||||
private Optional<InputStream> streamTaskFile(final String taskid, final long offset, String taskKey)
|
private Optional<InputStream> streamTaskFile(final String taskid, final long offset, String taskKey)
|
||||||
throws IOException
|
throws IOException
|
||||||
{
|
{
|
||||||
|
@ -154,6 +168,11 @@ public class AzureTaskLogs implements TaskLogs
|
||||||
return StringUtils.format("%s/%s/report.json", config.getPrefix(), taskid);
|
return StringUtils.format("%s/%s/report.json", config.getPrefix(), taskid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getTaskStatusKey(String taskid)
|
||||||
|
{
|
||||||
|
return StringUtils.format("%s/%s/status.json", config.getPrefix(), taskid);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void killAll() throws IOException
|
public void killAll() throws IOException
|
||||||
{
|
{
|
||||||
|
|
|
@ -155,6 +155,28 @@ public class AzureTaskLogsTest extends EasyMockSupport
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_PushTaskStatus_uploadsBlob() throws Exception
|
||||||
|
{
|
||||||
|
final File tmpDir = FileUtils.createTempDir();
|
||||||
|
|
||||||
|
try {
|
||||||
|
final File logFile = new File(tmpDir, "status.json");
|
||||||
|
|
||||||
|
azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/status.json");
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
azureTaskLogs.pushTaskStatus(TASK_ID, logFile);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
FileUtils.deleteDirectory(tmpDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(expected = RuntimeException.class)
|
@Test(expected = RuntimeException.class)
|
||||||
public void test_PushTaskReports_exception_rethrowsException() throws Exception
|
public void test_PushTaskReports_exception_rethrowsException() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -323,6 +345,79 @@ public class AzureTaskLogsTest extends EasyMockSupport
|
||||||
verifyAll();
|
verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_streamTaskStatus_blobExists_succeeds() throws Exception
|
||||||
|
{
|
||||||
|
final String taskStatus = "{}";
|
||||||
|
|
||||||
|
final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
|
||||||
|
EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
|
||||||
|
EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length());
|
||||||
|
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn(
|
||||||
|
new ByteArrayInputStream(taskStatus.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
|
||||||
|
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
final Optional<InputStream> stream = azureTaskLogs.streamTaskStatus(TASK_ID);
|
||||||
|
|
||||||
|
final StringWriter writer = new StringWriter();
|
||||||
|
IOUtils.copy(stream.get(), writer, "UTF-8");
|
||||||
|
Assert.assertEquals(writer.toString(), taskStatus);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_streamTaskStatus_blobDoesNotExist_returnsAbsent() throws Exception
|
||||||
|
{
|
||||||
|
final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/status.json";
|
||||||
|
EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(false);
|
||||||
|
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
final Optional<InputStream> stream = azureTaskLogs.streamTaskStatus(TASK_ID_NOT_FOUND);
|
||||||
|
|
||||||
|
|
||||||
|
Assert.assertFalse(stream.isPresent());
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void test_streamTaskStatus_exceptionWhenGettingStream_throwsException() throws Exception
|
||||||
|
{
|
||||||
|
final String taskStatus = "{}";
|
||||||
|
|
||||||
|
final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
|
||||||
|
EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true);
|
||||||
|
EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length());
|
||||||
|
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andThrow(
|
||||||
|
new URISyntaxException("", ""));
|
||||||
|
|
||||||
|
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
final Optional<InputStream> stream = azureTaskLogs.streamTaskStatus(TASK_ID);
|
||||||
|
|
||||||
|
final StringWriter writer = new StringWriter();
|
||||||
|
IOUtils.copy(stream.get(), writer, "UTF-8");
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsException() throws Exception
|
||||||
|
{
|
||||||
|
final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
|
||||||
|
EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
|
||||||
|
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
azureTaskLogs.streamTaskStatus(TASK_ID);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_killAll_noException_deletesAllTaskLogs() throws Exception
|
public void test_killAll_noException_deletesAllTaskLogs() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -74,6 +74,14 @@ public class GoogleTaskLogs implements TaskLogs
|
||||||
pushTaskFile(reportFile, taskKey);
|
pushTaskFile(reportFile, taskKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pushTaskStatus(String taskid, File statusFile) throws IOException
|
||||||
|
{
|
||||||
|
final String taskKey = getTaskStatusKey(taskid);
|
||||||
|
LOG.info("Pushing task status %s to: %s", statusFile, taskKey);
|
||||||
|
pushTaskFile(statusFile, taskKey);
|
||||||
|
}
|
||||||
|
|
||||||
private void pushTaskFile(final File logFile, final String taskKey) throws IOException
|
private void pushTaskFile(final File logFile, final String taskKey) throws IOException
|
||||||
{
|
{
|
||||||
try (final InputStream fileStream = Files.newInputStream(logFile.toPath())) {
|
try (final InputStream fileStream = Files.newInputStream(logFile.toPath())) {
|
||||||
|
@ -115,6 +123,13 @@ public class GoogleTaskLogs implements TaskLogs
|
||||||
return streamTaskFile(taskid, 0, taskKey);
|
return streamTaskFile(taskid, 0, taskKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<InputStream> streamTaskStatus(String taskid) throws IOException
|
||||||
|
{
|
||||||
|
final String taskKey = getTaskStatusKey(taskid);
|
||||||
|
return streamTaskFile(taskid, 0, taskKey);
|
||||||
|
}
|
||||||
|
|
||||||
private Optional<InputStream> streamTaskFile(final String taskid, final long offset, String taskKey)
|
private Optional<InputStream> streamTaskFile(final String taskid, final long offset, String taskKey)
|
||||||
throws IOException
|
throws IOException
|
||||||
{
|
{
|
||||||
|
@ -156,6 +171,11 @@ public class GoogleTaskLogs implements TaskLogs
|
||||||
return config.getPrefix() + "/" + taskid.replace(':', '_') + ".report.json";
|
return config.getPrefix() + "/" + taskid.replace(':', '_') + ".report.json";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getTaskStatusKey(String taskid)
|
||||||
|
{
|
||||||
|
return config.getPrefix() + "/" + taskid.replace(':', '_') + ".status.json";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void killAll() throws IOException
|
public void killAll() throws IOException
|
||||||
{
|
{
|
||||||
|
|
|
@ -109,6 +109,35 @@ public class GoogleTaskLogsTest extends EasyMockSupport
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPushTaskStatus() throws Exception
|
||||||
|
{
|
||||||
|
final File tmpDir = FileUtils.createTempDir();
|
||||||
|
|
||||||
|
try {
|
||||||
|
final File statusFile = new File(tmpDir, "status.json");
|
||||||
|
BufferedWriter output = Files.newBufferedWriter(statusFile.toPath(), StandardCharsets.UTF_8);
|
||||||
|
output.write("{}");
|
||||||
|
output.close();
|
||||||
|
|
||||||
|
storage.insert(
|
||||||
|
EasyMock.eq(BUCKET),
|
||||||
|
EasyMock.eq(PREFIX + "/" + TASKID),
|
||||||
|
EasyMock.anyObject(InputStreamContent.class)
|
||||||
|
);
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
googleTaskLogs.pushTaskLog(TASKID, statusFile);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
FileUtils.deleteDirectory(tmpDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStreamTaskLogWithoutOffset() throws Exception
|
public void testStreamTaskLogWithoutOffset() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -177,6 +206,27 @@ public class GoogleTaskLogsTest extends EasyMockSupport
|
||||||
verifyAll();
|
verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStreamTaskStatus() throws Exception
|
||||||
|
{
|
||||||
|
final String taskStatus = "{}";
|
||||||
|
|
||||||
|
final String logPath = PREFIX + "/" + TASKID + ".status.json";
|
||||||
|
EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
|
||||||
|
EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) taskStatus.length());
|
||||||
|
EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(taskStatus)));
|
||||||
|
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
final Optional<InputStream> stream = googleTaskLogs.streamTaskStatus(TASKID);
|
||||||
|
|
||||||
|
final StringWriter writer = new StringWriter();
|
||||||
|
IOUtils.copy(stream.get(), writer, "UTF-8");
|
||||||
|
Assert.assertEquals(writer.toString(), taskStatus);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
|
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
|
||||||
{
|
{
|
||||||
|
|
|
@ -75,6 +75,15 @@ public class HdfsTaskLogs implements TaskLogs
|
||||||
log.info("Wrote task reports to: %s", path);
|
log.info("Wrote task reports to: %s", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pushTaskStatus(String taskId, File statusFile) throws IOException
|
||||||
|
{
|
||||||
|
final Path path = getTaskStatusFileFromId(taskId);
|
||||||
|
log.info("Writing task status to: %s", path);
|
||||||
|
pushTaskFile(path, statusFile);
|
||||||
|
log.info("Wrote task status to: %s", path);
|
||||||
|
}
|
||||||
|
|
||||||
private void pushTaskFile(Path path, File logFile) throws IOException
|
private void pushTaskFile(Path path, File logFile) throws IOException
|
||||||
{
|
{
|
||||||
final FileSystem fs = path.getFileSystem(hadoopConfig);
|
final FileSystem fs = path.getFileSystem(hadoopConfig);
|
||||||
|
@ -100,6 +109,13 @@ public class HdfsTaskLogs implements TaskLogs
|
||||||
return streamTaskFile(path, 0);
|
return streamTaskFile(path, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<InputStream> streamTaskStatus(String taskId) throws IOException
|
||||||
|
{
|
||||||
|
final Path path = getTaskStatusFileFromId(taskId);
|
||||||
|
return streamTaskFile(path, 0);
|
||||||
|
}
|
||||||
|
|
||||||
private Optional<InputStream> streamTaskFile(final Path path, final long offset) throws IOException
|
private Optional<InputStream> streamTaskFile(final Path path, final long offset) throws IOException
|
||||||
{
|
{
|
||||||
final FileSystem fs = path.getFileSystem(hadoopConfig);
|
final FileSystem fs = path.getFileSystem(hadoopConfig);
|
||||||
|
@ -139,6 +155,15 @@ public class HdfsTaskLogs implements TaskLogs
|
||||||
return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".reports.json"));
|
return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".reports.json"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
|
||||||
|
* path names. So we format paths differently for HDFS.
|
||||||
|
*/
|
||||||
|
private Path getTaskStatusFileFromId(String taskId)
|
||||||
|
{
|
||||||
|
return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".status.json"));
|
||||||
|
}
|
||||||
|
|
||||||
// some hadoop version Path.mergePaths does not exist
|
// some hadoop version Path.mergePaths does not exist
|
||||||
private static String mergePaths(String path1, String path2)
|
private static String mergePaths(String path1, String path2)
|
||||||
{
|
{
|
||||||
|
|
|
@ -78,6 +78,23 @@ public class HdfsTaskLogsTest
|
||||||
Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0));
|
Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_taskStatus() throws Exception
|
||||||
|
{
|
||||||
|
final File tmpDir = tempFolder.newFolder();
|
||||||
|
final File logDir = new File(tmpDir, "logs");
|
||||||
|
final File statusFile = new File(tmpDir, "status.json");
|
||||||
|
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
|
||||||
|
|
||||||
|
|
||||||
|
Files.write("{}", statusFile, StandardCharsets.UTF_8);
|
||||||
|
taskLogs.pushTaskStatus("id", statusFile);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"{}",
|
||||||
|
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus("id").get()))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testKill() throws Exception
|
public void testKill() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -77,6 +77,13 @@ public class S3TaskLogs implements TaskLogs
|
||||||
return streamTaskFile(0, taskKey);
|
return streamTaskFile(0, taskKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<InputStream> streamTaskStatus(String taskid) throws IOException
|
||||||
|
{
|
||||||
|
final String taskKey = getTaskLogKey(taskid, "status.json");
|
||||||
|
return streamTaskFile(0, taskKey);
|
||||||
|
}
|
||||||
|
|
||||||
private Optional<InputStream> streamTaskFile(final long offset, String taskKey) throws IOException
|
private Optional<InputStream> streamTaskFile(final long offset, String taskKey) throws IOException
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
@ -141,6 +148,14 @@ public class S3TaskLogs implements TaskLogs
|
||||||
pushTaskFile(reportFile, taskKey);
|
pushTaskFile(reportFile, taskKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pushTaskStatus(String taskid, File statusFile) throws IOException
|
||||||
|
{
|
||||||
|
final String taskKey = getTaskLogKey(taskid, "status.json");
|
||||||
|
log.info("Pushing task status %s to: %s", statusFile, taskKey);
|
||||||
|
pushTaskFile(statusFile, taskKey);
|
||||||
|
}
|
||||||
|
|
||||||
private void pushTaskFile(final File logFile, String taskKey) throws IOException
|
private void pushTaskFile(final File logFile, String taskKey) throws IOException
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -76,6 +76,7 @@ public class S3TaskLogsTest extends EasyMockSupport
|
||||||
private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException());
|
private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException());
|
||||||
private static final String LOG_CONTENTS = "log_contents";
|
private static final String LOG_CONTENTS = "log_contents";
|
||||||
private static final String REPORT_CONTENTS = "report_contents";
|
private static final String REPORT_CONTENTS = "report_contents";
|
||||||
|
private static final String STATUS_CONTENTS = "status_contents";
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private CurrentTimeMillisSupplier timeSupplier;
|
private CurrentTimeMillisSupplier timeSupplier;
|
||||||
|
@ -115,6 +116,31 @@ public class S3TaskLogsTest extends EasyMockSupport
|
||||||
);
|
);
|
||||||
Assert.assertEquals("The Grant should have full control permission", Permission.FullControl, grant.getPermission());
|
Assert.assertEquals("The Grant should have full control permission", Permission.FullControl, grant.getPermission());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_pushTaskStatus() throws IOException
|
||||||
|
{
|
||||||
|
EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class)))
|
||||||
|
.andReturn(new PutObjectResult())
|
||||||
|
.once();
|
||||||
|
|
||||||
|
EasyMock.replay(s3Client);
|
||||||
|
|
||||||
|
S3TaskLogsConfig config = new S3TaskLogsConfig();
|
||||||
|
config.setS3Bucket(TEST_BUCKET);
|
||||||
|
config.setDisableAcl(true);
|
||||||
|
|
||||||
|
CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
|
||||||
|
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
|
||||||
|
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
|
||||||
|
|
||||||
|
String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
|
||||||
|
File logFile = tempFolder.newFile("status.json");
|
||||||
|
|
||||||
|
s3TaskLogs.pushTaskLog(taskId, logFile);
|
||||||
|
|
||||||
|
EasyMock.verify(s3Client);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
|
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
|
||||||
|
@ -434,6 +460,32 @@ public class S3TaskLogsTest extends EasyMockSupport
|
||||||
Assert.assertEquals(REPORT_CONTENTS, report);
|
Assert.assertEquals(REPORT_CONTENTS, report);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_status_fetch() throws IOException
|
||||||
|
{
|
||||||
|
EasyMock.reset(s3Client);
|
||||||
|
String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json";
|
||||||
|
ObjectMetadata objectMetadata = new ObjectMetadata();
|
||||||
|
objectMetadata.setContentLength(STATUS_CONTENTS.length());
|
||||||
|
EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
|
||||||
|
S3Object s3Object = new S3Object();
|
||||||
|
s3Object.setObjectContent(new ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath);
|
||||||
|
getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1);
|
||||||
|
getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
|
||||||
|
EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
|
||||||
|
EasyMock.replay(s3Client);
|
||||||
|
|
||||||
|
S3TaskLogs s3TaskLogs = getS3TaskLogs();
|
||||||
|
|
||||||
|
Optional<InputStream> inputStreamOptional = s3TaskLogs.streamTaskStatus(KEY_1);
|
||||||
|
String report = new BufferedReader(
|
||||||
|
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
|
||||||
|
.lines()
|
||||||
|
.collect(Collectors.joining("\n"));
|
||||||
|
|
||||||
|
Assert.assertEquals(STATUS_CONTENTS, report);
|
||||||
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
private S3TaskLogs getS3TaskLogs()
|
private S3TaskLogs getS3TaskLogs()
|
||||||
|
|
|
@ -50,6 +50,7 @@ import javax.annotation.Nullable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -94,7 +95,9 @@ public abstract class AbstractTask implements Task
|
||||||
private final String dataSource;
|
private final String dataSource;
|
||||||
|
|
||||||
private final Map<String, Object> context;
|
private final Map<String, Object> context;
|
||||||
|
|
||||||
private File reportsFile;
|
private File reportsFile;
|
||||||
|
private File statusFile;
|
||||||
|
|
||||||
private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
|
private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
|
||||||
|
|
||||||
|
@ -147,6 +150,7 @@ public abstract class AbstractTask implements Task
|
||||||
File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile();
|
File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile();
|
||||||
FileUtils.mkdirp(attemptDir);
|
FileUtils.mkdirp(attemptDir);
|
||||||
reportsFile = new File(attemptDir, "report.json");
|
reportsFile = new File(attemptDir, "report.json");
|
||||||
|
statusFile = new File(attemptDir, "status.json");
|
||||||
InetAddress hostName = InetAddress.getLocalHost();
|
InetAddress hostName = InetAddress.getLocalHost();
|
||||||
DruidNode node = toolbox.getTaskExecutorNode();
|
DruidNode node = toolbox.getTaskExecutorNode();
|
||||||
toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.create(
|
toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.create(
|
||||||
|
@ -160,48 +164,55 @@ public abstract class AbstractTask implements Task
|
||||||
@Override
|
@Override
|
||||||
public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
|
public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
|
||||||
{
|
{
|
||||||
boolean failure = false;
|
TaskStatus taskStatus = TaskStatus.running(getId());
|
||||||
try {
|
try {
|
||||||
String errorMessage = setup(taskToolbox);
|
String errorMessage = setup(taskToolbox);
|
||||||
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
|
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
|
||||||
return TaskStatus.failure(getId(), errorMessage);
|
return TaskStatus.failure(getId(), errorMessage);
|
||||||
}
|
}
|
||||||
TaskStatus taskStatus = runTask(taskToolbox);
|
taskStatus = runTask(taskToolbox);
|
||||||
if (taskStatus.isFailure()) {
|
|
||||||
failure = true;
|
|
||||||
}
|
|
||||||
return taskStatus;
|
return taskStatus;
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
failure = true;
|
taskStatus = TaskStatus.failure(getId(), e.toString());
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
cleanUp(taskToolbox, failure);
|
cleanUp(taskToolbox, taskStatus);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;
|
public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;
|
||||||
|
|
||||||
public void cleanUp(TaskToolbox toolbox, boolean failure) throws Exception
|
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
|
||||||
{
|
{
|
||||||
if (toolbox.getConfig().isEncapsulatedTask()) {
|
if (!toolbox.getConfig().isEncapsulatedTask()) {
|
||||||
// report back to the overlord
|
|
||||||
UpdateStatusAction status = new UpdateStatusAction("successful");
|
|
||||||
if (failure) {
|
|
||||||
status = new UpdateStatusAction("failure");
|
|
||||||
}
|
|
||||||
toolbox.getTaskActionClient().submit(status);
|
|
||||||
toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.unknown()));
|
|
||||||
|
|
||||||
if (reportsFile != null && reportsFile.exists()) {
|
|
||||||
toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
|
|
||||||
log.debug("Pushed task reports");
|
|
||||||
} else {
|
|
||||||
log.debug("No task reports file exists to push");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.debug("Not pushing task logs and reports from task.");
|
log.debug("Not pushing task logs and reports from task.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// report back to the overlord
|
||||||
|
UpdateStatusAction status = new UpdateStatusAction("successful");
|
||||||
|
if (taskStatus.isFailure()) {
|
||||||
|
status = new UpdateStatusAction("failure");
|
||||||
|
}
|
||||||
|
toolbox.getTaskActionClient().submit(status);
|
||||||
|
toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.unknown()));
|
||||||
|
|
||||||
|
if (reportsFile != null && reportsFile.exists()) {
|
||||||
|
toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
|
||||||
|
log.debug("Pushed task reports");
|
||||||
|
} else {
|
||||||
|
log.debug("No task reports file exists to push");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (statusFile != null) {
|
||||||
|
toolbox.getJsonMapper().writeValue(statusFile, taskStatus);
|
||||||
|
toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
|
||||||
|
Files.deleteIfExists(statusFile.toPath());
|
||||||
|
log.debug("Pushed task status");
|
||||||
|
} else {
|
||||||
|
log.debug("No task status file exists to push");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,12 +292,12 @@ public abstract class AbstractTask implements Task
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return "AbstractTask{" +
|
return "AbstractTask{" +
|
||||||
"id='" + id + '\'' +
|
"id='" + id + '\'' +
|
||||||
", groupId='" + groupId + '\'' +
|
", groupId='" + groupId + '\'' +
|
||||||
", taskResource=" + taskResource +
|
", taskResource=" + taskResource +
|
||||||
", dataSource='" + dataSource + '\'' +
|
", dataSource='" + dataSource + '\'' +
|
||||||
", context=" + context +
|
", context=" + context +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
public TaskStatus success()
|
public TaskStatus success()
|
||||||
|
@ -372,8 +383,8 @@ public abstract class AbstractTask implements Task
|
||||||
protected static IngestionMode computeBatchIngestionMode(@Nullable BatchIOConfig ioConfig)
|
protected static IngestionMode computeBatchIngestionMode(@Nullable BatchIOConfig ioConfig)
|
||||||
{
|
{
|
||||||
final boolean isAppendToExisting = ioConfig == null
|
final boolean isAppendToExisting = ioConfig == null
|
||||||
? BatchIOConfig.DEFAULT_APPEND_EXISTING
|
? BatchIOConfig.DEFAULT_APPEND_EXISTING
|
||||||
: ioConfig.isAppendToExisting();
|
: ioConfig.isAppendToExisting();
|
||||||
final boolean isDropExisting = ioConfig == null ? BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting();
|
final boolean isDropExisting = ioConfig == null ? BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting();
|
||||||
return computeIngestionMode(isAppendToExisting, isDropExisting);
|
return computeIngestionMode(isAppendToExisting, isDropExisting);
|
||||||
}
|
}
|
||||||
|
@ -388,7 +399,7 @@ public abstract class AbstractTask implements Task
|
||||||
return IngestionMode.REPLACE_LEGACY;
|
return IngestionMode.REPLACE_LEGACY;
|
||||||
}
|
}
|
||||||
throw new IAE("Cannot simultaneously replace and append to existing segments. "
|
throw new IAE("Cannot simultaneously replace and append to existing segments. "
|
||||||
+ "Either dropExisting or appendToExisting should be set to false");
|
+ "Either dropExisting or appendToExisting should be set to false");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void emitMetric(
|
public void emitMetric(
|
||||||
|
|
|
@ -65,6 +65,15 @@ public class FileTaskLogs implements TaskLogs
|
||||||
log.info("Wrote task report to: %s", outputFile);
|
log.info("Wrote task report to: %s", outputFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pushTaskStatus(String taskid, File statusFile) throws IOException
|
||||||
|
{
|
||||||
|
FileUtils.mkdirp(config.getDirectory());
|
||||||
|
final File outputFile = fileForTask(taskid, statusFile.getName());
|
||||||
|
Files.copy(statusFile, outputFile);
|
||||||
|
log.info("Wrote task status to: %s", outputFile);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
|
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
|
||||||
{
|
{
|
||||||
|
@ -87,6 +96,17 @@ public class FileTaskLogs implements TaskLogs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<InputStream> streamTaskStatus(final String taskid) throws IOException
|
||||||
|
{
|
||||||
|
final File file = fileForTask(taskid, "status.json");
|
||||||
|
if (file.exists()) {
|
||||||
|
return Optional.of(LogUtils.streamFile(file, 0));
|
||||||
|
} else {
|
||||||
|
return Optional.absent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private File fileForTask(final String taskid, String filename)
|
private File fileForTask(final String taskid, String filename)
|
||||||
{
|
{
|
||||||
return new File(config.getDirectory(), StringUtils.format("%s.%s", taskid, filename));
|
return new File(config.getDirectory(), StringUtils.format("%s.%s", taskid, filename));
|
||||||
|
|
|
@ -19,15 +19,18 @@
|
||||||
|
|
||||||
package org.apache.druid.indexing.common.task;
|
package org.apache.druid.indexing.common.task;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.druid.indexer.TaskStatus;
|
import org.apache.druid.indexer.TaskStatus;
|
||||||
import org.apache.druid.indexing.common.TaskToolbox;
|
import org.apache.druid.indexing.common.TaskToolbox;
|
||||||
|
import org.apache.druid.indexing.common.TestUtils;
|
||||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||||
import org.apache.druid.indexing.common.actions.UpdateStatusAction;
|
import org.apache.druid.indexing.common.actions.UpdateStatusAction;
|
||||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||||
import org.apache.druid.server.DruidNode;
|
import org.apache.druid.server.DruidNode;
|
||||||
import org.apache.druid.tasklogs.TaskLogPusher;
|
import org.apache.druid.tasklogs.TaskLogPusher;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
@ -48,10 +51,17 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class AbstractTaskTest
|
public class AbstractTaskTest
|
||||||
{
|
{
|
||||||
|
private ObjectMapper objectMapper;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup()
|
||||||
|
{
|
||||||
|
objectMapper = new TestUtils().getTestObjectMapper();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetupAndCleanupIsCalledWtihParameter() throws Exception
|
public void testSetupAndCleanupIsCalledWtihParameter() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -73,6 +83,7 @@ public class AbstractTaskTest
|
||||||
File folder = temporaryFolder.newFolder();
|
File folder = temporaryFolder.newFolder();
|
||||||
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
|
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
|
||||||
when(toolbox.getConfig()).thenReturn(config);
|
when(toolbox.getConfig()).thenReturn(config);
|
||||||
|
when(toolbox.getJsonMapper()).thenReturn(objectMapper);
|
||||||
|
|
||||||
TaskActionClient taskActionClient = mock(TaskActionClient.class);
|
TaskActionClient taskActionClient = mock(TaskActionClient.class);
|
||||||
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
|
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
|
||||||
|
@ -89,7 +100,9 @@ public class AbstractTaskTest
|
||||||
String result = super.setup(toolbox);
|
String result = super.setup(toolbox);
|
||||||
File attemptDir = Paths.get(folder.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile();
|
File attemptDir = Paths.get(folder.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile();
|
||||||
File reportsDir = new File(attemptDir, "report.json");
|
File reportsDir = new File(attemptDir, "report.json");
|
||||||
|
File statusDir = new File(attemptDir, "status.json");
|
||||||
FileUtils.write(reportsDir, "foo", StandardCharsets.UTF_8);
|
FileUtils.write(reportsDir, "foo", StandardCharsets.UTF_8);
|
||||||
|
FileUtils.write(statusDir, "{}", StandardCharsets.UTF_8);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -98,6 +111,7 @@ public class AbstractTaskTest
|
||||||
// call it 3 times, once to update location in setup, then one for status and location in cleanup
|
// call it 3 times, once to update location in setup, then one for status and location in cleanup
|
||||||
Mockito.verify(taskActionClient, times(3)).submit(any());
|
Mockito.verify(taskActionClient, times(3)).submit(any());
|
||||||
verify(pusher, times(1)).pushTaskReports(eq("myID"), any());
|
verify(pusher, times(1)).pushTaskReports(eq("myID"), any());
|
||||||
|
verify(pusher, times(1)).pushTaskStatus(eq("myID"), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -117,6 +131,7 @@ public class AbstractTaskTest
|
||||||
File folder = temporaryFolder.newFolder();
|
File folder = temporaryFolder.newFolder();
|
||||||
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
|
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
|
||||||
when(toolbox.getConfig()).thenReturn(config);
|
when(toolbox.getConfig()).thenReturn(config);
|
||||||
|
when(toolbox.getJsonMapper()).thenReturn(objectMapper);
|
||||||
|
|
||||||
TaskActionClient taskActionClient = mock(TaskActionClient.class);
|
TaskActionClient taskActionClient = mock(TaskActionClient.class);
|
||||||
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
|
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
|
||||||
|
@ -161,6 +176,7 @@ public class AbstractTaskTest
|
||||||
File folder = temporaryFolder.newFolder();
|
File folder = temporaryFolder.newFolder();
|
||||||
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
|
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
|
||||||
when(toolbox.getConfig()).thenReturn(config);
|
when(toolbox.getConfig()).thenReturn(config);
|
||||||
|
when(toolbox.getJsonMapper()).thenReturn(objectMapper);
|
||||||
|
|
||||||
TaskActionClient taskActionClient = mock(TaskActionClient.class);
|
TaskActionClient taskActionClient = mock(TaskActionClient.class);
|
||||||
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
|
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
|
||||||
|
@ -169,7 +185,7 @@ public class AbstractTaskTest
|
||||||
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
|
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public TaskStatus runTask(TaskToolbox toolbox)
|
public TaskStatus runTask(TaskToolbox toolbox)
|
||||||
{
|
{
|
||||||
return TaskStatus.failure("myId", "failed");
|
return TaskStatus.failure("myId", "failed");
|
||||||
}
|
}
|
||||||
|
|
|
@ -244,7 +244,7 @@ public class TaskMonitorTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanUp(TaskToolbox toolbox, boolean failure)
|
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
|
||||||
{
|
{
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
|
import org.apache.druid.indexer.TaskStatus;
|
||||||
import org.apache.druid.indexing.common.TaskReport;
|
import org.apache.druid.indexing.common.TaskReport;
|
||||||
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
|
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
|
||||||
import org.apache.druid.java.util.common.FileUtils;
|
import org.apache.druid.java.util.common.FileUtils;
|
||||||
|
@ -95,6 +96,28 @@ public class FileTaskLogsTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleStatus() throws Exception
|
||||||
|
{
|
||||||
|
final ObjectMapper mapper = TestHelper.makeJsonMapper();
|
||||||
|
final File tmpDir = temporaryFolder.newFolder();
|
||||||
|
final File logDir = new File(tmpDir, "druid/myTask");
|
||||||
|
final File statusFile = new File(tmpDir, "status.json");
|
||||||
|
|
||||||
|
final String taskId = "myTask";
|
||||||
|
final TaskStatus taskStatus = TaskStatus.success(taskId);
|
||||||
|
final String taskStatusString = mapper.writeValueAsString(taskStatus);
|
||||||
|
Files.write(taskStatusString, statusFile, StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
|
||||||
|
taskLogs.pushTaskStatus(taskId, statusFile);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
taskStatusString,
|
||||||
|
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus(taskId).get()))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPushTaskLogDirCreationFails() throws Exception
|
public void testPushTaskLogDirCreationFails() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -163,7 +163,7 @@ public class SingleTaskBackgroundRunnerTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanUp(TaskToolbox toolbox, boolean failure)
|
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
|
||||||
{
|
{
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
@ -181,10 +181,10 @@ public class SingleTaskBackgroundRunnerTest
|
||||||
|
|
||||||
final QueryRunner<ScanResultValue> queryRunner =
|
final QueryRunner<ScanResultValue> queryRunner =
|
||||||
Druids.newScanQueryBuilder()
|
Druids.newScanQueryBuilder()
|
||||||
.dataSource("foo")
|
.dataSource("foo")
|
||||||
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
|
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
|
||||||
.build()
|
.build()
|
||||||
.getRunner(runner);
|
.getRunner(runner);
|
||||||
|
|
||||||
Assert.assertThat(queryRunner, CoreMatchers.instanceOf(SetAndVerifyContextQueryRunner.class));
|
Assert.assertThat(queryRunner, CoreMatchers.instanceOf(SetAndVerifyContextQueryRunner.class));
|
||||||
}
|
}
|
||||||
|
@ -262,7 +262,7 @@ public class SingleTaskBackgroundRunnerTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanUp(TaskToolbox toolbox, boolean failure)
|
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
|
||||||
{
|
{
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
@ -384,7 +384,7 @@ public class SingleTaskBackgroundRunnerTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanUp(TaskToolbox toolbox, boolean failure)
|
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
|
||||||
{
|
{
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
|
@ -412,10 +412,10 @@ public class TaskQueueTest extends IngestionTestBase
|
||||||
TaskLocation.create("worker", 1, 2)
|
TaskLocation.create("worker", 1, 2)
|
||||||
), workerHolder);
|
), workerHolder);
|
||||||
while (!taskRunner.getRunningTasks()
|
while (!taskRunner.getRunningTasks()
|
||||||
.stream()
|
.stream()
|
||||||
.map(TaskRunnerWorkItem::getTaskId)
|
.map(TaskRunnerWorkItem::getTaskId)
|
||||||
.collect(Collectors.toList())
|
.collect(Collectors.toList())
|
||||||
.contains(task.getId())) {
|
.contains(task.getId())) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
taskQueue.shutdown(task.getId(), "shutdown");
|
taskQueue.shutdown(task.getId(), "shutdown");
|
||||||
|
@ -435,7 +435,7 @@ public class TaskQueueTest extends IngestionTestBase
|
||||||
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
|
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
|
||||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||||
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
|
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
|
||||||
.andReturn(druidNodeDiscovery);
|
.andReturn(druidNodeDiscovery);
|
||||||
EasyMock.replay(druidNodeDiscoveryProvider);
|
EasyMock.replay(druidNodeDiscoveryProvider);
|
||||||
TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class);
|
TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class);
|
||||||
for (String taskId : runningTasks) {
|
for (String taskId : runningTasks) {
|
||||||
|
@ -520,7 +520,7 @@ public class TaskQueueTest extends IngestionTestBase
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanUp(TaskToolbox toolbox, boolean failure)
|
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
|
||||||
{
|
{
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,12 @@ public class NoopTaskLogs implements TaskLogs
|
||||||
log.info("Not pushing reports for task: %s", taskid);
|
log.info("Not pushing reports for task: %s", taskid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pushTaskStatus(String taskid, File statusFile)
|
||||||
|
{
|
||||||
|
log.info("Not pushing status for task: %s", taskid);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void killAll()
|
public void killAll()
|
||||||
{
|
{
|
||||||
|
|
|
@ -35,4 +35,8 @@ public interface TaskLogPusher
|
||||||
default void pushTaskReports(String taskid, File reportFile) throws IOException
|
default void pushTaskReports(String taskid, File reportFile) throws IOException
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default void pushTaskStatus(String taskid, File reportFile) throws IOException
|
||||||
|
{
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,4 +44,9 @@ public interface TaskLogStreamer
|
||||||
{
|
{
|
||||||
return Optional.absent();
|
return Optional.absent();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default Optional<InputStream> streamTaskStatus(final String taskid) throws IOException
|
||||||
|
{
|
||||||
|
return Optional.absent();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.tasklogs;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class NoopTaskLogsTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void test_streamTaskStatus() throws IOException
|
||||||
|
{
|
||||||
|
TaskLogs taskLogs = new NoopTaskLogs();
|
||||||
|
Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.tasklogs;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class TaskLogPusherTest
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Test default implementation of pushTaskStatus in TaskLogPusher interface for code coverage
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_pushTaskStatus() throws IOException
|
||||||
|
{
|
||||||
|
TaskLogPusher taskLogPusher = new TaskLogPusher() {
|
||||||
|
@Override
|
||||||
|
public void pushTaskLog(String taskid, File logFile)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
};
|
||||||
|
taskLogPusher.pushTaskStatus("id", new File(""));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.tasklogs;
|
||||||
|
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
public class TaskLogStreamerTest
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Test default implemenation of streamTaskStatus in TaskLogStreamer interface for code coverage
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_streamTaskStatus() throws IOException
|
||||||
|
{
|
||||||
|
TaskLogStreamer taskLogStreamer = new TaskLogStreamer() {
|
||||||
|
@Override
|
||||||
|
public Optional<InputStream> streamTaskLog(String taskid, long offset)
|
||||||
|
{
|
||||||
|
return Optional.absent();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Assert.assertFalse(taskLogStreamer.streamTaskStatus("id").isPresent());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue