mirror of https://github.com/apache/druid.git
[FIX] ReleaseException when the path is being written by multiple tasks (#3494)
* fix ReleaseException when the path is being written by multiple task * Do not throw IOException if another replica wins the race for segment creation fix if check * handle logging comments * fix test
This commit is contained in:
parent
f8d71fc602
commit
6099d20303
|
@ -26,6 +26,7 @@ import com.google.common.io.ByteSource;
|
|||
import com.google.inject.Inject;
|
||||
import com.metamx.common.CompressionUtils;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.common.utils.UUIDUtils;
|
||||
import io.druid.segment.SegmentUtils;
|
||||
import io.druid.segment.loading.DataSegmentPusher;
|
||||
import io.druid.segment.loading.DataSegmentPusherUtil;
|
||||
|
@ -88,24 +89,49 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
|
|||
storageDir
|
||||
);
|
||||
|
||||
Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
|
||||
FileSystem fs = outFile.getFileSystem(hadoopConfig);
|
||||
Path tmpFile = new Path(String.format(
|
||||
"%s/%s/index.zip",
|
||||
config.getStorageDirectory(),
|
||||
UUIDUtils.generateUuid()
|
||||
));
|
||||
FileSystem fs = tmpFile.getFileSystem(hadoopConfig);
|
||||
|
||||
fs.mkdirs(outFile.getParent());
|
||||
log.info("Compressing files from[%s] to [%s]", inDir, outFile);
|
||||
fs.mkdirs(tmpFile.getParent());
|
||||
log.info("Compressing files from[%s] to [%s]", inDir, tmpFile);
|
||||
|
||||
final long size;
|
||||
try (FSDataOutputStream out = fs.create(outFile)) {
|
||||
final DataSegment dataSegment;
|
||||
try (FSDataOutputStream out = fs.create(tmpFile)) {
|
||||
size = CompressionUtils.zip(inDir, out);
|
||||
dataSegment = createDescriptorFile(
|
||||
segment.withLoadSpec(makeLoadSpec(tmpFile))
|
||||
.withSize(size)
|
||||
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
|
||||
tmpFile.getParent(),
|
||||
fs
|
||||
);
|
||||
Path outDir = new Path(String.format("%s/%s", config.getStorageDirectory(), storageDir));
|
||||
if (!fs.rename(tmpFile.getParent(), outDir)) {
|
||||
if (!fs.delete(tmpFile.getParent(), true)) {
|
||||
log.error("Failed to delete temp directory[%s]", tmpFile);
|
||||
}
|
||||
if (fs.exists(outDir)) {
|
||||
log.info(
|
||||
"Unable to rename temp directory[%s] to segment directory[%s]. It is already pushed by a replica task.",
|
||||
tmpFile,
|
||||
outDir
|
||||
);
|
||||
} else {
|
||||
throw new IOException(String.format(
|
||||
"Failed to rename temp directory[%s] and segment directory[%s] is not present.",
|
||||
tmpFile,
|
||||
outDir
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return createDescriptorFile(
|
||||
segment.withLoadSpec(makeLoadSpec(outFile))
|
||||
.withSize(size)
|
||||
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
|
||||
outFile.getParent(),
|
||||
fs
|
||||
);
|
||||
return dataSegment;
|
||||
}
|
||||
|
||||
private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs) throws IOException
|
||||
|
|
|
@ -23,16 +23,22 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.segment.loading.DataSegmentPusherUtil;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.internal.runners.statements.Fail;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -74,5 +80,21 @@ public class HdfsDataSegmentPusherTest
|
|||
DataSegment segment = pusher.push(segmentDir, segmentToPush);
|
||||
|
||||
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
|
||||
|
||||
// rename directory after push
|
||||
final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
|
||||
File indexFile = new File(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
|
||||
Assert.assertTrue(indexFile.exists());
|
||||
File descriptorFile = new File(String.format("%s/%s/descriptor.json", config.getStorageDirectory(), storageDir));
|
||||
Assert.assertTrue(descriptorFile.exists());
|
||||
|
||||
// push twice will fail and temp dir cleaned
|
||||
File outDir = new File(String.format("%s/%s", config.getStorageDirectory(), storageDir));
|
||||
outDir.setReadOnly();
|
||||
try {
|
||||
pusher.push(segmentDir, segmentToPush);
|
||||
}catch (IOException e){
|
||||
Assert.fail("should not throw exception");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue