1) Add support for storing segments in HDFS

This commit is contained in:
cheddar 2013-04-30 12:47:43 -05:00
parent cd535fcd79
commit 60b279b0d3
9 changed files with 288 additions and 24 deletions

View File

@ -46,32 +46,49 @@ public class CompressionUtils
public static long zip(File directory, File outputZipFile) throws IOException
{
if (!directory.isDirectory()) {
throw new IOException(String.format("directory[%s] is not a directory", directory));
}
if (!outputZipFile.getName().endsWith(".zip")) {
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
}
final FileOutputStream out = new FileOutputStream(outputZipFile);
try {
final long retVal = zip(directory, out);
out.close();
return retVal;
}
finally {
Closeables.closeQuietly(out);
}
}
public static long zip(File directory, OutputStream out) throws IOException
{
if (!directory.isDirectory()) {
throw new IOException(String.format("directory[%s] is not a directory", directory));
}
long totalSize = 0;
ZipOutputStream zipOut = null;
try {
zipOut = new ZipOutputStream(new FileOutputStream(outputZipFile));
zipOut = new ZipOutputStream(out);
File[] files = directory.listFiles();
for (File file : files) {
log.info("Adding file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize);
if (file.length() >= Integer.MAX_VALUE) {
zipOut.close();
outputZipFile.delete();
zipOut.finish();
throw new IOException(String.format("file[%s] too large [%,d]", file, file.length()));
}
zipOut.putNextEntry(new ZipEntry(file.getName()));
totalSize += ByteStreams.copy(Files.newInputStreamSupplier(file), zipOut);
}
zipOut.closeEntry();
}
finally {
Closeables.closeQuietly(zipOut);
if (zipOut != null) {
zipOut.finish();
}
}
return totalSize;
@ -100,11 +117,12 @@ public class CompressionUtils
ZipEntry entry;
while ((entry = zipIn.getNextEntry()) != null) {
OutputStream out = null;
FileOutputStream out = null;
try {
out = new FileOutputStream(new File(outDir, entry.getName()));
ByteStreams.copy(zipIn, out);
zipIn.closeEntry();
out.close();
}
finally {
Closeables.closeQuietly(out);

View File

@ -70,13 +70,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>

View File

@ -19,6 +19,7 @@
package com.metamx.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@ -47,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InvalidJobConfException;
@ -58,7 +60,6 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -417,6 +418,11 @@ public class IndexGeneratorJob implements Jobby
"type", "local",
"path", indexOutURI.getPath()
);
} else if (outputFS instanceof DistributedFileSystem) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", indexOutURI.getPath()
);
} else {
throw new ISE("Unknown file system[%s]", outputFS.getClass());
}

11
pom.xml
View File

@ -138,6 +138,17 @@
<artifactId>curator-x-discovery</artifactId>
<version>${netflix.curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>

View File

@ -168,6 +168,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</dependency>
<!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
<dependency>

View File

@ -27,8 +27,13 @@ import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.Query;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.DelegatingSegmentLoader;
import com.metamx.druid.loading.HdfsDataSegmentPuller;
import com.metamx.druid.loading.HdfsDataSegmentPusher;
import com.metamx.druid.loading.HdfsDataSegmentPusherConfig;
import com.metamx.druid.loading.LocalDataSegmentPuller;
import com.metamx.druid.loading.LocalDataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusherConfig;
@ -37,15 +42,13 @@ import com.metamx.druid.loading.QueryableIndexFactory;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.query.group.GroupByQueryEngine;
import com.metamx.druid.query.group.GroupByQueryEngineConfig;
import com.metamx.druid.Query;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.group.GroupByQueryEngine;
import com.metamx.druid.query.group.GroupByQueryEngineConfig;
import com.metamx.druid.query.group.GroupByQueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQueryRunnerFactoryConfig;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
@ -57,6 +60,7 @@ import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.utils.PropUtils;
import org.apache.hadoop.conf.Configuration;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
@ -85,13 +89,13 @@ public class ServerInit
final QueryableIndexFactory factory = new MMappedQueryableIndexFactory();
SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
SingleSegmentLoader localSegmentLoader = new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config);
delegateLoader.setLoaderTypes(
ImmutableMap.<String, SegmentLoader>builder()
.put("s3", s3segmentLoader)
.put("s3_zip", s3segmentLoader)
.put("local", localSegmentLoader)
.put("local", new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config))
.put("hdfs", new SingleSegmentLoader(new HdfsDataSegmentPuller(new Configuration()), factory, config))
.build()
);
@ -167,6 +171,11 @@ public class ServerInit
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper);
}
else if (Boolean.parseBoolean(props.getProperty("druid.pusher.hdfs", "false"))) {
final HdfsDataSegmentPusherConfig config = configFactory.build(HdfsDataSegmentPusherConfig.class);
return new HdfsDataSegmentPusher(config, new Configuration(), jsonMapper);
}
else {
final RestS3Service s3Client;

View File

@ -0,0 +1,85 @@
package com.metamx.druid.loading;
import com.google.common.io.Closeables;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
/**
*/
public class HdfsDataSegmentPuller implements DataSegmentPuller
{
private final Configuration config;
public HdfsDataSegmentPuller(final Configuration config)
{
this.config = config;
}
@Override
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
{
final Path path = getPath(segment);
final FileSystem fs = checkPathAndGetFilesystem(path);
FSDataInputStream in = null;
try {
if (path.getName().endsWith(".zip")) {
in = fs.open(path);
CompressionUtils.unzip(in, dir);
in.close();
}
else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Some IOException");
}
finally {
Closeables.closeQuietly(in);
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
Path path = getPath(segment);
FileSystem fs = checkPathAndGetFilesystem(path);
try {
return fs.getFileStatus(path).getModificationTime();
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Problem loading status of path[%s]", path);
}
}
private Path getPath(DataSegment segment) {
return new Path(String.valueOf(segment.getLoadSpec().get("path")));
}
private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException
{
FileSystem fs;
try {
fs = path.getFileSystem(config);
if (!fs.exists(path)) {
throw new SegmentLoadingException("Path[%s] doesn't exist.", path);
}
return fs;
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", path);
}
}
}

View File

@ -0,0 +1,106 @@
package com.metamx.druid.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
/**
*/
public class HdfsDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(HdfsDataSegmentPusher.class);
private final HdfsDataSegmentPusherConfig config;
private final Configuration hadoopConfig;
private final ObjectMapper jsonMapper;
public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
Configuration hadoopConfig,
ObjectMapper jsonMapper
)
{
this.config = config;
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
}
@Override
public DataSegment push(File inDir, DataSegment segment) throws IOException
{
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
FileSystem fs = outFile.getFileSystem(hadoopConfig);
fs.mkdirs(outFile.getParent());
log.info("Compressing files from[%s] to [%s]", inDir, outFile);
FSDataOutputStream out = null;
long size;
try {
out = fs.create(outFile);
size = CompressionUtils.zip(inDir, out);
out.close();
}
finally {
Closeables.closeQuietly(out);
}
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(IndexIO.CURRENT_VERSION_ID),
outFile.getParent(),
fs
);
}
private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs) throws IOException
{
final Path descriptorFile = new Path(outDir, "descriptor.json");
log.info("Creating descriptor file at[%s]", descriptorFile);
ByteStreams.copy(
ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)),
new HdfsOutputStreamSupplier(fs, descriptorFile)
);
return segment;
}
private ImmutableMap<String, Object> makeLoadSpec(Path outFile)
{
return ImmutableMap.<String, Object>of("type", "hdfs", "path", outFile.toString());
}
private static class HdfsOutputStreamSupplier implements OutputSupplier<OutputStream>
{
private final FileSystem fs;
private final Path descriptorFile;
public HdfsOutputStreamSupplier(FileSystem fs, Path descriptorFile)
{
this.fs = fs;
this.descriptorFile = descriptorFile;
}
@Override
public OutputStream getOutput() throws IOException
{
return fs.create(descriptorFile);
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import org.skife.config.Config;
import java.io.File;
/**
*/
public abstract class HdfsDataSegmentPusherConfig
{
@Config("druid.pusher.hdfs.storageDirectory")
public abstract File getStorageDirectory();
}