Adjust HadoopIndexTask temp segment renaming to avoid potential race conditions (#11075)

* Do stuff

* Do more stuff

* * Do more stuff

* * Do more stuff

* * working

* * cleanup

* * more cleanup

* * more cleanup

* * add license header

* * Add unit tests

* * add java docs

* * add more unit tests

* * Cleanup test

* * Move removing of workingPath to index task rather than in hadoop job.

* * Address review comments

* * remove unused import

* * Address review comments

* Do not overwrite segment descriptor for segment if it already exists.

* * add comments to FileSystemHelper class

* * fix local hadoop integration test
This commit is contained in:
zachjsh 2021-04-21 09:24:31 -10:00 committed by GitHub
parent 6d2b5cdd7e
commit a2892d9c40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1185 additions and 220 deletions

View File

@ -202,6 +202,21 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.timeline.DataSegment;
import java.util.List;
import java.util.Objects;
/**
* holds a {@link DataSegment} with the temporary file path where the corresponding index zip file is currently stored
* and the final path where the index zip file should eventually be moved to.
* see {@link JobHelper#renameIndexFilesForSegments(HadoopIngestionSpec, List)}
*/
public class DataSegmentAndIndexZipFilePath
{
private final DataSegment segment;
private final String tmpIndexZipFilePath;
private final String finalIndexZipFilePath;
@JsonCreator
public DataSegmentAndIndexZipFilePath(
@JsonProperty("segment") DataSegment segment,
@JsonProperty("tmpIndexZipFilePath") String tmpIndexZipFilePath,
@JsonProperty("finalIndexZipFilePath") String finalIndexZipFilePath
)
{
this.segment = segment;
this.tmpIndexZipFilePath = tmpIndexZipFilePath;
this.finalIndexZipFilePath = finalIndexZipFilePath;
}
@JsonProperty
public DataSegment getSegment()
{
return segment;
}
@JsonProperty
public String getTmpIndexZipFilePath()
{
return tmpIndexZipFilePath;
}
@JsonProperty
public String getFinalIndexZipFilePath()
{
return finalIndexZipFilePath;
}
@Override
public boolean equals(Object o)
{
if (o instanceof DataSegmentAndIndexZipFilePath) {
DataSegmentAndIndexZipFilePath that = (DataSegmentAndIndexZipFilePath) o;
return segment.equals(((DataSegmentAndIndexZipFilePath) o).getSegment())
&& tmpIndexZipFilePath.equals(that.getTmpIndexZipFilePath())
&& finalIndexZipFilePath.equals(that.getFinalIndexZipFilePath());
}
return false;
}
@Override
public int hashCode()
{
return Objects.hash(segment.getId(), tmpIndexZipFilePath);
}
@Override
public String toString()
{
return "DataSegmentAndIndexZipFilePath{" +
"segment=" + segment +
", tmpIndexZipFilePath=" + tmpIndexZipFilePath +
", finalIndexZipFilePath=" + finalIndexZipFilePath +
'}';
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
import java.net.URI;
/**
* This class exists for testing purposes, see {@link JobHelperPowerMockTest}. Using the
* raw {@link FileSystem} class resulted in errors with java assist.
*/
public class FileSystemHelper
{
public static FileSystem get(URI uri, Configuration conf) throws IOException
{
return FileSystem.get(uri, conf);
}
}

View File

@ -59,7 +59,12 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
if (config.isDeterminingPartitions()) { if (config.isDeterminingPartitions()) {
job = createPartitionJob(config); job = createPartitionJob(config);
config.setHadoopJobIdFileName(hadoopJobIdFile); config.setHadoopJobIdFileName(hadoopJobIdFile);
return JobHelper.runSingleJob(job, config); boolean jobSucceeded = JobHelper.runSingleJob(job);
JobHelper.maybeDeleteIntermediatePath(
jobSucceeded,
config.getSchema()
);
return jobSucceeded;
} else { } else {
final PartitionsSpec partitionsSpec = config.getPartitionsSpec(); final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
final int shardsPerInterval; final int shardsPerInterval;

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexer;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
@ -40,7 +39,7 @@ public class HadoopDruidIndexerJob implements Jobby
@Nullable @Nullable
private IndexGeneratorJob indexJob; private IndexGeneratorJob indexJob;
@Nullable @Nullable
private volatile List<DataSegment> publishedSegments = null; private volatile List<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePaths = null;
@Nullable @Nullable
private String hadoopJobIdFile; private String hadoopJobIdFile;
@ -91,14 +90,14 @@ public class HadoopDruidIndexerJob implements Jobby
@Override @Override
public boolean run() public boolean run()
{ {
publishedSegments = IndexGeneratorJob.getPublishedSegments(config); publishedSegmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
return true; return true;
} }
} }
); );
config.setHadoopJobIdFileName(hadoopJobIdFile); config.setHadoopJobIdFileName(hadoopJobIdFile);
return JobHelper.runJobs(jobs, config); return JobHelper.runJobs(jobs);
} }
@Override @Override
@ -122,12 +121,12 @@ public class HadoopDruidIndexerJob implements Jobby
return indexJob.getErrorMessage(); return indexJob.getErrorMessage();
} }
public List<DataSegment> getPublishedSegments() public List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZipFilePaths()
{ {
if (publishedSegments == null) { if (publishedSegmentAndIndexZipFilePaths == null) {
throw new IllegalStateException("Job hasn't run yet. No segments have been published yet."); throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
} }
return publishedSegments; return publishedSegmentAndIndexZipFilePaths;
} }
public void setHadoopJobIdFile(String hadoopJobIdFile) public void setHadoopJobIdFile(String hadoopJobIdFile)

View File

@ -102,14 +102,14 @@ public class IndexGeneratorJob implements Jobby
{ {
private static final Logger log = new Logger(IndexGeneratorJob.class); private static final Logger log = new Logger(IndexGeneratorJob.class);
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) public static List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZipFilePaths(HadoopDruidIndexerConfig config)
{ {
final Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config); final Configuration conf = JobHelper.injectSystemProperties(new Configuration(), config);
config.addJobProperties(conf); config.addJobProperties(conf);
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER; final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.JSON_MAPPER;
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder(); ImmutableList.Builder<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePathsBuilder = ImmutableList.builder();
final Path descriptorInfoDir = config.makeDescriptorInfoDir(); final Path descriptorInfoDir = config.makeDescriptorInfoDir();
@ -117,9 +117,9 @@ public class IndexGeneratorJob implements Jobby
FileSystem fs = descriptorInfoDir.getFileSystem(conf); FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) { for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegment.class); final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class);
publishedSegmentsBuilder.add(segment); publishedSegmentAndIndexZipFilePathsBuilder.add(segmentAndIndexZipFilePath);
log.info("Adding segment %s to the list of published segments", segment.getId()); log.info("Adding segment %s to the list of published segments", segmentAndIndexZipFilePath.getSegment().getId());
} }
} }
catch (FileNotFoundException e) { catch (FileNotFoundException e) {
@ -133,9 +133,9 @@ public class IndexGeneratorJob implements Jobby
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
List<DataSegment> publishedSegments = publishedSegmentsBuilder.build(); List<DataSegmentAndIndexZipFilePath> publishedSegmentAndIndexZipFilePaths = publishedSegmentAndIndexZipFilePathsBuilder.build();
return publishedSegments; return publishedSegmentAndIndexZipFilePaths;
} }
private final HadoopDruidIndexerConfig config; private final HadoopDruidIndexerConfig config;
@ -809,7 +809,7 @@ public class IndexGeneratorJob implements Jobby
0 0
); );
final DataSegment segment = JobHelper.serializeOutIndex( final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = JobHelper.serializeOutIndex(
segmentTemplate, segmentTemplate,
context.getConfiguration(), context.getConfiguration(),
context, context,
@ -831,7 +831,7 @@ public class IndexGeneratorJob implements Jobby
HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER HadoopDruidIndexerConfig.DATA_SEGMENT_PUSHER
); );
Path descriptorPath = config.makeDescriptorInfoPath(segment); Path descriptorPath = config.makeDescriptorInfoPath(segmentAndIndexZipFilePath.getSegment());
descriptorPath = JobHelper.prependFSIfNullScheme( descriptorPath = JobHelper.prependFSIfNullScheme(
FileSystem.get( FileSystem.get(
descriptorPath.toUri(), descriptorPath.toUri(),
@ -842,7 +842,7 @@ public class IndexGeneratorJob implements Jobby
log.info("Writing descriptor to path[%s]", descriptorPath); log.info("Writing descriptor to path[%s]", descriptorPath);
JobHelper.writeSegmentDescriptor( JobHelper.writeSegmentDescriptor(
config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()), config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
segment, segmentAndIndexZipFilePath,
descriptorPath, descriptorPath,
context context
); );

View File

@ -386,29 +386,13 @@ public class JobHelper
} }
} }
public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config) public static boolean runSingleJob(Jobby job)
{ {
boolean succeeded = job.run(); boolean succeeded = job.run();
if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) {
if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
Configuration conf = injectSystemProperties(new Configuration(), config);
config.addJobProperties(conf);
workingPath.getFileSystem(conf).delete(workingPath, true);
}
catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath);
}
}
}
return succeeded; return succeeded;
} }
public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config) public static boolean runJobs(List<Jobby> jobs)
{ {
boolean succeeded = true; boolean succeeded = true;
for (Jobby job : jobs) { for (Jobby job : jobs) {
@ -418,25 +402,33 @@ public class JobHelper
} }
} }
return succeeded;
}
public static void maybeDeleteIntermediatePath(
boolean jobSucceeded,
HadoopIngestionSpec indexerSchema)
{
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema);
final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config);
config.addJobProperties(configuration);
JobHelper.injectDruidProperties(configuration, config);
if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) { if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) {
if (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) { if (jobSucceeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath(); Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath); log.info("Deleting path[%s]", workingPath);
try { try {
Configuration conf = injectSystemProperties(new Configuration(), config); config.addJobProperties(configuration);
config.addJobProperties(conf); workingPath.getFileSystem(configuration).delete(workingPath, true);
workingPath.getFileSystem(conf).delete(workingPath, true);
} }
catch (IOException e) { catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath); log.error(e, "Failed to cleanup path[%s]", workingPath);
} }
} }
} }
return succeeded;
} }
public static DataSegment serializeOutIndex( public static DataSegmentAndIndexZipFilePath serializeOutIndex(
final DataSegment segmentTemplate, final DataSegment segmentTemplate,
final Configuration configuration, final Configuration configuration,
final Progressable progressable, final Progressable progressable,
@ -482,20 +474,16 @@ public class JobHelper
.withSize(size.get()) .withSize(size.get())
.withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase)); .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase));
if (!renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) { return new DataSegmentAndIndexZipFilePath(
throw new IOE( finalSegment,
"Unable to rename [%s] to [%s]", tmpPath.toUri().getPath(),
tmpPath.toUri().toString(), finalIndexZipFilePath.toUri().getPath()
finalIndexZipFilePath.toUri().toString() );
);
}
return finalSegment;
} }
public static void writeSegmentDescriptor( public static void writeSegmentDescriptor(
final FileSystem outputFS, final FileSystem outputFS,
final DataSegment segment, final DataSegmentAndIndexZipFilePath segmentAndPath,
final Path descriptorPath, final Path descriptorPath,
final Progressable progressable final Progressable progressable
) )
@ -511,9 +499,12 @@ public class JobHelper
try { try {
progressable.progress(); progressable.progress();
if (outputFS.exists(descriptorPath)) { if (outputFS.exists(descriptorPath)) {
if (!outputFS.delete(descriptorPath, false)) { // If the descriptor path already exists, don't overwrite, and risk clobbering it.
throw new IOE("Failed to delete descriptor at [%s]", descriptorPath); // If it already exists, it means that the segment data is already written to the
} // tmp path, and the existing descriptor written should give us the information we
// need to rename the segment index to final path and publish it in the top level task.
log.info("descriptor path [%s] already exists, not overwriting", descriptorPath);
return -1;
} }
try (final OutputStream descriptorOut = outputFS.create( try (final OutputStream descriptorOut = outputFS.create(
descriptorPath, descriptorPath,
@ -521,7 +512,7 @@ public class JobHelper
DEFAULT_FS_BUFFER_SIZE, DEFAULT_FS_BUFFER_SIZE,
progressable progressable
)) { )) {
HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segment); HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segmentAndPath);
} }
} }
catch (RuntimeException | IOException ex) { catch (RuntimeException | IOException ex) {
@ -632,7 +623,39 @@ public class JobHelper
} }
/** /**
* Rename the files. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename * Renames the index files for the segments. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
* which will not overwrite. Note: segments should be renamed in the index task, not in a hadoop job, as race
* conditions between job retries can cause the final segment index file path to get clobbered.
*
* @param indexerSchema the hadoop ingestion spec
* @param segmentAndIndexZipFilePaths the list of segments with their currently stored tmp path and the final path
* that they should be renamed to.
*/
public static void renameIndexFilesForSegments(
HadoopIngestionSpec indexerSchema,
List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths
) throws IOException
{
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema);
final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config);
config.addJobProperties(configuration);
JobHelper.injectDruidProperties(configuration, config);
for (DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath : segmentAndIndexZipFilePaths) {
Path tmpPath = new Path(segmentAndIndexZipFilePath.getTmpIndexZipFilePath());
Path finalIndexZipFilePath = new Path(segmentAndIndexZipFilePath.getFinalIndexZipFilePath());
final FileSystem outputFS = FileSystemHelper.get(finalIndexZipFilePath.toUri(), configuration);
if (!renameIndexFile(outputFS, tmpPath, finalIndexZipFilePath)) {
throw new IOE(
"Unable to rename [%s] to [%s]",
tmpPath.toUri().toString(),
finalIndexZipFilePath.toUri().toString()
);
}
}
}
/**
* Rename the file. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename
* which will not overwrite * which will not overwrite
* *
* @param outputFS The output fs * @param outputFS The output fs
@ -641,7 +664,7 @@ public class JobHelper
* *
* @return False if a rename failed, true otherwise (rename success or no rename needed) * @return False if a rename failed, true otherwise (rename success or no rename needed)
*/ */
private static boolean renameIndexFiles( private static boolean renameIndexFile(
final FileSystem outputFS, final FileSystem outputFS,
final Path indexZipFilePath, final Path indexZipFilePath,
final Path finalIndexZipFilePath final Path finalIndexZipFilePath

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexer;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
*/ */
@ -42,7 +43,8 @@ public class MetadataStorageUpdaterJob implements Jobby
@Override @Override
public boolean run() public boolean run()
{ {
final List<DataSegment> segments = IndexGeneratorJob.getPublishedSegments(config); final List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths = IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
final List<DataSegment> segments = segmentAndIndexZipFilePaths.stream().map(s -> s.getSegment()).collect(Collectors.toList());
final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable(); final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable();
handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.JSON_MAPPER); handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.JSON_MAPPER);

View File

@ -372,7 +372,15 @@ public class BatchDeltaIngestionTest
) throws Exception ) throws Exception
{ {
IndexGeneratorJob job = new IndexGeneratorJob(config); IndexGeneratorJob job = new IndexGeneratorJob(config);
Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config)); Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job)));
List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths =
IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths);
JobHelper.maybeDeleteIntermediatePath(true, config.getSchema());
File workingPath = new File(config.makeIntermediatePath().toUri().getPath());
Assert.assertFalse(workingPath.exists());
File segmentFolder = new File( File segmentFolder = new File(
StringUtils.format( StringUtils.format(

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class DataSegmentAndIndexZipFilePathTest
{
private static final SegmentId SEGMENT_ID = SegmentId.dummy("data-source", 1);
private static final SegmentId OTHER_SEGMENT_ID = SegmentId.dummy("data-source2", 1);
private static final DataSegment SEGMENT = new DataSegment(
SEGMENT_ID,
null,
null,
null,
new NumberedShardSpec(1, 10),
null,
0,
0
);
private static final DataSegment OTHER_SEGMENT = new DataSegment(
OTHER_SEGMENT_ID,
null,
null,
null,
new NumberedShardSpec(1, 10),
null,
0,
0
);
private DataSegmentAndIndexZipFilePath target;
@Test
public void test_equals_otherNull_notEqual()
{
String tmpPath = "tmpPath";
String finalPath = "finalPath";
target = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
finalPath
);
Assert.assertNotEquals(target, null);
}
@Test
public void test_equals_differentSegmentId_notEqual()
{
String tmpPath = "tmpPath";
String finalPath = "finalPath";
target = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
finalPath
);
DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
OTHER_SEGMENT,
tmpPath,
finalPath
);
Assert.assertNotEquals(target, other);
}
@Test
public void test_equals_differentTmpPath_notEqual()
{
String tmpPath = "tmpPath";
String otherTmpPath = "otherTmpPath";
String finalPath = "finalPath";
target = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
finalPath
);
DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
SEGMENT,
otherTmpPath,
finalPath
);
Assert.assertNotEquals(target, other);
}
@Test
public void test_equals_differentFinalPath_notEqual()
{
String tmpPath = "tmpPath";
String finalPath = "finalPath";
String otherFinalPath = "otherFinalPath";
target = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
finalPath
);
DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
otherFinalPath
);
Assert.assertNotEquals(target, other);
}
@Test
public void test_equals_allFieldsEqualValue_equal()
{
String tmpPath = "tmpPath";
String finalPath = "finalPath";
target = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
finalPath
);
DataSegmentAndIndexZipFilePath other = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
finalPath
);
Assert.assertEquals(target, other);
}
@Test
public void test_equals_sameObject_equal()
{
String tmpPath = "tmpPath";
String finalPath = "finalPath";
target = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
finalPath
);
Assert.assertEquals(target, target);
}
@Test
public void test_serde() throws IOException
{
String tmpPath = "tmpPath";
String finalPath = "finalPath";
target = new DataSegmentAndIndexZipFilePath(
SEGMENT,
tmpPath,
finalPath
);
final InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT);
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(injectableValues);
final String json = mapper.writeValueAsString(target);
final DataSegmentAndIndexZipFilePath fromJson =
mapper.readValue(json, DataSegmentAndIndexZipFilePath.class);
Assert.assertEquals(target, fromJson);
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
JobHelper.class,
IndexGeneratorJob.class
})
@PowerMockIgnore({"javax.net.ssl.*"})
public class HadoopDruidIndexerJobTest
{
private HadoopDruidIndexerConfig config;
private MetadataStorageUpdaterJobHandler handler;
private HadoopDruidIndexerJob target;
@Test
public void test_run()
{
config = PowerMock.createMock(HadoopDruidIndexerConfig.class);
handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class);
PowerMock.mockStaticNice(JobHelper.class);
PowerMock.mockStaticNice(IndexGeneratorJob.class);
config.verify();
EasyMock.expectLastCall();
EasyMock.expect(config.isUpdaterJobSpecSet()).andReturn(false).anyTimes();
config.setHadoopJobIdFileName(EasyMock.anyString());
EasyMock.expectLastCall();
JobHelper.ensurePaths(config);
EasyMock.expectLastCall();
Capture<List<Jobby>> capturedJobs = Capture.newInstance();
EasyMock.expect(JobHelper.runJobs(EasyMock.capture(capturedJobs))).andReturn(true);
EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(EasyMock.anyObject())).andReturn(null);
PowerMock.replayAll();
target = new HadoopDruidIndexerJob(config, handler);
target.run();
List<Jobby> jobs = capturedJobs.getValue();
Assert.assertEquals(2, jobs.size());
jobs.stream().filter(job -> !(job instanceof IndexGeneratorJob)).forEach(job -> Assert.assertTrue(job.run()));
PowerMock.verifyAll();
}
}

View File

@ -621,13 +621,21 @@ public class IndexGeneratorJobTest
private void verifyJob(IndexGeneratorJob job) throws IOException private void verifyJob(IndexGeneratorJob job) throws IOException
{ {
Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config)); Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job)));
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>(); final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
IndexGeneratorJob IndexGeneratorJob
.getPublishedSegments(config) .getPublishedSegmentAndIndexZipFilePaths(config)
.forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()) .forEach(segmentAndIndexZipFilePath -> intervalToSegments.computeIfAbsent(segmentAndIndexZipFilePath.getSegment().getInterval(), k -> new ArrayList<>())
.add(segment)); .add(segmentAndIndexZipFilePath.getSegment()));
List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths =
IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths);
JobHelper.maybeDeleteIntermediatePath(true, config.getSchema());
File workingPath = new File(config.makeIntermediatePath().toUri().getPath());
Assert.assertTrue(workingPath.exists());
final Map<Interval, List<File>> intervalToIndexFiles = new HashMap<>(); final Map<Interval, List<File>> intervalToIndexFiles = new HashMap<>();
int segmentNum = 0; int segmentNum = 0;

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import java.net.URI;
import java.util.List;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
FileSystemHelper.class,
HadoopDruidIndexerConfig.class
})
@PowerMockIgnore({"javax.net.ssl.*"})
public class JobHelperPowerMockTest
{
private static final String TMP_PATH = "/tmp/index.zip.0";
private static final String FINAL_PATH = "/final/index.zip.0";
private HadoopDruidIndexerConfig indexerConfig;
@Test
public void test_renameIndexFilesForSegments_emptySegments() throws IOException
{
HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths = ImmutableList.of();
PowerMock.replayAll();
JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
PowerMock.verifyAll();
}
@Test
public void test_renameIndexFilesForSegments_segmentIndexFileRenamedSuccessfully()
throws IOException
{
HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
mockFileSystem(true);
DataSegment segment = PowerMock.createMock(DataSegment.class);
List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths = ImmutableList.of(
new DataSegmentAndIndexZipFilePath(
segment,
TMP_PATH,
FINAL_PATH
)
);
PowerMock.replayAll();
JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
PowerMock.verifyAll();
}
@Test (expected = IOE.class)
public void test_renameIndexFilesForSegments_segmentIndexFileRenamedFailed_throwsException()
throws IOException
{
HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
mockFileSystem(false);
DataSegment segment = PowerMock.createMock(DataSegment.class);
List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths = ImmutableList.of(
new DataSegmentAndIndexZipFilePath(
segment,
TMP_PATH,
FINAL_PATH
)
);
PowerMock.replayAll();
JobHelper.renameIndexFilesForSegments(ingestionSpec, segmentAndIndexZipFilePaths);
PowerMock.verifyAll();
}
@Test
public void test_maybeDeleteIntermediatePath_leaveIntermediate_doesNotDeleteIntermediatePath()
{
HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(true);
EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig);
PowerMock.replayAll();
JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec);
PowerMock.verifyAll();
}
@Test
public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexerJobSucceeded_deleteIntermediatePath()
throws IOException
{
HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
Path workingPath = PowerMock.createMock(Path.class);
FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig);
EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true);
EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
PowerMock.replayAll();
JobHelper.maybeDeleteIntermediatePath(true, ingestionSpec);
PowerMock.verifyAll();
}
@Test
public void test_maybeDeleteIntermediatePath_doNotleaveIntermediateAndIndexJobFailedAndCleanupOnFailure_deleteIntermediatePath()
throws IOException
{
HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
Path workingPath = PowerMock.createMock(Path.class);
FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true);
EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes();
EasyMock.expect(workingPathFs.delete(workingPath, true)).andReturn(true);
EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
PowerMock.replayAll();
JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec);
PowerMock.verifyAll();
}
@Test
public void test_maybeDeleteIntermediatePath_deleteThrowsException_noExceptionPropogated()
throws IOException
{
HadoopIngestionSpec ingestionSpec = mockIngestionSpec();
HadoopTuningConfig tuningConfig = PowerMock.createMock(HadoopTuningConfig.class);
Path workingPath = PowerMock.createMock(Path.class);
FileSystem workingPathFs = PowerMock.createMock(FileSystem.class);
EasyMock.expect(tuningConfig.isLeaveIntermediate()).andReturn(false);
EasyMock.expect(tuningConfig.isCleanupOnFailure()).andReturn(true);
EasyMock.expect(ingestionSpec.getTuningConfig()).andReturn(tuningConfig).anyTimes();
EasyMock.expect(workingPathFs.delete(workingPath, true)).andThrow(new IOException("Delete Exception"));
EasyMock.expect(workingPath.getFileSystem(EasyMock.anyObject())).andReturn(workingPathFs);
EasyMock.expect(indexerConfig.makeIntermediatePath()).andReturn(workingPath);
PowerMock.replayAll();
JobHelper.maybeDeleteIntermediatePath(false, ingestionSpec);
PowerMock.verifyAll();
}
private HadoopIngestionSpec mockIngestionSpec()
{
indexerConfig = PowerMock.createMock(HadoopDruidIndexerConfig.class);
HadoopIngestionSpec ingestionSpec = PowerMock.createMock(HadoopIngestionSpec.class);
PowerMock.mockStaticNice(HadoopDruidIndexerConfig.class);
EasyMock.expect(indexerConfig.getAllowedProperties()).andReturn(ImmutableMap.of()).anyTimes();
indexerConfig.addJobProperties(EasyMock.anyObject(Configuration.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(HadoopDruidIndexerConfig.fromSpec(ingestionSpec)).andReturn(indexerConfig);
EasyMock.expect(indexerConfig.getSchema()).andReturn(ingestionSpec).anyTimes();
return ingestionSpec;
}
private void mockFileSystem(boolean renameSuccess) throws IOException
{
PowerMock.mockStaticNice(FileSystemHelper.class);
FileSystem fileSystem = PowerMock.createMock(FileSystem.class);
EasyMock.expect(FileSystemHelper.get(
EasyMock.anyObject(URI.class),
EasyMock.anyObject(Configuration.class)
)).andReturn(fileSystem);
EasyMock.expect(fileSystem.exists(EasyMock.anyObject(Path.class))).andReturn(false);
EasyMock.expect(fileSystem.rename(EasyMock.anyObject(Path.class), EasyMock.anyObject(Path.class)))
.andReturn(renameSuccess);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
import java.util.stream.Collectors;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
IndexGeneratorJob.class
})
@PowerMockIgnore({"javax.net.ssl.*"})
public class MetadataStorageUpdaterJobTest
{
private static final List<DataSegmentAndIndexZipFilePath> DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS = ImmutableList.of(
new DataSegmentAndIndexZipFilePath(null, null, null)
);
private static final String SEGMENT_TABLE = "segments";
private HadoopIngestionSpec spec;
private HadoopIOConfig ioConfig;
private MetadataStorageUpdaterJobSpec metadataUpdateSpec;
private HadoopDruidIndexerConfig config;
private MetadataStorageUpdaterJobHandler handler;
private MetadataStorageUpdaterJob target;
@Test
public void test_run()
{
metadataUpdateSpec = PowerMock.createMock(MetadataStorageUpdaterJobSpec.class);
ioConfig = PowerMock.createMock(HadoopIOConfig.class);
spec = PowerMock.createMock(HadoopIngestionSpec.class);
config = PowerMock.createMock(HadoopDruidIndexerConfig.class);
handler = PowerMock.createMock(MetadataStorageUpdaterJobHandler.class);
PowerMock.mockStaticNice(IndexGeneratorJob.class);
EasyMock.expect(metadataUpdateSpec.getSegmentTable()).andReturn(SEGMENT_TABLE);
EasyMock.expect(ioConfig.getMetadataUpdateSpec()).andReturn(metadataUpdateSpec);
EasyMock.expect(spec.getIOConfig()).andReturn(ioConfig);
EasyMock.expect(config.getSchema()).andReturn(spec);
EasyMock.expect(IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config))
.andReturn(DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS);
handler.publishSegments(
SEGMENT_TABLE,
DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATHS.stream().map(s -> s.getSegment()).collect(
Collectors.toList()), HadoopDruidIndexerConfig.JSON_MAPPER);
EasyMock.expectLastCall();
target = new MetadataStorageUpdaterJob(config, handler);
PowerMock.replayAll();
target.run();
PowerMock.verifyAll();
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -30,6 +31,8 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.commons.lang.BooleanUtils;
import org.apache.druid.indexer.DataSegmentAndIndexZipFilePath;
import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob; import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob;
import org.apache.druid.indexer.HadoopDruidIndexerConfig; import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.HadoopDruidIndexerJob; import org.apache.druid.indexer.HadoopDruidIndexerJob;
@ -83,6 +86,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
public class HadoopIndexTask extends HadoopTask implements ChatHandler public class HadoopIndexTask extends HadoopTask implements ChatHandler
{ {
@ -307,170 +311,197 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{ {
registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); boolean indexGeneratorJobAttempted = false;
String hadoopJobIdFile = getHadoopJobIdFileName(); boolean indexGeneratorJobSuccess = false;
final ClassLoader loader = buildClassLoader(toolbox); HadoopIngestionSpec indexerSchema = null;
boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
spec,
jsonMapper,
new OverlordActionBasedUsedSegmentsRetriever(toolbox)
);
Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
loader
);
determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);
String[] determinePartitionsInput = new String[]{
toolbox.getJsonMapper().writeValueAsString(spec),
toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(),
hadoopJobIdFile
};
HadoopIngestionSpec indexerSchema;
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
Class<?> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
"runTask",
determinePartitionsInput.getClass()
);
try { try {
Thread.currentThread().setContextClassLoader(loader); registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
String hadoopJobIdFile = getHadoopJobIdFileName();
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
ingestionState = IngestionState.DETERMINE_PARTITIONS; HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
spec,
final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke( jsonMapper,
determinePartitionsInnerProcessingRunner, new OverlordActionBasedUsedSegmentsRetriever(toolbox)
new Object[]{determinePartitionsInput}
); );
Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner",
loader
);
determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner);
determineConfigStatus = toolbox String[] determinePartitionsInput = new String[]{
.getJsonMapper() toolbox.getJsonMapper().writeValueAsString(spec),
.readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class); toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(),
hadoopJobIdFile
};
indexerSchema = determineConfigStatus.getSchema(); final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
if (indexerSchema == null) { Class<?> determinePartitionsRunnerClass = determinePartitionsInnerProcessingRunner.getClass();
errorMsg = determineConfigStatus.getErrorMsg(); Method determinePartitionsInnerProcessingRunTask = determinePartitionsRunnerClass.getMethod(
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); "runTask",
return TaskStatus.failure( determinePartitionsInput.getClass()
getId(), );
errorMsg try {
Thread.currentThread().setContextClassLoader(loader);
ingestionState = IngestionState.DETERMINE_PARTITIONS;
final String determineConfigStatusString = (String) determinePartitionsInnerProcessingRunTask.invoke(
determinePartitionsInnerProcessingRunner,
new Object[]{determinePartitionsInput}
); );
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
// We should have a lock from before we started running only if interval was specified
String version;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
)
);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
),
"Cannot acquire a lock for interval[%s]", interval
);
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
final TaskLock myLock = Iterables.getOnlyElement(locks);
version = myLock.getVersion();
}
final String specVersion = indexerSchema.getTuningConfig().getVersion(); determineConfigStatus = toolbox
if (indexerSchema.getTuningConfig().isUseExplicitVersion()) { .getJsonMapper()
if (specVersion.compareTo(version) < 0) { .readValue(determineConfigStatusString, HadoopDetermineConfigInnerProcessingStatus.class);
version = specVersion;
} else {
log.error(
"Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
specVersion,
version
);
toolbox.getTaskReportFileWriter().write(getId(), null);
return TaskStatus.failure(getId());
}
}
log.info("Setting version to: %s", version); indexerSchema = determineConfigStatus.getSchema();
if (indexerSchema == null) {
Object innerProcessingRunner = getForeignClassloaderObject( errorMsg = determineConfigStatus.getErrorMsg();
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner", toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
loader return TaskStatus.failure(
); getId(),
buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner); errorMsg
String[] buildSegmentsInput = new String[]{
toolbox.getJsonMapper().writeValueAsString(indexerSchema),
version,
hadoopJobIdFile
};
Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());
try {
Thread.currentThread().setContextClassLoader(loader);
ingestionState = IngestionState.BUILD_SEGMENTS;
final String jobStatusString = (String) innerProcessingRunTask.invoke(
innerProcessingRunner,
new Object[]{buildSegmentsInput}
);
buildSegmentsStatus = toolbox.getJsonMapper().readValue(
jobStatusString,
HadoopIndexGeneratorInnerProcessingStatus.class
);
if (buildSegmentsStatus.getDataSegments() != null) {
toolbox.publishSegments(buildSegmentsStatus.getDataSegments());
// Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
// for awaitSegmentAvailabilityTimeoutMillis
if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
ArrayList<DataSegment> segmentsToWaitFor = new ArrayList<>(buildSegmentsStatus.getDataSegments());
segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
toolbox,
segmentsToWaitFor,
spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
); );
} }
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
ingestionState = IngestionState.COMPLETED; // We should have a lock from before we started running only if interval was specified
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); String version;
return TaskStatus.success(getId()); if (determineIntervals) {
} else { Interval interval = JodaUtils.umbrellaInterval(
errorMsg = buildSegmentsStatus.getErrorMsg(); JodaUtils.condenseIntervals(
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
return TaskStatus.failure( )
getId(),
errorMsg
); );
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, interval, lockTimeoutMs)
),
"Cannot acquire a lock for interval[%s]", interval
);
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());
final TaskLock myLock = Iterables.getOnlyElement(locks);
version = myLock.getVersion();
}
final String specVersion = indexerSchema.getTuningConfig().getVersion();
if (indexerSchema.getTuningConfig().isUseExplicitVersion()) {
if (specVersion.compareTo(version) < 0) {
version = specVersion;
} else {
log.error(
"Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
specVersion,
version
);
toolbox.getTaskReportFileWriter().write(getId(), null);
return TaskStatus.failure(getId());
}
}
log.info("Setting version to: %s", version);
Object innerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner",
loader
);
buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner);
String[] buildSegmentsInput = new String[]{
toolbox.getJsonMapper().writeValueAsString(indexerSchema),
version,
hadoopJobIdFile
};
Class<?> buildSegmentsRunnerClass = innerProcessingRunner.getClass();
Method innerProcessingRunTask = buildSegmentsRunnerClass.getMethod("runTask", buildSegmentsInput.getClass());
try {
Thread.currentThread().setContextClassLoader(loader);
ingestionState = IngestionState.BUILD_SEGMENTS;
indexGeneratorJobAttempted = true;
final String jobStatusString = (String) innerProcessingRunTask.invoke(
innerProcessingRunner,
new Object[]{buildSegmentsInput}
);
buildSegmentsStatus = toolbox.getJsonMapper().readValue(
jobStatusString,
HadoopIndexGeneratorInnerProcessingStatus.class
);
List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths();
if (dataSegmentAndIndexZipFilePaths != null) {
indexGeneratorJobSuccess = true;
try {
Thread.currentThread().setContextClassLoader(oldLoader);
renameSegmentIndexFilesJob(
toolbox.getJsonMapper().writeValueAsString(indexerSchema),
toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
);
}
finally {
Thread.currentThread().setContextClassLoader(loader);
}
ArrayList<DataSegment> segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream()
.map(
DataSegmentAndIndexZipFilePath::getSegment)
.collect(Collectors.toList()));
toolbox.publishSegments(segments);
// Try to wait for segments to be loaded by the cluster if the tuning config specifies a non-zero value
// for awaitSegmentAvailabilityTimeoutMillis
if (spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis() > 0) {
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
segmentAvailabilityConfirmationCompleted = waitForSegmentAvailability(
toolbox,
segments,
spec.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis()
);
}
ingestionState = IngestionState.COMPLETED;
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.success(getId());
} else {
errorMsg = buildSegmentsStatus.getErrorMsg();
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
return TaskStatus.failure(
getId(),
errorMsg
);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
} }
} }
catch (Exception e) {
throw new RuntimeException(e);
}
finally { finally {
Thread.currentThread().setContextClassLoader(oldLoader); indexerGeneratorCleanupJob(
indexGeneratorJobAttempted,
indexGeneratorJobSuccess,
indexerSchema == null ? null : toolbox.getJsonMapper().writeValueAsString(indexerSchema)
);
} }
} }
@ -514,6 +545,96 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
} }
} }
private void renameSegmentIndexFilesJob(
String hadoopIngestionSpecStr,
String dataSegmentAndIndexZipFilePathListStr
)
{
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
try {
ClassLoader loader = HadoopTask.buildClassLoader(
getHadoopDependencyCoordinates(),
taskConfig.getDefaultHadoopCoordinates()
);
Object renameSegmentIndexFilesRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner",
loader
);
String[] renameSegmentIndexFilesJobInput = new String[]{
hadoopIngestionSpecStr,
dataSegmentAndIndexZipFilePathListStr
};
Class<?> buildRenameSegmentIndexFilesJobRunnerClass = renameSegmentIndexFilesRunner.getClass();
Method renameSegmentIndexFiles = buildRenameSegmentIndexFilesJobRunnerClass.getMethod(
"runTask",
renameSegmentIndexFilesJobInput.getClass()
);
Thread.currentThread().setContextClassLoader(loader);
renameSegmentIndexFiles.invoke(
renameSegmentIndexFilesRunner,
new Object[]{renameSegmentIndexFilesJobInput}
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
private void indexerGeneratorCleanupJob(
boolean indexGeneratorJobAttempted,
boolean indexGeneratorJobSuccess,
String hadoopIngestionSpecStr
)
{
if (!indexGeneratorJobAttempted) {
log.info("No need for cleanup as index generator job did not even run");
return;
}
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
try {
ClassLoader loader = HadoopTask.buildClassLoader(
getHadoopDependencyCoordinates(),
taskConfig.getDefaultHadoopCoordinates()
);
Object indexerGeneratorCleanupRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner",
loader
);
String[] indexerGeneratorCleanupJobInput = new String[]{
indexGeneratorJobSuccess ? "true" : "false",
hadoopIngestionSpecStr,
};
Class<?> buildIndexerGeneratorCleanupRunnerClass = indexerGeneratorCleanupRunner.getClass();
Method indexerGeneratorCleanup = buildIndexerGeneratorCleanupRunnerClass.getMethod(
"runTask",
indexerGeneratorCleanupJobInput.getClass()
);
Thread.currentThread().setContextClassLoader(loader);
indexerGeneratorCleanup.invoke(
indexerGeneratorCleanupRunner,
new Object[]{indexerGeneratorCleanupJobInput}
);
}
catch (Exception e) {
log.warn(e, "Failed to cleanup after index generator job");
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
@GET @GET
@Path("/rowStats") @Path("/rowStats")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@ -722,7 +843,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
if (job.run()) { if (job.run()) {
return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString( return HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(
new HadoopIndexGeneratorInnerProcessingStatus( new HadoopIndexGeneratorInnerProcessingStatus(
job.getPublishedSegments(), job.getPublishedSegmentAndIndexZipFilePaths(),
job.getStats(), job.getStats(),
null null
) )
@ -790,28 +911,111 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
} }
} }
@SuppressWarnings("unused")
public static class HadoopRenameSegmentIndexFilesRunner
{
TypeReference<List<DataSegmentAndIndexZipFilePath>> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH =
new TypeReference<List<DataSegmentAndIndexZipFilePath>>()
{
};
public void runTask(String[] args) throws Exception
{
if (args.length != 2) {
log.warn("HadoopRenameSegmentIndexFilesRunner called with improper number of arguments");
}
String hadoopIngestionSpecStr = args[0];
String dataSegmentAndIndexZipFilePathListStr = args[1];
HadoopIngestionSpec indexerSchema;
List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths;
try {
indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
hadoopIngestionSpecStr,
HadoopIngestionSpec.class
);
dataSegmentAndIndexZipFilePaths = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
dataSegmentAndIndexZipFilePathListStr,
LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH
);
}
catch (Exception e) {
log.warn(
e,
"HadoopRenameSegmentIndexFilesRunner: Error occurred while trying to read input parameters into data objects"
);
throw e;
}
JobHelper.renameIndexFilesForSegments(
indexerSchema,
dataSegmentAndIndexZipFilePaths
);
}
}
@SuppressWarnings("unused")
public static class HadoopIndexerGeneratorCleanupRunner
{
TypeReference<List<DataSegmentAndIndexZipFilePath>> LIST_DATA_SEGMENT_AND_INDEX_ZIP_FILE_PATH =
new TypeReference<List<DataSegmentAndIndexZipFilePath>>()
{
};
public void runTask(String[] args) throws Exception
{
if (args.length != 2) {
log.warn("HadoopIndexerGeneratorCleanupRunner called with improper number of arguments");
}
String indexGeneratorJobSucceededStr = args[0];
String hadoopIngestionSpecStr = args[1];
HadoopIngestionSpec indexerSchema;
boolean indexGeneratorJobSucceeded;
List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths;
try {
indexerSchema = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
hadoopIngestionSpecStr,
HadoopIngestionSpec.class
);
indexGeneratorJobSucceeded = BooleanUtils.toBoolean(indexGeneratorJobSucceededStr);
}
catch (Exception e) {
log.warn(
e,
"HadoopIndexerGeneratorCleanupRunner: Error occurred while trying to read input parameters into data objects"
);
throw e;
}
JobHelper.maybeDeleteIntermediatePath(
indexGeneratorJobSucceeded,
indexerSchema
);
}
}
public static class HadoopIndexGeneratorInnerProcessingStatus public static class HadoopIndexGeneratorInnerProcessingStatus
{ {
private final List<DataSegment> dataSegments; private final List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths;
private final Map<String, Object> metrics; private final Map<String, Object> metrics;
private final String errorMsg; private final String errorMsg;
@JsonCreator @JsonCreator
public HadoopIndexGeneratorInnerProcessingStatus( public HadoopIndexGeneratorInnerProcessingStatus(
@JsonProperty("dataSegments") List<DataSegment> dataSegments, @JsonProperty("dataSegmentAndIndexZipFilePaths") List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths,
@JsonProperty("metrics") Map<String, Object> metrics, @JsonProperty("metrics") Map<String, Object> metrics,
@JsonProperty("errorMsg") String errorMsg @JsonProperty("errorMsg") String errorMsg
) )
{ {
this.dataSegments = dataSegments; this.dataSegmentAndIndexZipFilePaths = dataSegmentAndIndexZipFilePaths;
this.metrics = metrics; this.metrics = metrics;
this.errorMsg = errorMsg; this.errorMsg = errorMsg;
} }
@JsonProperty @JsonProperty
public List<DataSegment> getDataSegments() public List<DataSegmentAndIndexZipFilePath> getDataSegmentAndIndexZipFilePaths()
{ {
return dataSegments; return dataSegmentAndIndexZipFilePaths;
} }
@JsonProperty @JsonProperty

View File

@ -102,6 +102,10 @@
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId> <artifactId>servlet-api</artifactId>
</exclusion> </exclusion>
<exclusion>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -117,9 +117,12 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
); );
List<Jobby> jobs = new ArrayList<>(); List<Jobby> jobs = new ArrayList<>();
HadoopDruidIndexerJob indexerJob = new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class));
jobs.add(new HadoopDruidDetermineConfigurationJob(config)); jobs.add(new HadoopDruidDetermineConfigurationJob(config));
jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class))); jobs.add(indexerJob);
JobHelper.runJobs(jobs, config); boolean jobsSucceeded = JobHelper.runJobs(jobs);
JobHelper.renameIndexFilesForSegments(config.getSchema(), indexerJob.getPublishedSegmentAndIndexZipFilePaths());
JobHelper.maybeDeleteIntermediatePath(jobsSucceeded, config.getSchema());
} }
catch (Exception e) { catch (Exception e) {