Namespace optimization for hdfs data segments. (#3877)

* NN optimization for hdfs data segments.

* HdfsDataSegmentKiller, HdfsDataSegment finder changes to use new storage
format.Docs update.

* Common utility function in DataSegmentPusherUtil.

* new static method `makeSegmentOutputPathUptoVersionForHdfs` in JobHelper

* reuse getHdfsStorageDirUptoVersion in
DataSegmentPusherUtil.getHdfsStorageDir()

* Addressed comments.

* Review comments.

* HdfsDataSegmentKiller requested changes.

* extra newline

* Add maprfs.
This commit is contained in:
Akash Dwivedi 2017-03-01 09:51:20 -08:00 committed by cheddar
parent cc20133e70
commit 94da5e80f9
17 changed files with 465 additions and 134 deletions

View File

@ -30,11 +30,12 @@ import java.util.Set;
public interface DataSegmentFinder
{
/**
* This method should first recursively look for descriptor.json underneath workingDirPath and then verify that
* index.zip exists in the same folder. If not, it should throw SegmentLoadingException to let the caller know that
* descriptor.json exists while index.zip doesn't. If a segment is found and updateDescriptor is set, then this method
* should update the loadSpec in descriptor.json to reflect the location from where it was found. After the search,
* this method should return the set of segments that were found.
* This method should first recursively look for descriptor.json (partitionNum_descriptor.json for HDFS data storage) underneath
* workingDirPath and then verify that index.zip (partitionNum_index.zip for HDFS data storage) exists in the same folder.
* If not, it should throw SegmentLoadingException to let the caller know that descriptor.json exists
* while index.zip doesn't. If a segment is found and updateDescriptor is set, then this method should update the
* loadSpec in descriptor.json to reflect the location from where it was found. After the search, this method
* should return the set of segments that were found.
*
* @param workingDirPath the String representation of the working directory path
* @param updateDescriptor if true, update loadSpec in descriptor.json if loadSpec's location is different from where

View File

@ -60,8 +60,7 @@ public class DataSegmentPusherUtil
segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
segment.getVersion().replaceAll(":", "_"),
segment.getShardSpec().getPartitionNum()
segment.getVersion().replaceAll(":", "_")
);
}
}

View File

@ -31,7 +31,8 @@ import java.util.Arrays;
public class DataSegmentPusherUtilTest
{
@Test
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception {
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception
{
Interval interval = new Interval("2011-10-01/2011-10-02");
ImmutableMap<String, Object> loadSpec = ImmutableMap.<String, Object>of("something", "or_other");
@ -49,7 +50,7 @@ public class DataSegmentPusherUtilTest
);
String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version/0", storageDir);
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir);
}
}

View File

@ -85,7 +85,7 @@ In this way, you can validate both push (at realtime node) and pull (at historic
* DataSegmentPusher
Wherever your data storage (cloud storage service, distributed file system, etc.) is, you should be able to see two new files: `descriptor.json` and `index.zip` after your ingestion task ends.
Wherever your data storage (cloud storage service, distributed file system, etc.) is, you should be able to see two new files: `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) and `index.zip` (`partitionNum_index.zip` for HDFS data storage) after your ingestion task ends.
* DataSegmentPuller
@ -118,7 +118,7 @@ To mark a segment as not used, you need to connect to your metadata storage and
To start a segment killing task, you need to access the old Coordinator console `http://<COODRINATOR_IP>:<COORDINATOR_PORT/old-console/kill.html` then select the appropriate datasource and then input a time range (e.g. `2000/3000`).
After the killing task ends, both `descriptor.json` and `index.zip` files should be deleted from the data storage.
After the killing task ends, both `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) and `index.zip` (`partitionNum_index.zip` for HDFS data storage) files should be deleted from the data storage.
### Adding a new Firehose

View File

@ -25,7 +25,7 @@ or runtime.properties file. Specifically, this tool needs to know
`druid.storage.type`
Besides the properties above, you also need to specify the location where the segments are stored and whether you want to
update descriptor.json. These two can be provided through command line arguments.
update descriptor.json (`partitionNum_descriptor.json` for HDFS data storage). These two can be provided through command line arguments.
`--workingDir` (Required)
@ -36,11 +36,11 @@ update descriptor.json. These two can be provided through command line arguments
`--updateDescriptor` (Optional)
if set to true, this tool will update `loadSpec` field in `descriptor.json` if the path in `loadSpec` is different from
where `desciptor.json` was found. Default value is `true`.
if set to true, this tool will update `loadSpec` field in `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) if the path in `loadSpec` is different from
where `desciptor.json` (`partitionNum_descriptor.json` for HDFS data storage) was found. Default value is `true`.
Note: you will also need to load different Druid extensions per the metadata and deep storage you use. For example, if you
use `mysql` as metadata storage and `HDFS` as deep storage, you should load `mysql-metadata-storage` and `druid-hdfs-storage`
use `mysql` as metadata storage and HDFS as deep storage, you should load `mysql-metadata-storage` and `druid-hdfs-storage`
extensions.
@ -54,24 +54,20 @@ Directory path: /druid/storage/wikipedia
├── 2013-08-31T000000.000Z_2013-09-01T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
│   ├── 0_descriptor.json
│   └── 0_index.zip
├── 2013-09-01T000000.000Z_2013-09-02T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
│   ├── 0_descriptor.json
│   └── 0_index.zip
├── 2013-09-02T000000.000Z_2013-09-03T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   └── 0
│   ├── descriptor.json
│   └── index.zip
│   ├── 0_descriptor.json
│   └── 0_index.zip
└── 2013-09-03T000000.000Z_2013-09-04T000000.000Z
└── 2015-10-21T22_07_57.074Z
└── 0
├── descriptor.json
└── index.zip
├── 0_descriptor.json
└── 0_index.zip
```
To load all these segments into `mysql`, you can fire the command below,

View File

@ -27,6 +27,7 @@ import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@ -79,8 +80,16 @@ public class HdfsDataSegmentFinder implements DataSegmentFinder
while (it.hasNext()) {
final LocatedFileStatus locatedFileStatus = it.next();
final Path path = locatedFileStatus.getPath();
if (path.getName().equals("descriptor.json")) {
final Path indexZip = new Path(path.getParent(), "index.zip");
if (path.getName().endsWith("descriptor.json")) {
final Path indexZip;
final String descriptorParts[] = path.getName().split("_");
if (descriptorParts.length == 2
&& descriptorParts[1].equals("descriptor.json")
&& StringUtils.isNumeric(descriptorParts[0])) {
indexZip = new Path(path.getParent(), String.format("%s_index.zip", descriptorParts[0]));
} else {
indexZip = new Path(path.getParent(), "index.zip");
}
if (fs.exists(indexZip)) {
final DataSegment dataSegment = mapper.readValue(fs.open(path), DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);

View File

@ -20,11 +20,11 @@
package io.druid.storage.hdfs;
import com.google.inject.Inject;
import io.druid.java.util.common.logger.Logger;
import com.metamx.emitter.EmittingLogger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -33,7 +33,7 @@ import java.io.IOException;
public class HdfsDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(HdfsDataSegmentKiller.class);
private static final EmittingLogger log = new EmittingLogger(HdfsDataSegmentKiller.class);
private static final String PATH_KEY = "path";
@ -48,42 +48,71 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
this.storageDirectory = new Path(pusherConfig.getStorageDirectory());
}
private static Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get(PATH_KEY)));
}
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
final Path path = getPath(segment);
log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), path);
final Path segmentPath = getPath(segment);
log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), segmentPath);
try {
if (path.getName().endsWith(".zip")) {
String segmentLocation = segmentPath.getName();
final FileSystem fs = segmentPath.getFileSystem(config);
if (!segmentLocation.endsWith(".zip")) {
throw new SegmentLoadingException("Unknown file type[%s]", segmentPath);
} else {
final FileSystem fs = path.getFileSystem(config);
if (!fs.exists(path)) {
log.warn("Segment Path [%s] does not exist. It appears to have been deleted already.", path);
if (!fs.exists(segmentPath)) {
log.warn("Segment Path [%s] does not exist. It appears to have been deleted already.", segmentPath);
return;
}
// path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
Path partitionNumDir = path.getParent();
if (!fs.delete(partitionNumDir, true)) {
String[] zipParts = segmentLocation.split("_");
// for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip
if (zipParts.length == 2
&& zipParts[1].equals("index.zip")
&& StringUtils.isNumeric(zipParts[0])) {
if (!fs.delete(segmentPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete dir [%s]",
partitionNumDir.toString()
"Unable to kill segment, failed to delete [%s]",
segmentPath.toString()
);
}
Path descriptorPath = new Path(segmentPath.getParent(), String.format("%s_descriptor.json", zipParts[0]));
//delete partitionNumber_descriptor.json
if (!fs.delete(descriptorPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
descriptorPath.toString()
);
}
//for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip
// max depth to look is 2, i.e version directory and interval.
mayBeDeleteParentsUpto(fs, segmentPath, 2);
//try to delete other directories if possible
Path versionDir = partitionNumDir.getParent();
if (safeNonRecursiveDelete(fs, versionDir)) {
Path intervalDir = versionDir.getParent();
if (safeNonRecursiveDelete(fs, intervalDir)) {
Path dataSourceDir = intervalDir.getParent();
safeNonRecursiveDelete(fs, dataSourceDir);
} else { //for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/
// index.zip
if (!fs.delete(segmentPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
segmentPath.toString()
);
}
Path descriptorPath = new Path(segmentPath.getParent(), "descriptor.json");
if (!fs.delete(descriptorPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
descriptorPath.toString()
);
}
//for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/index.zip
//max depth to look is 3, i.e partition number directory,version directory and interval.
mayBeDeleteParentsUpto(fs, segmentPath, 3);
}
} else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
catch (IOException e) {
@ -99,18 +128,19 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
fs.delete(storageDirectory, true);
}
private boolean safeNonRecursiveDelete(FileSystem fs, Path path)
private void mayBeDeleteParentsUpto(final FileSystem fs, final Path segmentPath, final int maxDepthTobeDeleted)
{
Path path = segmentPath;
try {
return fs.delete(path, false);
}
catch (Exception ex) {
return false;
for (int i = 1; i <= maxDepthTobeDeleted; i++) {
path = path.getParent();
if (fs.listStatus(path).length != 0 || !fs.delete(path, false)) {
break;
}
}
private Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get(PATH_KEY)));
}
catch (Exception e) {
log.makeAlert(e, "uncaught exception during segment killer").emit();
}
}
}

View File

@ -94,10 +94,11 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
);
Path tmpFile = new Path(String.format(
"%s/%s/%s/index.zip",
"%s/%s/%s/%s_index.zip",
fullyQualifiedStorageDirectory,
segment.getDataSource(),
UUIDUtils.generateUuid()
UUIDUtils.generateUuid(),
segment.getShardSpec().getPartitionNum()
));
FileSystem fs = tmpFile.getFileSystem(hadoopConfig);
@ -108,18 +109,21 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
final DataSegment dataSegment;
try (FSDataOutputStream out = fs.create(tmpFile)) {
size = CompressionUtils.zip(inDir, out);
final Path outFile = new Path(String.format(
"%s/%s/index.zip",
Path outFile = new Path(String.format(
"%s/%s/%d_index.zip",
fullyQualifiedStorageDirectory,
storageDir
storageDir,
segment.getShardSpec().getPartitionNum()
));
final Path outDir = outFile.getParent();
dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
tmpFile.getParent(),
fs
fs,
segment.getShardSpec().getPartitionNum()
);
// Create parent if it does not exist, recreation is not an error
@ -154,9 +158,9 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
return dataSegment;
}
private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs) throws IOException
private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs, final int partitionNumber) throws IOException
{
final Path descriptorFile = new Path(outDir, "descriptor.json");
final Path descriptorFile = new Path(outDir, String.format("%s_descriptor.json", partitionNumber));
log.info("Creating descriptor file at[%s]", descriptorFile);
ByteSource
.wrap(jsonMapper.writeValueAsBytes(segment))

View File

@ -110,6 +110,26 @@ public class HdfsDataSegmentFinderTest
.shardSpec(new NumberedShardSpec(1, 2))
.build();
private static final DataSegment SEGMENT_5 = DataSegment.builder()
.dataSource("wikipedia")
.interval(
new Interval(
"2013-09-03T00:00:00.000Z/2013-09-04T00:00:00.000Z"
)
)
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"hdfs",
"path",
"hdfs://abc.com:1234/somewhere/1_index.zip"
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static MiniDFSCluster miniCluster;
private static File hdfsTmpDir;
private static URI uriBase;
@ -122,11 +142,13 @@ public class HdfsDataSegmentFinderTest
private Path descriptor3;
private Path descriptor4_0;
private Path descriptor4_1;
private Path descriptor5;
private Path indexZip1;
private Path indexZip2;
private Path indexZip3;
private Path indexZip4_0;
private Path indexZip4_1;
private Path indexZip5;
@BeforeClass
public static void setupStatic() throws IOException
@ -162,23 +184,29 @@ public class HdfsDataSegmentFinderTest
descriptor3 = new Path(dataSourceDir, "interval3/v2/0/" + DESCRIPTOR_JSON);
descriptor4_0 = new Path(dataSourceDir, "interval4/v1/0/" + DESCRIPTOR_JSON);
descriptor4_1 = new Path(dataSourceDir, "interval4/v1/1/" + DESCRIPTOR_JSON);
descriptor5 = new Path(dataSourceDir, "interval5/v1/1/" + "1_" +DESCRIPTOR_JSON);
indexZip1 = new Path(descriptor1.getParent(), INDEX_ZIP);
indexZip2 = new Path(descriptor2.getParent(), INDEX_ZIP);
indexZip3 = new Path(descriptor3.getParent(), INDEX_ZIP);
indexZip4_0 = new Path(descriptor4_0.getParent(), INDEX_ZIP);
indexZip4_1 = new Path(descriptor4_1.getParent(), INDEX_ZIP);
indexZip5 = new Path(descriptor5.getParent(), "1_" + INDEX_ZIP);
mapper.writeValue(fs.create(descriptor1), SEGMENT_1);
mapper.writeValue(fs.create(descriptor2), SEGMENT_2);
mapper.writeValue(fs.create(descriptor3), SEGMENT_3);
mapper.writeValue(fs.create(descriptor4_0), SEGMENT_4_0);
mapper.writeValue(fs.create(descriptor4_1), SEGMENT_4_1);
mapper.writeValue(fs.create(descriptor5), SEGMENT_5);
create(indexZip1);
create(indexZip2);
create(indexZip3);
create(indexZip4_0);
create(indexZip4_1);
create(indexZip5);
}
private void create(Path indexZip1) throws IOException
@ -194,13 +222,15 @@ public class HdfsDataSegmentFinderTest
final Set<DataSegment> segments = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
Assert.assertEquals(5, segments.size());
Assert.assertEquals(6, segments.size());
DataSegment updatedSegment1 = null;
DataSegment updatedSegment2 = null;
DataSegment updatedSegment3 = null;
DataSegment updatedSegment4_0 = null;
DataSegment updatedSegment4_1 = null;
DataSegment updatedSegment5 = null;
for (DataSegment dataSegment : segments) {
if (dataSegment.getIdentifier().equals(SEGMENT_1.getIdentifier())) {
updatedSegment1 = dataSegment;
@ -212,7 +242,10 @@ public class HdfsDataSegmentFinderTest
updatedSegment4_0 = dataSegment;
} else if (dataSegment.getIdentifier().equals(SEGMENT_4_1.getIdentifier())) {
updatedSegment4_1 = dataSegment;
} else {
} else if (dataSegment.getIdentifier().equals(SEGMENT_5.getIdentifier())) {
updatedSegment5 = dataSegment;
}
else {
Assert.fail("Unexpected segment");
}
}
@ -222,12 +255,16 @@ public class HdfsDataSegmentFinderTest
Assert.assertEquals(descriptor3.toUri().getPath(), getDescriptorPath(updatedSegment3));
Assert.assertEquals(descriptor4_0.toUri().getPath(), getDescriptorPath(updatedSegment4_0));
Assert.assertEquals(descriptor4_1.toUri().getPath(), getDescriptorPath(updatedSegment4_1));
Assert.assertEquals(descriptor5.toUri().getPath(), getDescriptorPathWithPartitionNum(updatedSegment5, 1));
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
final String serializedSegment5 = mapper.writeValueAsString(updatedSegment5);
// since updateDescriptor was not enabled, descriptor.json still has stale information
Assert.assertNotEquals(serializedSegment1, readContent(descriptor1));
@ -235,6 +272,7 @@ public class HdfsDataSegmentFinderTest
Assert.assertNotEquals(serializedSegment3, readContent(descriptor3));
Assert.assertNotEquals(serializedSegment4_0, readContent(descriptor4_0));
Assert.assertNotEquals(serializedSegment4_1, readContent(descriptor4_1));
Assert.assertNotEquals(serializedSegment5, readContent(descriptor5));
// enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec
final Set<DataSegment> segments2 = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), true);
@ -245,6 +283,7 @@ public class HdfsDataSegmentFinderTest
Assert.assertEquals(serializedSegment3, readContent(descriptor3));
Assert.assertEquals(serializedSegment4_0, readContent(descriptor4_0));
Assert.assertEquals(serializedSegment4_1, readContent(descriptor4_1));
Assert.assertEquals(serializedSegment5, readContent(descriptor5));
}
@Test(expected = SegmentLoadingException.class)
@ -277,6 +316,12 @@ public class HdfsDataSegmentFinderTest
return indexzip.getParent().toString() + "/" + DESCRIPTOR_JSON;
}
private String getDescriptorPathWithPartitionNum(DataSegment segment, int partitionNum)
{
final Path indexzip = new Path(String.valueOf(segment.getLoadSpec().get("path")));
return indexzip.getParent().toString() + "/" + partitionNum + "_" + DESCRIPTOR_JSON;
}
private String readContent(Path descriptor) throws IOException
{
final FSDataInputStream is = fs.open(descriptor);

View File

@ -102,7 +102,41 @@ public class HdfsDataSegmentKillerTest
killer.kill(getSegmentWithPath(new Path(partition012Dir, "index.zip").toString()));
Assert.assertFalse(fs.exists(dataSourceDir));
Assert.assertTrue(fs.exists(dataSourceDir));
Assert.assertTrue(fs.delete(dataSourceDir, false));
}
@Test
public void testKillForSegmentPathWithoutPartitionNumber() throws Exception
{
Configuration config = new Configuration();
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
config,
new HdfsDataSegmentPusherConfig()
{
@Override
public String getStorageDirectory()
{
return "/tmp";
}
}
);
FileSystem fs = FileSystem.get(config);
Path dataSourceDir = new Path("/tmp/dataSourceNew");
Path interval1Dir = new Path(dataSourceDir, "intervalNew");
Path version11Dir = new Path(interval1Dir, "v1");
makePartitionDirWithIndexWitNewFormat(fs, version11Dir, 3);
killer.kill(getSegmentWithPath(new Path(version11Dir, "3_index.zip").toString()));
Assert.assertFalse(fs.exists(version11Dir));
Assert.assertFalse(fs.exists(interval1Dir));
Assert.assertTrue(fs.exists(dataSourceDir));
Assert.assertTrue(fs.exists(new Path("/tmp")));
Assert.assertTrue(fs.exists(dataSourceDir));
Assert.assertTrue(fs.delete(dataSourceDir, false));
}
@Test
@ -126,7 +160,21 @@ public class HdfsDataSegmentKillerTest
private void makePartitionDirWithIndex(FileSystem fs, Path path) throws IOException
{
Assert.assertTrue(fs.mkdirs(path));
try (FSDataOutputStream os = fs.create(new Path(path, "index.zip"))) {
try (FSDataOutputStream os = fs.create(new Path(path, "index.zip")); FSDataOutputStream oos = fs.create(new Path(
path,
"descriptor.json"
))) {
}
}
private void makePartitionDirWithIndexWitNewFormat(FileSystem fs, Path path, Integer partitionNumber)
throws IOException
{
Assert.assertTrue(fs.mkdirs(path));
try (FSDataOutputStream os = fs.create(new Path(
path,
String.format("%s_index.zip", partitionNumber)
)); FSDataOutputStream oos = fs.create(new Path(path, String.format("%s_descriptor.json", partitionNumber)))) {
}
}

View File

@ -109,23 +109,38 @@ public class HdfsDataSegmentPusherTest
DataSegment segment = pusher.push(segmentDir, segmentToPush);
String indexUri = String.format(
"%s/%s/%d_index.zip",
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush),
segmentToPush.getShardSpec().getPartitionNum()
);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(segmentToPush, segment);
Assert.assertEquals(ImmutableMap.of(
"type",
"hdfs",
"path",
String.format(
"%s/%s/index.zip",
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush)
)
indexUri
), segment.getLoadSpec());
// rename directory after push
final String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(segment);
File indexFile = new File(String.format("%s/%s/index.zip", storageDirectory, segmentPath));
File indexFile = new File(String.format(
"%s/%s/%d_index.zip",
storageDirectory,
segmentPath,
segment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(indexFile.exists());
File descriptorFile = new File(String.format("%s/%s/descriptor.json", storageDirectory, segmentPath));
File descriptorFile = new File(String.format(
"%s/%s/%d_descriptor.json",
storageDirectory,
segmentPath,
segment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(descriptorFile.exists());
// push twice will fail and temp dir cleaned

View File

@ -735,12 +735,24 @@ public class IndexGeneratorJob implements Jobby
segmentTemplate,
context.getConfiguration(),
context,
context.getTaskAttemptID(),
mergedBase,
JobHelper.makeSegmentOutputPath(
JobHelper.makeFileNamePath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate
segmentTemplate,
JobHelper.INDEX_ZIP
),
JobHelper.makeFileNamePath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate,
JobHelper.DESCRIPTOR_JSON
),
JobHelper.makeTmpPath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate,
context.getTaskAttemptID()
)
);

View File

@ -75,8 +75,6 @@ import java.util.zip.ZipOutputStream;
public class JobHelper
{
private static final Logger log = new Logger(JobHelper.class);
private static final int NUM_RETRIES = 8;
private static final int SECONDS_BETWEEN_RETRIES = 2;
private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
@ -91,6 +89,8 @@ public class JobHelper
{
return new Path(base, "classpath");
}
public static final String INDEX_ZIP = "index.zip";
public static final String DESCRIPTOR_JSON = "descriptor.json";
/**
* Dose authenticate against a secured hadoop cluster
@ -376,14 +376,14 @@ public class JobHelper
final DataSegment segmentTemplate,
final Configuration configuration,
final Progressable progressable,
final TaskAttemptID taskAttemptID,
final File mergedBase,
final Path segmentBasePath
final Path finalIndexZipFilePath,
final Path finalDescriptorPath,
final Path tmpPath
)
throws IOException
{
final FileSystem outputFS = FileSystem.get(segmentBasePath.toUri(), configuration);
final Path tmpPath = new Path(segmentBasePath, String.format("index.zip.%d", taskAttemptID.getId()));
final FileSystem outputFS = FileSystem.get(finalIndexZipFilePath.toUri(), configuration);
final AtomicLong size = new AtomicLong(0L);
final DataPusher zipPusher = (DataPusher) RetryProxy.create(
DataPusher.class, new DataPusher()
@ -412,7 +412,6 @@ public class JobHelper
zipPusher.push();
log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri());
final Path finalIndexZipFilePath = new Path(segmentBasePath, "index.zip");
final URI indexOutURI = finalIndexZipFilePath.toUri();
final ImmutableMap<String, Object> loadSpec;
// TODO: Make this a part of Pushers or Pullers
@ -463,10 +462,11 @@ public class JobHelper
)
);
}
writeSegmentDescriptor(
outputFS,
finalSegment,
new Path(segmentBasePath, "descriptor.json"),
finalDescriptorPath,
progressable
);
return finalSegment;
@ -576,16 +576,77 @@ public class JobHelper
out.putNextEntry(new ZipEntry(file.getName()));
}
public static Path makeSegmentOutputPath(
Path basePath,
FileSystem fileSystem,
DataSegment segment
public static boolean isHdfs(FileSystem fs)
{
return "hdfs".equals(fs.getScheme()) || "viewfs".equals(fs.getScheme()) || "maprfs".equals(fs.getScheme());
}
public static Path makeFileNamePath(
final Path basePath,
final FileSystem fs,
final DataSegment segmentTemplate,
final String baseFileName
)
{
String segmentDir = "hdfs".equals(fileSystem.getScheme()) || "viewfs".equals(fileSystem.getScheme())
? DataSegmentPusherUtil.getHdfsStorageDir(segment)
: DataSegmentPusherUtil.getStorageDir(segment);
return new Path(prependFSIfNullScheme(fileSystem, basePath), String.format("./%s", segmentDir));
final Path finalIndexZipPath;
final String segmentDir;
if (isHdfs(fs)) {
segmentDir = DataSegmentPusherUtil.getHdfsStorageDir(segmentTemplate);
finalIndexZipPath = new Path(
prependFSIfNullScheme(fs, basePath),
String.format(
"./%s/%d_%s",
segmentDir,
segmentTemplate.getShardSpec().getPartitionNum(),
baseFileName
)
);
} else {
segmentDir = DataSegmentPusherUtil.getStorageDir(segmentTemplate);
finalIndexZipPath = new Path(
prependFSIfNullScheme(fs, basePath),
String.format(
"./%s/%s",
segmentDir,
baseFileName
)
);
}
return finalIndexZipPath;
}
public static Path makeTmpPath(
final Path basePath,
final FileSystem fs,
final DataSegment segmentTemplate,
final TaskAttemptID taskAttemptID
)
{
final String segmentDir;
if (isHdfs(fs)) {
segmentDir = DataSegmentPusherUtil.getHdfsStorageDir(segmentTemplate);
return new Path(
prependFSIfNullScheme(fs, basePath),
String.format(
"./%s/%d_index.zip.%d",
segmentDir,
segmentTemplate.getShardSpec().getPartitionNum(),
taskAttemptID.getId()
)
);
} else {
segmentDir = DataSegmentPusherUtil.getStorageDir(segmentTemplate);
return new Path(
prependFSIfNullScheme(fs, basePath),
String.format(
"./%s/%d_index.zip.%d",
segmentDir,
segmentTemplate.getShardSpec().getPartitionNum(),
taskAttemptID.getId()
)
);
}
}
/**

View File

@ -553,12 +553,24 @@ public class HadoopConverterJob
finalSegmentTemplate,
context.getConfiguration(),
context,
context.getTaskAttemptID(),
outDir,
JobHelper.makeSegmentOutputPath(
JobHelper.makeFileNamePath(
baseOutputPath,
outputFS,
finalSegmentTemplate
finalSegmentTemplate,
JobHelper.INDEX_ZIP
),
JobHelper.makeFileNamePath(
baseOutputPath,
outputFS,
finalSegmentTemplate,
JobHelper.DESCRIPTOR_JSON
),
JobHelper.makeTmpPath(
baseOutputPath,
outputFS,
finalSegmentTemplate,
context.getTaskAttemptID()
)
);
context.progress();

View File

@ -38,6 +38,8 @@ import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -105,7 +107,7 @@ public class HadoopDruidIndexerConfigTest
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = JobHelper.makeSegmentOutputPath(
Path path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
@ -118,12 +120,59 @@ public class HadoopDruidIndexerConfigTest
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
)
),
JobHelper.INDEX_ZIP
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712",
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_index.zip",
path.toString()
);
path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.DESCRIPTOR_JSON
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_descriptor.json",
path.toString()
);
path = JobHelper.makeTmpPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0)
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_index.zip.0",
path.toString()
);
}
@Test
@ -165,7 +214,7 @@ public class HadoopDruidIndexerConfigTest
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = JobHelper.makeSegmentOutputPath(
Path path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
@ -178,10 +227,56 @@ public class HadoopDruidIndexerConfigTest
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
)
),
JobHelper.INDEX_ZIP
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712",
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/index.zip",
path.toString()
);
path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.DESCRIPTOR_JSON
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/descriptor.json",
path.toString()
);
path = JobHelper.makeTmpPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0)
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/4712_index.zip.0",
path.toString()
);
@ -193,7 +288,10 @@ public class HadoopDruidIndexerConfigTest
List<HadoopyShardSpec> specs = Lists.newArrayList();
final int partitionCount = 10;
for (int i = 0; i < partitionCount; i++) {
specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, null, new DefaultObjectMapper()), i));
specs.add(new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, partitionCount, null, new DefaultObjectMapper()),
i
));
}
HadoopIngestionSpec spec = new HadoopIngestionSpec(

View File

@ -297,9 +297,10 @@ public class RealtimeIndexTask extends AbstractTask
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and
// NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the
// NOTE: "same" segment.
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip
// NOTE: (partitionNum_index.zip for HDFS data storage) and descriptor.json (partitionNum_descriptor.json for
// NOTE: HDFS data storage) to mismatch, or it can cause historical nodes to load different instances of
// NOTE: the "same" segment.
final PlumberSchool plumberSchool = new RealtimePlumberSchool(
toolbox.getEmitter(),
toolbox.getQueryRunnerFactoryConglomerate(),

View File

@ -27,7 +27,6 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.druid.guice.JsonConfigProvider;
@ -57,7 +56,7 @@ public class InsertSegment extends GuiceRunnable
@Option(name = "--workingDir", description = "The directory path where segments are stored. This tool will recursively look for segments underneath this directory and insert/update these segments in metdata storage.", required = true)
private String workingDirPath;
@Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json if the path in loadSpec is different from where desciptor.json was found. Default value is true", required = false)
@Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json (partitionNum_descriptor.json for HDFS data storage) if the path in loadSpec is different from where desciptor.json (partitionNum_descriptor.json for HDFS data storage) was found. Default value is true", required = false)
private String updateDescriptor;
private ObjectMapper mapper;