Fix Race in jar upload during hadoop indexing - https://github.com/druid-io/druid/issues/582

few fixes

delete intermediate file early

better exception handling

use static pattern instead of compiling it every time

Add retry for transient exceptions

remove usage of deprecated method.

Add test

fix imports

fix javadoc

review comment.

review comment: handle crazy snapshot naming

review comments

remove default retry count in favour of already present constant

review comment

make random intermediate and final paths.

review comment, use temporaryFolder where possible
This commit is contained in:
Nishant 2015-10-09 20:47:56 +05:30
parent e4ac78e43d
commit 3641a0e553
6 changed files with 410 additions and 42 deletions

View File

@ -102,7 +102,11 @@ public class DetermineHashedPartitionsJob implements Jobby
} else { } else {
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
} }
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob); JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
groupByJob
);
config.addInputPaths(groupByJob); config.addInputPaths(groupByJob);
config.intoConfiguration(groupByJob); config.intoConfiguration(groupByJob);

View File

@ -136,7 +136,11 @@ public class DeterminePartitionsJob implements Jobby
groupByJob.setOutputKeyClass(BytesWritable.class); groupByJob.setOutputKeyClass(BytesWritable.class);
groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob); JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
groupByJob
);
config.addInputPaths(groupByJob); config.addInputPaths(groupByJob);
config.intoConfiguration(groupByJob); config.intoConfiguration(groupByJob);
@ -186,7 +190,11 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class); dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob); JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
dimSelectionJob
);
config.intoConfiguration(dimSelectionJob); config.intoConfiguration(dimSelectionJob);
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath()); FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());

View File

@ -190,7 +190,11 @@ public class IndexGeneratorJob implements Jobby
config.intoConfiguration(job); config.intoConfiguration(job);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), job); JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
job
);
job.submit(); job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL()); log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());

View File

@ -18,6 +18,7 @@
package io.druid.indexer; package io.druid.indexer;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -33,22 +34,6 @@ import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.segment.ProgressIndicator; import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils; import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -64,9 +49,24 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream; import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
/** /**
*/ */
@ -79,6 +79,7 @@ public class JobHelper
private static final int NUM_RETRIES = 8; private static final int NUM_RETRIES = 8;
private static final int SECONDS_BETWEEN_RETRIES = 2; private static final int SECONDS_BETWEEN_RETRIES = 2;
private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
private static final Pattern SNAPSHOT_JAR = Pattern.compile(".*\\-SNAPSHOT(-selfcontained)?\\.jar$");
public static Path distributedClassPath(String path) public static Path distributedClassPath(String path)
{ {
@ -90,9 +91,21 @@ public class JobHelper
return new Path(base, "classpath"); return new Path(base, "classpath");
} }
/**
* Uploads jar files to hdfs and configures the classpath.
* Snapshot jar files are uploaded to intermediateClasspath and not shared across multiple jobs.
* Non-Snapshot jar files are uploaded to a distributedClasspath and shared across multiple jobs.
*
* @param distributedClassPath classpath shared across multiple jobs
* @param intermediateClassPath classpath exclusive for this job. used to upload SNAPSHOT jar files.
* @param job job to run
*
* @throws IOException
*/
public static void setupClasspath( public static void setupClasspath(
Path distributedClassPath, final Path distributedClassPath,
Job job final Path intermediateClassPath,
final Job job
) )
throws IOException throws IOException
{ {
@ -111,34 +124,160 @@ public class JobHelper
} }
for (String jarFilePath : jarFiles) { for (String jarFilePath : jarFiles) {
File jarFile = new File(jarFilePath);
final File jarFile = new File(jarFilePath);
if (jarFile.getName().endsWith(".jar")) { if (jarFile.getName().endsWith(".jar")) {
final Path hdfsPath = new Path(distributedClassPath, jarFile.getName()); try {
RetryUtils.retry(
if (!existing.contains(hdfsPath)) { new Callable<Boolean>()
if (jarFile.getName().matches(".*SNAPSHOT(-selfcontained)?\\.jar$") || !fs.exists(hdfsPath)) { {
log.info("Uploading jar to path[%s]", hdfsPath); @Override
ByteStreams.copy( public Boolean call() throws Exception
Files.newInputStreamSupplier(jarFile),
new OutputSupplier<OutputStream>()
{ {
@Override if (isSnapshot(jarFile)) {
public OutputStream getOutput() throws IOException addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job);
{ } else {
return fs.create(hdfsPath); addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job);
} }
return true;
} }
); },
} shouldRetryPredicate(),
NUM_RETRIES
existing.add(hdfsPath); );
}
catch (Exception e) {
throw Throwables.propagate(e);
} }
DistributedCache.addFileToClassPath(hdfsPath, conf, fs);
} }
} }
} }
public static final Predicate<Throwable> shouldRetryPredicate()
{
return new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable input)
{
if (input == null) {
return false;
}
if (input instanceof IOException) {
return true;
}
return apply(input.getCause());
}
};
}
static void addJarToClassPath(
File jarFile,
Path distributedClassPath,
Path intermediateClassPath,
FileSystem fs,
Job job
)
throws IOException
{
// Create distributed directory if it does not exist.
// rename will always fail if destination does not exist.
fs.mkdirs(distributedClassPath);
// Non-snapshot jar files are uploaded to the shared classpath.
final Path hdfsPath = new Path(distributedClassPath, jarFile.getName());
if (!fs.exists(hdfsPath)) {
// Muliple jobs can try to upload the jar here,
// to avoid them from overwriting files, first upload to intermediateClassPath and then rename to the distributedClasspath.
final Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName());
uploadJar(jarFile, intermediateHdfsPath, fs);
IOException exception = null;
try {
log.info("Renaming jar to path[%s]", hdfsPath);
fs.rename(intermediateHdfsPath, hdfsPath);
if (!fs.exists(hdfsPath)) {
throw new IOException(
String.format(
"File does not exist even after moving from[%s] to [%s]",
intermediateHdfsPath,
hdfsPath
)
);
}
}
catch (IOException e) {
// rename failed, possibly due to race condition. check if some other job has uploaded the jar file.
try {
if (!fs.exists(hdfsPath)) {
log.error(e, "IOException while Renaming jar file");
exception = e;
}
}
catch (IOException e1) {
e.addSuppressed(e1);
exception = e;
}
}
finally {
try {
if (fs.exists(intermediateHdfsPath)) {
fs.delete(intermediateHdfsPath, false);
}
}
catch (IOException e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
}
if (exception != null) {
throw exception;
}
}
}
job.addFileToClassPath(hdfsPath);
}
static void addSnapshotJarToClassPath(
File jarFile,
Path intermediateClassPath,
FileSystem fs,
Job job
) throws IOException
{
// Snapshot jars are uploaded to non shared intermediate directory.
final Path hdfsPath = new Path(intermediateClassPath, jarFile.getName());
// existing is used to prevent uploading file multiple times in same run.
if (!existing.contains(hdfsPath)) {
uploadJar(jarFile, hdfsPath, fs);
existing.add(hdfsPath);
}
job.addFileToClassPath(hdfsPath);
}
static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException
{
log.info("Uploading jar to path[%s]", path);
ByteStreams.copy(
Files.newInputStreamSupplier(jarFile),
new OutputSupplier<OutputStream>()
{
@Override
public OutputStream getOutput() throws IOException
{
return fs.create(path);
}
}
);
}
static boolean isSnapshot(File jarFile)
{
return SNAPSHOT_JAR.matcher(jarFile.getName()).matches();
}
public static void injectSystemProperties(Job job) public static void injectSystemProperties(Job job)
{ {
injectSystemProperties(job.getConfiguration()); injectSystemProperties(job.getConfiguration());

View File

@ -140,11 +140,17 @@ public class HadoopConverterJob
return new Path(getJobPath(jobID, workingDirectory), taskAttemptID.toString()); return new Path(getJobPath(jobID, workingDirectory), taskAttemptID.toString());
} }
public static Path getJobClassPathDir(String jobName, Path workingDirectory) throws IOException
{
return new Path(workingDirectory, jobName.replace(":", ""));
}
public static void cleanup(Job job) throws IOException public static void cleanup(Job job) throws IOException
{ {
final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory()); final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory());
final FileSystem fs = jobDir.getFileSystem(job.getConfiguration()); final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
fs.delete(jobDir, true); fs.delete(jobDir, true);
fs.delete(getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true);
} }
@ -231,7 +237,11 @@ public class HadoopConverterJob
job.setMapSpeculativeExecution(false); job.setMapSpeculativeExecution(false);
job.setOutputFormatClass(ConvertingOutputFormat.class); job.setOutputFormatClass(ConvertingOutputFormat.class);
JobHelper.setupClasspath(JobHelper.distributedClassPath(jobConf.getWorkingDirectory()), job); JobHelper.setupClasspath(
JobHelper.distributedClassPath(jobConf.getWorkingDirectory()),
JobHelper.distributedClassPath(getJobClassPathDir(job.getJobName(), jobConf.getWorkingDirectory())),
job
);
Throwable throwable = null; Throwable throwable = null;
try { try {

View File

@ -0,0 +1,203 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.StringUtils;
import io.druid.common.utils.UUIDUtils;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import junit.framework.Assert;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class HdfsClasspathSetupTest
{
private static MiniDFSCluster miniCluster;
private static File hdfsTmpDir;
private static Configuration conf;
private static String dummyJarString = "This is a test jar file.";
private File dummyJarFile;
private Path finalClasspath;
private Path intermediatePath;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@BeforeClass
public static void setupStatic() throws IOException, ClassNotFoundException
{
hdfsTmpDir = File.createTempFile("hdfsClasspathSetupTest", "dir");
hdfsTmpDir.deleteOnExit();
if (!hdfsTmpDir.delete()) {
throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath()));
}
conf = new Configuration(true);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
miniCluster = new MiniDFSCluster.Builder(conf).build();
}
@Before
public void setUp() throws IOException
{
// intermedatePath and finalClasspath are relative to hdfsTmpDir directory.
intermediatePath = new Path(String.format("/tmp/classpath/%s", UUIDUtils.generateUuid()));
finalClasspath = new Path(String.format("/tmp/intermediate/%s", UUIDUtils.generateUuid()));
dummyJarFile = tempFolder.newFile("dummy-test.jar");
Files.copy(
new ByteArrayInputStream(StringUtils.toUtf8(dummyJarString)),
dummyJarFile.toPath(),
StandardCopyOption.REPLACE_EXISTING
);
}
@AfterClass
public static void tearDownStatic() throws IOException
{
if (miniCluster != null) {
miniCluster.shutdown(true);
}
}
@After
public void tearDown() throws IOException
{
dummyJarFile.delete();
Assert.assertFalse(dummyJarFile.exists());
miniCluster.getFileSystem().delete(finalClasspath, true);
Assert.assertFalse(miniCluster.getFileSystem().exists(finalClasspath));
miniCluster.getFileSystem().delete(intermediatePath, true);
Assert.assertFalse(miniCluster.getFileSystem().exists(intermediatePath));
}
@Test
public void testAddSnapshotJarToClasspath() throws IOException
{
Job job = Job.getInstance(conf, "test-job");
DistributedFileSystem fs = miniCluster.getFileSystem();
Path intermediatePath = new Path("/tmp/classpath");
JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, fs, job);
Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName());
// check file gets uploaded to HDFS
Assert.assertTrue(fs.exists(expectedJarPath));
// check file gets added to the classpath
Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES));
Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath))));
}
@Test
public void testAddNonSnapshotJarToClasspath() throws IOException
{
Job job = Job.getInstance(conf, "test-job");
DistributedFileSystem fs = miniCluster.getFileSystem();
JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job);
Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
// check file gets uploaded to final HDFS path
Assert.assertTrue(fs.exists(expectedJarPath));
// check that the intermediate file gets deleted
Assert.assertFalse(fs.exists(new Path(intermediatePath, dummyJarFile.getName())));
// check file gets added to the classpath
Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES));
Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath))));
}
@Test
public void testIsSnapshot()
{
Assert.assertTrue(JobHelper.isSnapshot(new File("test-SNAPSHOT.jar")));
Assert.assertTrue(JobHelper.isSnapshot(new File("test-SNAPSHOT-selfcontained.jar")));
Assert.assertFalse(JobHelper.isSnapshot(new File("test.jar")));
Assert.assertFalse(JobHelper.isSnapshot(new File("test-selfcontained.jar")));
Assert.assertFalse(JobHelper.isSnapshot(new File("iAmNotSNAPSHOT.jar")));
Assert.assertFalse(JobHelper.isSnapshot(new File("iAmNotSNAPSHOT-selfcontained.jar")));
}
@Test
public void testConcurrentUpload() throws IOException, InterruptedException, ExecutionException, TimeoutException
{
final int concurrency = 10;
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency));
// barrier ensures that all jobs try to add files to classpath at same time.
final CyclicBarrier barrier = new CyclicBarrier(concurrency);
final DistributedFileSystem fs = miniCluster.getFileSystem();
final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
for (int i = 0; i < concurrency; i++) {
futures.add(
pool.submit(
new Callable()
{
@Override
public Boolean call() throws Exception
{
int id = barrier.await();
Job job = Job.getInstance(conf, "test-job-" + id);
Path intermediatePathForJob = new Path(intermediatePath, "job-" + id);
JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job);
// check file gets uploaded to final HDFS path
Assert.assertTrue(fs.exists(expectedJarPath));
// check that the intermediate file is not present
Assert.assertFalse(fs.exists(new Path(intermediatePathForJob, dummyJarFile.getName())));
// check file gets added to the classpath
Assert.assertEquals(
expectedJarPath.toString(),
job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES)
);
return true;
}
}
)
);
}
Futures.allAsList(futures).get(30, TimeUnit.SECONDS);
pool.shutdownNow();
}
}