From a2892d9c40793027ba8a8977c85b3de4a949e11c Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 21 Apr 2021 09:24:31 -1000 Subject: [PATCH] Adjust HadoopIndexTask temp segment renaming to avoid potential race conditions (#11075) * Do stuff * Do more stuff * * Do more stuff * * Do more stuff * * working * * cleanup * * more cleanup * * more cleanup * * add license header * * Add unit tests * * add java docs * * add more unit tests * * Cleanup test * * Move removing of workingPath to index task rather than in hadoop job. * * Address review comments * * remove unused import * * Address review comments * Do not overwrite segment descriptor for segment if it already exists. * * add comments to FileSystemHelper class * * fix local hadoop integration test --- indexing-hadoop/pom.xml | 15 + .../DataSegmentAndIndexZipFilePath.java | 97 ++++ .../druid/indexer/FileSystemHelper.java | 38 ++ .../HadoopDruidDetermineConfigurationJob.java | 7 +- .../druid/indexer/HadoopDruidIndexerJob.java | 13 +- .../druid/indexer/IndexGeneratorJob.java | 20 +- .../org/apache/druid/indexer/JobHelper.java | 105 ++-- .../indexer/MetadataStorageUpdaterJob.java | 4 +- .../indexer/BatchDeltaIngestionTest.java | 10 +- .../DataSegmentAndIndexZipFilePathTest.java | 185 +++++++ .../indexer/HadoopDruidIndexerJobTest.java | 76 +++ .../druid/indexer/IndexGeneratorJobTest.java | 16 +- .../druid/indexer/JobHelperPowerMockTest.java | 216 ++++++++ .../MetadataStorageUpdaterJobTest.java | 82 +++ .../indexing/common/task/HadoopIndexTask.java | 510 ++++++++++++------ integration-tests/pom.xml | 4 + .../druid/cli/CliInternalHadoopIndexer.java | 7 +- 17 files changed, 1185 insertions(+), 220 deletions(-) create mode 100644 indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java create mode 100644 indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java create mode 100644 indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java create mode 100644 indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java create mode 100644 indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java create mode 100644 indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 9557ab5b759..8eacc7e2220 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -202,6 +202,21 @@ mockito-core test + + org.powermock + powermock-core + test + + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-easymock + test + diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java new file mode 100644 index 00000000000..e12f7fbf5dc --- /dev/null +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePath.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.timeline.DataSegment; + +import java.util.List; +import java.util.Objects; + +/** + * holds a {@link DataSegment} with the temporary file path where the corresponding index zip file is currently stored + * and the final path where the index zip file should eventually be moved to. + * see {@link JobHelper#renameIndexFilesForSegments(HadoopIngestionSpec, List)} + */ +public class DataSegmentAndIndexZipFilePath +{ + private final DataSegment segment; + private final String tmpIndexZipFilePath; + private final String finalIndexZipFilePath; + + @JsonCreator + public DataSegmentAndIndexZipFilePath( + @JsonProperty("segment") DataSegment segment, + @JsonProperty("tmpIndexZipFilePath") String tmpIndexZipFilePath, + @JsonProperty("finalIndexZipFilePath") String finalIndexZipFilePath + ) + { + this.segment = segment; + this.tmpIndexZipFilePath = tmpIndexZipFilePath; + this.finalIndexZipFilePath = finalIndexZipFilePath; + } + + @JsonProperty + public DataSegment getSegment() + { + return segment; + } + + @JsonProperty + public String getTmpIndexZipFilePath() + { + return tmpIndexZipFilePath; + } + + @JsonProperty + public String getFinalIndexZipFilePath() + { + return finalIndexZipFilePath; + } + + @Override + public boolean equals(Object o) + { + if (o instanceof DataSegmentAndIndexZipFilePath) { + DataSegmentAndIndexZipFilePath that = (DataSegmentAndIndexZipFilePath) o; + return segment.equals(((DataSegmentAndIndexZipFilePath) o).getSegment()) + && tmpIndexZipFilePath.equals(that.getTmpIndexZipFilePath()) + && finalIndexZipFilePath.equals(that.getFinalIndexZipFilePath()); + } + return false; + } + + @Override + public int hashCode() + { + return Objects.hash(segment.getId(), tmpIndexZipFilePath); + } + + @Override + public String toString() + { + return "DataSegmentAndIndexZipFilePath{" + + "segment=" + segment + + ", tmpIndexZipFilePath=" + tmpIndexZipFilePath + + ", finalIndexZipFilePath=" + finalIndexZipFilePath + + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java new file mode 100644 index 00000000000..96fde6b8ece --- /dev/null +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/FileSystemHelper.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; +import java.net.URI; + +/** + * This class exists for testing purposes, see {@link JobHelperPowerMockTest}. Using the + * raw {@link FileSystem} class resulted in errors with java assist. + */ +public class FileSystemHelper +{ + public static FileSystem get(URI uri, Configuration conf) throws IOException + { + return FileSystem.get(uri, conf); + } +} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java index 8b5b4b6b0bb..ea37db1a10e 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -59,7 +59,12 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby if (config.isDeterminingPartitions()) { job = createPartitionJob(config); config.setHadoopJobIdFileName(hadoopJobIdFile); - return JobHelper.runSingleJob(job, config); + boolean jobSucceeded = JobHelper.runSingleJob(job); + JobHelper.maybeDeleteIntermediatePath( + jobSucceeded, + config.getSchema() + ); + return jobSucceeded; } else { final PartitionsSpec partitionsSpec = config.getPartitionsSpec(); final int shardsPerInterval; diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java index 25683f32bd8..58977ad4840 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerJob.java @@ -22,7 +22,6 @@ package org.apache.druid.indexer; import com.google.common.base.Preconditions; import com.google.inject.Inject; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; import java.util.ArrayList; @@ -40,7 +39,7 @@ public class HadoopDruidIndexerJob implements Jobby @Nullable private IndexGeneratorJob indexJob; @Nullable - private volatile List publishedSegments = null; + private volatile List publishedSegmentAndIndexZipFilePaths = null; @Nullable private String hadoopJobIdFile; @@ -91,14 +90,14 @@ public class HadoopDruidIndexerJob implements Jobby @Override public boolean run() { - publishedSegments = IndexGeneratorJob.getPublishedSegments(config); + publishedSegmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config); return true; } } ); config.setHadoopJobIdFileName(hadoopJobIdFile); - return JobHelper.runJobs(jobs, config); + return JobHelper.runJobs(jobs); } @Override @@ -122,12 +121,12 @@ public class HadoopDruidIndexerJob implements Jobby return indexJob.getErrorMessage(); } - public List getPublishedSegments() + public List getPublishedSegmentAndIndexZipFilePaths() { - if (publishedSegments == null) { + if (publishedSegmentAndIndexZipFilePaths == null) { throw new IllegalStateException("Job hasn't run yet. No segments have been published yet."); } - return publishedSegments; + return publishedSegmentAndIndexZipFilePaths; } public void setHadoopJobIdFile(String hadoopJobIdFile) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index a12e76571db..9124b9b4c1d 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -102,14 +102,14 @@ public class IndexGeneratorJob implements Jobby { private static final Logger log = new Logger(IndexGeneratorJob.class); - public static List getPublishedSegments(HadoopDruidIndexerConfig config) + public static List getPublishedSegmentAndIndexZipFilePaths(HadoopDruidIndexerConfig config) { final Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config); config.addJobProperties(conf); final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER; - ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); + ImmutableList.Builder publishedSegmentAndIndexZipFilePathsBuilder = ImmutableList.builder(); final Path descriptorInfoDir = config.makeDescriptorInfoDir(); @@ -117,9 +117,9 @@ public class IndexGeneratorJob implements Jobby FileSystem fs = descriptorInfoDir.getFileSystem(conf); for (FileStatus status : fs.listStatus(descriptorInfoDir)) { - final DataSegment segment = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegment.class); - publishedSegmentsBuilder.add(segment); - log.info("Adding segment %s to the list of published segments", segment.getId()); + final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class); + publishedSegmentAndIndexZipFilePathsBuilder.add(segmentAndIndexZipFilePath); + log.info("Adding segment %s to the list of published segments", segmentAndIndexZipFilePath.getSegment().getId()); } } catch (FileNotFoundException e) { @@ -133,9 +133,9 @@ public class IndexGeneratorJob implements Jobby catch (IOException e) { throw new RuntimeException(e); } - List publishedSegments = publishedSegmentsBuilder.build(); + List publishedSegmentAndIndexZipFilePaths = publishedSegmentAndIndexZipFilePathsBuilder.build(); - return publishedSegments; + return publishedSegmentAndIndexZipFilePaths; } private final HadoopDruidIndexerConfig config; @@ -809,7 +809,7 @@ public class IndexGeneratorJob implements Jobby 0 ); - final DataSegment segment = JobHelper.serializeOutIndex( + final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex( segmentTemplate, context.getConfiguration(), context, @@ -831,7 +831,7 @@ public class IndexGeneratorJob implements Jobby HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER ); - Path descriptorPath = config.makeDescriptorInfoPath(segment); + Path descriptorPath = config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment()); descriptorPath = JobHelper.prependFSIfNullScheme( FileSystem.get( descriptorPath.toUri(), @@ -842,7 +842,7 @@ public class IndexGeneratorJob implements Jobby log.info("Writing descriptor to path[%s]", descriptorPath); JobHelper.writeSegmentDescriptor( config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()), - segment, + segmentAndIndexZipFilePath, descriptorPath, context ); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java index 7d99d03ad56..b8c29b82f7a 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java @@ -386,29 +386,13 @@ public class JobHelper } } - public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config) + public static boolean runSingleJob(Jobby job) { boolean succeeded = job.run(); - - if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) { - if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) { - Path workingPath = config.makeIntermediatePath(); - log.info("Deleting path[%s]", workingPath); - try { - Configuration conf = injectSystemProperties(new Configuration(), config); - config.addJobProperties(conf); - workingPath.getFileSystem(conf).delete(workingPath, true); - } - catch (IOException e) { - log.error(e, "Failed to cleanup path[%s]", workingPath); - } - } - } - return succeeded; } - public static boolean runJobs(List jobs, HadoopDruidIndexerConfig config) + public static boolean runJobs(List jobs) { boolean succeeded = true; for (Jobby job : jobs) { @@ -418,25 +402,33 @@ public class JobHelper } } + return succeeded; + } + + public static void maybeDeleteIntermediatePath( + boolean jobSucceeded, + HadoopIngestionSpec indexerSchema) + { + HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema); + final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config); + config.addJobProperties(configuration); + JobHelper.injectDruidProperties(configuration, config); if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) { - if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) { + if (jobSucceeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) { Path workingPath = config.makeIntermediatePath(); log.info("Deleting path[%s]", workingPath); try { - Configuration conf = injectSystemProperties(new Configuration(), config); - config.addJobProperties(conf); - workingPath.getFileSystem(conf).delete(workingPath, true); + config.addJobProperties(configuration); + workingPath.getFileSystem(configuration).delete(workingPath, true); } catch (IOException e) { log.error(e, "Failed to cleanup path[%s]", workingPath); } } } - - return succeeded; } - public static DataSegment serializeOutIndex( + public static DataSegmentAndIndexZipFilePath serializeOutIndex( final DataSegment segmentTemplate, final Configuration configuration, final Progressable progressable, @@ -482,20 +474,16 @@ public class JobHelper .withSize(size.get()) .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase)); - if (!renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) { - throw new IOE( - "Unable to rename [%s] to [%s]", - tmpPath.toUri().toString(), - finalIndexZipFilePath.toUri().toString() - ); - } - - return finalSegment; + return new DataSegmentAndIndexZipFilePath( + finalSegment, + tmpPath.toUri().getPath(), + finalIndexZipFilePath.toUri().getPath() + ); } public static void writeSegmentDescriptor( final FileSystem outputFS, - final DataSegment segment, + final DataSegmentAndIndexZipFilePath segmentAndPath, final Path descriptorPath, final Progressable progressable ) @@ -511,9 +499,12 @@ public class JobHelper try { progressable.progress(); if (outputFS.exists(descriptorPath)) { - if (!outputFS.delete(descriptorPath, false)) { - throw new IOE("Failed to delete descriptor at [%s]", descriptorPath); - } + // If the descriptor path already exists, don't overwrite, and risk clobbering it. + // If it already exists, it means that the segment data is already written to the + // tmp path, and the existing descriptor written should give us the information we + // need to rename the segment index to final path and publish it in the top level task. + log.info("descriptor path [%s] already exists, not overwriting", descriptorPath); + return -1; } try (final OutputStream descriptorOut = outputFS.create( descriptorPath, @@ -521,7 +512,7 @@ public class JobHelper DEFAULT_FS_BUFFER_SIZE, progressable )) { - HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segment); + HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segmentAndPath); } } catch (RuntimeException | IOException ex) { @@ -632,7 +623,39 @@ public class JobHelper } /** - * Rename the files. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename + * Renames the index files for the segments. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename + * which will not overwrite. Note: segments should be renamed in the index task, not in a hadoop job, as race + * conditions between job retries can cause the final segment index file path to get clobbered. + * + * @param indexerSchema the hadoop ingestion spec + * @param segmentAndIndexZipFilePaths the list of segments with their currently stored tmp path and the final path + * that they should be renamed to. + */ + public static void renameIndexFilesForSegments( + HadoopIngestionSpec indexerSchema, + List segmentAndIndexZipFilePaths + ) throws IOException + { + HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema); + final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config); + config.addJobProperties(configuration); + JobHelper.injectDruidProperties(configuration, config); + for (DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath : segmentAndIndexZipFilePaths) { + Path tmpPath = new Path(segmentAndIndexZipFilePath.getTmpIndexZipFilePath()); + Path finalIndexZipFilePath = new Path(segmentAndIndexZipFilePath.getFinalIndexZipFilePath()); + final FileSystem outputFS = FileSystemHelper.get(finalIndexZipFilePath.toUri(), configuration); + if (!renameIndexFile(outputFS, tmpPath, finalIndexZipFilePath)) { + throw new IOE( + "Unable to rename [%s] to [%s]", + tmpPath.toUri().toString(), + finalIndexZipFilePath.toUri().toString() + ); + } + } + } + + /** + * Rename the file. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename * which will not overwrite * * @param outputFS The output fs @@ -641,7 +664,7 @@ public class JobHelper * * @return False if a rename failed, true otherwise (rename success or no rename needed) */ - private static boolean renameIndexFiles( + private static boolean renameIndexFile( final FileSystem outputFS, final Path indexZipFilePath, final Path finalIndexZipFilePath diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java index b7eb60bf093..bdadc95010e 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/MetadataStorageUpdaterJob.java @@ -22,6 +22,7 @@ package org.apache.druid.indexer; import org.apache.druid.timeline.DataSegment; import java.util.List; +import java.util.stream.Collectors; /** */ @@ -42,7 +43,8 @@ public class MetadataStorageUpdaterJob implements Jobby @Override public boolean run() { - final List segments = IndexGeneratorJob.getPublishedSegments(config); + final List segmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config); + final List segments = segmentAndIndexZipFilePaths.stream().map(s -> s.getSegment()).collect(Collectors.toList()); final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable(); handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.JSON_MAPPER); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 404d5ed67b8..913c6481e57 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -372,7 +372,15 @@ public class BatchDeltaIngestionTest ) throws Exception { IndexGeneratorJob job = new IndexGeneratorJob(config); - Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config)); + Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job))); + + List dataSegmentAndIndexZipFilePaths = + IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config); + JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths); + + JobHelper.maybeDeleteIntermediatePath(true, config.getSchema()); + File workingPath = new File(config.makeIntermediatePath().toUri().getPath()); + Assert.assertFalse(workingPath.exists()); File segmentFolder = new File( StringUtils.format( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java new file mode 100644 index 00000000000..3dcd2033e2f --- /dev/null +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class DataSegmentAndIndexZipFilePathTest +{ + private static final SegmentId SEGMENT_ID = SegmentId.dummy("data-source", 1); + private static final SegmentId OTHER_SEGMENT_ID = SegmentId.dummy("data-source2", 1); + private static final DataSegment SEGMENT = new DataSegment( + SEGMENT_ID, + null, + null, + null, + new NumberedShardSpec(1, 10), + null, + 0, + 0 + ); + private static final DataSegment OTHER_SEGMENT = new DataSegment( + OTHER_SEGMENT_ID, + null, + null, + null, + new NumberedShardSpec(1, 10), + null, + 0, + 0 + ); + + private DataSegmentAndIndexZipFilePath target; + + @Test + public void test_equals_otherNull_notEqual() + { + String tmpPath = "tmpPath"; + String finalPath = "finalPath"; + target = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + finalPath + ); + Assert.assertNotEquals(target, null); + } + + @Test + public void test_equals_differentSegmentId_notEqual() + { + String tmpPath = "tmpPath"; + String finalPath = "finalPath"; + target = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + finalPath + ); + + DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath( + OTHER_SEGMENT, + tmpPath, + finalPath + ); + Assert.assertNotEquals(target, other); + } + + @Test + public void test_equals_differentTmpPath_notEqual() + { + String tmpPath = "tmpPath"; + String otherTmpPath = "otherTmpPath"; + String finalPath = "finalPath"; + target = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + finalPath + ); + + DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath( + SEGMENT, + otherTmpPath, + finalPath + ); + Assert.assertNotEquals(target, other); + } + + @Test + public void test_equals_differentFinalPath_notEqual() + { + String tmpPath = "tmpPath"; + String finalPath = "finalPath"; + String otherFinalPath = "otherFinalPath"; + target = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + finalPath + ); + + DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + otherFinalPath + ); + Assert.assertNotEquals(target, other); + } + + @Test + public void test_equals_allFieldsEqualValue_equal() + { + String tmpPath = "tmpPath"; + String finalPath = "finalPath"; + target = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + finalPath + ); + + DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + finalPath + ); + Assert.assertEquals(target, other); + } + + @Test + public void test_equals_sameObject_equal() + { + String tmpPath = "tmpPath"; + String finalPath = "finalPath"; + target = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + finalPath + ); + + Assert.assertEquals(target, target); + } + + @Test + public void test_serde() throws IOException + { + String tmpPath = "tmpPath"; + String finalPath = "finalPath"; + target = new DataSegmentAndIndexZipFilePath( + SEGMENT, + tmpPath, + finalPath + ); + + final InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT); + final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues(injectableValues); + final String json = mapper.writeValueAsString(target); + final DataSegmentAndIndexZipFilePath fromJson = + mapper.readValue(json, DataSegmentAndIndexZipFilePath.class); + Assert.assertEquals(target, fromJson); + } +} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java new file mode 100644 index 00000000000..8231b7ce4aa --- /dev/null +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerJobTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.List; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + JobHelper.class, + IndexGeneratorJob.class +}) +@PowerMockIgnore({"javax.net.ssl.*"}) +public class HadoopDruidIndexerJobTest +{ + private HadoopDruidIndexerConfig config; + private MetadataStorageUpdaterJobHandler handler; + private HadoopDruidIndexerJob target; + + @Test + public void test_run() + { + config = PowerMock.createMock(HadoopDruidIndexerConfig.class); + handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class); + PowerMock.mockStaticNice(JobHelper.class); + PowerMock.mockStaticNice(IndexGeneratorJob.class); + config.verify(); + EasyMock.expectLastCall(); + EasyMock.expect(config.isUpdaterJobSpecSet()).andReturn(false).anyTimes(); + config.setHadoopJobIdFileName(EasyMock.anyString()); + EasyMock.expectLastCall(); + JobHelper.ensurePaths(config); + EasyMock.expectLastCall(); + Capture> capturedJobs = Capture.newInstance(); + EasyMock.expect(JobHelper.runJobs(EasyMock.capture(capturedJobs))).andReturn(true); + EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(EasyMock.anyObject())).andReturn(null); + + + PowerMock.replayAll(); + + target = new HadoopDruidIndexerJob(config, handler); + target.run(); + + List jobs = capturedJobs.getValue(); + Assert.assertEquals(2, jobs.size()); + jobs.stream().filter(job -> !(job instanceof IndexGeneratorJob)).forEach(job -> Assert.assertTrue(job.run())); + + PowerMock.verifyAll(); + } +} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 97967bdb3b7..24ff1227ee5 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -621,13 +621,21 @@ public class IndexGeneratorJobTest private void verifyJob(IndexGeneratorJob job) throws IOException { - Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config)); + Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job))); final Map> intervalToSegments = new HashMap<>(); IndexGeneratorJob - .getPublishedSegments(config) - .forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()) - .add(segment)); + .getPublishedSegmentAndIndexZipFilePaths(config) + .forEach(segmentAndIndexZipFilePath -> intervalToSegments.computeIfAbsent(segmentAndIndexZipFilePath.getSegment().getInterval(), k -> new ArrayList<>()) + .add(segmentAndIndexZipFilePath.getSegment())); + + List dataSegmentAndIndexZipFilePaths = + IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config); + JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths); + + JobHelper.maybeDeleteIntermediatePath(true, config.getSchema()); + File workingPath = new File(config.makeIntermediatePath().toUri().getPath()); + Assert.assertTrue(workingPath.exists()); final Map> intervalToIndexFiles = new HashMap<>(); int segmentNum = 0; diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java new file mode 100644 index 00000000000..48f653a8c3b --- /dev/null +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperPowerMockTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.IOE; +import org.apache.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + FileSystemHelper.class, + HadoopDruidIndexerConfig.class +}) +@PowerMockIgnore({"javax.net.ssl.*"}) +public class JobHelperPowerMockTest +{ + private static final String TMP_PATH = "/tmp/index.zip.0"; + private static final String FINAL_PATH = "/final/index.zip.0"; + + private HadoopDruidIndexerConfig indexerConfig; + + @Test + public void test_renameIndexFilesForSegments_emptySegments() throws IOException + { + HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); + List segmentAndIndexZipFilePaths = ImmutableList.of(); + + PowerMock.replayAll(); + + JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths); + + PowerMock.verifyAll(); + } + + @Test + public void test_renameIndexFilesForSegments_segmentIndexFileRenamedSuccessfully() + throws IOException + { + HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); + mockFileSystem(true); + DataSegment segment = PowerMock.createMock(DataSegment.class); + + List segmentAndIndexZipFilePaths = ImmutableList.of( + new DataSegmentAndIndexZipFilePath( + segment, + TMP_PATH, + FINAL_PATH + ) + ); + PowerMock.replayAll(); + + JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths); + + PowerMock.verifyAll(); + } + + @Test (expected = IOE.class) + public void test_renameIndexFilesForSegments_segmentIndexFileRenamedFailed_throwsException() + throws IOException + { + HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); + mockFileSystem(false); + DataSegment segment = PowerMock.createMock(DataSegment.class); + List segmentAndIndexZipFilePaths = ImmutableList.of( + new DataSegmentAndIndexZipFilePath( + segment, + TMP_PATH, + FINAL_PATH + ) + ); + + PowerMock.replayAll(); + + JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths); + + PowerMock.verifyAll(); + } + + @Test + public void test_maybeDeleteIntermediatePath_leaveIntermediate_doesNotDeleteIntermediatePath() + { + HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); + HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class); + EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(true); + EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig); + + PowerMock.replayAll(); + + JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec); + + PowerMock.verifyAll(); + } + + @Test + public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexerJobSucceeded_deleteIntermediatePath() + throws IOException + { + HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); + HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class); + Path workingPath = PowerMock.createMock(Path.class); + FileSystem workingPathFs = PowerMock.createMock(FileSystem.class); + EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false); + EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig); + EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true); + EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs); + EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath); + + PowerMock.replayAll(); + + JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec); + + PowerMock.verifyAll(); + } + + @Test + public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexJobFailedAndCleanupOnFailure_deleteIntermediatePath() + throws IOException + { + HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); + HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class); + Path workingPath = PowerMock.createMock(Path.class); + FileSystem workingPathFs = PowerMock.createMock(FileSystem.class); + EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false); + EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true); + EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes(); + EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true); + EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs); + EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath); + + PowerMock.replayAll(); + + JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec); + + PowerMock.verifyAll(); + } + + @Test + public void test_maybeDeleteIntermediatePath_deleteThrowsException_noExceptionPropogated() + throws IOException + { + HadoopIngestionSpec ingestionSpec = mockIngestionSpec(); + HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class); + Path workingPath = PowerMock.createMock(Path.class); + FileSystem workingPathFs = PowerMock.createMock(FileSystem.class); + EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false); + EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true); + EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes(); + EasyMock.expect(workingPathFs.delete(workingPath, true)).andThrow(new IOException("Delete Exception")); + EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs); + EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath); + + PowerMock.replayAll(); + + JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec); + + PowerMock.verifyAll(); + } + + private HadoopIngestionSpec mockIngestionSpec() + { + indexerConfig = PowerMock.createMock(HadoopDruidIndexerConfig.class); + HadoopIngestionSpec ingestionSpec = PowerMock.createMock(HadoopIngestionSpec.class); + PowerMock.mockStaticNice(HadoopDruidIndexerConfig.class); + EasyMock.expect(indexerConfig.getAllowedProperties()).andReturn(ImmutableMap.of()).anyTimes(); + indexerConfig.addJobProperties(EasyMock.anyObject(Configuration.class)); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(HadoopDruidIndexerConfig.fromSpec(ingestionSpec)).andReturn(indexerConfig); + EasyMock.expect(indexerConfig.getSchema()).andReturn(ingestionSpec).anyTimes(); + return ingestionSpec; + } + + private void mockFileSystem(boolean renameSuccess) throws IOException + { + PowerMock.mockStaticNice(FileSystemHelper.class); + FileSystem fileSystem = PowerMock.createMock(FileSystem.class); + EasyMock.expect(FileSystemHelper.get( + EasyMock.anyObject(URI.class), + EasyMock.anyObject(Configuration.class) + )).andReturn(fileSystem); + EasyMock.expect(fileSystem.exists(EasyMock.anyObject(Path.class))).andReturn(false); + EasyMock.expect(fileSystem.rename(EasyMock.anyObject(Path.class), EasyMock.anyObject(Path.class))) + .andReturn(renameSuccess); + } +} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java new file mode 100644 index 00000000000..0b867630cd1 --- /dev/null +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/MetadataStorageUpdaterJobTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.indexer.updater.MetadataStorageUpdaterJobSpec; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.List; +import java.util.stream.Collectors; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + IndexGeneratorJob.class +}) +@PowerMockIgnore({"javax.net.ssl.*"}) +public class MetadataStorageUpdaterJobTest +{ + private static final List DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS = ImmutableList.of( + new DataSegmentAndIndexZipFilePath(null, null, null) + ); + private static final String SEGMENT_TABLE = "segments"; + private HadoopIngestionSpec spec; + private HadoopIOConfig ioConfig; + private MetadataStorageUpdaterJobSpec metadataUpdateSpec; + private HadoopDruidIndexerConfig config; + private MetadataStorageUpdaterJobHandler handler; + private MetadataStorageUpdaterJob target; + + @Test + public void test_run() + { + metadataUpdateSpec = PowerMock.createMock(MetadataStorageUpdaterJobSpec.class); + ioConfig = PowerMock.createMock(HadoopIOConfig.class); + spec = PowerMock.createMock(HadoopIngestionSpec.class); + config = PowerMock.createMock(HadoopDruidIndexerConfig.class); + handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class); + PowerMock.mockStaticNice(IndexGeneratorJob.class); + + EasyMock.expect(metadataUpdateSpec.getSegmentTable()).andReturn(SEGMENT_TABLE); + EasyMock.expect(ioConfig.getMetadataUpdateSpec()).andReturn(metadataUpdateSpec); + EasyMock.expect(spec.getIOConfig()).andReturn(ioConfig); + EasyMock.expect(config.getSchema()).andReturn(spec); + EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config)) + .andReturn(DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS); + handler.publishSegments( + SEGMENT_TABLE, + DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS.stream().map(s -> s.getSegment()).collect( + Collectors.toList()), HadoopDruidIndexerConfig.JSON_MAPPER); + EasyMock.expectLastCall(); + target = new MetadataStorageUpdaterJob(config, handler); + + PowerMock.replayAll(); + + target.run(); + + PowerMock.verifyAll(); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index b66ae471136..37ffb4cd790 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -30,6 +31,8 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import org.apache.commons.lang.BooleanUtils; +import org.apache.druid.indexer.DataSegmentAndIndexZipFilePath; import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob; import org.apache.druid.indexer.HadoopDruidIndexerConfig; import org.apache.druid.indexer.HadoopDruidIndexerJob; @@ -83,6 +86,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class HadoopIndexTask extends HadoopTask implements ChatHandler { @@ -307,170 +311,197 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler @SuppressWarnings("unchecked") private TaskStatus runInternal(TaskToolbox toolbox) throws Exception { - registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); - String hadoopJobIdFile = getHadoopJobIdFileName(); - final ClassLoader loader = buildClassLoader(toolbox); - boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(); - - HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( - spec, - jsonMapper, - new OverlordActionBasedUsedSegmentsRetriever(toolbox) - ); - - Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner", - loader - ); - determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner); - - String[] determinePartitionsInput = new String[]{ - toolbox.getJsonMapper().writeValueAsString(spec), - toolbox.getConfig().getHadoopWorkingPath(), - toolbox.getSegmentPusher().getPathForHadoop(), - hadoopJobIdFile - }; - - HadoopIngestionSpec indexerSchema; - final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - Class determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass(); - Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod( - "runTask", - determinePartitionsInput.getClass() - ); + boolean indexGeneratorJobAttempted = false; + boolean indexGeneratorJobSuccess = false; + HadoopIngestionSpec indexerSchema = null; try { - Thread.currentThread().setContextClassLoader(loader); + registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); + String hadoopJobIdFile = getHadoopJobIdFileName(); + final ClassLoader loader = buildClassLoader(toolbox); + boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(); - ingestionState = IngestionState.DETERMINE_PARTITIONS; - - final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke( - determinePartitionsInnerProcessingRunner, - new Object[]{determinePartitionsInput} + HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( + spec, + jsonMapper, + new OverlordActionBasedUsedSegmentsRetriever(toolbox) ); + Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject( + "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner", + loader + ); + determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner); - determineConfigStatus = toolbox - .getJsonMapper() - .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class); + String[] determinePartitionsInput = new String[]{ + toolbox.getJsonMapper().writeValueAsString(spec), + toolbox.getConfig().getHadoopWorkingPath(), + toolbox.getSegmentPusher().getPathForHadoop(), + hadoopJobIdFile + }; - indexerSchema = determineConfigStatus.getSchema(); - if (indexerSchema == null) { - errorMsg = determineConfigStatus.getErrorMsg(); - toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); - return TaskStatus.failure( - getId(), - errorMsg + final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); + Class determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass(); + Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod( + "runTask", + determinePartitionsInput.getClass() + ); + try { + Thread.currentThread().setContextClassLoader(loader); + + ingestionState = IngestionState.DETERMINE_PARTITIONS; + + final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke( + determinePartitionsInnerProcessingRunner, + new Object[]{determinePartitionsInput} ); - } - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - Thread.currentThread().setContextClassLoader(oldLoader); - } - // We should have a lock from before we started running only if interval was specified - String version; - if (determineIntervals) { - Interval interval = JodaUtils.umbrellaInterval( - JodaUtils.condenseIntervals( - indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals() - ) - ); - final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS); - // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error. - final TaskLock lock = Preconditions.checkNotNull( - toolbox.getTaskActionClient().submit( - new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs) - ), - "Cannot acquire a lock for interval[%s]", interval - ); - version = lock.getVersion(); - } else { - Iterable locks = getTaskLocks(toolbox.getTaskActionClient()); - final TaskLock myLock = Iterables.getOnlyElement(locks); - version = myLock.getVersion(); - } - final String specVersion = indexerSchema.getTuningConfig().getVersion(); - if (indexerSchema.getTuningConfig().isUseExplicitVersion()) { - if (specVersion.compareTo(version) < 0) { - version = specVersion; - } else { - log.error( - "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].", - specVersion, - version - ); - toolbox.getTaskReportFileWriter().write(getId(), null); - return TaskStatus.failure(getId()); - } - } + determineConfigStatus = toolbox + .getJsonMapper() + .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class); - log.info("Setting version to: %s", version); - - Object innerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner", - loader - ); - buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner); - - String[] buildSegmentsInput = new String[]{ - toolbox.getJsonMapper().writeValueAsString(indexerSchema), - version, - hadoopJobIdFile - }; - - Class buildSegmentsRunnerClass = innerProcessingRunner.getClass(); - Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass()); - - try { - Thread.currentThread().setContextClassLoader(loader); - - ingestionState = IngestionState.BUILD_SEGMENTS; - final String jobStatusString = (String) innerProcessingRunTask.invoke( - innerProcessingRunner, - new Object[]{buildSegmentsInput} - ); - - buildSegmentsStatus = toolbox.getJsonMapper().readValue( - jobStatusString, - HadoopIndexGeneratorInnerProcessingStatus.class - ); - - if (buildSegmentsStatus.getDataSegments() != null) { - toolbox.publishSegments(buildSegmentsStatus.getDataSegments()); - - // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value - // for awaitSegmentAvailabilityTimeoutMillis - if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) { - ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; - ArrayList segmentsToWaitFor = new ArrayList<>(buildSegmentsStatus.getDataSegments()); - segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( - toolbox, - segmentsToWaitFor, - spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() + indexerSchema = determineConfigStatus.getSchema(); + if (indexerSchema == null) { + errorMsg = determineConfigStatus.getErrorMsg(); + toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + return TaskStatus.failure( + getId(), + errorMsg ); } + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); + } - ingestionState = IngestionState.COMPLETED; - toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); - return TaskStatus.success(getId()); - } else { - errorMsg = buildSegmentsStatus.getErrorMsg(); - toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); - return TaskStatus.failure( - getId(), - errorMsg + // We should have a lock from before we started running only if interval was specified + String version; + if (determineIntervals) { + Interval interval = JodaUtils.umbrellaInterval( + JodaUtils.condenseIntervals( + indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals() + ) ); + final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS); + // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error. + final TaskLock lock = Preconditions.checkNotNull( + toolbox.getTaskActionClient().submit( + new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs) + ), + "Cannot acquire a lock for interval[%s]", interval + ); + version = lock.getVersion(); + } else { + Iterable locks = getTaskLocks(toolbox.getTaskActionClient()); + final TaskLock myLock = Iterables.getOnlyElement(locks); + version = myLock.getVersion(); + } + + final String specVersion = indexerSchema.getTuningConfig().getVersion(); + if (indexerSchema.getTuningConfig().isUseExplicitVersion()) { + if (specVersion.compareTo(version) < 0) { + version = specVersion; + } else { + log.error( + "Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].", + specVersion, + version + ); + toolbox.getTaskReportFileWriter().write(getId(), null); + return TaskStatus.failure(getId()); + } + } + + log.info("Setting version to: %s", version); + + Object innerProcessingRunner = getForeignClassloaderObject( + "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner", + loader + ); + buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner); + + String[] buildSegmentsInput = new String[]{ + toolbox.getJsonMapper().writeValueAsString(indexerSchema), + version, + hadoopJobIdFile + }; + + Class buildSegmentsRunnerClass = innerProcessingRunner.getClass(); + Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass()); + + try { + Thread.currentThread().setContextClassLoader(loader); + + ingestionState = IngestionState.BUILD_SEGMENTS; + indexGeneratorJobAttempted = true; + final String jobStatusString = (String) innerProcessingRunTask.invoke( + innerProcessingRunner, + new Object[]{buildSegmentsInput} + ); + + buildSegmentsStatus = toolbox.getJsonMapper().readValue( + jobStatusString, + HadoopIndexGeneratorInnerProcessingStatus.class + ); + + List dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths(); + if (dataSegmentAndIndexZipFilePaths != null) { + indexGeneratorJobSuccess = true; + try { + Thread.currentThread().setContextClassLoader(oldLoader); + renameSegmentIndexFilesJob( + toolbox.getJsonMapper().writeValueAsString(indexerSchema), + toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths) + ); + } + finally { + Thread.currentThread().setContextClassLoader(loader); + } + ArrayList segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream() + .map( + DataSegmentAndIndexZipFilePath::getSegment) + .collect(Collectors.toList())); + toolbox.publishSegments(segments); + + // Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value + // for awaitSegmentAvailabilityTimeoutMillis + if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) { + ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; + segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability( + toolbox, + segments, + spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() + ); + } + + ingestionState = IngestionState.COMPLETED; + toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + return TaskStatus.success(getId()); + } else { + errorMsg = buildSegmentsStatus.getErrorMsg(); + toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + return TaskStatus.failure( + getId(), + errorMsg + ); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); } } - catch (Exception e) { - throw new RuntimeException(e); - } finally { - Thread.currentThread().setContextClassLoader(oldLoader); + indexerGeneratorCleanupJob( + indexGeneratorJobAttempted, + indexGeneratorJobSuccess, + indexerSchema == null ? null : toolbox.getJsonMapper().writeValueAsString(indexerSchema) + ); } } @@ -514,6 +545,96 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler } } + private void renameSegmentIndexFilesJob( + String hadoopIngestionSpecStr, + String dataSegmentAndIndexZipFilePathListStr + ) + { + final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); + try { + ClassLoader loader = HadoopTask.buildClassLoader( + getHadoopDependencyCoordinates(), + taskConfig.getDefaultHadoopCoordinates() + ); + + Object renameSegmentIndexFilesRunner = getForeignClassloaderObject( + "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner", + loader + ); + + String[] renameSegmentIndexFilesJobInput = new String[]{ + hadoopIngestionSpecStr, + dataSegmentAndIndexZipFilePathListStr + }; + + Class buildRenameSegmentIndexFilesJobRunnerClass = renameSegmentIndexFilesRunner.getClass(); + Method renameSegmentIndexFiles = buildRenameSegmentIndexFilesJobRunnerClass.getMethod( + "runTask", + renameSegmentIndexFilesJobInput.getClass() + ); + + Thread.currentThread().setContextClassLoader(loader); + renameSegmentIndexFiles.invoke( + renameSegmentIndexFilesRunner, + new Object[]{renameSegmentIndexFilesJobInput} + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); + } + } + + private void indexerGeneratorCleanupJob( + boolean indexGeneratorJobAttempted, + boolean indexGeneratorJobSuccess, + String hadoopIngestionSpecStr + ) + { + if (!indexGeneratorJobAttempted) { + log.info("No need for cleanup as index generator job did not even run"); + return; + } + + final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); + try { + ClassLoader loader = HadoopTask.buildClassLoader( + getHadoopDependencyCoordinates(), + taskConfig.getDefaultHadoopCoordinates() + ); + + Object indexerGeneratorCleanupRunner = getForeignClassloaderObject( + "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner", + loader + ); + + String[] indexerGeneratorCleanupJobInput = new String[]{ + indexGeneratorJobSuccess ? "true" : "false", + hadoopIngestionSpecStr, + }; + + Class buildIndexerGeneratorCleanupRunnerClass = indexerGeneratorCleanupRunner.getClass(); + Method indexerGeneratorCleanup = buildIndexerGeneratorCleanupRunnerClass.getMethod( + "runTask", + indexerGeneratorCleanupJobInput.getClass() + ); + + Thread.currentThread().setContextClassLoader(loader); + indexerGeneratorCleanup.invoke( + indexerGeneratorCleanupRunner, + new Object[]{indexerGeneratorCleanupJobInput} + ); + } + catch (Exception e) { + log.warn(e, "Failed to cleanup after index generator job"); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); + } + } + @GET @Path("/rowStats") @Produces(MediaType.APPLICATION_JSON) @@ -722,7 +843,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler if (job.run()) { return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString( new HadoopIndexGeneratorInnerProcessingStatus( - job.getPublishedSegments(), + job.getPublishedSegmentAndIndexZipFilePaths(), job.getStats(), null ) @@ -790,28 +911,111 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler } } + @SuppressWarnings("unused") + public static class HadoopRenameSegmentIndexFilesRunner + { + TypeReference> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH = + new TypeReference>() + { + }; + + public void runTask(String[] args) throws Exception + { + if (args.length != 2) { + log.warn("HadoopRenameSegmentIndexFilesRunner called with improper number of arguments"); + } + String hadoopIngestionSpecStr = args[0]; + String dataSegmentAndIndexZipFilePathListStr = args[1]; + + HadoopIngestionSpec indexerSchema; + List dataSegmentAndIndexZipFilePaths; + try { + indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( + hadoopIngestionSpecStr, + HadoopIngestionSpec.class + ); + dataSegmentAndIndexZipFilePaths = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( + dataSegmentAndIndexZipFilePathListStr, + LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH + ); + } + catch (Exception e) { + log.warn( + e, + "HadoopRenameSegmentIndexFilesRunner: Error occurred while trying to read input parameters into data objects" + ); + throw e; + } + JobHelper.renameIndexFilesForSegments( + indexerSchema, + dataSegmentAndIndexZipFilePaths + ); + } + } + + @SuppressWarnings("unused") + public static class HadoopIndexerGeneratorCleanupRunner + { + TypeReference> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH = + new TypeReference>() + { + }; + + public void runTask(String[] args) throws Exception + { + if (args.length != 2) { + log.warn("HadoopIndexerGeneratorCleanupRunner called with improper number of arguments"); + } + + String indexGeneratorJobSucceededStr = args[0]; + String hadoopIngestionSpecStr = args[1]; + + HadoopIngestionSpec indexerSchema; + boolean indexGeneratorJobSucceeded; + List dataSegmentAndIndexZipFilePaths; + try { + indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( + hadoopIngestionSpecStr, + HadoopIngestionSpec.class + ); + indexGeneratorJobSucceeded = BooleanUtils.toBoolean(indexGeneratorJobSucceededStr); + } + catch (Exception e) { + log.warn( + e, + "HadoopIndexerGeneratorCleanupRunner: Error occurred while trying to read input parameters into data objects" + ); + throw e; + } + JobHelper.maybeDeleteIntermediatePath( + indexGeneratorJobSucceeded, + indexerSchema + ); + } + } + public static class HadoopIndexGeneratorInnerProcessingStatus { - private final List dataSegments; + private final List dataSegmentAndIndexZipFilePaths; private final Map metrics; private final String errorMsg; @JsonCreator public HadoopIndexGeneratorInnerProcessingStatus( - @JsonProperty("dataSegments") List dataSegments, + @JsonProperty("dataSegmentAndIndexZipFilePaths") List dataSegmentAndIndexZipFilePaths, @JsonProperty("metrics") Map metrics, @JsonProperty("errorMsg") String errorMsg ) { - this.dataSegments = dataSegments; + this.dataSegmentAndIndexZipFilePaths = dataSegmentAndIndexZipFilePaths; this.metrics = metrics; this.errorMsg = errorMsg; } @JsonProperty - public List getDataSegments() + public List getDataSegmentAndIndexZipFilePaths() { - return dataSegments; + return dataSegmentAndIndexZipFilePaths; } @JsonProperty diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 2fb9adc239e..8ad9cec8ffa 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -102,6 +102,10 @@ javax.servlet servlet-api + + com.squareup.okhttp + okhttp + diff --git a/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java index 4235abbf884..b227ababb44 100644 --- a/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliInternalHadoopIndexer.java @@ -117,9 +117,12 @@ public class CliInternalHadoopIndexer extends GuiceRunnable ); List jobs = new ArrayList<>(); + HadoopDruidIndexerJob indexerJob = new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)); jobs.add(new HadoopDruidDetermineConfigurationJob(config)); - jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class))); - JobHelper.runJobs(jobs, config); + jobs.add(indexerJob); + boolean jobsSucceeded = JobHelper.runJobs(jobs); + JobHelper.renameIndexFilesForSegments(config.getSchema(), indexerJob.getPublishedSegmentAndIndexZipFilePaths()); + JobHelper.maybeDeleteIntermediatePath(jobsSucceeded, config.getSchema()); } catch (Exception e) {