mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Kill Hadoop MR task on kill of Hadoop ingestion task (#6828)
* KillTask from overlord UI now makes sure that it terminates the underlying MR job, thus saving unnecessary compute Run in jobby is now split into 2 1. submitAndGetHadoopJobId followed by 2. run submitAndGetHadoopJobId is responsible for submitting the job and returning the jobId as a string, run monitors this job for completion JobHelper writes this jobId in the path provided by HadoopIndexTask which in turn is provided by the ForkingTaskRunner HadoopIndexTask reads this path when kill task is clicked to get hte jobId and fire the kill command via the yarn api. This is taken care in the stopGracefully method which is called in SingleTaskBackgroundRunner. Have enabled `canRestore` method to return `true` for HadoopIndexTask in order for the stopGracefully method to be called Hadoop*Job files have been changed to incorporate the changes to jobby * Addressing PR comments * Addressing PR comments - Fix taskDir * Addressing PR comments - For changing the contract of Task.stopGracefully() `SingleTaskBackgroundRunner` calls stopGracefully in stop() and then checks for canRestore condition to return the status of the task * Addressing PR comments 1. Formatting 2. Removing `submitAndGetHadoopJobId` from `Jobby` and calling writeJobIdToFile in the job itself * Addressing PR comments 1. POM change. Moving hadoop dependency to indexing-hadoop * Addressing PR comments 1. stopGracefully now accepts TaskConfig as a param Handling isRestoreOnRestart in stopGracefully for `AppenderatorDriverRealtimeIndexTask, RealtimeIndexTask, SeekableStreamIndexTask` Changing tests to make TaskConfig param isRestoreOnRestart to true
This commit is contained in:
parent
e1033bb412
commit
8492d94f59
@ -345,7 +345,7 @@ public class KafkaIndexTaskTest
|
||||
{
|
||||
synchronized (runningTasks) {
|
||||
for (Task task : runningTasks) {
|
||||
task.stopGracefully();
|
||||
task.stopGracefully(toolboxFactory.build(task).getConfig());
|
||||
}
|
||||
|
||||
runningTasks.clear();
|
||||
@ -1848,7 +1848,7 @@ public class KafkaIndexTaskTest
|
||||
Assert.assertEquals(2, countEvents(task1));
|
||||
|
||||
// Stop without publishing segment
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(toolboxFactory.build(task1).getConfig());
|
||||
unlockAppenderatorBasePersistDirForTask(task1);
|
||||
|
||||
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
|
||||
@ -2339,7 +2339,7 @@ public class KafkaIndexTaskTest
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
false,
|
||||
true,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
@ -331,7 +331,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
||||
{
|
||||
synchronized (runningTasks) {
|
||||
for (Task task : runningTasks) {
|
||||
task.stopGracefully();
|
||||
task.stopGracefully(toolboxFactory.build(task).getConfig());
|
||||
}
|
||||
|
||||
runningTasks.clear();
|
||||
@ -2126,7 +2126,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
||||
Assert.assertEquals(2, countEvents(task1));
|
||||
|
||||
// Stop without publishing segment
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(toolboxFactory.build(task1).getConfig());
|
||||
unlockAppenderatorBasePersistDirForTask(task1);
|
||||
|
||||
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
|
||||
@ -2651,7 +2651,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
false,
|
||||
true,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
@ -130,6 +130,22 @@
|
||||
<version>${hadoop.compile.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-server</artifactId>
|
||||
|
@ -75,6 +75,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private String failureCause;
|
||||
private Job groupByJob;
|
||||
private long startTime;
|
||||
|
||||
public DetermineHashedPartitionsJob(
|
||||
HadoopDruidIndexerConfig config
|
||||
@ -91,7 +92,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
||||
* Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear
|
||||
* in the final segment.
|
||||
*/
|
||||
final long startTime = System.currentTimeMillis();
|
||||
startTime = System.currentTimeMillis();
|
||||
groupByJob = Job.getInstance(
|
||||
new Configuration(),
|
||||
StringUtils.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals())
|
||||
@ -125,6 +126,11 @@ public class DetermineHashedPartitionsJob implements Jobby
|
||||
groupByJob.submit();
|
||||
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
|
||||
|
||||
// Store the jobId in the file
|
||||
if (groupByJob.getJobID() != null) {
|
||||
JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), groupByJob.getJobID().toString());
|
||||
}
|
||||
|
||||
if (!groupByJob.waitForCompletion(true)) {
|
||||
log.error("Job failed: %s", groupByJob.getJobID());
|
||||
failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
|
||||
|
@ -161,6 +161,12 @@ public class DeterminePartitionsJob implements Jobby
|
||||
groupByJob.submit();
|
||||
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
|
||||
|
||||
// Store the jobId in the file
|
||||
if (groupByJob.getJobID() != null) {
|
||||
JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), groupByJob.getJobID().toString());
|
||||
}
|
||||
|
||||
|
||||
if (!groupByJob.waitForCompletion(true)) {
|
||||
log.error("Job failed: %s", groupByJob.getJobID());
|
||||
failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
|
||||
@ -218,6 +224,12 @@ public class DeterminePartitionsJob implements Jobby
|
||||
dimSelectionJob.getTrackingURL()
|
||||
);
|
||||
|
||||
// Store the jobId in the file
|
||||
if (dimSelectionJob.getJobID() != null) {
|
||||
JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), dimSelectionJob.getJobID().toString());
|
||||
}
|
||||
|
||||
|
||||
if (!dimSelectionJob.waitForCompletion(true)) {
|
||||
log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
|
||||
failureCause = Utils.getFailureMessage(dimSelectionJob, config.JSON_MAPPER);
|
||||
|
@ -39,6 +39,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
|
||||
private static final Logger log = new Logger(HadoopDruidDetermineConfigurationJob.class);
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private Jobby job;
|
||||
private String hadoopJobIdFile;
|
||||
|
||||
@Inject
|
||||
public HadoopDruidDetermineConfigurationJob(
|
||||
@ -55,6 +56,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
|
||||
|
||||
if (config.isDeterminingPartitions()) {
|
||||
job = config.getPartitionsSpec().getPartitionJob(config);
|
||||
config.setHadoopJobIdFileName(hadoopJobIdFile);
|
||||
return JobHelper.runSingleJob(job, config);
|
||||
} else {
|
||||
int shardsPerInterval = config.getPartitionsSpec().getNumShards();
|
||||
@ -109,4 +111,9 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
|
||||
|
||||
return job.getErrorMessage();
|
||||
}
|
||||
|
||||
public void setHadoopJobIdFile(String hadoopJobIdFile)
|
||||
{
|
||||
this.hadoopJobIdFile = hadoopJobIdFile;
|
||||
}
|
||||
}
|
||||
|
@ -212,6 +212,7 @@ public class HadoopDruidIndexerConfig
|
||||
|
||||
private HadoopIngestionSpec schema;
|
||||
private PathSpec pathSpec;
|
||||
private String hadoopJobIdFileName;
|
||||
private final Map<Long, ShardSpecLookup> shardSpecLookups = new HashMap<>();
|
||||
private final Map<Long, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = new HashMap<>();
|
||||
private final Granularity rollupGran;
|
||||
@ -375,6 +376,16 @@ public class HadoopDruidIndexerConfig
|
||||
return schema.getTuningConfig().getMaxParseExceptions();
|
||||
}
|
||||
|
||||
public void setHadoopJobIdFileName(String hadoopJobIdFileName)
|
||||
{
|
||||
this.hadoopJobIdFileName = hadoopJobIdFileName;
|
||||
}
|
||||
|
||||
public String getHadoopJobIdFileName()
|
||||
{
|
||||
return hadoopJobIdFileName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
|
||||
* or via injected system properties) before this method is called. The {@link PathSpec} may
|
||||
|
@ -38,6 +38,7 @@ public class HadoopDruidIndexerJob implements Jobby
|
||||
private final MetadataStorageUpdaterJob metadataStorageUpdaterJob;
|
||||
private IndexGeneratorJob indexJob;
|
||||
private volatile List<DataSegment> publishedSegments = null;
|
||||
private String hadoopJobIdFile;
|
||||
|
||||
@Inject
|
||||
public HadoopDruidIndexerJob(
|
||||
@ -92,7 +93,7 @@ public class HadoopDruidIndexerJob implements Jobby
|
||||
}
|
||||
);
|
||||
|
||||
|
||||
config.setHadoopJobIdFileName(hadoopJobIdFile);
|
||||
return JobHelper.runJobs(jobs, config);
|
||||
}
|
||||
|
||||
@ -124,4 +125,9 @@ public class HadoopDruidIndexerJob implements Jobby
|
||||
}
|
||||
return publishedSegments;
|
||||
}
|
||||
|
||||
public void setHadoopJobIdFile(String hadoopJobIdFile)
|
||||
{
|
||||
this.hadoopJobIdFile = hadoopJobIdFile;
|
||||
}
|
||||
}
|
||||
|
@ -208,6 +208,11 @@ public class IndexGeneratorJob implements Jobby
|
||||
job.submit();
|
||||
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
|
||||
|
||||
// Store the jobId in the file
|
||||
if (job.getJobID() != null) {
|
||||
JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), job.getJobID().toString());
|
||||
}
|
||||
|
||||
boolean success = job.waitForCompletion(true);
|
||||
|
||||
Counters counters = job.getCounters();
|
||||
|
@ -59,8 +59,10 @@ import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -347,6 +349,24 @@ public class JobHelper
|
||||
}
|
||||
}
|
||||
|
||||
public static void writeJobIdToFile(String hadoopJobIdFileName, String hadoopJobId)
|
||||
{
|
||||
if (hadoopJobId != null && hadoopJobIdFileName != null) {
|
||||
try {
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(
|
||||
new OutputStreamWriter(new FileOutputStream(new File(hadoopJobIdFileName)), StandardCharsets.UTF_8),
|
||||
hadoopJobId
|
||||
);
|
||||
log.info("MR job id [%s] is written to the file [%s]", hadoopJobId, hadoopJobIdFileName);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Error writing job id [%s] to the file [%s]", hadoopJobId, hadoopJobIdFileName);
|
||||
}
|
||||
} else {
|
||||
log.info("Either job id or file name is null for the submitted job. Skipping writing the file [%s]", hadoopJobIdFileName);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config)
|
||||
{
|
||||
boolean succeeded = job.run();
|
||||
|
@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus;
|
||||
import org.apache.druid.indexing.common.TaskLock;
|
||||
import org.apache.druid.indexing.common.actions.LockListAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryRunner;
|
||||
@ -153,11 +154,14 @@ public abstract class AbstractTask implements Task
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should be called independent of canRestore so that resource cleaning can be achieved.
|
||||
* If resource cleaning is required, concrete class should override this method
|
||||
*/
|
||||
@Override
|
||||
public void stopGracefully()
|
||||
public void stopGracefully(TaskConfig taskConfig)
|
||||
{
|
||||
// Should not be called when canRestore = false.
|
||||
throw new UnsupportedOperationException("Cannot stop gracefully");
|
||||
// Do nothing and let the concrete class handle it
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -50,6 +50,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
|
||||
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
|
||||
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
|
||||
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
|
||||
@ -401,29 +402,31 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopGracefully()
|
||||
public void stopGracefully(TaskConfig taskConfig)
|
||||
{
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (!gracefullyStopped) {
|
||||
gracefullyStopped = true;
|
||||
if (firehose == null) {
|
||||
log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
|
||||
} else if (finishingJob) {
|
||||
log.info("stopGracefully: Interrupting finishJob.");
|
||||
runThread.interrupt();
|
||||
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
|
||||
log.info("stopGracefully: Draining firehose.");
|
||||
firehose.close();
|
||||
} else {
|
||||
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
|
||||
runThread.interrupt();
|
||||
if (taskConfig.isRestoreTasksOnRestart()) {
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (!gracefullyStopped) {
|
||||
gracefullyStopped = true;
|
||||
if (firehose == null) {
|
||||
log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
|
||||
} else if (finishingJob) {
|
||||
log.info("stopGracefully: Interrupting finishJob.");
|
||||
runThread.interrupt();
|
||||
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
|
||||
log.info("stopGracefully: Draining firehose.");
|
||||
firehose.close();
|
||||
} else {
|
||||
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
|
||||
runThread.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,6 +48,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.LockAcquireAction;
|
||||
import org.apache.druid.indexing.common.actions.LockTryAcquireAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
|
||||
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
@ -59,6 +60,8 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
|
||||
import org.apache.druid.server.security.Action;
|
||||
import org.apache.druid.server.security.AuthorizerMapper;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
@ -69,6 +72,7 @@ import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.io.File;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
@ -80,6 +84,8 @@ import java.util.SortedSet;
|
||||
public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
{
|
||||
private static final Logger log = new Logger(HadoopIndexTask.class);
|
||||
private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json";
|
||||
private TaskConfig taskConfig = null;
|
||||
|
||||
private static String getTheDataSource(HadoopIngestionSpec spec)
|
||||
{
|
||||
@ -218,10 +224,16 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
return classpathPrefix;
|
||||
}
|
||||
|
||||
public String getHadoopJobIdFileName()
|
||||
{
|
||||
return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
try {
|
||||
taskConfig = toolbox.getConfig();
|
||||
if (chatHandlerProvider.isPresent()) {
|
||||
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
|
||||
chatHandlerProvider.get().register(getId(), this, false);
|
||||
@ -259,6 +271,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
@SuppressWarnings("unchecked")
|
||||
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
String hadoopJobIdFile = getHadoopJobIdFileName();
|
||||
final ClassLoader loader = buildClassLoader(toolbox);
|
||||
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
|
||||
|
||||
@ -277,7 +290,8 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
String[] determinePartitionsInput = new String[]{
|
||||
toolbox.getObjectMapper().writeValueAsString(spec),
|
||||
toolbox.getConfig().getHadoopWorkingPath(),
|
||||
toolbox.getSegmentPusher().getPathForHadoop()
|
||||
toolbox.getSegmentPusher().getPathForHadoop(),
|
||||
hadoopJobIdFile
|
||||
};
|
||||
|
||||
HadoopIngestionSpec indexerSchema;
|
||||
@ -367,7 +381,8 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
|
||||
String[] buildSegmentsInput = new String[]{
|
||||
toolbox.getObjectMapper().writeValueAsString(indexerSchema),
|
||||
version
|
||||
version,
|
||||
hadoopJobIdFile
|
||||
};
|
||||
|
||||
Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
|
||||
@ -412,6 +427,57 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopGracefully(TaskConfig taskConfig)
|
||||
{
|
||||
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
|
||||
File hadoopJobIdFile = new File(getHadoopJobIdFileName());
|
||||
String jobId = null;
|
||||
|
||||
try {
|
||||
if (hadoopJobIdFile.exists()) {
|
||||
jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile);
|
||||
}
|
||||
|
||||
try {
|
||||
if (jobId != null) {
|
||||
ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
|
||||
taskConfig.getDefaultHadoopCoordinates());
|
||||
|
||||
Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
|
||||
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
|
||||
loader
|
||||
);
|
||||
String[] buildKillJobInput = new String[]{
|
||||
"-kill",
|
||||
jobId
|
||||
};
|
||||
|
||||
Class<?> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
|
||||
Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());
|
||||
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
final String killStatusString = (String) innerProcessingRunTask.invoke(
|
||||
killMRJobInnerProcessingRunner,
|
||||
new Object[]{buildKillJobInput}
|
||||
);
|
||||
|
||||
log.info(StringUtils.format("Tried killing job %s , status: %s", jobId, killStatusString));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
finally {
|
||||
Thread.currentThread().setContextClassLoader(oldLoader);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/rowStats")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@ -540,6 +606,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
final String schema = args[0];
|
||||
final String workingPath = args[1];
|
||||
final String segmentOutputPath = args[2];
|
||||
final String hadoopJobIdFile = args[3];
|
||||
|
||||
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.JSON_MAPPER
|
||||
.readValue(
|
||||
@ -553,6 +620,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
);
|
||||
|
||||
job = new HadoopDruidDetermineConfigurationJob(config);
|
||||
job.setHadoopJobIdFile(hadoopJobIdFile);
|
||||
|
||||
log.info("Starting a hadoop determine configuration job...");
|
||||
if (job.run()) {
|
||||
@ -585,6 +653,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
{
|
||||
final String schema = args[0];
|
||||
String version = args[1];
|
||||
final String hadoopJobIdFile = args[2];
|
||||
|
||||
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.JSON_MAPPER
|
||||
.readValue(
|
||||
@ -606,6 +675,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
maybeHandler = null;
|
||||
}
|
||||
job = new HadoopDruidIndexerJob(config, maybeHandler);
|
||||
job.setHadoopJobIdFile(hadoopJobIdFile);
|
||||
|
||||
log.info("Starting a hadoop index generator job...");
|
||||
try {
|
||||
@ -649,6 +719,16 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class HadoopKillMRJobIdProcessingRunner
|
||||
{
|
||||
public String runTask(String[] args) throws Exception
|
||||
{
|
||||
int res = ToolRunner.run(new JobClient(), args);
|
||||
return res == 0 ? "Success" : "Fail";
|
||||
}
|
||||
}
|
||||
|
||||
public static class HadoopIndexGeneratorInnerProcessingStatus
|
||||
{
|
||||
private final List<DataSegment> dataSegments;
|
||||
|
@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.LockAcquireAction;
|
||||
import org.apache.druid.indexing.common.actions.LockReleaseAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.guava.CloseQuietly;
|
||||
@ -494,29 +495,31 @@ public class RealtimeIndexTask extends AbstractTask
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopGracefully()
|
||||
public void stopGracefully(TaskConfig taskConfig)
|
||||
{
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (!gracefullyStopped) {
|
||||
gracefullyStopped = true;
|
||||
if (firehose == null) {
|
||||
log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
|
||||
} else if (finishingJob) {
|
||||
log.info("stopGracefully: Interrupting finishJob.");
|
||||
runThread.interrupt();
|
||||
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
|
||||
log.info("stopGracefully: Draining firehose.");
|
||||
firehose.close();
|
||||
} else {
|
||||
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
|
||||
runThread.interrupt();
|
||||
if (taskConfig.isRestoreTasksOnRestart()) {
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (!gracefullyStopped) {
|
||||
gracefullyStopped = true;
|
||||
if (firehose == null) {
|
||||
log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
|
||||
} else if (finishingJob) {
|
||||
log.info("stopGracefully: Interrupting finishJob.");
|
||||
runThread.interrupt();
|
||||
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
|
||||
log.info("stopGracefully: Draining firehose.");
|
||||
firehose.close();
|
||||
} else {
|
||||
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
|
||||
runThread.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import org.apache.druid.indexer.TaskStatus;
|
||||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSubTask;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
|
||||
import org.apache.druid.query.Query;
|
||||
@ -163,11 +164,12 @@ public interface Task
|
||||
boolean canRestore();
|
||||
|
||||
/**
|
||||
* Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
|
||||
* {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
|
||||
* Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be terminated with
|
||||
* extreme prejudice.
|
||||
*
|
||||
* @param taskConfig TaskConfig for this task
|
||||
*/
|
||||
void stopGracefully();
|
||||
void stopGracefully(TaskConfig taskConfig);
|
||||
|
||||
/**
|
||||
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
|
||||
|
@ -178,17 +178,15 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
|
||||
if (runningItem != null) {
|
||||
final Task task = runningItem.getTask();
|
||||
final long start = System.currentTimeMillis();
|
||||
final boolean graceful;
|
||||
final long elapsed;
|
||||
boolean error = false;
|
||||
|
||||
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
|
||||
// Attempt graceful shutdown.
|
||||
graceful = true;
|
||||
log.info("Starting graceful shutdown of task[%s].", task.getId());
|
||||
// stopGracefully for resource cleaning
|
||||
log.info("Starting graceful shutdown of task[%s].", task.getId());
|
||||
task.stopGracefully(taskConfig);
|
||||
|
||||
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
|
||||
try {
|
||||
task.stopGracefully();
|
||||
final TaskStatus taskStatus = runningItem.getResult().get(
|
||||
new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
@ -213,7 +211,6 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
|
||||
}
|
||||
} else {
|
||||
graceful = false;
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
|
||||
}
|
||||
|
||||
@ -223,7 +220,7 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
|
||||
.builder()
|
||||
.setDimension("task", task.getId())
|
||||
.setDimension("dataSource", task.getDataSource())
|
||||
.setDimension("graceful", String.valueOf(graceful))
|
||||
.setDimension("graceful", "true") // for backward compatibility
|
||||
.setDimension("error", String.valueOf(error));
|
||||
|
||||
emitter.emit(metricBuilder.build("task/interrupt/count", 1L));
|
||||
|
@ -34,6 +34,7 @@ import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
|
||||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
|
||||
import org.apache.druid.indexing.common.task.AbstractTask;
|
||||
import org.apache.druid.indexing.common.task.TaskResource;
|
||||
@ -179,9 +180,11 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopGracefully()
|
||||
public void stopGracefully(TaskConfig taskConfig)
|
||||
{
|
||||
runner.stopGracefully();
|
||||
if (taskConfig.isRestoreTasksOnRestart()) {
|
||||
runner.stopGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1029,7 +1029,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
|
||||
);
|
||||
|
||||
// Trigger graceful shutdown.
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(taskToolboxFactory.build(task1).getConfig());
|
||||
|
||||
// Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
|
||||
final TaskStatus taskStatus = statusFuture.get();
|
||||
@ -1129,7 +1129,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
|
||||
Assert.assertEquals(1, sumMetric(task1, null, "rows").longValue());
|
||||
|
||||
// Trigger graceful shutdown.
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(taskToolboxFactory.build(task1).getConfig());
|
||||
|
||||
// Wait for the task to finish. The status doesn't really matter.
|
||||
while (!statusFuture.isDone()) {
|
||||
@ -1202,7 +1202,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
|
||||
);
|
||||
|
||||
// Trigger graceful shutdown.
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(taskToolboxFactory.build(task1).getConfig());
|
||||
|
||||
// Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
|
||||
final TaskStatus taskStatus = statusFuture.get();
|
||||
@ -1257,7 +1257,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
|
||||
|
||||
final AppenderatorDriverRealtimeIndexTask task1 = makeRealtimeTask(null);
|
||||
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(taskToolboxFactory.build(task1).getConfig());
|
||||
final ListenableFuture<TaskStatus> statusFuture = runTask(task1);
|
||||
|
||||
// Wait for the task to finish.
|
||||
@ -1517,7 +1517,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
|
||||
return result;
|
||||
}
|
||||
};
|
||||
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
|
||||
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
|
||||
|
||||
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
|
||||
taskLockbox,
|
||||
|
@ -600,7 +600,7 @@ public class RealtimeIndexTaskTest
|
||||
);
|
||||
|
||||
// Trigger graceful shutdown.
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(taskToolbox.getConfig());
|
||||
|
||||
// Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
|
||||
final TaskStatus taskStatus = statusFuture.get();
|
||||
@ -708,7 +708,7 @@ public class RealtimeIndexTaskTest
|
||||
Assert.assertEquals(1, sumMetric(task1, null, "rows").longValue());
|
||||
|
||||
// Trigger graceful shutdown.
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(taskToolbox.getConfig());
|
||||
|
||||
// Wait for the task to finish. The status doesn't really matter.
|
||||
while (!statusFuture.isDone()) {
|
||||
@ -788,7 +788,7 @@ public class RealtimeIndexTaskTest
|
||||
);
|
||||
|
||||
// Trigger graceful shutdown.
|
||||
task1.stopGracefully();
|
||||
task1.stopGracefully(taskToolbox.getConfig());
|
||||
|
||||
// Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
|
||||
final TaskStatus taskStatus = statusFuture.get();
|
||||
@ -837,9 +837,9 @@ public class RealtimeIndexTaskTest
|
||||
final File directory = tempFolder.newFolder();
|
||||
final RealtimeIndexTask task1 = makeRealtimeTask(null);
|
||||
|
||||
task1.stopGracefully();
|
||||
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
|
||||
final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory);
|
||||
task1.stopGracefully(taskToolbox.getConfig());
|
||||
final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox);
|
||||
|
||||
// Wait for the task to finish.
|
||||
@ -970,7 +970,7 @@ public class RealtimeIndexTaskTest
|
||||
final File directory
|
||||
)
|
||||
{
|
||||
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
|
||||
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
|
||||
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
|
||||
try {
|
||||
taskStorage.insert(task, TaskStatus.running(task.getId()));
|
||||
|
@ -195,7 +195,7 @@ public class SingleTaskBackgroundRunnerTest
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopGracefully()
|
||||
public void stopGracefully(TaskConfig taskConfig)
|
||||
{
|
||||
gracefullyStopped.set();
|
||||
}
|
||||
|
@ -161,7 +161,7 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
|
||||
log.info("Starting graceful shutdown of task[%s].", task.getId());
|
||||
|
||||
try {
|
||||
task.stopGracefully();
|
||||
task.stopGracefully(taskConfig);
|
||||
final TaskStatus taskStatus = item.getResult().get(
|
||||
new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
|
18
pom.xml
18
pom.xml
@ -633,6 +633,24 @@
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<version>${hadoop.compile.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
<version>${hadoop.compile.version}</version>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mapdb</groupId>
|
||||
<artifactId>mapdb</artifactId>
|
||||
|
Loading…
x
Reference in New Issue
Block a user