diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 0b6e7c9e000..7bf83ddd3a0 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -345,7 +345,7 @@ public class KafkaIndexTaskTest
{
synchronized (runningTasks) {
for (Task task : runningTasks) {
- task.stopGracefully();
+ task.stopGracefully(toolboxFactory.build(task).getConfig());
}
runningTasks.clear();
@@ -1848,7 +1848,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(2, countEvents(task1));
// Stop without publishing segment
- task1.stopGracefully();
+ task1.stopGracefully(toolboxFactory.build(task1).getConfig());
unlockAppenderatorBasePersistDirForTask(task1);
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
@@ -2339,7 +2339,7 @@ public class KafkaIndexTaskTest
null,
50000,
null,
- false,
+ true,
null,
null
);
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 31bedd9ae89..a62d45ae451 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -331,7 +331,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
{
synchronized (runningTasks) {
for (Task task : runningTasks) {
- task.stopGracefully();
+ task.stopGracefully(toolboxFactory.build(task).getConfig());
}
runningTasks.clear();
@@ -2126,7 +2126,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(2, countEvents(task1));
// Stop without publishing segment
- task1.stopGracefully();
+ task1.stopGracefully(toolboxFactory.build(task1).getConfig());
unlockAppenderatorBasePersistDirForTask(task1);
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
@@ -2651,7 +2651,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
null,
50000,
null,
- false,
+ true,
null,
null
);
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index bdf6c87772d..92cd1e778fd 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -130,6 +130,22 @@
${hadoop.compile.version}
test
+
+ org.apache.hadoop
+ hadoop-common
+ provided
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ provided
+
+
+ javax.servlet
+ servlet-api
+
+
+
org.apache.druid
druid-server
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index c8696b563a7..976b78d718a 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -75,6 +75,7 @@ public class DetermineHashedPartitionsJob implements Jobby
private final HadoopDruidIndexerConfig config;
private String failureCause;
private Job groupByJob;
+ private long startTime;
public DetermineHashedPartitionsJob(
HadoopDruidIndexerConfig config
@@ -91,7 +92,7 @@ public class DetermineHashedPartitionsJob implements Jobby
* Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear
* in the final segment.
*/
- final long startTime = System.currentTimeMillis();
+ startTime = System.currentTimeMillis();
groupByJob = Job.getInstance(
new Configuration(),
StringUtils.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals())
@@ -125,6 +126,11 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.submit();
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
+ // Store the jobId in the file
+ if (groupByJob.getJobID() != null) {
+ JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), groupByJob.getJobID().toString());
+ }
+
if (!groupByJob.waitForCompletion(true)) {
log.error("Job failed: %s", groupByJob.getJobID());
failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index 27cc9c3c215..d8c8ae2c5d9 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -161,6 +161,12 @@ public class DeterminePartitionsJob implements Jobby
groupByJob.submit();
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
+ // Store the jobId in the file
+ if (groupByJob.getJobID() != null) {
+ JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), groupByJob.getJobID().toString());
+ }
+
+
if (!groupByJob.waitForCompletion(true)) {
log.error("Job failed: %s", groupByJob.getJobID());
failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
@@ -218,6 +224,12 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.getTrackingURL()
);
+ // Store the jobId in the file
+ if (dimSelectionJob.getJobID() != null) {
+ JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), dimSelectionJob.getJobID().toString());
+ }
+
+
if (!dimSelectionJob.waitForCompletion(true)) {
log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
failureCause = Utils.getFailureMessage(dimSelectionJob, config.JSON_MAPPER);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index 294c6767f20..10551bab001 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -39,6 +39,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
private static final Logger log = new Logger(HadoopDruidDetermineConfigurationJob.class);
private final HadoopDruidIndexerConfig config;
private Jobby job;
+ private String hadoopJobIdFile;
@Inject
public HadoopDruidDetermineConfigurationJob(
@@ -55,6 +56,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
if (config.isDeterminingPartitions()) {
job = config.getPartitionsSpec().getPartitionJob(config);
+ config.setHadoopJobIdFileName(hadoopJobIdFile);
return JobHelper.runSingleJob(job, config);
} else {
int shardsPerInterval = config.getPartitionsSpec().getNumShards();
@@ -109,4 +111,9 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
return job.getErrorMessage();
}
+
+ public void setHadoopJobIdFile(String hadoopJobIdFile)
+ {
+ this.hadoopJobIdFile = hadoopJobIdFile;
+ }
}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
index 12228a4e55c..22266ac9c38 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
@@ -212,6 +212,7 @@ public class HadoopDruidIndexerConfig
private HadoopIngestionSpec schema;
private PathSpec pathSpec;
+ private String hadoopJobIdFileName;
private final Map shardSpecLookups = new HashMap<>();
private final Map> hadoopShardSpecLookup = new HashMap<>();
private final Granularity rollupGran;
@@ -375,6 +376,16 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getMaxParseExceptions();
}
+ public void setHadoopJobIdFileName(String hadoopJobIdFileName)
+ {
+ this.hadoopJobIdFileName = hadoopJobIdFileName;
+ }
+
+ public String getHadoopJobIdFileName()
+ {
+ return hadoopJobIdFileName;
+ }
+
/**
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
index 331863abc9d..6d20dbe9a54 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
@@ -38,6 +38,7 @@ public class HadoopDruidIndexerJob implements Jobby
private final MetadataStorageUpdaterJob metadataStorageUpdaterJob;
private IndexGeneratorJob indexJob;
private volatile List publishedSegments = null;
+ private String hadoopJobIdFile;
@Inject
public HadoopDruidIndexerJob(
@@ -92,7 +93,7 @@ public class HadoopDruidIndexerJob implements Jobby
}
);
-
+ config.setHadoopJobIdFileName(hadoopJobIdFile);
return JobHelper.runJobs(jobs, config);
}
@@ -124,4 +125,9 @@ public class HadoopDruidIndexerJob implements Jobby
}
return publishedSegments;
}
+
+ public void setHadoopJobIdFile(String hadoopJobIdFile)
+ {
+ this.hadoopJobIdFile = hadoopJobIdFile;
+ }
}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index f2ae2b9ae6f..eed19061428 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -208,6 +208,11 @@ public class IndexGeneratorJob implements Jobby
job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
+ // Store the jobId in the file
+ if (job.getJobID() != null) {
+ JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), job.getJobID().toString());
+ }
+
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
index da5b9e113a3..f5fb82a6219 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
@@ -59,8 +59,10 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -347,6 +349,24 @@ public class JobHelper
}
}
+ public static void writeJobIdToFile(String hadoopJobIdFileName, String hadoopJobId)
+ {
+ if (hadoopJobId != null && hadoopJobIdFileName != null) {
+ try {
+ HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(
+ new OutputStreamWriter(new FileOutputStream(new File(hadoopJobIdFileName)), StandardCharsets.UTF_8),
+ hadoopJobId
+ );
+ log.info("MR job id [%s] is written to the file [%s]", hadoopJobId, hadoopJobIdFileName);
+ }
+ catch (IOException e) {
+ log.warn(e, "Error writing job id [%s] to the file [%s]", hadoopJobId, hadoopJobIdFileName);
+ }
+ } else {
+ log.info("Either job id or file name is null for the submitted job. Skipping writing the file [%s]", hadoopJobIdFileName);
+ }
+ }
+
public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config)
{
boolean succeeded = job.run();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index cac4e5ab4af..27e585e326f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@@ -153,11 +154,14 @@ public abstract class AbstractTask implements Task
return false;
}
+ /**
+ * Should be called independent of canRestore so that resource cleaning can be achieved.
+ * If resource cleaning is required, concrete class should override this method
+ */
@Override
- public void stopGracefully()
+ public void stopGracefully(TaskConfig taskConfig)
{
- // Should not be called when canRestore = false.
- throw new UnsupportedOperationException("Cannot stop gracefully");
+ // Do nothing and let the concrete class handle it
}
@Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 1dcf92b4b85..1cb83cbb74e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
@@ -401,29 +402,31 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
}
@Override
- public void stopGracefully()
+ public void stopGracefully(TaskConfig taskConfig)
{
- try {
- synchronized (this) {
- if (!gracefullyStopped) {
- gracefullyStopped = true;
- if (firehose == null) {
- log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
- } else if (finishingJob) {
- log.info("stopGracefully: Interrupting finishJob.");
- runThread.interrupt();
- } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
- log.info("stopGracefully: Draining firehose.");
- firehose.close();
- } else {
- log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
- runThread.interrupt();
+ if (taskConfig.isRestoreTasksOnRestart()) {
+ try {
+ synchronized (this) {
+ if (!gracefullyStopped) {
+ gracefullyStopped = true;
+ if (firehose == null) {
+ log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
+ } else if (finishingJob) {
+ log.info("stopGracefully: Interrupting finishJob.");
+ runThread.interrupt();
+ } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
+ log.info("stopGracefully: Draining firehose.");
+ firehose.close();
+ } else {
+ log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
+ runThread.interrupt();
+ }
}
}
}
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 104123fc100..398ed96a2f3 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -48,6 +48,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockAcquireAction;
import org.apache.druid.indexing.common.actions.LockTryAcquireAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
import org.apache.druid.java.util.common.DateTimes;
@@ -59,6 +60,8 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.util.ToolRunner;
import org.joda.time.Interval;
import javax.servlet.http.HttpServletRequest;
@@ -69,6 +72,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
@@ -80,6 +84,8 @@ import java.util.SortedSet;
public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
private static final Logger log = new Logger(HadoopIndexTask.class);
+ private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json";
+ private TaskConfig taskConfig = null;
private static String getTheDataSource(HadoopIngestionSpec spec)
{
@@ -218,10 +224,16 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
return classpathPrefix;
}
+ public String getHadoopJobIdFileName()
+ {
+ return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath();
+ }
+
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
try {
+ taskConfig = toolbox.getConfig();
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(getId(), this, false);
@@ -259,6 +271,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
@SuppressWarnings("unchecked")
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
+ String hadoopJobIdFile = getHadoopJobIdFileName();
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
@@ -277,7 +290,8 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
String[] determinePartitionsInput = new String[]{
toolbox.getObjectMapper().writeValueAsString(spec),
toolbox.getConfig().getHadoopWorkingPath(),
- toolbox.getSegmentPusher().getPathForHadoop()
+ toolbox.getSegmentPusher().getPathForHadoop(),
+ hadoopJobIdFile
};
HadoopIngestionSpec indexerSchema;
@@ -367,7 +381,8 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
String[] buildSegmentsInput = new String[]{
toolbox.getObjectMapper().writeValueAsString(indexerSchema),
- version
+ version,
+ hadoopJobIdFile
};
Class> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
@@ -412,6 +427,57 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
}
}
+ @Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+ File hadoopJobIdFile = new File(getHadoopJobIdFileName());
+ String jobId = null;
+
+ try {
+ if (hadoopJobIdFile.exists()) {
+ jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class);
+ }
+ }
+ catch (Exception e) {
+ log.warn(e, "exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile);
+ }
+
+ try {
+ if (jobId != null) {
+ ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
+ taskConfig.getDefaultHadoopCoordinates());
+
+ Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
+ "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
+ loader
+ );
+ String[] buildKillJobInput = new String[]{
+ "-kill",
+ jobId
+ };
+
+ Class> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
+ Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());
+
+ Thread.currentThread().setContextClassLoader(loader);
+ final String killStatusString = (String) innerProcessingRunTask.invoke(
+ killMRJobInnerProcessingRunner,
+ new Object[]{buildKillJobInput}
+ );
+
+ log.info(StringUtils.format("Tried killing job %s , status: %s", jobId, killStatusString));
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(oldLoader);
+ }
+
+ }
+
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
@@ -540,6 +606,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
final String schema = args[0];
final String workingPath = args[1];
final String segmentOutputPath = args[2];
+ final String hadoopJobIdFile = args[3];
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.JSON_MAPPER
.readValue(
@@ -553,6 +620,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
);
job = new HadoopDruidDetermineConfigurationJob(config);
+ job.setHadoopJobIdFile(hadoopJobIdFile);
log.info("Starting a hadoop determine configuration job...");
if (job.run()) {
@@ -585,6 +653,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
final String schema = args[0];
String version = args[1];
+ final String hadoopJobIdFile = args[2];
final HadoopIngestionSpec theSchema = HadoopDruidIndexerConfig.JSON_MAPPER
.readValue(
@@ -606,6 +675,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
maybeHandler = null;
}
job = new HadoopDruidIndexerJob(config, maybeHandler);
+ job.setHadoopJobIdFile(hadoopJobIdFile);
log.info("Starting a hadoop index generator job...");
try {
@@ -649,6 +719,16 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
}
}
+ @SuppressWarnings("unused")
+ public static class HadoopKillMRJobIdProcessingRunner
+ {
+ public String runTask(String[] args) throws Exception
+ {
+ int res = ToolRunner.run(new JobClient(), args);
+ return res == 0 ? "Success" : "Fail";
+ }
+ }
+
public static class HadoopIndexGeneratorInnerProcessingStatus
{
private final List dataSegments;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index 83130e6c6ab..8a7fe6a136c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockAcquireAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
@@ -494,29 +495,31 @@ public class RealtimeIndexTask extends AbstractTask
}
@Override
- public void stopGracefully()
+ public void stopGracefully(TaskConfig taskConfig)
{
- try {
- synchronized (this) {
- if (!gracefullyStopped) {
- gracefullyStopped = true;
- if (firehose == null) {
- log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
- } else if (finishingJob) {
- log.info("stopGracefully: Interrupting finishJob.");
- runThread.interrupt();
- } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
- log.info("stopGracefully: Draining firehose.");
- firehose.close();
- } else {
- log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
- runThread.interrupt();
+ if (taskConfig.isRestoreTasksOnRestart()) {
+ try {
+ synchronized (this) {
+ if (!gracefullyStopped) {
+ gracefullyStopped = true;
+ if (firehose == null) {
+ log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
+ } else if (finishingJob) {
+ log.info("stopGracefully: Interrupting finishJob.");
+ runThread.interrupt();
+ } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
+ log.info("stopGracefully: Draining firehose.");
+ firehose.close();
+ } else {
+ log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
+ runThread.interrupt();
+ }
}
}
}
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 3ed45482cc4..4523fc2dd58 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.query.Query;
@@ -163,11 +164,12 @@ public interface Task
boolean canRestore();
/**
- * Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
- * {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
+ * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be terminated with
* extreme prejudice.
+ *
+ * @param taskConfig TaskConfig for this task
*/
- void stopGracefully();
+ void stopGracefully(TaskConfig taskConfig);
/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index a9b631713d3..cb222ea88c1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -178,17 +178,15 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
if (runningItem != null) {
final Task task = runningItem.getTask();
final long start = System.currentTimeMillis();
- final boolean graceful;
final long elapsed;
boolean error = false;
- if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
- // Attempt graceful shutdown.
- graceful = true;
- log.info("Starting graceful shutdown of task[%s].", task.getId());
+ // stopGracefully for resource cleaning
+ log.info("Starting graceful shutdown of task[%s].", task.getId());
+ task.stopGracefully(taskConfig);
+ if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
try {
- task.stopGracefully();
final TaskStatus taskStatus = runningItem.getResult().get(
new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
TimeUnit.MILLISECONDS
@@ -213,7 +211,6 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
}
} else {
- graceful = false;
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
}
@@ -223,7 +220,7 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
.builder()
.setDimension("task", task.getId())
.setDimension("dataSource", task.getDataSource())
- .setDimension("graceful", String.valueOf(graceful))
+ .setDimension("graceful", "true") // for backward compatibility
.setDimension("error", String.valueOf(error));
emitter.emit(metricBuilder.build("task/interrupt/count", 1L));
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 7f279917a3f..29186961ce6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -179,9 +180,11 @@ public abstract class SeekableStreamIndexTask statusFuture = runTask(task1);
// Wait for the task to finish.
@@ -1517,7 +1517,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
return result;
}
};
- final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
+ final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 5317b66e4a4..9a6a8e98e6e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -600,7 +600,7 @@ public class RealtimeIndexTaskTest
);
// Trigger graceful shutdown.
- task1.stopGracefully();
+ task1.stopGracefully(taskToolbox.getConfig());
// Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
final TaskStatus taskStatus = statusFuture.get();
@@ -708,7 +708,7 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(1, sumMetric(task1, null, "rows").longValue());
// Trigger graceful shutdown.
- task1.stopGracefully();
+ task1.stopGracefully(taskToolbox.getConfig());
// Wait for the task to finish. The status doesn't really matter.
while (!statusFuture.isDone()) {
@@ -788,7 +788,7 @@ public class RealtimeIndexTaskTest
);
// Trigger graceful shutdown.
- task1.stopGracefully();
+ task1.stopGracefully(taskToolbox.getConfig());
// Wait for the task to finish. The status doesn't really matter, but we'll check it anyway.
final TaskStatus taskStatus = statusFuture.get();
@@ -837,9 +837,9 @@ public class RealtimeIndexTaskTest
final File directory = tempFolder.newFolder();
final RealtimeIndexTask task1 = makeRealtimeTask(null);
- task1.stopGracefully();
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory);
+ task1.stopGracefully(taskToolbox.getConfig());
final ListenableFuture statusFuture = runTask(task1, taskToolbox);
// Wait for the task to finish.
@@ -970,7 +970,7 @@ public class RealtimeIndexTaskTest
final File directory
)
{
- final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
+ final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 9f30ff97414..c8f9380714b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -195,7 +195,7 @@ public class SingleTaskBackgroundRunnerTest
}
@Override
- public void stopGracefully()
+ public void stopGracefully(TaskConfig taskConfig)
{
gracefullyStopped.set();
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
index cb6cffee0d9..c9163611c61 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
@@ -161,7 +161,7 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
log.info("Starting graceful shutdown of task[%s].", task.getId());
try {
- task.stopGracefully();
+ task.stopGracefully(taskConfig);
final TaskStatus taskStatus = item.getResult().get(
new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
TimeUnit.MILLISECONDS
diff --git a/pom.xml b/pom.xml
index 58642a79434..668ba3e1770 100644
--- a/pom.xml
+++ b/pom.xml
@@ -633,6 +633,24 @@
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.compile.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.compile.version}
+ provided
+
+
+ javax.servlet
+ servlet-api
+
+
+
org.mapdb
mapdb