From 3641a0e55380da227e024e0ce966ddf1e8c8e355 Mon Sep 17 00:00:00 2001 From: Nishant Date: Fri, 9 Oct 2015 20:47:56 +0530 Subject: [PATCH] Fix Race in jar upload during hadoop indexing - https://github.com/druid-io/druid/issues/582 few fixes delete intermediate file early better exception handling use static pattern instead of compiling it every time Add retry for transient exceptions remove usage of deprecated method. Add test fix imports fix javadoc review comment. review comment: handle crazy snapshot naming review comments remove default retry count in favour of already present constant review comment make random intermediate and final paths. review comment, use temporaryFolder where possible --- .../indexer/DetermineHashedPartitionsJob.java | 6 +- .../druid/indexer/DeterminePartitionsJob.java | 12 +- .../io/druid/indexer/IndexGeneratorJob.java | 6 +- .../main/java/io/druid/indexer/JobHelper.java | 213 +++++++++++++++--- .../indexer/updater/HadoopConverterJob.java | 12 +- .../druid/indexer/HdfsClasspathSetupTest.java | 203 +++++++++++++++++ 6 files changed, 410 insertions(+), 42 deletions(-) create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/HdfsClasspathSetupTest.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 829a03a6844..594841f5201 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -102,7 +102,11 @@ public class DetermineHashedPartitionsJob implements Jobby } else { groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); } - JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(config.getWorkingPath()), + JobHelper.distributedClassPath(config.makeIntermediatePath()), + groupByJob + ); config.addInputPaths(groupByJob); config.intoConfiguration(groupByJob); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index a502c340c1b..0fe4bb9c510 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -136,7 +136,11 @@ public class DeterminePartitionsJob implements Jobby groupByJob.setOutputKeyClass(BytesWritable.class); groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); - JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(config.getWorkingPath()), + JobHelper.distributedClassPath(config.makeIntermediatePath()), + groupByJob + ); config.addInputPaths(groupByJob); config.intoConfiguration(groupByJob); @@ -186,7 +190,11 @@ public class DeterminePartitionsJob implements Jobby dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class); dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); - JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(config.getWorkingPath()), + JobHelper.distributedClassPath(config.makeIntermediatePath()), + dimSelectionJob + ); config.intoConfiguration(dimSelectionJob); FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 16de192049f..e137d8bae64 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -190,7 +190,11 @@ public class IndexGeneratorJob implements Jobby config.intoConfiguration(job); - JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), job); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(config.getWorkingPath()), + JobHelper.distributedClassPath(config.makeIntermediatePath()), + job + ); job.submit(); log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index a9c282ddfad..dc9c6e53c57 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -18,6 +18,7 @@ package io.druid.indexer; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -33,22 +34,6 @@ import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.Progressable; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.joda.time.format.ISODateTimeFormat; - import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; @@ -64,9 +49,24 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.util.Progressable; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.format.ISODateTimeFormat; /** */ @@ -79,6 +79,7 @@ public class JobHelper private static final int NUM_RETRIES = 8; private static final int SECONDS_BETWEEN_RETRIES = 2; private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB + private static final Pattern SNAPSHOT_JAR = Pattern.compile(".*\\-SNAPSHOT(-selfcontained)?\\.jar$"); public static Path distributedClassPath(String path) { @@ -90,9 +91,21 @@ public class JobHelper return new Path(base, "classpath"); } + /** + * Uploads jar files to hdfs and configures the classpath. + * Snapshot jar files are uploaded to intermediateClasspath and not shared across multiple jobs. + * Non-Snapshot jar files are uploaded to a distributedClasspath and shared across multiple jobs. + * + * @param distributedClassPath classpath shared across multiple jobs + * @param intermediateClassPath classpath exclusive for this job. used to upload SNAPSHOT jar files. + * @param job job to run + * + * @throws IOException + */ public static void setupClasspath( - Path distributedClassPath, - Job job + final Path distributedClassPath, + final Path intermediateClassPath, + final Job job ) throws IOException { @@ -111,34 +124,160 @@ public class JobHelper } for (String jarFilePath : jarFiles) { - File jarFile = new File(jarFilePath); + + final File jarFile = new File(jarFilePath); if (jarFile.getName().endsWith(".jar")) { - final Path hdfsPath = new Path(distributedClassPath, jarFile.getName()); - - if (!existing.contains(hdfsPath)) { - if (jarFile.getName().matches(".*SNAPSHOT(-selfcontained)?\\.jar$") || !fs.exists(hdfsPath)) { - log.info("Uploading jar to path[%s]", hdfsPath); - ByteStreams.copy( - Files.newInputStreamSupplier(jarFile), - new OutputSupplier() + try { + RetryUtils.retry( + new Callable() + { + @Override + public Boolean call() throws Exception { - @Override - public OutputStream getOutput() throws IOException - { - return fs.create(hdfsPath); + if (isSnapshot(jarFile)) { + addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job); + } else { + addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job); } + return true; } - ); - } - - existing.add(hdfsPath); + }, + shouldRetryPredicate(), + NUM_RETRIES + ); + } + catch (Exception e) { + throw Throwables.propagate(e); } - - DistributedCache.addFileToClassPath(hdfsPath, conf, fs); } } } + public static final Predicate shouldRetryPredicate() + { + return new Predicate() + { + @Override + public boolean apply(Throwable input) + { + if (input == null) { + return false; + } + if (input instanceof IOException) { + return true; + } + return apply(input.getCause()); + } + }; + } + + static void addJarToClassPath( + File jarFile, + Path distributedClassPath, + Path intermediateClassPath, + FileSystem fs, + Job job + ) + throws IOException + { + // Create distributed directory if it does not exist. + // rename will always fail if destination does not exist. + fs.mkdirs(distributedClassPath); + + // Non-snapshot jar files are uploaded to the shared classpath. + final Path hdfsPath = new Path(distributedClassPath, jarFile.getName()); + if (!fs.exists(hdfsPath)) { + // Muliple jobs can try to upload the jar here, + // to avoid them from overwriting files, first upload to intermediateClassPath and then rename to the distributedClasspath. + final Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName()); + uploadJar(jarFile, intermediateHdfsPath, fs); + IOException exception = null; + try { + log.info("Renaming jar to path[%s]", hdfsPath); + fs.rename(intermediateHdfsPath, hdfsPath); + if (!fs.exists(hdfsPath)) { + throw new IOException( + String.format( + "File does not exist even after moving from[%s] to [%s]", + intermediateHdfsPath, + hdfsPath + ) + ); + } + } + catch (IOException e) { + // rename failed, possibly due to race condition. check if some other job has uploaded the jar file. + try { + if (!fs.exists(hdfsPath)) { + log.error(e, "IOException while Renaming jar file"); + exception = e; + } + } + catch (IOException e1) { + e.addSuppressed(e1); + exception = e; + } + } + finally { + try { + if (fs.exists(intermediateHdfsPath)) { + fs.delete(intermediateHdfsPath, false); + } + } + catch (IOException e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + if (exception != null) { + throw exception; + } + } + } + job.addFileToClassPath(hdfsPath); + } + + static void addSnapshotJarToClassPath( + File jarFile, + Path intermediateClassPath, + FileSystem fs, + Job job + ) throws IOException + { + // Snapshot jars are uploaded to non shared intermediate directory. + final Path hdfsPath = new Path(intermediateClassPath, jarFile.getName()); + + // existing is used to prevent uploading file multiple times in same run. + if (!existing.contains(hdfsPath)) { + uploadJar(jarFile, hdfsPath, fs); + existing.add(hdfsPath); + } + job.addFileToClassPath(hdfsPath); + } + + static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException + { + log.info("Uploading jar to path[%s]", path); + ByteStreams.copy( + Files.newInputStreamSupplier(jarFile), + new OutputSupplier() + { + @Override + public OutputStream getOutput() throws IOException + { + return fs.create(path); + } + } + ); + } + + static boolean isSnapshot(File jarFile) + { + return SNAPSHOT_JAR.matcher(jarFile.getName()).matches(); + } + public static void injectSystemProperties(Job job) { injectSystemProperties(job.getConfiguration()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index a1a411bf3ff..0e13ed00cf5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -140,11 +140,17 @@ public class HadoopConverterJob return new Path(getJobPath(jobID, workingDirectory), taskAttemptID.toString()); } + public static Path getJobClassPathDir(String jobName, Path workingDirectory) throws IOException + { + return new Path(workingDirectory, jobName.replace(":", "")); + } + public static void cleanup(Job job) throws IOException { final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory()); final FileSystem fs = jobDir.getFileSystem(job.getConfiguration()); fs.delete(jobDir, true); + fs.delete(getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true); } @@ -231,7 +237,11 @@ public class HadoopConverterJob job.setMapSpeculativeExecution(false); job.setOutputFormatClass(ConvertingOutputFormat.class); - JobHelper.setupClasspath(JobHelper.distributedClassPath(jobConf.getWorkingDirectory()), job); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(jobConf.getWorkingDirectory()), + JobHelper.distributedClassPath(getJobClassPathDir(job.getJobName(), jobConf.getWorkingDirectory())), + job + ); Throwable throwable = null; try { diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HdfsClasspathSetupTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HdfsClasspathSetupTest.java new file mode 100644 index 00000000000..6204c04d1de --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HdfsClasspathSetupTest.java @@ -0,0 +1,203 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.StringUtils; +import io.druid.common.utils.UUIDUtils; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import junit.framework.Assert; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class HdfsClasspathSetupTest +{ + private static MiniDFSCluster miniCluster; + private static File hdfsTmpDir; + private static Configuration conf; + private static String dummyJarString = "This is a test jar file."; + private File dummyJarFile; + private Path finalClasspath; + private Path intermediatePath; + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @BeforeClass + public static void setupStatic() throws IOException, ClassNotFoundException + { + hdfsTmpDir = File.createTempFile("hdfsClasspathSetupTest", "dir"); + hdfsTmpDir.deleteOnExit(); + if (!hdfsTmpDir.delete()) { + throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath())); + } + conf = new Configuration(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); + miniCluster = new MiniDFSCluster.Builder(conf).build(); + } + + @Before + public void setUp() throws IOException + { + // intermedatePath and finalClasspath are relative to hdfsTmpDir directory. + intermediatePath = new Path(String.format("/tmp/classpath/%s", UUIDUtils.generateUuid())); + finalClasspath = new Path(String.format("/tmp/intermediate/%s", UUIDUtils.generateUuid())); + dummyJarFile = tempFolder.newFile("dummy-test.jar"); + Files.copy( + new ByteArrayInputStream(StringUtils.toUtf8(dummyJarString)), + dummyJarFile.toPath(), + StandardCopyOption.REPLACE_EXISTING + ); + } + + @AfterClass + public static void tearDownStatic() throws IOException + { + if (miniCluster != null) { + miniCluster.shutdown(true); + } + } + + @After + public void tearDown() throws IOException + { + dummyJarFile.delete(); + Assert.assertFalse(dummyJarFile.exists()); + miniCluster.getFileSystem().delete(finalClasspath, true); + Assert.assertFalse(miniCluster.getFileSystem().exists(finalClasspath)); + miniCluster.getFileSystem().delete(intermediatePath, true); + Assert.assertFalse(miniCluster.getFileSystem().exists(intermediatePath)); + } + + @Test + public void testAddSnapshotJarToClasspath() throws IOException + { + Job job = Job.getInstance(conf, "test-job"); + DistributedFileSystem fs = miniCluster.getFileSystem(); + Path intermediatePath = new Path("/tmp/classpath"); + JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, fs, job); + Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName()); + // check file gets uploaded to HDFS + Assert.assertTrue(fs.exists(expectedJarPath)); + // check file gets added to the classpath + Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES)); + Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath)))); + } + + @Test + public void testAddNonSnapshotJarToClasspath() throws IOException + { + Job job = Job.getInstance(conf, "test-job"); + DistributedFileSystem fs = miniCluster.getFileSystem(); + JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job); + Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName()); + // check file gets uploaded to final HDFS path + Assert.assertTrue(fs.exists(expectedJarPath)); + // check that the intermediate file gets deleted + Assert.assertFalse(fs.exists(new Path(intermediatePath, dummyJarFile.getName()))); + // check file gets added to the classpath + Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES)); + Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath)))); + } + + @Test + public void testIsSnapshot() + { + Assert.assertTrue(JobHelper.isSnapshot(new File("test-SNAPSHOT.jar"))); + Assert.assertTrue(JobHelper.isSnapshot(new File("test-SNAPSHOT-selfcontained.jar"))); + Assert.assertFalse(JobHelper.isSnapshot(new File("test.jar"))); + Assert.assertFalse(JobHelper.isSnapshot(new File("test-selfcontained.jar"))); + Assert.assertFalse(JobHelper.isSnapshot(new File("iAmNotSNAPSHOT.jar"))); + Assert.assertFalse(JobHelper.isSnapshot(new File("iAmNotSNAPSHOT-selfcontained.jar"))); + + } + + @Test + public void testConcurrentUpload() throws IOException, InterruptedException, ExecutionException, TimeoutException + { + final int concurrency = 10; + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency)); + // barrier ensures that all jobs try to add files to classpath at same time. + final CyclicBarrier barrier = new CyclicBarrier(concurrency); + final DistributedFileSystem fs = miniCluster.getFileSystem(); + final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName()); + List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrency; i++) { + futures.add( + pool.submit( + new Callable() + { + @Override + public Boolean call() throws Exception + { + int id = barrier.await(); + Job job = Job.getInstance(conf, "test-job-" + id); + Path intermediatePathForJob = new Path(intermediatePath, "job-" + id); + JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job); + // check file gets uploaded to final HDFS path + Assert.assertTrue(fs.exists(expectedJarPath)); + // check that the intermediate file is not present + Assert.assertFalse(fs.exists(new Path(intermediatePathForJob, dummyJarFile.getName()))); + // check file gets added to the classpath + Assert.assertEquals( + expectedJarPath.toString(), + job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES) + ); + return true; + } + } + ) + ); + } + + Futures.allAsList(futures).get(30, TimeUnit.SECONDS); + + pool.shutdownNow(); + } + +}