diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index abc55e5ab67..dec9b2abd93 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; import com.google.inject.Inject; - import io.druid.common.utils.UUIDUtils; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.logger.Logger; @@ -35,6 +34,7 @@ import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HadoopFsWrapper; import org.apache.hadoop.fs.Path; import java.io.File; @@ -116,7 +116,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher // Create parent if it does not exist, recreation is not an error fs.mkdirs(outDir.getParent()); - if (!fs.rename(tmpFile.getParent(), outDir)) { + if (!HadoopFsWrapper.rename(fs, tmpFile.getParent(), outDir)) { if (fs.exists(outDir)) { log.info( "Unable to rename temp directory[%s] to segment directory[%s]. It is already pushed by a replica task.", diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java b/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java new file mode 100644 index 00000000000..041417219d8 --- /dev/null +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java @@ -0,0 +1,59 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.hadoop.fs; + +import io.druid.java.util.common.logger.Logger; + +import java.io.IOException; + +/** + * This wrapper class is created to be able to access some of the the "protected" methods inside Hadoop's + * FileSystem class. Those are supposed to become public eventually or more appropriate alternatives would be + * provided. + * This is a hack and should be removed when no longer necessary. + */ +public class HadoopFsWrapper +{ + private static final Logger log = new Logger(HadoopFsWrapper.class); + + private HadoopFsWrapper() {} + + /** + * Same as FileSystem.rename(from, to, Options.Rename.NONE) . That is, + * it returns "false" when "to" directory already exists. It is different from FileSystem.rename(from, to) + * which moves "from" directory inside "to" directory if it already exists. + * + * @param from + * @param to + * @return + * @throws IOException + */ + public static boolean rename(FileSystem fs, Path from, Path to) throws IOException + { + try { + fs.rename(from, to, Options.Rename.NONE); + return true; + } + catch (IOException ex) { + log.warn(ex, "Failed to rename [%s] to [%s].", from, to); + return false; + } + } +}