1) Changes to allow for local storage

This commit is contained in:
Eric Tschetter 2013-02-19 19:22:59 -06:00
parent dc3459d3f9
commit f8c54a72c2
27 changed files with 433 additions and 120 deletions

View File

@ -174,7 +174,7 @@ public class DefaultObjectMapper extends ObjectMapper
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
configure(MapperFeature.AUTO_DETECT_GETTERS, false);
configure(MapperFeature.AUTO_DETECT_CREATORS, false);
// configure(MapperFeature.AUTO_DETECT_CREATORS, false); https://github.com/FasterXML/jackson-databind/issues/170
configure(MapperFeature.AUTO_DETECT_FIELDS, false);
configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
configure(MapperFeature.AUTO_DETECT_SETTERS, false);

View File

@ -21,6 +21,7 @@ package com.metamx.druid.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
@ -29,6 +30,7 @@ import sun.misc.IOUtils;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -36,6 +38,7 @@ import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
*/
@ -43,10 +46,43 @@ public class CompressionUtils
{
private static final Logger log = new Logger(CompressionUtils.class);
public static long zip(File directory, File outputZipFile) throws IOException
{
if (!directory.isDirectory()) {
throw new IOException(String.format("directory[%s] is not a directory", directory));
}
if (!outputZipFile.getName().endsWith(".zip")) {
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
}
long totalSize = 0;
ZipOutputStream zipOut = null;
try {
zipOut = new ZipOutputStream(new FileOutputStream(outputZipFile));
File[] files = directory.listFiles();
for (File file : files) {
log.info("Adding file[%s] with size[%,d]. Total size[%,d]", file, file.length(), totalSize);
if (file.length() >= Integer.MAX_VALUE) {
zipOut.close();
outputZipFile.delete();
throw new IOException(String.format("file[%s] too large [%,d]", file, file.length()));
}
zipOut.putNextEntry(new ZipEntry(file.getName()));
totalSize += ByteStreams.copy(Files.newInputStreamSupplier(file), zipOut);
}
}
finally {
Closeables.closeQuietly(zipOut);
}
return totalSize;
}
public static void unzip(File pulledFile, File outDir) throws IOException
{
if (!(outDir.exists() && outDir.isDirectory())) {
throw new ISE("outDir[%s] must exist and be a directory");
throw new ISE("outDir[%s] must exist and be a directory", outDir);
}
log.info("Unzipping file[%s] to [%s]", pulledFile, outDir);

View File

@ -48,10 +48,12 @@ public class RealtimeStandaloneMain
rn.setPhoneBook(dummyPhoneBook);
MetadataUpdater dummyMetadataUpdater =
new MetadataUpdater(new DefaultObjectMapper(),
new MetadataUpdater(
new DefaultObjectMapper(),
Config.createFactory(Initialization.loadProperties()).build(MetadataUpdaterConfig.class),
dummyPhoneBook,
null) {
null
) {
@Override
public void publishSegment(DataSegment segment) throws IOException
{

View File

@ -266,8 +266,7 @@ public class DeterminePartitionsJob implements Jobby
Context context
) throws IOException, InterruptedException
{
// Create group key
// TODO -- There are more efficient ways to do this
// Create group key, there are probably more efficient ways of doing this
final Map<String, Set<String>> dims = Maps.newTreeMap();
for(final String dim : inputRow.getDimensions()) {
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));

View File

@ -662,8 +662,9 @@ public class HadoopDruidIndexerConfig
return new Path(
String.format(
"%s/%s_%s/%s/%s",
"%s/%s/%s_%s/%s/%s",
getSegmentOutputDir(),
dataSource,
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion().toString(),

View File

@ -379,7 +379,8 @@ public class IndexGeneratorJob implements Jobby
);
} else if (outputFS instanceof LocalFileSystem) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "test"
"type", "local",
"path", indexOutURI.getPath()
);
} else {
throw new ISE("Unknown file system[%s]", outputFS.getClass());

View File

@ -25,6 +25,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.merger.common.task.Task;
@ -94,7 +95,14 @@ public class TaskToolbox
final SingleSegmentLoader loader = new SingleSegmentLoader(
new S3DataSegmentPuller(s3Client),
new MMappedQueryableIndexFactory(),
new File(config.getTaskDir(task), "fetched_segments")
new SegmentLoaderConfig()
{
@Override
public File getCacheDirectory()
{
return new File(config.getTaskDir(task), "fetched_segments");
}
}
);
Map<DataSegment, File> retVal = Maps.newLinkedHashMap();

View File

@ -45,11 +45,11 @@ import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.Sink;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
@ -120,13 +120,13 @@ public class YeOldePlumberSchool implements PlumberSchool
@Override
public void finishJob()
{
// The segment we will upload
File fileToUpload = null;
try {
// User should have persisted everything by now.
Preconditions.checkState(!theSink.swappable(), "All data must be persisted before fininshing the job!");
// The segment we will upload
final File fileToUpload;
if(spilled.size() == 0) {
throw new IllegalStateException("Nothing indexed?");
} else if(spilled.size() == 1) {
@ -160,6 +160,17 @@ public class YeOldePlumberSchool implements PlumberSchool
log.warn(e, "Failed to merge and upload");
throw Throwables.propagate(e);
}
finally {
try {
if (fileToUpload != null) {
log.info("Deleting Index File[%s]", fileToUpload);
FileUtils.deleteDirectory(fileToUpload);
}
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", fileToUpload);
}
}
}
private void spillIfSwappable()

View File

@ -22,6 +22,7 @@ package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.common.logger.Logger;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
@ -37,7 +38,7 @@ import com.metamx.druid.merger.coordinator.TaskContext;
import com.metamx.druid.shard.NoneShardSpec;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -102,6 +103,9 @@ public class DeleteTask extends AbstractTask
segment.getVersion()
);
log.info("Deleting Uploaded Files[%s]", fileToUpload);
FileUtils.deleteDirectory(fileToUpload);
return TaskStatus.success(getId(), Lists.newArrayList(uploadedSegment));
}
}

View File

@ -46,7 +46,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -168,6 +168,9 @@ public abstract class MergeTask extends AbstractTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
log.info("Deleting Uploaded Files[%s]", fileToUpload);
FileUtils.deleteDirectory(fileToUpload);
return TaskStatus.success(getId(), Lists.newArrayList(uploadedSegment));
}
catch (Exception e) {

View File

@ -51,7 +51,7 @@ import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -405,7 +405,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
);
final DataSegmentPusher dataSegmentPusher = new S3DataSegmentPusher(
s3Client,
configFactory.build(S3SegmentPusherConfig.class),
configFactory.build(S3DataSegmentPusherConfig.class),
jsonMapper
);
taskToolbox = new TaskToolbox(config, emitter, s3Client, dataSegmentPusher, jsonMapper);

View File

@ -37,6 +37,7 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -45,7 +46,6 @@ import com.metamx.druid.merger.worker.TaskMonitor;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
@ -292,7 +292,7 @@ public class WorkerNode extends RegisteringNode
);
final DataSegmentPusher dataSegmentPusher = new S3DataSegmentPusher(
s3Client,
configFactory.build(S3SegmentPusherConfig.class),
configFactory.build(S3DataSegmentPusherConfig.class),
jsonMapper
);
taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, dataSegmentPusher, jsonMapper);

View File

@ -19,6 +19,7 @@
package com.metamx.druid.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
@ -31,6 +32,7 @@ public class FireDepartmentConfig
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
@JsonCreator
public FireDepartmentConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod

View File

@ -47,8 +47,10 @@ import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusherConfig;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
@ -258,6 +260,13 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
{
if (dataSegmentPusher == null) {
final Properties props = getProps();
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
dataSegmentPusher = new LocalDataSegmentPusher(
getConfigFactory().build(LocalDataSegmentPusherConfig.class), getJsonMapper()
);
}
else {
final RestS3Service s3Client;
try {
s3Client = new RestS3Service(
@ -271,7 +280,10 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
throw Throwables.propagate(e);
}
dataSegmentPusher = new S3DataSegmentPusher(s3Client, getConfigFactory().build(S3SegmentPusherConfig.class), getJsonMapper());
dataSegmentPusher = new S3DataSegmentPusher(
s3Client, getConfigFactory().build(S3DataSegmentPusherConfig.class), getJsonMapper()
);
}
}
}

View File

@ -307,7 +307,7 @@ public class RealtimePlumberSchool implements PlumberSchool
}
}
final File mergedFile;
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
@ -337,6 +337,19 @@ public class RealtimePlumberSchool implements PlumberSchool
.addData("interval", interval)
.emit();
}
if (mergedFile != null) {
try {
if (mergedFile != null) {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
}
}
}
}
);

View File

@ -21,7 +21,7 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -35,7 +35,7 @@ public class S3SegmentPusher extends com.metamx.druid.loading.S3DataSegmentPushe
{
public S3SegmentPusher(
RestS3Service s3Client,
S3SegmentPusherConfig config,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{

View File

@ -39,7 +39,7 @@ import com.metamx.druid.coordination.ZkCoordinatorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService;
@ -172,7 +172,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
);
setSegmentLoader(
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(QueryableLoaderConfig.class))
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(SegmentLoaderConfig.class))
);
}
catch (S3ServiceException e) {

View File

@ -26,15 +26,16 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.loading.DelegatingSegmentLoader;
import com.metamx.druid.loading.LocalDataSegmentPuller;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.QueryableIndexFactory;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.query.group.GroupByQueryEngine;
import com.metamx.druid.query.group.GroupByQueryEngineConfig;
import com.metamx.druid.Query;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.group.GroupByQuery;
@ -63,25 +64,22 @@ public class ServerInit
public static SegmentLoader makeDefaultQueryableLoader(
RestS3Service s3Client,
QueryableLoaderConfig config
SegmentLoaderConfig config
)
{
DelegatingSegmentLoader delegateLoader = new DelegatingSegmentLoader();
final S3DataSegmentPuller segmentGetter = new S3DataSegmentPuller(s3Client);
final QueryableIndexFactory factory = new MMappedQueryableIndexFactory();
final QueryableIndexFactory factory;
if ("mmap".equals(config.getQueryableFactoryType())) {
factory = new MMappedQueryableIndexFactory();
} else {
throw new ISE("Unknown queryableFactoryType[%s]", config.getQueryableFactoryType());
}
SingleSegmentLoader s3segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config);
SingleSegmentLoader localSegmentLoader = new SingleSegmentLoader(new LocalDataSegmentPuller(), factory, config);
SingleSegmentLoader segmentLoader = new SingleSegmentLoader(segmentGetter, factory, config.getCacheDirectory());
delegateLoader.setLoaderTypes(
ImmutableMap.<String, SegmentLoader>builder()
.put("s3", segmentLoader)
.put("s3_zip", segmentLoader)
.put("s3", s3segmentLoader)
.put("s3_zip", s3segmentLoader)
.put("local", localSegmentLoader)
.build()
);

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.common.base.Joiner;
import com.metamx.druid.client.DataSegment;
/**
*/
public class DataSegmentPusherUtil
{
private static final Joiner JOINER = Joiner.on("/").skipNulls();
public static String getStorageDir(DataSegment segment)
{
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
}
}

View File

@ -0,0 +1,105 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.common.io.Files;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
*/
public class LocalDataSegmentPuller implements DataSegmentPuller
{
private static final Logger log = new Logger(LocalDataSegmentPuller.class);
@Override
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
{
final File path = getFile(segment);
if (path.isDirectory()) {
if (path.equals(dir)) {
log.info("Asked to load [%s] into itself, done!", dir);
return;
}
log.info("Copying files from [%s] to [%s]", path, dir);
File file = null;
try {
final File[] files = path.listFiles();
for (int i = 0; i < files.length; ++i) {
file = files[i];
Files.copy(file, new File(dir, file.getName()));
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to copy file[%s].", file);
}
} else {
if (!path.getName().endsWith(".zip")) {
throw new SegmentLoadingException("File is not a zip file[%s]", path);
}
log.info("Unzipping local file[%s] to [%s]", path, dir);
try {
CompressionUtils.unzip(path, dir);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to unzip file[%s]", path);
}
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
final File file = getFile(segment);
long lastModified = Long.MAX_VALUE;
if (file.isDirectory()) {
for (File childFile : file.listFiles()) {
lastModified = Math.min(childFile.lastModified(), lastModified);
}
}
else {
lastModified = file.lastModified();
}
return lastModified;
}
private File getFile(DataSegment segment) throws SegmentLoadingException
{
final Map<String, Object> loadSpec = segment.getLoadSpec();
final File path = new File(MapUtils.getString(loadSpec, "path"));
if (!path.exists()) {
throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path);
}
return path;
}
}

View File

@ -0,0 +1,96 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;
/**
*/
public class LocalDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(LocalDataSegmentPusher.class);
private final LocalDataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
public LocalDataSegmentPusher(
LocalDataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException
{
File outDir = new File(config.getStorageDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
if (dataSegmentFile.equals(outDir)) {
long size = 0;
for (File file : dataSegmentFile.listFiles()) {
size += file.length();
}
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outDir))
.withSize(size)
.withBinaryVersion(IndexIO.getVersionFromDir(dataSegmentFile)),
outDir
);
}
outDir.mkdirs();
File outFile = new File(outDir, "index.zip");
log.info("Compressing files from[%s] to [%s]", dataSegmentFile, outFile);
long size = CompressionUtils.zip(dataSegmentFile, outFile);
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(IndexIO.getVersionFromDir(dataSegmentFile)),
outDir
);
}
private DataSegment createDescriptorFile(DataSegment segment, File outDir) throws IOException
{
File descriptorFile = new File(outDir, "descriptor.json");
log.info("Creating descriptor file at[%s]", descriptorFile);
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
return segment;
}
private ImmutableMap<String, Object> makeLoadSpec(File outFile)
{
return ImmutableMap.<String, Object>of("type", "local", "path", outFile.toString());
}
}

View File

@ -25,8 +25,8 @@ import java.io.File;
/**
*/
public abstract class S3SegmentGetterConfig
public abstract class LocalDataSegmentPusherConfig
{
@Config("druid.paths.indexCache")
public abstract File getCacheDirectory();
@Config("druid.pusher.local.storageDirectory")
public abstract File getStorageDirectory();
}

View File

@ -66,7 +66,7 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
catch (IOException e2) {
log.error(e, "Problem deleting parentDir[%s]", parentDir);
}
throw new SegmentLoadingException(e, e.getMessage());
throw new SegmentLoadingException(e, "%s", e.getMessage());
}
}
}

View File

@ -22,24 +22,20 @@ package com.metamx.druid.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closeables;
import com.metamx.common.ISE;
import com.metamx.common.StreamUtils;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.*;
import java.io.File;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
public class S3DataSegmentPusher implements DataSegmentPusher
{
@ -47,12 +43,12 @@ public class S3DataSegmentPusher implements DataSegmentPusher
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final RestS3Service s3Client;
private final S3SegmentPusherConfig config;
private final S3DataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
public S3DataSegmentPusher(
RestS3Service s3Client,
S3SegmentPusherConfig config,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
@ -67,35 +63,11 @@ public class S3DataSegmentPusher implements DataSegmentPusher
log.info("Uploading [%s] to S3", indexFilesDir);
String outputKey = JOINER.join(
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
DataSegmentPusherUtil.getStorageDir(segment)
);
long indexSize = 0;
final File zipOutFile = File.createTempFile("druid", "index.zip");
ZipOutputStream zipOut = null;
try {
zipOut = new ZipOutputStream(new FileOutputStream(zipOutFile));
File[] indexFiles = indexFilesDir.listFiles();
for (File indexFile : indexFiles) {
log.info("Adding indexFile[%s] with size[%,d]. Total size[%,d]", indexFile, indexFile.length(), indexSize);
if (indexFile.length() >= Integer.MAX_VALUE) {
throw new ISE("indexFile[%s] too large [%,d]", indexFile, indexFile.length());
}
zipOut.putNextEntry(new ZipEntry(indexFile.getName()));
IOUtils.copy(new FileInputStream(indexFile), zipOut);
indexSize += indexFile.length();
}
}
finally {
Closeables.closeQuietly(zipOut);
}
long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
try {
S3Object toPush = new S3Object(zipOutFile);
@ -119,7 +91,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
StreamUtils.copyToFileAndClose(new ByteArrayInputStream(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
@ -128,9 +100,6 @@ public class S3DataSegmentPusher implements DataSegmentPusher
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);
log.info("Deleting Index File[%s]", indexFilesDir);
FileUtils.deleteDirectory(indexFilesDir);
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();

View File

@ -24,7 +24,7 @@ import org.skife.config.Default;
/**
*/
public abstract class S3SegmentPusherConfig
public abstract class S3DataSegmentPusherConfig
{
@Config("druid.pusher.s3.bucket")
public abstract String getBucket();

View File

@ -21,13 +21,18 @@ package com.metamx.druid.loading;
import org.skife.config.Config;
import java.io.File;
/**
*/
public abstract class QueryableLoaderConfig extends S3SegmentGetterConfig
public abstract class SegmentLoaderConfig
{
@Config("druid.queryable.factory")
public String getQueryableFactoryType()
@Config({"druid.paths.indexCache", "druid.segmentCache.path"})
public abstract File getCacheDirectory();
@Config("druid.segmentCache.deleteOnRemove")
public boolean deleteOnRemove()
{
return "mmap";
return true;
}
}

View File

@ -20,6 +20,7 @@
package com.metamx.druid.loading;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.StreamUtils;
import com.metamx.common.logger.Logger;
@ -39,19 +40,19 @@ public class SingleSegmentLoader implements SegmentLoader
private final DataSegmentPuller dataSegmentPuller;
private final QueryableIndexFactory factory;
private File cacheDirectory;
private final SegmentLoaderConfig config;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
@Inject
public SingleSegmentLoader(
DataSegmentPuller dataSegmentPuller,
QueryableIndexFactory factory,
File cacheDirectory
SegmentLoaderConfig config
)
{
this.dataSegmentPuller = dataSegmentPuller;
this.factory = factory;
this.cacheDirectory = cacheDirectory;
this.config = config;
}
@Override
@ -65,34 +66,37 @@ public class SingleSegmentLoader implements SegmentLoader
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
File cacheFile = getCacheFile(segment);
if (cacheFile.exists()) {
long localLastModified = cacheFile.lastModified();
File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
long localLastModified = localStorageDir.lastModified();
long remoteLastModified = dataSegmentPuller.getLastModified(segment);
if (remoteLastModified > 0 && localLastModified >= remoteLastModified) {
log.info(
"Found cacheFile[%s] with modified[%s], which is same or after remote[%s]. Using.",
cacheFile,
localLastModified,
remoteLastModified
"Found localStorageDir[%s] with modified[%s], which is same or after remote[%s]. Using.",
localStorageDir, localLastModified, remoteLastModified
);
return cacheFile;
return localStorageDir;
}
}
dataSegmentPuller.getSegmentFiles(segment, cacheFile);
if (!cacheFile.getParentFile().mkdirs()) {
log.info("Unable to make parent file[%s]", cacheFile.getParentFile());
if (localStorageDir.exists()) {
try {
FileUtils.deleteDirectory(localStorageDir);
}
if (cacheFile.exists()) {
cacheFile.delete();
catch (IOException e) {
log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir);
}
}
if (!localStorageDir.mkdirs()) {
log.info("Unable to make parent file[%s]", localStorageDir);
}
return cacheFile;
dataSegmentPuller.getSegmentFiles(segment, localStorageDir);
return localStorageDir;
}
private File getCacheFile(DataSegment segment)
private File getLocalStorageDir(DataSegment segment)
{
String outputKey = JOINER.join(
segment.getDataSource(),
@ -105,7 +109,7 @@ public class SingleSegmentLoader implements SegmentLoader
segment.getShardSpec().getPartitionNum()
);
return new File(cacheDirectory, outputKey);
return new File(config.getCacheDirectory(), outputKey);
}
private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException
@ -134,7 +138,7 @@ public class SingleSegmentLoader implements SegmentLoader
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
File cacheFile = getCacheFile(segment).getParentFile();
File cacheFile = getLocalStorageDir(segment).getParentFile();
try {
log.info("Deleting directory[%s]", cacheFile);