fix datasegment metadata (#3555)

This commit is contained in:
Parag Jain 2016-10-07 16:30:33 -05:00 committed by Himanshu
parent 078de4fcf9
commit c255dd8b19
2 changed files with 15 additions and 8 deletions

View File

@ -103,14 +103,14 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
final DataSegment dataSegment; final DataSegment dataSegment;
try (FSDataOutputStream out = fs.create(tmpFile)) { try (FSDataOutputStream out = fs.create(tmpFile)) {
size = CompressionUtils.zip(inDir, out); size = CompressionUtils.zip(inDir, out);
Path outDir = new Path(String.format("%s/%s", config.getStorageDirectory(), storageDir));
dataSegment = createDescriptorFile( dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(tmpFile)) segment.withLoadSpec(makeLoadSpec(new Path(String.format("%s/%s", outDir.toUri().getPath(), "index.zip"))))
.withSize(size) .withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)), .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
tmpFile.getParent(), tmpFile.getParent(),
fs fs
); );
Path outDir = new Path(String.format("%s/%s", config.getStorageDirectory(), storageDir));
// Create parent if it does not exist, recreation is not an error // Create parent if it does not exist, recreation is not an error
fs.mkdirs(outDir.getParent()); fs.mkdirs(outDir.getParent());

View File

@ -19,6 +19,7 @@
package io.druid.storage.hdfs; package io.druid.storage.hdfs;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -27,14 +28,10 @@ import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.internal.runners.statements.Fail;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.File; import java.io.File;
@ -80,7 +77,16 @@ public class HdfsDataSegmentPusherTest
DataSegment segment = pusher.push(segmentDir, segmentToPush); DataSegment segment = pusher.push(segmentDir, segmentToPush);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(segmentToPush, segment);
Assert.assertEquals(ImmutableMap.of(
"type",
"hdfs",
"path",
String.format("%s/%s/index.zip",
config.getStorageDirectory(),
DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush)
)
), segment.getLoadSpec());
// rename directory after push // rename directory after push
final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
File indexFile = new File(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); File indexFile = new File(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
@ -93,7 +99,8 @@ public class HdfsDataSegmentPusherTest
outDir.setReadOnly(); outDir.setReadOnly();
try { try {
pusher.push(segmentDir, segmentToPush); pusher.push(segmentDir, segmentToPush);
}catch (IOException e){ }
catch (IOException e) {
Assert.fail("should not throw exception"); Assert.fail("should not throw exception");
} }
} }