diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 9557ab5b759..8eacc7e2220 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -202,6 +202,21 @@
mockito-core
test
+
+ org.powermock
+ powermock-core
+ test
+
+
+ org.powermock
+ powermock-module-junit4
+ test
+
+
+ org.powermock
+ powermock-api-easymock
+ test
+
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java
new file mode 100644
index 00000000000..e12f7fbf5dc
--- /dev/null
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * holds a {@link DataSegment} with the temporary file path where the corresponding index zip file is currently stored
+ * and the final path where the index zip file should eventually be moved to.
+ * see {@link JobHelper#renameIndexFilesForSegments(HadoopIngestionSpec, List)}
+ */
+public class DataSegmentAndIndexZipFilePath
+{
+ private final DataSegment segment;
+ private final String tmpIndexZipFilePath;
+ private final String finalIndexZipFilePath;
+
+ @JsonCreator
+ public DataSegmentAndIndexZipFilePath(
+ @JsonProperty("segment") DataSegment segment,
+ @JsonProperty("tmpIndexZipFilePath") String tmpIndexZipFilePath,
+ @JsonProperty("finalIndexZipFilePath") String finalIndexZipFilePath
+ )
+ {
+ this.segment = segment;
+ this.tmpIndexZipFilePath = tmpIndexZipFilePath;
+ this.finalIndexZipFilePath = finalIndexZipFilePath;
+ }
+
+ @JsonProperty
+ public DataSegment getSegment()
+ {
+ return segment;
+ }
+
+ @JsonProperty
+ public String getTmpIndexZipFilePath()
+ {
+ return tmpIndexZipFilePath;
+ }
+
+ @JsonProperty
+ public String getFinalIndexZipFilePath()
+ {
+ return finalIndexZipFilePath;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o instanceof DataSegmentAndIndexZipFilePath) {
+ DataSegmentAndIndexZipFilePath that = (DataSegmentAndIndexZipFilePath) o;
+ return segment.equals(((DataSegmentAndIndexZipFilePath) o).getSegment())
+ && tmpIndexZipFilePath.equals(that.getTmpIndexZipFilePath())
+ && finalIndexZipFilePath.equals(that.getFinalIndexZipFilePath());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(segment.getId(), tmpIndexZipFilePath);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DataSegmentAndIndexZipFilePath{" +
+ "segment=" + segment +
+ ", tmpIndexZipFilePath=" + tmpIndexZipFilePath +
+ ", finalIndexZipFilePath=" + finalIndexZipFilePath +
+ '}';
+ }
+}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java
new file mode 100644
index 00000000000..96fde6b8ece
--- /dev/null
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * This class exists for testing purposes, see {@link JobHelperPowerMockTest}. Using the
+ * raw {@link FileSystem} class resulted in errors with java assist.
+ */
+public class FileSystemHelper
+{
+ public static FileSystem get(URI uri, Configuration conf) throws IOException
+ {
+ return FileSystem.get(uri, conf);
+ }
+}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index 8b5b4b6b0bb..ea37db1a10e 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -59,7 +59,12 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
if (config.isDeterminingPartitions()) {
job = createPartitionJob(config);
config.setHadoopJobIdFileName(hadoopJobIdFile);
- return JobHelper.runSingleJob(job, config);
+ boolean jobSucceeded = JobHelper.runSingleJob(job);
+ JobHelper.maybeDeleteIntermediatePath(
+ jobSucceeded,
+ config.getSchema()
+ );
+ return jobSucceeded;
} else {
final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
final int shardsPerInterval;
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
index 25683f32bd8..58977ad4840 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -40,7 +39,7 @@ public class HadoopDruidIndexerJob implements Jobby
@Nullable
private IndexGeneratorJob indexJob;
@Nullable
- private volatile List publishedSegments = null;
+ private volatile List publishedSegmentAndIndexZipFilePaths = null;
@Nullable
private String hadoopJobIdFile;
@@ -91,14 +90,14 @@ public class HadoopDruidIndexerJob implements Jobby
@Override
public boolean run()
{
- publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
+ publishedSegmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
return true;
}
}
);
config.setHadoopJobIdFileName(hadoopJobIdFile);
- return JobHelper.runJobs(jobs, config);
+ return JobHelper.runJobs(jobs);
}
@Override
@@ -122,12 +121,12 @@ public class HadoopDruidIndexerJob implements Jobby
return indexJob.getErrorMessage();
}
- public List getPublishedSegments()
+ public List getPublishedSegmentAndIndexZipFilePaths()
{
- if (publishedSegments == null) {
+ if (publishedSegmentAndIndexZipFilePaths == null) {
throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
}
- return publishedSegments;
+ return publishedSegmentAndIndexZipFilePaths;
}
public void setHadoopJobIdFile(String hadoopJobIdFile)
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index a12e76571db..9124b9b4c1d 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -102,14 +102,14 @@ public class IndexGeneratorJob implements Jobby
{
private static final Logger log = new Logger(IndexGeneratorJob.class);
- public static List getPublishedSegments(HadoopDruidIndexerConfig config)
+ public static List getPublishedSegmentAndIndexZipFilePaths(HadoopDruidIndexerConfig config)
{
final Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config);
config.addJobProperties(conf);
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER;
- ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder();
+ ImmutableList.Builder publishedSegmentAndIndexZipFilePathsBuilder = ImmutableList.builder();
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
@@ -117,9 +117,9 @@ public class IndexGeneratorJob implements Jobby
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
- final DataSegment segment = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegment.class);
- publishedSegmentsBuilder.add(segment);
- log.info("Adding segment %s to the list of published segments", segment.getId());
+ final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class);
+ publishedSegmentAndIndexZipFilePathsBuilder.add(segmentAndIndexZipFilePath);
+ log.info("Adding segment %s to the list of published segments", segmentAndIndexZipFilePath.getSegment().getId());
}
}
catch (FileNotFoundException e) {
@@ -133,9 +133,9 @@ public class IndexGeneratorJob implements Jobby
catch (IOException e) {
throw new RuntimeException(e);
}
- List publishedSegments = publishedSegmentsBuilder.build();
+ List publishedSegmentAndIndexZipFilePaths = publishedSegmentAndIndexZipFilePathsBuilder.build();
- return publishedSegments;
+ return publishedSegmentAndIndexZipFilePaths;
}
private final HadoopDruidIndexerConfig config;
@@ -809,7 +809,7 @@ public class IndexGeneratorJob implements Jobby
0
);
- final DataSegment segment = JobHelper.serializeOutIndex(
+ final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex(
segmentTemplate,
context.getConfiguration(),
context,
@@ -831,7 +831,7 @@ public class IndexGeneratorJob implements Jobby
HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
);
- Path descriptorPath = config.makeDescriptorInfoPath(segment);
+ Path descriptorPath = config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment());
descriptorPath = JobHelper.prependFSIfNullScheme(
FileSystem.get(
descriptorPath.toUri(),
@@ -842,7 +842,7 @@ public class IndexGeneratorJob implements Jobby
log.info("Writing descriptor to path[%s]", descriptorPath);
JobHelper.writeSegmentDescriptor(
config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
- segment,
+ segmentAndIndexZipFilePath,
descriptorPath,
context
);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
index 7d99d03ad56..b8c29b82f7a 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
@@ -386,29 +386,13 @@ public class JobHelper
}
}
- public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config)
+ public static boolean runSingleJob(Jobby job)
{
boolean succeeded = job.run();
-
- if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) {
- if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
- Path workingPath = config.makeIntermediatePath();
- log.info("Deleting path[%s]", workingPath);
- try {
- Configuration conf = injectSystemProperties(new Configuration(), config);
- config.addJobProperties(conf);
- workingPath.getFileSystem(conf).delete(workingPath, true);
- }
- catch (IOException e) {
- log.error(e, "Failed to cleanup path[%s]", workingPath);
- }
- }
- }
-
return succeeded;
}
- public static boolean runJobs(List jobs, HadoopDruidIndexerConfig config)
+ public static boolean runJobs(List jobs)
{
boolean succeeded = true;
for (Jobby job : jobs) {
@@ -418,25 +402,33 @@ public class JobHelper
}
}
+ return succeeded;
+ }
+
+ public static void maybeDeleteIntermediatePath(
+ boolean jobSucceeded,
+ HadoopIngestionSpec indexerSchema)
+ {
+ HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema);
+ final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config);
+ config.addJobProperties(configuration);
+ JobHelper.injectDruidProperties(configuration, config);
if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) {
- if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
+ if (jobSucceeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
- Configuration conf = injectSystemProperties(new Configuration(), config);
- config.addJobProperties(conf);
- workingPath.getFileSystem(conf).delete(workingPath, true);
+ config.addJobProperties(configuration);
+ workingPath.getFileSystem(configuration).delete(workingPath, true);
}
catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath);
}
}
}
-
- return succeeded;
}
- public static DataSegment serializeOutIndex(
+ public static DataSegmentAndIndexZipFilePath serializeOutIndex(
final DataSegment segmentTemplate,
final Configuration configuration,
final Progressable progressable,
@@ -482,20 +474,16 @@ public class JobHelper
.withSize(size.get())
.withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase));
- if (!renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) {
- throw new IOE(
- "Unable to rename [%s] to [%s]",
- tmpPath.toUri().toString(),
- finalIndexZipFilePath.toUri().toString()
- );
- }
-
- return finalSegment;
+ return new DataSegmentAndIndexZipFilePath(
+ finalSegment,
+ tmpPath.toUri().getPath(),
+ finalIndexZipFilePath.toUri().getPath()
+ );
}
public static void writeSegmentDescriptor(
final FileSystem outputFS,
- final DataSegment segment,
+ final DataSegmentAndIndexZipFilePath segmentAndPath,
final Path descriptorPath,
final Progressable progressable
)
@@ -511,9 +499,12 @@ public class JobHelper
try {
progressable.progress();
if (outputFS.exists(descriptorPath)) {
- if (!outputFS.delete(descriptorPath, false)) {
- throw new IOE("Failed to delete descriptor at [%s]", descriptorPath);
- }
+ // If the descriptor path already exists, don't overwrite, and risk clobbering it.
+ // If it already exists, it means that the segment data is already written to the
+ // tmp path, and the existing descriptor written should give us the information we
+ // need to rename the segment index to final path and publish it in the top level task.
+ log.info("descriptor path [%s] already exists, not overwriting", descriptorPath);
+ return -1;
}
try (final OutputStream descriptorOut = outputFS.create(
descriptorPath,
@@ -521,7 +512,7 @@ public class JobHelper
DEFAULT_FS_BUFFER_SIZE,
progressable
)) {
- HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segment);
+ HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segmentAndPath);
}
}
catch (RuntimeException | IOException ex) {
@@ -632,7 +623,39 @@ public class JobHelper
}
/**
- * Rename the files. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
+ * Renames the index files for the segments. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
+ * which will not overwrite. Note: segments should be renamed in the index task, not in a hadoop job, as race
+ * conditions between job retries can cause the final segment index file path to get clobbered.
+ *
+ * @param indexerSchema the hadoop ingestion spec
+ * @param segmentAndIndexZipFilePaths the list of segments with their currently stored tmp path and the final path
+ * that they should be renamed to.
+ */
+ public static void renameIndexFilesForSegments(
+ HadoopIngestionSpec indexerSchema,
+ List segmentAndIndexZipFilePaths
+ ) throws IOException
+ {
+ HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema);
+ final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config);
+ config.addJobProperties(configuration);
+ JobHelper.injectDruidProperties(configuration, config);
+ for (DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath : segmentAndIndexZipFilePaths) {
+ Path tmpPath = new Path(segmentAndIndexZipFilePath.getTmpIndexZipFilePath());
+ Path finalIndexZipFilePath = new Path(segmentAndIndexZipFilePath.getFinalIndexZipFilePath());
+ final FileSystem outputFS = FileSystemHelper.get(finalIndexZipFilePath.toUri(), configuration);
+ if (!renameIndexFile(outputFS, tmpPath, finalIndexZipFilePath)) {
+ throw new IOE(
+ "Unable to rename [%s] to [%s]",
+ tmpPath.toUri().toString(),
+ finalIndexZipFilePath.toUri().toString()
+ );
+ }
+ }
+ }
+
+ /**
+ * Rename the file. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
* which will not overwrite
*
* @param outputFS The output fs
@@ -641,7 +664,7 @@ public class JobHelper
*
* @return False if a rename failed, true otherwise (rename success or no rename needed)
*/
- private static boolean renameIndexFiles(
+ private static boolean renameIndexFile(
final FileSystem outputFS,
final Path indexZipFilePath,
final Path finalIndexZipFilePath
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java
index b7eb60bf093..bdadc95010e 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexer;
import org.apache.druid.timeline.DataSegment;
import java.util.List;
+import java.util.stream.Collectors;
/**
*/
@@ -42,7 +43,8 @@ public class MetadataStorageUpdaterJob implements Jobby
@Override
public boolean run()
{
- final List segments = IndexGeneratorJob.getPublishedSegments(config);
+ final List segmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
+ final List segments = segmentAndIndexZipFilePaths.stream().map(s -> s.getSegment()).collect(Collectors.toList());
final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable();
handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.JSON_MAPPER);
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
index 404d5ed67b8..913c6481e57 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
@@ -372,7 +372,15 @@ public class BatchDeltaIngestionTest
) throws Exception
{
IndexGeneratorJob job = new IndexGeneratorJob(config);
- Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config));
+ Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job)));
+
+ List dataSegmentAndIndexZipFilePaths =
+ IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
+ JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths);
+
+ JobHelper.maybeDeleteIntermediatePath(true, config.getSchema());
+ File workingPath = new File(config.makeIntermediatePath().toUri().getPath());
+ Assert.assertFalse(workingPath.exists());
File segmentFolder = new File(
StringUtils.format(
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
new file mode 100644
index 00000000000..3dcd2033e2f
--- /dev/null
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class DataSegmentAndIndexZipFilePathTest
+{
+ private static final SegmentId SEGMENT_ID = SegmentId.dummy("data-source", 1);
+ private static final SegmentId OTHER_SEGMENT_ID = SegmentId.dummy("data-source2", 1);
+ private static final DataSegment SEGMENT = new DataSegment(
+ SEGMENT_ID,
+ null,
+ null,
+ null,
+ new NumberedShardSpec(1, 10),
+ null,
+ 0,
+ 0
+ );
+ private static final DataSegment OTHER_SEGMENT = new DataSegment(
+ OTHER_SEGMENT_ID,
+ null,
+ null,
+ null,
+ new NumberedShardSpec(1, 10),
+ null,
+ 0,
+ 0
+ );
+
+ private DataSegmentAndIndexZipFilePath target;
+
+ @Test
+ public void test_equals_otherNull_notEqual()
+ {
+ String tmpPath = "tmpPath";
+ String finalPath = "finalPath";
+ target = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ finalPath
+ );
+ Assert.assertNotEquals(target, null);
+ }
+
+ @Test
+ public void test_equals_differentSegmentId_notEqual()
+ {
+ String tmpPath = "tmpPath";
+ String finalPath = "finalPath";
+ target = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ finalPath
+ );
+
+ DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
+ OTHER_SEGMENT,
+ tmpPath,
+ finalPath
+ );
+ Assert.assertNotEquals(target, other);
+ }
+
+ @Test
+ public void test_equals_differentTmpPath_notEqual()
+ {
+ String tmpPath = "tmpPath";
+ String otherTmpPath = "otherTmpPath";
+ String finalPath = "finalPath";
+ target = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ finalPath
+ );
+
+ DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ otherTmpPath,
+ finalPath
+ );
+ Assert.assertNotEquals(target, other);
+ }
+
+ @Test
+ public void test_equals_differentFinalPath_notEqual()
+ {
+ String tmpPath = "tmpPath";
+ String finalPath = "finalPath";
+ String otherFinalPath = "otherFinalPath";
+ target = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ finalPath
+ );
+
+ DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ otherFinalPath
+ );
+ Assert.assertNotEquals(target, other);
+ }
+
+ @Test
+ public void test_equals_allFieldsEqualValue_equal()
+ {
+ String tmpPath = "tmpPath";
+ String finalPath = "finalPath";
+ target = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ finalPath
+ );
+
+ DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ finalPath
+ );
+ Assert.assertEquals(target, other);
+ }
+
+ @Test
+ public void test_equals_sameObject_equal()
+ {
+ String tmpPath = "tmpPath";
+ String finalPath = "finalPath";
+ target = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ finalPath
+ );
+
+ Assert.assertEquals(target, target);
+ }
+
+ @Test
+ public void test_serde() throws IOException
+ {
+ String tmpPath = "tmpPath";
+ String finalPath = "finalPath";
+ target = new DataSegmentAndIndexZipFilePath(
+ SEGMENT,
+ tmpPath,
+ finalPath
+ );
+
+ final InjectableValues.Std injectableValues = new InjectableValues.Std();
+ injectableValues.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT);
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(injectableValues);
+ final String json = mapper.writeValueAsString(target);
+ final DataSegmentAndIndexZipFilePath fromJson =
+ mapper.readValue(json, DataSegmentAndIndexZipFilePath.class);
+ Assert.assertEquals(target, fromJson);
+ }
+}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java
new file mode 100644
index 00000000000..8231b7ce4aa
--- /dev/null
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ JobHelper.class,
+ IndexGeneratorJob.class
+})
+@PowerMockIgnore({"javax.net.ssl.*"})
+public class HadoopDruidIndexerJobTest
+{
+ private HadoopDruidIndexerConfig config;
+ private MetadataStorageUpdaterJobHandler handler;
+ private HadoopDruidIndexerJob target;
+
+ @Test
+ public void test_run()
+ {
+ config = PowerMock.createMock(HadoopDruidIndexerConfig.class);
+ handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class);
+ PowerMock.mockStaticNice(JobHelper.class);
+ PowerMock.mockStaticNice(IndexGeneratorJob.class);
+ config.verify();
+ EasyMock.expectLastCall();
+ EasyMock.expect(config.isUpdaterJobSpecSet()).andReturn(false).anyTimes();
+ config.setHadoopJobIdFileName(EasyMock.anyString());
+ EasyMock.expectLastCall();
+ JobHelper.ensurePaths(config);
+ EasyMock.expectLastCall();
+ Capture> capturedJobs = Capture.newInstance();
+ EasyMock.expect(JobHelper.runJobs(EasyMock.capture(capturedJobs))).andReturn(true);
+ EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(EasyMock.anyObject())).andReturn(null);
+
+
+ PowerMock.replayAll();
+
+ target = new HadoopDruidIndexerJob(config, handler);
+ target.run();
+
+ List jobs = capturedJobs.getValue();
+ Assert.assertEquals(2, jobs.size());
+ jobs.stream().filter(job -> !(job instanceof IndexGeneratorJob)).forEach(job -> Assert.assertTrue(job.run()));
+
+ PowerMock.verifyAll();
+ }
+}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
index 97967bdb3b7..24ff1227ee5 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
@@ -621,13 +621,21 @@ public class IndexGeneratorJobTest
private void verifyJob(IndexGeneratorJob job) throws IOException
{
- Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config));
+ Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job)));
final Map> intervalToSegments = new HashMap<>();
IndexGeneratorJob
- .getPublishedSegments(config)
- .forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>())
- .add(segment));
+ .getPublishedSegmentAndIndexZipFilePaths(config)
+ .forEach(segmentAndIndexZipFilePath -> intervalToSegments.computeIfAbsent(segmentAndIndexZipFilePath.getSegment().getInterval(), k -> new ArrayList<>())
+ .add(segmentAndIndexZipFilePath.getSegment()));
+
+ List dataSegmentAndIndexZipFilePaths =
+ IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
+ JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths);
+
+ JobHelper.maybeDeleteIntermediatePath(true, config.getSchema());
+ File workingPath = new File(config.makeIntermediatePath().toUri().getPath());
+ Assert.assertTrue(workingPath.exists());
final Map> intervalToIndexFiles = new HashMap<>();
int segmentNum = 0;
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java
new file mode 100644
index 00000000000..48f653a8c3b
--- /dev/null
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ FileSystemHelper.class,
+ HadoopDruidIndexerConfig.class
+})
+@PowerMockIgnore({"javax.net.ssl.*"})
+public class JobHelperPowerMockTest
+{
+ private static final String TMP_PATH = "/tmp/index.zip.0";
+ private static final String FINAL_PATH = "/final/index.zip.0";
+
+ private HadoopDruidIndexerConfig indexerConfig;
+
+ @Test
+ public void test_renameIndexFilesForSegments_emptySegments() throws IOException
+ {
+ HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
+ List segmentAndIndexZipFilePaths = ImmutableList.of();
+
+ PowerMock.replayAll();
+
+ JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void test_renameIndexFilesForSegments_segmentIndexFileRenamedSuccessfully()
+ throws IOException
+ {
+ HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
+ mockFileSystem(true);
+ DataSegment segment = PowerMock.createMock(DataSegment.class);
+
+ List segmentAndIndexZipFilePaths = ImmutableList.of(
+ new DataSegmentAndIndexZipFilePath(
+ segment,
+ TMP_PATH,
+ FINAL_PATH
+ )
+ );
+ PowerMock.replayAll();
+
+ JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test (expected = IOE.class)
+ public void test_renameIndexFilesForSegments_segmentIndexFileRenamedFailed_throwsException()
+ throws IOException
+ {
+ HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
+ mockFileSystem(false);
+ DataSegment segment = PowerMock.createMock(DataSegment.class);
+ List segmentAndIndexZipFilePaths = ImmutableList.of(
+ new DataSegmentAndIndexZipFilePath(
+ segment,
+ TMP_PATH,
+ FINAL_PATH
+ )
+ );
+
+ PowerMock.replayAll();
+
+ JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void test_maybeDeleteIntermediatePath_leaveIntermediate_doesNotDeleteIntermediatePath()
+ {
+ HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
+ HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
+ EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(true);
+ EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig);
+
+ PowerMock.replayAll();
+
+ JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexerJobSucceeded_deleteIntermediatePath()
+ throws IOException
+ {
+ HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
+ HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
+ Path workingPath = PowerMock.createMock(Path.class);
+ FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
+ EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
+ EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig);
+ EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true);
+ EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
+ EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
+
+ PowerMock.replayAll();
+
+ JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexJobFailedAndCleanupOnFailure_deleteIntermediatePath()
+ throws IOException
+ {
+ HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
+ HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
+ Path workingPath = PowerMock.createMock(Path.class);
+ FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
+ EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
+ EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true);
+ EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes();
+ EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true);
+ EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
+ EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
+
+ PowerMock.replayAll();
+
+ JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void test_maybeDeleteIntermediatePath_deleteThrowsException_noExceptionPropogated()
+ throws IOException
+ {
+ HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
+ HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
+ Path workingPath = PowerMock.createMock(Path.class);
+ FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
+ EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
+ EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true);
+ EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes();
+ EasyMock.expect(workingPathFs.delete(workingPath, true)).andThrow(new IOException("Delete Exception"));
+ EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
+ EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
+
+ PowerMock.replayAll();
+
+ JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec);
+
+ PowerMock.verifyAll();
+ }
+
+ private HadoopIngestionSpec mockIngestionSpec()
+ {
+ indexerConfig = PowerMock.createMock(HadoopDruidIndexerConfig.class);
+ HadoopIngestionSpec ingestionSpec = PowerMock.createMock(HadoopIngestionSpec.class);
+ PowerMock.mockStaticNice(HadoopDruidIndexerConfig.class);
+ EasyMock.expect(indexerConfig.getAllowedProperties()).andReturn(ImmutableMap.of()).anyTimes();
+ indexerConfig.addJobProperties(EasyMock.anyObject(Configuration.class));
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(HadoopDruidIndexerConfig.fromSpec(ingestionSpec)).andReturn(indexerConfig);
+ EasyMock.expect(indexerConfig.getSchema()).andReturn(ingestionSpec).anyTimes();
+ return ingestionSpec;
+ }
+
+ private void mockFileSystem(boolean renameSuccess) throws IOException
+ {
+ PowerMock.mockStaticNice(FileSystemHelper.class);
+ FileSystem fileSystem = PowerMock.createMock(FileSystem.class);
+ EasyMock.expect(FileSystemHelper.get(
+ EasyMock.anyObject(URI.class),
+ EasyMock.anyObject(Configuration.class)
+ )).andReturn(fileSystem);
+ EasyMock.expect(fileSystem.exists(EasyMock.anyObject(Path.class))).andReturn(false);
+ EasyMock.expect(fileSystem.rename(EasyMock.anyObject(Path.class), EasyMock.anyObject(Path.class)))
+ .andReturn(renameSuccess);
+ }
+}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java
new file mode 100644
index 00000000000..0b867630cd1
--- /dev/null
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ IndexGeneratorJob.class
+})
+@PowerMockIgnore({"javax.net.ssl.*"})
+public class MetadataStorageUpdaterJobTest
+{
+ private static final List DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS = ImmutableList.of(
+ new DataSegmentAndIndexZipFilePath(null, null, null)
+ );
+ private static final String SEGMENT_TABLE = "segments";
+ private HadoopIngestionSpec spec;
+ private HadoopIOConfig ioConfig;
+ private MetadataStorageUpdaterJobSpec metadataUpdateSpec;
+ private HadoopDruidIndexerConfig config;
+ private MetadataStorageUpdaterJobHandler handler;
+ private MetadataStorageUpdaterJob target;
+
+ @Test
+ public void test_run()
+ {
+ metadataUpdateSpec = PowerMock.createMock(MetadataStorageUpdaterJobSpec.class);
+ ioConfig = PowerMock.createMock(HadoopIOConfig.class);
+ spec = PowerMock.createMock(HadoopIngestionSpec.class);
+ config = PowerMock.createMock(HadoopDruidIndexerConfig.class);
+ handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class);
+ PowerMock.mockStaticNice(IndexGeneratorJob.class);
+
+ EasyMock.expect(metadataUpdateSpec.getSegmentTable()).andReturn(SEGMENT_TABLE);
+ EasyMock.expect(ioConfig.getMetadataUpdateSpec()).andReturn(metadataUpdateSpec);
+ EasyMock.expect(spec.getIOConfig()).andReturn(ioConfig);
+ EasyMock.expect(config.getSchema()).andReturn(spec);
+ EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config))
+ .andReturn(DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS);
+ handler.publishSegments(
+ SEGMENT_TABLE,
+ DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS.stream().map(s -> s.getSegment()).collect(
+ Collectors.toList()), HadoopDruidIndexerConfig.JSON_MAPPER);
+ EasyMock.expectLastCall();
+ target = new MetadataStorageUpdaterJob(config, handler);
+
+ PowerMock.replayAll();
+
+ target.run();
+
+ PowerMock.verifyAll();
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index b66ae471136..37ffb4cd790 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -30,6 +31,8 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.druid.indexer.DataSegmentAndIndexZipFilePath;
import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.HadoopDruidIndexerJob;
@@ -83,6 +86,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
@@ -307,170 +311,197 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
@SuppressWarnings("unchecked")
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
- registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
- String hadoopJobIdFile = getHadoopJobIdFileName();
- final ClassLoader loader = buildClassLoader(toolbox);
- boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
-
- HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
- spec,
- jsonMapper,
- new OverlordActionBasedUsedSegmentsRetriever(toolbox)
- );
-
- Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
- "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
- loader
- );
- determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);
-
- String[] determinePartitionsInput = new String[]{
- toolbox.getJsonMapper().writeValueAsString(spec),
- toolbox.getConfig().getHadoopWorkingPath(),
- toolbox.getSegmentPusher().getPathForHadoop(),
- hadoopJobIdFile
- };
-
- HadoopIngestionSpec indexerSchema;
- final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
- Class> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
- Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
- "runTask",
- determinePartitionsInput.getClass()
- );
+ boolean indexGeneratorJobAttempted = false;
+ boolean indexGeneratorJobSuccess = false;
+ HadoopIngestionSpec indexerSchema = null;
try {
- Thread.currentThread().setContextClassLoader(loader);
+ registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
+ String hadoopJobIdFile = getHadoopJobIdFileName();
+ final ClassLoader loader = buildClassLoader(toolbox);
+ boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
- ingestionState = IngestionState.DETERMINE_PARTITIONS;
-
- final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke(
- determinePartitionsInnerProcessingRunner,
- new Object[]{determinePartitionsInput}
+ HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
+ spec,
+ jsonMapper,
+ new OverlordActionBasedUsedSegmentsRetriever(toolbox)
);
+ Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
+ "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
+ loader
+ );
+ determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);
- determineConfigStatus = toolbox
- .getJsonMapper()
- .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class);
+ String[] determinePartitionsInput = new String[]{
+ toolbox.getJsonMapper().writeValueAsString(spec),
+ toolbox.getConfig().getHadoopWorkingPath(),
+ toolbox.getSegmentPusher().getPathForHadoop(),
+ hadoopJobIdFile
+ };
- indexerSchema = determineConfigStatus.getSchema();
- if (indexerSchema == null) {
- errorMsg = determineConfigStatus.getErrorMsg();
- toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
- return TaskStatus.failure(
- getId(),
- errorMsg
+ final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+ Class> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
+ Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
+ "runTask",
+ determinePartitionsInput.getClass()
+ );
+ try {
+ Thread.currentThread().setContextClassLoader(loader);
+
+ ingestionState = IngestionState.DETERMINE_PARTITIONS;
+
+ final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke(
+ determinePartitionsInnerProcessingRunner,
+ new Object[]{determinePartitionsInput}
);
- }
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- finally {
- Thread.currentThread().setContextClassLoader(oldLoader);
- }
- // We should have a lock from before we started running only if interval was specified
- String version;
- if (determineIntervals) {
- Interval interval = JodaUtils.umbrellaInterval(
- JodaUtils.condenseIntervals(
- indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
- )
- );
- final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
- // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
- final TaskLock lock = Preconditions.checkNotNull(
- toolbox.getTaskActionClient().submit(
- new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
- ),
- "Cannot acquire a lock for interval[%s]", interval
- );
- version = lock.getVersion();
- } else {
- Iterable locks = getTaskLocks(toolbox.getTaskActionClient());
- final TaskLock myLock = Iterables.getOnlyElement(locks);
- version = myLock.getVersion();
- }
- final String specVersion = indexerSchema.getTuningConfig().getVersion();
- if (indexerSchema.getTuningConfig().isUseExplicitVersion()) {
- if (specVersion.compareTo(version) < 0) {
- version = specVersion;
- } else {
- log.error(
- "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
- specVersion,
- version
- );
- toolbox.getTaskReportFileWriter().write(getId(), null);
- return TaskStatus.failure(getId());
- }
- }
+ determineConfigStatus = toolbox
+ .getJsonMapper()
+ .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class);
- log.info("Setting version to: %s", version);
-
- Object innerProcessingRunner = getForeignClassloaderObject(
- "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner",
- loader
- );
- buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner);
-
- String[] buildSegmentsInput = new String[]{
- toolbox.getJsonMapper().writeValueAsString(indexerSchema),
- version,
- hadoopJobIdFile
- };
-
- Class> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
- Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());
-
- try {
- Thread.currentThread().setContextClassLoader(loader);
-
- ingestionState = IngestionState.BUILD_SEGMENTS;
- final String jobStatusString = (String) innerProcessingRunTask.invoke(
- innerProcessingRunner,
- new Object[]{buildSegmentsInput}
- );
-
- buildSegmentsStatus = toolbox.getJsonMapper().readValue(
- jobStatusString,
- HadoopIndexGeneratorInnerProcessingStatus.class
- );
-
- if (buildSegmentsStatus.getDataSegments() != null) {
- toolbox.publishSegments(buildSegmentsStatus.getDataSegments());
-
- // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
- // for awaitSegmentAvailabilityTimeoutMillis
- if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
- ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
- ArrayList segmentsToWaitFor = new ArrayList<>(buildSegmentsStatus.getDataSegments());
- segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
- toolbox,
- segmentsToWaitFor,
- spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
+ indexerSchema = determineConfigStatus.getSchema();
+ if (indexerSchema == null) {
+ errorMsg = determineConfigStatus.getErrorMsg();
+ toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+ return TaskStatus.failure(
+ getId(),
+ errorMsg
);
}
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(oldLoader);
+ }
- ingestionState = IngestionState.COMPLETED;
- toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
- return TaskStatus.success(getId());
- } else {
- errorMsg = buildSegmentsStatus.getErrorMsg();
- toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
- return TaskStatus.failure(
- getId(),
- errorMsg
+ // We should have a lock from before we started running only if interval was specified
+ String version;
+ if (determineIntervals) {
+ Interval interval = JodaUtils.umbrellaInterval(
+ JodaUtils.condenseIntervals(
+ indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
+ )
);
+ final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
+ // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
+ final TaskLock lock = Preconditions.checkNotNull(
+ toolbox.getTaskActionClient().submit(
+ new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
+ ),
+ "Cannot acquire a lock for interval[%s]", interval
+ );
+ version = lock.getVersion();
+ } else {
+ Iterable locks = getTaskLocks(toolbox.getTaskActionClient());
+ final TaskLock myLock = Iterables.getOnlyElement(locks);
+ version = myLock.getVersion();
+ }
+
+ final String specVersion = indexerSchema.getTuningConfig().getVersion();
+ if (indexerSchema.getTuningConfig().isUseExplicitVersion()) {
+ if (specVersion.compareTo(version) < 0) {
+ version = specVersion;
+ } else {
+ log.error(
+ "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
+ specVersion,
+ version
+ );
+ toolbox.getTaskReportFileWriter().write(getId(), null);
+ return TaskStatus.failure(getId());
+ }
+ }
+
+ log.info("Setting version to: %s", version);
+
+ Object innerProcessingRunner = getForeignClassloaderObject(
+ "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner",
+ loader
+ );
+ buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner);
+
+ String[] buildSegmentsInput = new String[]{
+ toolbox.getJsonMapper().writeValueAsString(indexerSchema),
+ version,
+ hadoopJobIdFile
+ };
+
+ Class> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
+ Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());
+
+ try {
+ Thread.currentThread().setContextClassLoader(loader);
+
+ ingestionState = IngestionState.BUILD_SEGMENTS;
+ indexGeneratorJobAttempted = true;
+ final String jobStatusString = (String) innerProcessingRunTask.invoke(
+ innerProcessingRunner,
+ new Object[]{buildSegmentsInput}
+ );
+
+ buildSegmentsStatus = toolbox.getJsonMapper().readValue(
+ jobStatusString,
+ HadoopIndexGeneratorInnerProcessingStatus.class
+ );
+
+ List dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths();
+ if (dataSegmentAndIndexZipFilePaths != null) {
+ indexGeneratorJobSuccess = true;
+ try {
+ Thread.currentThread().setContextClassLoader(oldLoader);
+ renameSegmentIndexFilesJob(
+ toolbox.getJsonMapper().writeValueAsString(indexerSchema),
+ toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
+ );
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(loader);
+ }
+ ArrayList segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream()
+ .map(
+ DataSegmentAndIndexZipFilePath::getSegment)
+ .collect(Collectors.toList()));
+ toolbox.publishSegments(segments);
+
+ // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
+ // for awaitSegmentAvailabilityTimeoutMillis
+ if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
+ ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
+ segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
+ toolbox,
+ segments,
+ spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
+ );
+ }
+
+ ingestionState = IngestionState.COMPLETED;
+ toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+ return TaskStatus.success(getId());
+ } else {
+ errorMsg = buildSegmentsStatus.getErrorMsg();
+ toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+ return TaskStatus.failure(
+ getId(),
+ errorMsg
+ );
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(oldLoader);
}
}
- catch (Exception e) {
- throw new RuntimeException(e);
- }
finally {
- Thread.currentThread().setContextClassLoader(oldLoader);
+ indexerGeneratorCleanupJob(
+ indexGeneratorJobAttempted,
+ indexGeneratorJobSuccess,
+ indexerSchema == null ? null : toolbox.getJsonMapper().writeValueAsString(indexerSchema)
+ );
}
}
@@ -514,6 +545,96 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
}
}
+ private void renameSegmentIndexFilesJob(
+ String hadoopIngestionSpecStr,
+ String dataSegmentAndIndexZipFilePathListStr
+ )
+ {
+ final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ ClassLoader loader = HadoopTask.buildClassLoader(
+ getHadoopDependencyCoordinates(),
+ taskConfig.getDefaultHadoopCoordinates()
+ );
+
+ Object renameSegmentIndexFilesRunner = getForeignClassloaderObject(
+ "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner",
+ loader
+ );
+
+ String[] renameSegmentIndexFilesJobInput = new String[]{
+ hadoopIngestionSpecStr,
+ dataSegmentAndIndexZipFilePathListStr
+ };
+
+ Class> buildRenameSegmentIndexFilesJobRunnerClass = renameSegmentIndexFilesRunner.getClass();
+ Method renameSegmentIndexFiles = buildRenameSegmentIndexFilesJobRunnerClass.getMethod(
+ "runTask",
+ renameSegmentIndexFilesJobInput.getClass()
+ );
+
+ Thread.currentThread().setContextClassLoader(loader);
+ renameSegmentIndexFiles.invoke(
+ renameSegmentIndexFilesRunner,
+ new Object[]{renameSegmentIndexFilesJobInput}
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(oldLoader);
+ }
+ }
+
+ private void indexerGeneratorCleanupJob(
+ boolean indexGeneratorJobAttempted,
+ boolean indexGeneratorJobSuccess,
+ String hadoopIngestionSpecStr
+ )
+ {
+ if (!indexGeneratorJobAttempted) {
+ log.info("No need for cleanup as index generator job did not even run");
+ return;
+ }
+
+ final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ ClassLoader loader = HadoopTask.buildClassLoader(
+ getHadoopDependencyCoordinates(),
+ taskConfig.getDefaultHadoopCoordinates()
+ );
+
+ Object indexerGeneratorCleanupRunner = getForeignClassloaderObject(
+ "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner",
+ loader
+ );
+
+ String[] indexerGeneratorCleanupJobInput = new String[]{
+ indexGeneratorJobSuccess ? "true" : "false",
+ hadoopIngestionSpecStr,
+ };
+
+ Class> buildIndexerGeneratorCleanupRunnerClass = indexerGeneratorCleanupRunner.getClass();
+ Method indexerGeneratorCleanup = buildIndexerGeneratorCleanupRunnerClass.getMethod(
+ "runTask",
+ indexerGeneratorCleanupJobInput.getClass()
+ );
+
+ Thread.currentThread().setContextClassLoader(loader);
+ indexerGeneratorCleanup.invoke(
+ indexerGeneratorCleanupRunner,
+ new Object[]{indexerGeneratorCleanupJobInput}
+ );
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to cleanup after index generator job");
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(oldLoader);
+ }
+ }
+
@GET
@Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON)
@@ -722,7 +843,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
if (job.run()) {
return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(
new HadoopIndexGeneratorInnerProcessingStatus(
- job.getPublishedSegments(),
+ job.getPublishedSegmentAndIndexZipFilePaths(),
job.getStats(),
null
)
@@ -790,28 +911,111 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
}
}
+ @SuppressWarnings("unused")
+ public static class HadoopRenameSegmentIndexFilesRunner
+ {
+ TypeReference> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH =
+ new TypeReference>()
+ {
+ };
+
+ public void runTask(String[] args) throws Exception
+ {
+ if (args.length != 2) {
+ log.warn("HadoopRenameSegmentIndexFilesRunner called with improper number of arguments");
+ }
+ String hadoopIngestionSpecStr = args[0];
+ String dataSegmentAndIndexZipFilePathListStr = args[1];
+
+ HadoopIngestionSpec indexerSchema;
+ List dataSegmentAndIndexZipFilePaths;
+ try {
+ indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
+ hadoopIngestionSpecStr,
+ HadoopIngestionSpec.class
+ );
+ dataSegmentAndIndexZipFilePaths = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
+ dataSegmentAndIndexZipFilePathListStr,
+ LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH
+ );
+ }
+ catch (Exception e) {
+ log.warn(
+ e,
+ "HadoopRenameSegmentIndexFilesRunner: Error occurred while trying to read input parameters into data objects"
+ );
+ throw e;
+ }
+ JobHelper.renameIndexFilesForSegments(
+ indexerSchema,
+ dataSegmentAndIndexZipFilePaths
+ );
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public static class HadoopIndexerGeneratorCleanupRunner
+ {
+ TypeReference> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH =
+ new TypeReference>()
+ {
+ };
+
+ public void runTask(String[] args) throws Exception
+ {
+ if (args.length != 2) {
+ log.warn("HadoopIndexerGeneratorCleanupRunner called with improper number of arguments");
+ }
+
+ String indexGeneratorJobSucceededStr = args[0];
+ String hadoopIngestionSpecStr = args[1];
+
+ HadoopIngestionSpec indexerSchema;
+ boolean indexGeneratorJobSucceeded;
+ List dataSegmentAndIndexZipFilePaths;
+ try {
+ indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
+ hadoopIngestionSpecStr,
+ HadoopIngestionSpec.class
+ );
+ indexGeneratorJobSucceeded = BooleanUtils.toBoolean(indexGeneratorJobSucceededStr);
+ }
+ catch (Exception e) {
+ log.warn(
+ e,
+ "HadoopIndexerGeneratorCleanupRunner: Error occurred while trying to read input parameters into data objects"
+ );
+ throw e;
+ }
+ JobHelper.maybeDeleteIntermediatePath(
+ indexGeneratorJobSucceeded,
+ indexerSchema
+ );
+ }
+ }
+
public static class HadoopIndexGeneratorInnerProcessingStatus
{
- private final List dataSegments;
+ private final List dataSegmentAndIndexZipFilePaths;
private final Map metrics;
private final String errorMsg;
@JsonCreator
public HadoopIndexGeneratorInnerProcessingStatus(
- @JsonProperty("dataSegments") List dataSegments,
+ @JsonProperty("dataSegmentAndIndexZipFilePaths") List dataSegmentAndIndexZipFilePaths,
@JsonProperty("metrics") Map metrics,
@JsonProperty("errorMsg") String errorMsg
)
{
- this.dataSegments = dataSegments;
+ this.dataSegmentAndIndexZipFilePaths = dataSegmentAndIndexZipFilePaths;
this.metrics = metrics;
this.errorMsg = errorMsg;
}
@JsonProperty
- public List getDataSegments()
+ public List getDataSegmentAndIndexZipFilePaths()
{
- return dataSegments;
+ return dataSegmentAndIndexZipFilePaths;
}
@JsonProperty
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 2fb9adc239e..8ad9cec8ffa 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -102,6 +102,10 @@
javax.servlet
servlet-api
+
+ com.squareup.okhttp
+ okhttp
+
diff --git a/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java
index 4235abbf884..b227ababb44 100644
--- a/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java
@@ -117,9 +117,12 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
);
List jobs = new ArrayList<>();
+ HadoopDruidIndexerJob indexerJob = new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class));
jobs.add(new HadoopDruidDetermineConfigurationJob(config));
- jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)));
- JobHelper.runJobs(jobs, config);
+ jobs.add(indexerJob);
+ boolean jobsSucceeded = JobHelper.runJobs(jobs);
+ JobHelper.renameIndexFilesForSegments(config.getSchema(), indexerJob.getPublishedSegmentAndIndexZipFilePaths());
+ JobHelper.maybeDeleteIntermediatePath(jobsSucceeded, config.getSchema());
}
catch (Exception e) {