mirror of https://github.com/apache/druid.git
HdfsDataSegmentPusher bug fix (#4003)
* Fix for HdfsDataSegmentPusher. * Add missing loadspec in actual descriptor file. Tests to check actual content of descriptor file.
This commit is contained in:
parent
df623ebfe3
commit
bebf9f34c7
|
@ -93,79 +93,87 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
|
|||
storageDir
|
||||
);
|
||||
|
||||
Path tmpFile = new Path(String.format(
|
||||
Path tmpIndexFile = new Path(String.format(
|
||||
"%s/%s/%s/%s_index.zip",
|
||||
fullyQualifiedStorageDirectory,
|
||||
segment.getDataSource(),
|
||||
UUIDUtils.generateUuid(),
|
||||
segment.getShardSpec().getPartitionNum()
|
||||
));
|
||||
FileSystem fs = tmpFile.getFileSystem(hadoopConfig);
|
||||
FileSystem fs = tmpIndexFile.getFileSystem(hadoopConfig);
|
||||
|
||||
fs.mkdirs(tmpFile.getParent());
|
||||
log.info("Compressing files from[%s] to [%s]", inDir, tmpFile);
|
||||
fs.mkdirs(tmpIndexFile.getParent());
|
||||
log.info("Compressing files from[%s] to [%s]", inDir, tmpIndexFile);
|
||||
|
||||
final long size;
|
||||
final DataSegment dataSegment;
|
||||
try (FSDataOutputStream out = fs.create(tmpFile)) {
|
||||
try (FSDataOutputStream out = fs.create(tmpIndexFile)) {
|
||||
size = CompressionUtils.zip(inDir, out);
|
||||
Path outFile = new Path(String.format(
|
||||
final Path outIndexFile = new Path(String.format(
|
||||
"%s/%s/%d_index.zip",
|
||||
fullyQualifiedStorageDirectory,
|
||||
storageDir,
|
||||
segment.getShardSpec().getPartitionNum()
|
||||
));
|
||||
|
||||
final Path outDir = outFile.getParent();
|
||||
dataSegment = createDescriptorFile(
|
||||
segment.withLoadSpec(makeLoadSpec(outFile))
|
||||
.withSize(size)
|
||||
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
|
||||
tmpFile.getParent(),
|
||||
fs,
|
||||
final Path outDescriptorFile = new Path(String.format(
|
||||
"%s/%s/%d_descriptor.json",
|
||||
fullyQualifiedStorageDirectory,
|
||||
storageDir,
|
||||
segment.getShardSpec().getPartitionNum()
|
||||
));
|
||||
|
||||
dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile))
|
||||
.withSize(size)
|
||||
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir));
|
||||
|
||||
final Path tmpDescriptorFile = new Path(
|
||||
tmpIndexFile.getParent(),
|
||||
String.format("%s_descriptor.json", dataSegment.getShardSpec().getPartitionNum())
|
||||
);
|
||||
|
||||
log.info("Creating descriptor file at[%s]", tmpDescriptorFile);
|
||||
ByteSource
|
||||
.wrap(jsonMapper.writeValueAsBytes(dataSegment))
|
||||
.copyTo(new HdfsOutputStreamSupplier(fs, tmpDescriptorFile));
|
||||
|
||||
// Create parent if it does not exist, recreation is not an error
|
||||
fs.mkdirs(outDir.getParent());
|
||||
if (!HadoopFsWrapper.rename(fs, tmpFile.getParent(), outDir)) {
|
||||
if (fs.exists(outDir)) {
|
||||
log.info(
|
||||
"Unable to rename temp directory[%s] to segment directory[%s]. It is already pushed by a replica task.",
|
||||
tmpFile.getParent(),
|
||||
outDir
|
||||
);
|
||||
} else {
|
||||
throw new IOException(String.format(
|
||||
"Failed to rename temp directory[%s] and segment directory[%s] is not present.",
|
||||
tmpFile.getParent(),
|
||||
outDir
|
||||
));
|
||||
}
|
||||
}
|
||||
fs.mkdirs(outIndexFile.getParent());
|
||||
copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile);
|
||||
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile);
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
if (fs.exists(tmpFile.getParent()) && !fs.delete(tmpFile.getParent(), true)) {
|
||||
log.error("Failed to delete temp directory[%s]", tmpFile.getParent());
|
||||
if (fs.exists(tmpIndexFile.getParent()) && !fs.delete(tmpIndexFile.getParent(), true)) {
|
||||
log.error("Failed to delete temp directory[%s]", tmpIndexFile.getParent());
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
log.error(ex, "Failed to delete temp directory[%s]", tmpFile.getParent());
|
||||
log.error(ex, "Failed to delete temp directory[%s]", tmpIndexFile.getParent());
|
||||
}
|
||||
}
|
||||
|
||||
return dataSegment;
|
||||
}
|
||||
|
||||
private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs, final int partitionNumber) throws IOException
|
||||
private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to) throws IOException
|
||||
{
|
||||
final Path descriptorFile = new Path(outDir, String.format("%s_descriptor.json", partitionNumber));
|
||||
log.info("Creating descriptor file at[%s]", descriptorFile);
|
||||
ByteSource
|
||||
.wrap(jsonMapper.writeValueAsBytes(segment))
|
||||
.copyTo(new HdfsOutputStreamSupplier(fs, descriptorFile));
|
||||
return segment;
|
||||
if (!HadoopFsWrapper.rename(fs, from, to)) {
|
||||
if (fs.exists(to)) {
|
||||
log.info(
|
||||
"Unable to rename temp Index file[%s] to final segment path [%s]. "
|
||||
+ "It is already pushed by a replica task.",
|
||||
from,
|
||||
to
|
||||
);
|
||||
} else {
|
||||
throw new IOException(String.format(
|
||||
"Failed to rename temp Index file[%s] and final segment path[%s] is not present.",
|
||||
from,
|
||||
to
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ImmutableMap<String, Object> makeLoadSpec(Path outFile)
|
||||
|
|
|
@ -19,6 +19,17 @@
|
|||
|
||||
package io.druid.storage.hdfs;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.MapperFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -27,6 +38,7 @@ import io.druid.jackson.DefaultObjectMapper;
|
|||
import io.druid.segment.loading.DataSegmentPusherUtil;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import io.druid.timeline.partition.NumberedShardSpec;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -50,6 +62,8 @@ public class HdfsDataSegmentPusherTest
|
|||
@Rule
|
||||
public final ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
TestObjectMapper objectMapper = new TestObjectMapper();
|
||||
|
||||
@Test
|
||||
public void testPushWithScheme() throws Exception
|
||||
{
|
||||
|
@ -73,6 +87,12 @@ public class HdfsDataSegmentPusherTest
|
|||
testUsingScheme(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPushWithMultipleSegments() throws Exception
|
||||
{
|
||||
testUsingSchemeForMultipleSegments("file", 3);
|
||||
}
|
||||
|
||||
private void testUsingScheme(final String scheme) throws Exception
|
||||
{
|
||||
Configuration conf = new Configuration(true);
|
||||
|
@ -153,4 +173,148 @@ public class HdfsDataSegmentPusherTest
|
|||
Assert.fail("should not throw exception");
|
||||
}
|
||||
}
|
||||
|
||||
private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception
|
||||
{
|
||||
Configuration conf = new Configuration(true);
|
||||
DataSegment[] segments = new DataSegment[numberOfSegments];
|
||||
|
||||
// Create a mock segment on disk
|
||||
File segmentDir = tempFolder.newFolder();
|
||||
File tmp = new File(segmentDir, "version.bin");
|
||||
|
||||
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
|
||||
Files.write(data, tmp);
|
||||
final long size = data.length;
|
||||
|
||||
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
|
||||
final File storageDirectory = tempFolder.newFolder();
|
||||
|
||||
config.setStorageDirectory(
|
||||
scheme != null
|
||||
? String.format("%s://%s", scheme, storageDirectory.getAbsolutePath())
|
||||
: storageDirectory.getAbsolutePath()
|
||||
);
|
||||
HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper());
|
||||
|
||||
for (int i = 0; i < numberOfSegments; i++) {
|
||||
segments[i] = new DataSegment(
|
||||
"foo",
|
||||
new Interval("2015/2016"),
|
||||
"0",
|
||||
Maps.<String, Object>newHashMap(),
|
||||
Lists.<String>newArrayList(),
|
||||
Lists.<String>newArrayList(),
|
||||
new NumberedShardSpec(i, i),
|
||||
0,
|
||||
size
|
||||
);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numberOfSegments; i++) {
|
||||
final DataSegment pushedSegment = pusher.push(segmentDir, segments[i]);
|
||||
|
||||
String indexUri = String.format(
|
||||
"%s/%s/%d_index.zip",
|
||||
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
|
||||
DataSegmentPusherUtil.getHdfsStorageDir(segments[i]),
|
||||
segments[i].getShardSpec().getPartitionNum()
|
||||
);
|
||||
|
||||
Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize());
|
||||
Assert.assertEquals(segments[i], pushedSegment);
|
||||
Assert.assertEquals(ImmutableMap.of(
|
||||
"type",
|
||||
"hdfs",
|
||||
"path",
|
||||
indexUri
|
||||
), pushedSegment.getLoadSpec());
|
||||
// rename directory after push
|
||||
String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(pushedSegment);
|
||||
|
||||
File indexFile = new File(String.format(
|
||||
"%s/%s/%d_index.zip",
|
||||
storageDirectory,
|
||||
segmentPath,
|
||||
pushedSegment.getShardSpec().getPartitionNum()
|
||||
));
|
||||
Assert.assertTrue(indexFile.exists());
|
||||
File descriptorFile = new File(String.format(
|
||||
"%s/%s/%d_descriptor.json",
|
||||
storageDirectory,
|
||||
segmentPath,
|
||||
pushedSegment.getShardSpec().getPartitionNum()
|
||||
));
|
||||
Assert.assertTrue(descriptorFile.exists());
|
||||
|
||||
//read actual data from descriptor file.
|
||||
DataSegment fromDescriptorFileDataSegment = objectMapper.readValue(descriptorFile, DataSegment.class);
|
||||
|
||||
Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize());
|
||||
Assert.assertEquals(segments[i], pushedSegment);
|
||||
Assert.assertEquals(ImmutableMap.of(
|
||||
"type",
|
||||
"hdfs",
|
||||
"path",
|
||||
indexUri
|
||||
), fromDescriptorFileDataSegment.getLoadSpec());
|
||||
// rename directory after push
|
||||
segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(fromDescriptorFileDataSegment);
|
||||
|
||||
indexFile = new File(String.format(
|
||||
"%s/%s/%d_index.zip",
|
||||
storageDirectory,
|
||||
segmentPath,
|
||||
fromDescriptorFileDataSegment.getShardSpec().getPartitionNum()
|
||||
));
|
||||
Assert.assertTrue(indexFile.exists());
|
||||
|
||||
|
||||
// push twice will fail and temp dir cleaned
|
||||
File outDir = new File(String.format("%s/%s", config.getStorageDirectory(), segmentPath));
|
||||
outDir.setReadOnly();
|
||||
try {
|
||||
pusher.push(segmentDir, segments[i]);
|
||||
}
|
||||
catch (IOException e) {
|
||||
Assert.fail("should not throw exception");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class TestObjectMapper extends ObjectMapper
|
||||
{
|
||||
public TestObjectMapper()
|
||||
{
|
||||
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
configure(MapperFeature.AUTO_DETECT_GETTERS, false);
|
||||
configure(MapperFeature.AUTO_DETECT_FIELDS, false);
|
||||
configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
|
||||
configure(MapperFeature.AUTO_DETECT_SETTERS, false);
|
||||
configure(SerializationFeature.INDENT_OUTPUT, false);
|
||||
registerModule(new TestModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec")));
|
||||
}
|
||||
|
||||
public class TestModule extends SimpleModule
|
||||
{
|
||||
TestModule()
|
||||
{
|
||||
addSerializer(Interval.class, ToStringSerializer.instance);
|
||||
addSerializer(NumberedShardSpec.class, ToStringSerializer.instance);
|
||||
addDeserializer(
|
||||
Interval.class, new StdDeserializer<Interval>(Interval.class)
|
||||
{
|
||||
@Override
|
||||
public Interval deserialize(
|
||||
JsonParser jsonParser, DeserializationContext deserializationContext
|
||||
) throws IOException, JsonProcessingException
|
||||
{
|
||||
return new Interval(jsonParser.getText());
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue