diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index ac81289e7b7..6b80e3619c2 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -93,79 +93,87 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher storageDir ); - Path tmpFile = new Path(String.format( + Path tmpIndexFile = new Path(String.format( "%s/%s/%s/%s_index.zip", fullyQualifiedStorageDirectory, segment.getDataSource(), UUIDUtils.generateUuid(), segment.getShardSpec().getPartitionNum() )); - FileSystem fs = tmpFile.getFileSystem(hadoopConfig); + FileSystem fs = tmpIndexFile.getFileSystem(hadoopConfig); - fs.mkdirs(tmpFile.getParent()); - log.info("Compressing files from[%s] to [%s]", inDir, tmpFile); + fs.mkdirs(tmpIndexFile.getParent()); + log.info("Compressing files from[%s] to [%s]", inDir, tmpIndexFile); final long size; final DataSegment dataSegment; - try (FSDataOutputStream out = fs.create(tmpFile)) { + try (FSDataOutputStream out = fs.create(tmpIndexFile)) { size = CompressionUtils.zip(inDir, out); - Path outFile = new Path(String.format( + final Path outIndexFile = new Path(String.format( "%s/%s/%d_index.zip", fullyQualifiedStorageDirectory, storageDir, segment.getShardSpec().getPartitionNum() )); - final Path outDir = outFile.getParent(); - dataSegment = createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(outFile)) - .withSize(size) - .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)), - tmpFile.getParent(), - fs, + final Path outDescriptorFile = new Path(String.format( + "%s/%s/%d_descriptor.json", + fullyQualifiedStorageDirectory, + storageDir, segment.getShardSpec().getPartitionNum() + )); + + dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile)) + .withSize(size) + .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)); + + final Path tmpDescriptorFile = new Path( + tmpIndexFile.getParent(), + String.format("%s_descriptor.json", dataSegment.getShardSpec().getPartitionNum()) ); + log.info("Creating descriptor file at[%s]", tmpDescriptorFile); + ByteSource + .wrap(jsonMapper.writeValueAsBytes(dataSegment)) + .copyTo(new HdfsOutputStreamSupplier(fs, tmpDescriptorFile)); + // Create parent if it does not exist, recreation is not an error - fs.mkdirs(outDir.getParent()); - if (!HadoopFsWrapper.rename(fs, tmpFile.getParent(), outDir)) { - if (fs.exists(outDir)) { - log.info( - "Unable to rename temp directory[%s] to segment directory[%s]. It is already pushed by a replica task.", - tmpFile.getParent(), - outDir - ); - } else { - throw new IOException(String.format( - "Failed to rename temp directory[%s] and segment directory[%s] is not present.", - tmpFile.getParent(), - outDir - )); - } - } + fs.mkdirs(outIndexFile.getParent()); + copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile); + copyFilesWithChecks(fs, tmpIndexFile, outIndexFile); } finally { try { - if (fs.exists(tmpFile.getParent()) && !fs.delete(tmpFile.getParent(), true)) { - log.error("Failed to delete temp directory[%s]", tmpFile.getParent()); + if (fs.exists(tmpIndexFile.getParent()) && !fs.delete(tmpIndexFile.getParent(), true)) { + log.error("Failed to delete temp directory[%s]", tmpIndexFile.getParent()); } } catch (IOException ex) { - log.error(ex, "Failed to delete temp directory[%s]", tmpFile.getParent()); + log.error(ex, "Failed to delete temp directory[%s]", tmpIndexFile.getParent()); } } return dataSegment; } - private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs, final int partitionNumber) throws IOException + private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to) throws IOException { - final Path descriptorFile = new Path(outDir, String.format("%s_descriptor.json", partitionNumber)); - log.info("Creating descriptor file at[%s]", descriptorFile); - ByteSource - .wrap(jsonMapper.writeValueAsBytes(segment)) - .copyTo(new HdfsOutputStreamSupplier(fs, descriptorFile)); - return segment; + if (!HadoopFsWrapper.rename(fs, from, to)) { + if (fs.exists(to)) { + log.info( + "Unable to rename temp Index file[%s] to final segment path [%s]. " + + "It is already pushed by a replica task.", + from, + to + ); + } else { + throw new IOException(String.format( + "Failed to rename temp Index file[%s] and final segment path[%s] is not present.", + from, + to + )); + } + } } private ImmutableMap makeLoadSpec(Path outFile) diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index e6c8479873c..f341cd4d65d 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -19,6 +19,17 @@ package io.druid.storage.hdfs; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -27,6 +38,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,6 +62,8 @@ public class HdfsDataSegmentPusherTest @Rule public final ExpectedException expectedException = ExpectedException.none(); + TestObjectMapper objectMapper = new TestObjectMapper(); + @Test public void testPushWithScheme() throws Exception { @@ -73,6 +87,12 @@ public class HdfsDataSegmentPusherTest testUsingScheme(null); } + @Test + public void testPushWithMultipleSegments() throws Exception + { + testUsingSchemeForMultipleSegments("file", 3); + } + private void testUsingScheme(final String scheme) throws Exception { Configuration conf = new Configuration(true); @@ -153,4 +173,148 @@ public class HdfsDataSegmentPusherTest Assert.fail("should not throw exception"); } } + + private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception + { + Configuration conf = new Configuration(true); + DataSegment[] segments = new DataSegment[numberOfSegments]; + + // Create a mock segment on disk + File segmentDir = tempFolder.newFolder(); + File tmp = new File(segmentDir, "version.bin"); + + final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1}; + Files.write(data, tmp); + final long size = data.length; + + HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig(); + final File storageDirectory = tempFolder.newFolder(); + + config.setStorageDirectory( + scheme != null + ? String.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) + : storageDirectory.getAbsolutePath() + ); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + + for (int i = 0; i < numberOfSegments; i++) { + segments[i] = new DataSegment( + "foo", + new Interval("2015/2016"), + "0", + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NumberedShardSpec(i, i), + 0, + size + ); + } + + for (int i = 0; i < numberOfSegments; i++) { + final DataSegment pushedSegment = pusher.push(segmentDir, segments[i]); + + String indexUri = String.format( + "%s/%s/%d_index.zip", + FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), + DataSegmentPusherUtil.getHdfsStorageDir(segments[i]), + segments[i].getShardSpec().getPartitionNum() + ); + + Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize()); + Assert.assertEquals(segments[i], pushedSegment); + Assert.assertEquals(ImmutableMap.of( + "type", + "hdfs", + "path", + indexUri + ), pushedSegment.getLoadSpec()); + // rename directory after push + String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(pushedSegment); + + File indexFile = new File(String.format( + "%s/%s/%d_index.zip", + storageDirectory, + segmentPath, + pushedSegment.getShardSpec().getPartitionNum() + )); + Assert.assertTrue(indexFile.exists()); + File descriptorFile = new File(String.format( + "%s/%s/%d_descriptor.json", + storageDirectory, + segmentPath, + pushedSegment.getShardSpec().getPartitionNum() + )); + Assert.assertTrue(descriptorFile.exists()); + + //read actual data from descriptor file. + DataSegment fromDescriptorFileDataSegment = objectMapper.readValue(descriptorFile, DataSegment.class); + + Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize()); + Assert.assertEquals(segments[i], pushedSegment); + Assert.assertEquals(ImmutableMap.of( + "type", + "hdfs", + "path", + indexUri + ), fromDescriptorFileDataSegment.getLoadSpec()); + // rename directory after push + segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(fromDescriptorFileDataSegment); + + indexFile = new File(String.format( + "%s/%s/%d_index.zip", + storageDirectory, + segmentPath, + fromDescriptorFileDataSegment.getShardSpec().getPartitionNum() + )); + Assert.assertTrue(indexFile.exists()); + + + // push twice will fail and temp dir cleaned + File outDir = new File(String.format("%s/%s", config.getStorageDirectory(), segmentPath)); + outDir.setReadOnly(); + try { + pusher.push(segmentDir, segments[i]); + } + catch (IOException e) { + Assert.fail("should not throw exception"); + } + } + } + + public class TestObjectMapper extends ObjectMapper + { + public TestObjectMapper() + { + configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + configure(MapperFeature.AUTO_DETECT_GETTERS, false); + configure(MapperFeature.AUTO_DETECT_FIELDS, false); + configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false); + configure(MapperFeature.AUTO_DETECT_SETTERS, false); + configure(SerializationFeature.INDENT_OUTPUT, false); + registerModule(new TestModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec"))); + } + + public class TestModule extends SimpleModule + { + TestModule() + { + addSerializer(Interval.class, ToStringSerializer.instance); + addSerializer(NumberedShardSpec.class, ToStringSerializer.instance); + addDeserializer( + Interval.class, new StdDeserializer(Interval.class) + { + @Override + public Interval deserialize( + JsonParser jsonParser, DeserializationContext deserializationContext + ) throws IOException, JsonProcessingException + { + return new Interval(jsonParser.getText()); + } + } + ); + } + } + } + }