Abstractify hadoopy indexer configuration.

* Moves many items to JobHelper
* Remove dependencies of these functions on HadoopDruidIndexerConfig in favor of more general items
* Changes functionalities of some of the path methods to always return a path with scheme
* Adds retry to uploads
* Change output loadSpec determining from using outputFS.getClass().getName() to using outputFS.getScheme()
This commit is contained in:
Charles Allen 2015-06-05 16:56:29 -07:00
parent 92d7316ed8
commit 2a76bdc60a
4 changed files with 324 additions and 293 deletions

View File

@ -53,9 +53,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup; import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -246,7 +244,8 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getPartitionsSpec(); return schema.getTuningConfig().getPartitionsSpec();
} }
public IndexSpec getIndexSpec() { public IndexSpec getIndexSpec()
{
return schema.getTuningConfig().getIndexSpec(); return schema.getTuningConfig().getIndexSpec();
} }
@ -488,35 +487,6 @@ public class HadoopDruidIndexerConfig
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
} }
public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
{
final Interval bucketInterval = schema.getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get();
if (fileSystem instanceof DistributedFileSystem) {
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
schema.getIOConfig().getSegmentOutputPath(),
schema.getDataSchema().getDataSource(),
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
schema.getTuningConfig().getVersion().replace(":", "_"),
bucket.partitionNum
)
);
}
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
schema.getIOConfig().getSegmentOutputPath(),
schema.getDataSchema().getDataSource(),
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
schema.getTuningConfig().getVersion(),
bucket.partitionNum
)
);
}
public void addJobProperties(Job job) public void addJobProperties(Job job)
{ {
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();

View File

@ -22,17 +22,14 @@ import com.google.common.base.Optional;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException; import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
@ -43,11 +40,9 @@ import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker; import io.druid.segment.IndexMaker;
import io.druid.segment.IndexSpec;
import io.druid.segment.LoggingProgressIndicator; import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator; import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OffheapIncrementalIndex;
@ -56,7 +51,7 @@ import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -71,21 +66,13 @@ import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/** /**
*/ */
@ -160,7 +147,7 @@ public class IndexGeneratorJob implements Jobby
SortableBytes.useSortableBytesAsMapOutputKey(job); SortableBytes.useSortableBytesAsMapOutputKey(job);
int numReducers = Iterables.size(config.getAllBuckets().get()); int numReducers = Iterables.size(config.getAllBuckets().get());
if(numReducers == 0) { if (numReducers == 0) {
throw new RuntimeException("No buckets?? seems there is no data to index."); throw new RuntimeException("No buckets?? seems there is no data to index.");
} }
job.setNumReduceTasks(numReducers); job.setNumReduceTasks(numReducers);
@ -179,8 +166,8 @@ public class IndexGeneratorJob implements Jobby
// once IndexIO doesn't rely on globally injected properties, we can move this into the HadoopTuningConfig. // once IndexIO doesn't rely on globally injected properties, we can move this into the HadoopTuningConfig.
final String bitmapProperty = "druid.processing.bitmap.type"; final String bitmapProperty = "druid.processing.bitmap.type";
final String bitmapType = HadoopDruidIndexerConfig.properties.getProperty(bitmapProperty); final String bitmapType = HadoopDruidIndexerConfig.properties.getProperty(bitmapProperty);
if(bitmapType != null) { if (bitmapType != null) {
for(String property : new String[] {"mapreduce.reduce.java.opts", "mapreduce.map.java.opts"}) { for (String property : new String[]{"mapreduce.reduce.java.opts", "mapreduce.map.java.opts"}) {
// prepend property to allow overriding using hadoop.xxx properties by JobHelper.injectSystemProperties above // prepend property to allow overriding using hadoop.xxx properties by JobHelper.injectSystemProperties above
String value = Strings.nullToEmpty(job.getConfiguration().get(property)); String value = Strings.nullToEmpty(job.getConfiguration().get(property));
job.getConfiguration().set(property, String.format("-D%s=%s %s", bitmapProperty, bitmapType, value)); job.getConfiguration().set(property, String.format("-D%s=%s %s", bitmapProperty, bitmapType, value));
@ -376,7 +363,8 @@ public class IndexGeneratorJob implements Jobby
allDimensionNames.addAll(inputRow.getDimensions()); allDimensionNames.addAll(inputRow.getDimensions());
numRows = index.add(inputRow); numRows = index.add(inputRow);
} catch (ParseException e) { }
catch (ParseException e) {
if (config.isIgnoreInvalidRows()) { if (config.isIgnoreInvalidRows()) {
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
@ -437,7 +425,50 @@ public class IndexGeneratorJob implements Jobby
indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
); );
} }
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
.getFileSystem(context.getConfiguration());
final DataSegment segment = JobHelper.serializeOutIndex(
new DataSegment(
config.getDataSource(),
interval,
config.getSchema().getTuningConfig().getVersion(),
null,
ImmutableList.copyOf(allDimensionNames),
metricNames,
config.getShardSpec(bucket).getActualSpec(),
-1,
-1
),
context.getConfiguration(),
context,
context.getTaskAttemptID(),
mergedBase,
JobHelper.makeSegmentOutputPath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
config.getSchema().getDataSchema().getDataSource(),
config.getSchema().getTuningConfig().getVersion(),
config.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum
)
);
Path descriptorPath = config.makeDescriptorInfoPath(segment);
descriptorPath = JobHelper.prependFSIfNullScheme(
FileSystem.get(
descriptorPath.toUri(),
context.getConfiguration()
), descriptorPath
);
log.info("Writing descriptor to path[%s]", descriptorPath);
JobHelper.writeSegmentDescriptor(
config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()),
segment,
descriptorPath,
FileContext.getFileContext(descriptorPath.toUri(), context.getConfiguration()),
context
);
for (File file : toMerge) { for (File file : toMerge) {
FileUtils.deleteDirectory(file); FileUtils.deleteDirectory(file);
} }
@ -447,237 +478,6 @@ public class IndexGeneratorJob implements Jobby
} }
} }
private void serializeOutIndex(Context context, Bucket bucket, File mergedBase, List<String> dimensionNames)
throws IOException
{
Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
int attemptNumber = context.getTaskAttemptID().getId();
final FileSystem intermediateFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()).getFileSystem(
context.getConfiguration()
);
final Path indexBasePath = config.makeSegmentOutputPath(outputFS, bucket);
final Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
outputFS.mkdirs(indexBasePath);
Exception caughtException = null;
ZipOutputStream out = null;
long size = 0;
try {
out = new ZipOutputStream(new BufferedOutputStream(outputFS.create(indexZipFilePath), 256 * 1024));
List<String> filesToCopy = Arrays.asList(mergedBase.list());
for (String file : filesToCopy) {
size += copyFile(context, out, mergedBase, file);
}
}
catch (Exception e) {
caughtException = e;
}
finally {
if (caughtException == null) {
Closeables.close(out, false);
} else {
CloseQuietly.close(out);
throw Throwables.propagate(caughtException);
}
}
Path finalIndexZipFilePath = new Path(indexBasePath, "index.zip");
final URI indexOutURI = finalIndexZipFilePath.toUri();
ImmutableMap<String, Object> loadSpec;
// We do String comparison instead of instanceof checks here because in Hadoop 2.6.0
// NativeS3FileSystem got moved to a separate jar (hadoop-aws) that is not guaranteed
// to be part of the core code anymore. The instanceof check requires that the class exist
// but we do not have any guarantee that it will exist, so instead we must pull out
// the String name of it and verify that. We do a full package-qualified test in order
// to be as explicit as possible.
String fsClazz = outputFS.getClass().getName();
if ("org.apache.hadoop.fs.s3native.NativeS3FileSystem".equals(fsClazz)) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", indexOutURI.getHost(),
"key", indexOutURI.getPath().substring(1) // remove the leading "/"
);
} else if ("org.apache.hadoop.fs.LocalFileSystem".equals(fsClazz)) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "local",
"path", indexOutURI.getPath()
);
} else if ("org.apache.hadoop.hdfs.DistributedFileSystem".equals(fsClazz)) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", indexOutURI.toString()
);
} else {
throw new ISE("Unknown file system[%s]", fsClazz);
}
DataSegment segment = new DataSegment(
config.getDataSource(),
interval,
config.getSchema().getTuningConfig().getVersion(),
loadSpec,
dimensionNames,
metricNames,
config.getShardSpec(bucket).getActualSpec(),
SegmentUtils.getVersionFromDir(mergedBase),
size
);
// retry 1 minute
boolean success = false;
for (int i = 0; i < 6; i++) {
if (renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
success = true;
break;
} else {
log.info("Failed to rename [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath);
try {
Thread.sleep(10000);
context.progress();
}
catch (InterruptedException e) {
throw new ISE(
"Thread error in retry loop for renaming [%s] to [%s]",
indexZipFilePath.toUri().getPath(),
finalIndexZipFilePath.toUri().getPath()
);
}
}
}
if (!success) {
if (!outputFS.exists(indexZipFilePath)) {
throw new ISE("File [%s] does not exist after retry loop.", indexZipFilePath.toUri().getPath());
}
if (outputFS.getFileStatus(indexZipFilePath).getLen() == outputFS.getFileStatus(finalIndexZipFilePath)
.getLen()) {
outputFS.delete(indexZipFilePath, true);
} else {
outputFS.delete(finalIndexZipFilePath, true);
if (!renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) {
throw new ISE(
"Files [%s] and [%s] are different, but still cannot rename after retry loop",
indexZipFilePath.toUri().getPath(),
finalIndexZipFilePath.toUri().getPath()
);
}
}
}
}
private boolean renameIndexFiles(
FileSystem intermediateFS,
FileSystem outputFS,
Path indexBasePath,
Path indexZipFilePath,
Path finalIndexZipFilePath,
DataSegment segment
)
throws IOException
{
final boolean needRename;
if (outputFS.exists(finalIndexZipFilePath)) {
// NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first
final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath);
final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath);
if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime()
|| zipFile.getLen() != finalIndexZipFile.getLen()) {
log.info(
"File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]",
finalIndexZipFile.getPath(),
new DateTime(finalIndexZipFile.getModificationTime()),
finalIndexZipFile.getLen(),
zipFile.getPath(),
new DateTime(zipFile.getModificationTime()),
zipFile.getLen()
);
outputFS.delete(finalIndexZipFilePath, false);
needRename = true;
} else {
log.info(
"File[%s / %s / %sB] existed and will be kept",
finalIndexZipFile.getPath(),
new DateTime(finalIndexZipFile.getModificationTime()),
finalIndexZipFile.getLen()
);
needRename = false;
}
} else {
needRename = true;
}
if (needRename && !outputFS.rename(indexZipFilePath, finalIndexZipFilePath)) {
return false;
}
writeSegmentDescriptor(outputFS, segment, new Path(indexBasePath, "descriptor.json"));
final Path descriptorPath = config.makeDescriptorInfoPath(segment);
log.info("Writing descriptor to path[%s]", descriptorPath);
intermediateFS.mkdirs(descriptorPath.getParent());
writeSegmentDescriptor(intermediateFS, segment, descriptorPath);
return true;
}
private void writeSegmentDescriptor(FileSystem outputFS, DataSegment segment, Path descriptorPath)
throws IOException
{
if (outputFS.exists(descriptorPath)) {
outputFS.delete(descriptorPath, false);
}
final FSDataOutputStream descriptorOut = outputFS.create(descriptorPath);
try {
HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment);
}
finally {
descriptorOut.close();
}
}
private long copyFile(
Context context, ZipOutputStream out, File mergedBase, final String filename
) throws IOException
{
createNewZipEntry(out, filename);
long numRead = 0;
InputStream in = null;
try {
in = new FileInputStream(new File(mergedBase, filename));
byte[] buf = new byte[0x10000];
int read;
while (true) {
read = in.read(buf);
if (read == -1) {
break;
}
out.write(buf, 0, read);
numRead += read;
context.progress();
}
}
finally {
CloseQuietly.close(in);
}
out.closeEntry();
context.progress();
return numRead;
}
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool) private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool)
{ {
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
@ -702,12 +502,6 @@ public class IndexGeneratorJob implements Jobby
); );
} }
} }
private void createNewZipEntry(ZipOutputStream out, String name) throws IOException
{
log.info("Creating new ZipEntry[%s]", name);
out.putNextEntry(new ZipEntry(name));
}
} }
public static class IndexGeneratorOutputFormat extends TextOutputFormat public static class IndexGeneratorOutputFormat extends TextOutputFormat

View File

@ -18,26 +18,47 @@
package io.druid.indexer; package io.druid.indexer;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.io.OutputSupplier; import com.google.common.io.OutputSupplier;
import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Progressable;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/** /**
*/ */
@ -47,6 +68,9 @@ public class JobHelper
private static final Set<Path> existing = Sets.newHashSet(); private static final Set<Path> existing = Sets.newHashSet();
private static final int NUM_RETRIES = 6;
private static final int SECONDS_BETWEEN_RETRIES = 10;
public static void setupClasspath( public static void setupClasspath(
HadoopDruidIndexerConfig config, HadoopDruidIndexerConfig config,
@ -103,7 +127,8 @@ public class JobHelper
injectSystemProperties(job.getConfiguration()); injectSystemProperties(job.getConfiguration());
} }
public static Configuration injectSystemProperties(Configuration conf) { public static Configuration injectSystemProperties(Configuration conf)
{
for (String propName : System.getProperties().stringPropertyNames()) { for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) { if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
@ -164,7 +189,7 @@ public class JobHelper
public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig) public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig)
{ {
if(indexerConfig.getInputFormatClass() != null) { if (indexerConfig.getInputFormatClass() != null) {
job.setInputFormatClass(indexerConfig.getInputFormatClass()); job.setInputFormatClass(indexerConfig.getInputFormatClass());
} else if (indexerConfig.isCombineText()) { } else if (indexerConfig.isCombineText()) {
job.setInputFormatClass(CombineTextInputFormat.class); job.setInputFormatClass(CombineTextInputFormat.class);
@ -172,4 +197,232 @@ public class JobHelper
job.setInputFormatClass(TextInputFormat.class); job.setInputFormatClass(TextInputFormat.class);
} }
} }
public static DataSegment serializeOutIndex(
final DataSegment segmentTemplate,
final Configuration configuration,
final Progressable progressable,
final TaskAttemptID taskAttemptID,
final File mergedBase,
final Path segmentBasePath
)
throws IOException
{
final FileSystem outputFS = FileSystem.get(segmentBasePath.toUri(), configuration);
final FileContext fileContext = FileContext.getFileContext(segmentBasePath.toUri(), configuration);
final Path tmpPath = new Path(segmentBasePath, String.format("index.zip.%d", taskAttemptID.getId()));
final AtomicLong size = new AtomicLong(0L);
final DataPusher zipPusher = (DataPusher) RetryProxy.create(
DataPusher.class, new DataPusher()
{
@Override
public void push() throws IOException
{
try (OutputStream outputStream = fileContext.create(
tmpPath,
EnumSet.of(CreateFlag.OVERWRITE, CreateFlag.CREATE),
Options.CreateOpts.createParent(),
Options.CreateOpts.bufferSize(256 * 1024)
)) {
size.set(zipAndCopyDir(mergedBase, outputStream, progressable));
outputStream.flush();
}
catch (IOException | RuntimeException exception) {
log.error(exception, "Exception in retry loop");
throw exception;
}
}
},
RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
zipPusher.push();
log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri());
final Path finalIndexZipFilePath = new Path(segmentBasePath, "index.zip");
final URI indexOutURI = finalIndexZipFilePath.toUri();
final ImmutableMap<String, Object> loadSpec;
// TODO: Make this a part of Pushers or Pullers
switch (outputFS.getScheme()) {
case "hdfs":
loadSpec = ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", indexOutURI.toString()
);
break;
case "s3":
case "s3n":
loadSpec = ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", indexOutURI.getHost(),
"key", indexOutURI.getPath().substring(1) // remove the leading "/"
);
break;
case "file":
loadSpec = ImmutableMap.<String, Object>of(
"type", "local",
"path", indexOutURI.getPath()
);
break;
default:
throw new IAE("Unknown file system scheme [%s]", outputFS.getScheme());
}
final DataSegment finalSegment = segmentTemplate
.withLoadSpec(loadSpec)
.withSize(size.get())
.withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase));
fileContext.rename(tmpPath, finalIndexZipFilePath, Options.Rename.OVERWRITE);
writeSegmentDescriptor(
outputFS,
finalSegment,
new Path(segmentBasePath, "descriptor.json"),
fileContext,
progressable
);
return finalSegment;
}
public static void writeSegmentDescriptor(
final FileSystem outputFS,
final DataSegment segment,
final Path descriptorPath,
final FileContext fileContext,
final Progressable progressable
)
throws IOException
{
final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(
DataPusher.class, new DataPusher()
{
@Override
public void push() throws IOException
{
try {
progressable.progress();
if (outputFS.exists(descriptorPath)) {
if (!fileContext.delete(descriptorPath, false)) {
throw new IOException(String.format("Failed to delete descriptor at [%s]", descriptorPath));
}
}
try (final OutputStream descriptorOut = fileContext.create(
descriptorPath,
EnumSet.of(CreateFlag.OVERWRITE, CreateFlag.CREATE),
Options.CreateOpts.bufferSize(256 * 1024),
Options.CreateOpts.createParent()
)) {
HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment);
descriptorOut.flush();
}
}
catch (RuntimeException | IOException ex) {
log.info(ex, "Error in retry loop");
throw ex;
}
}
},
RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
descriptorPusher.push();
}
/**
* Simple interface for retry operations
*/
public interface DataPusher
{
void push() throws IOException;
}
public static long zipAndCopyDir(
File baseDir,
OutputStream baseOutputStream,
Progressable progressable
) throws IOException
{
long size = 0L;
try (ZipOutputStream outputStream = new ZipOutputStream(baseOutputStream)) {
List<String> filesToCopy = Arrays.asList(baseDir.list());
for (String fileName : filesToCopy) {
final File fileToCopy = new File(baseDir, fileName);
if (java.nio.file.Files.isRegularFile(fileToCopy.toPath())) {
size += copyFileToZipStream(fileToCopy, outputStream, progressable);
} else {
log.warn("File at [%s] is not a regular file! skipping as part of zip", fileToCopy.getPath());
}
}
outputStream.flush();
}
return size;
}
public static long copyFileToZipStream(
File file,
ZipOutputStream zipOutputStream,
Progressable progressable
) throws IOException
{
createNewZipEntry(zipOutputStream, file);
long numRead = 0;
try (FileInputStream inputStream = new FileInputStream(file)) {
byte[] buf = new byte[0x10000];
for (int bytesRead = inputStream.read(buf); bytesRead >= 0; bytesRead = inputStream.read(buf)) {
progressable.progress();
if (bytesRead == 0) {
continue;
}
zipOutputStream.write(buf, 0, bytesRead);
progressable.progress();
numRead += bytesRead;
}
}
zipOutputStream.closeEntry();
progressable.progress();
return numRead;
}
private static void createNewZipEntry(ZipOutputStream out, File file) throws IOException
{
log.info("Creating new ZipEntry[%s]", file.getName());
out.putNextEntry(new ZipEntry(file.getName()));
}
public static Path makeSegmentOutputPath(
Path basePath,
FileSystem fileSystem,
String dataSource,
String version,
Interval interval,
int partitionNum
)
{
Path outputPath = new Path(prependFSIfNullScheme(fileSystem, basePath), "./" + dataSource);
if ("hdfs".equals(fileSystem.getScheme())) {
outputPath = new Path(
outputPath, String.format(
"./%s_%s",
interval.getStart().toString(ISODateTimeFormat.basicDateTime()),
interval.getEnd().toString(ISODateTimeFormat.basicDateTime())
)
);
outputPath = new Path(outputPath, version.replace(":", "_"));
} else {
outputPath = new Path(
outputPath, String.format(
"./%s_%s",
interval.getStart().toString(),
interval.getEnd().toString()
)
);
outputPath = new Path(outputPath, String.format("./%s", version));
}
outputPath = new Path(outputPath, Integer.toString(partitionNum));
return outputPath;
}
public static Path prependFSIfNullScheme(FileSystem fs, Path path)
{
if (path.toUri().getScheme() == null) {
path = new Path(fs.getUri().toString(), String.format("./%s", path));
}
return path;
}
} }

View File

@ -96,7 +96,14 @@ public class HadoopDruidIndexerConfigTest
); );
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket); Path path = JobHelper.makeSegmentOutputPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getTuningConfig().getVersion(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum
);
Assert.assertEquals( Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712",
path.toString() path.toString()
@ -142,9 +149,16 @@ public class HadoopDruidIndexerConfigTest
); );
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket); Path path = JobHelper.makeSegmentOutputPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getTuningConfig().getVersion(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum
);
Assert.assertEquals( Assert.assertEquals(
"/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712",
path.toString() path.toString()
); );