diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java index 19303ffbf1b..44d68b1e395 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java @@ -52,6 +52,7 @@ import org.apache.hadoop.util.Progressable; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -209,7 +210,7 @@ public class JobHelper // Non-snapshot jar files are uploaded to the shared classpath. final Path hdfsPath = new Path(distributedClassPath, jarFile.getName()); - if (!fs.exists(hdfsPath)) { + if (shouldUploadOrReplace(jarFile, hdfsPath, fs)) { // Muliple jobs can try to upload the jar here, // to avoid them from overwriting files, first upload to intermediateClassPath and then rename to the distributedClasspath. final Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName()); @@ -256,6 +257,22 @@ public class JobHelper job.addFileToClassPath(hdfsPath); } + static boolean shouldUploadOrReplace( + File jarFile, + Path hdfsPath, + FileSystem fs + ) + throws IOException + { + try { + FileStatus status = fs.getFileStatus(hdfsPath); + return status == null || status.getLen() != jarFile.length(); + } + catch (FileNotFoundException e) { + return true; + } + } + static void addSnapshotJarToClassPath( File jarFile, Path intermediateClassPath,