diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 99f243eaf17..fcf24454118 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -47,7 +47,6 @@ import io.druid.guice.annotations.Self; import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.path.PathSpec; import io.druid.initialization.Initialization; -import io.druid.segment.column.ColumnConfig; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; @@ -170,7 +169,6 @@ public class HadoopDruidIndexerConfig private volatile HadoopIngestionSpec schema; private volatile PathSpec pathSpec; - private volatile ColumnConfig columnConfig; private volatile Map shardSpecLookups = Maps.newHashMap(); private volatile Map hadoopShardSpecLookup = Maps.newHashMap(); @@ -179,7 +177,6 @@ public class HadoopDruidIndexerConfig final @JsonProperty("schema") HadoopIngestionSpec schema ) { - this.columnConfig = columnConfig; this.schema = schema; this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class); for (Map.Entry> entry : schema.getTuningConfig().getShardSpecs().entrySet()) { @@ -207,23 +204,12 @@ public class HadoopDruidIndexerConfig } } - // TODO: remove this - public boolean isLegacy() - { - return schema.isLegacy(); - } - @JsonProperty public HadoopIngestionSpec getSchema() { return schema; } - public ColumnConfig getColumnConfig() - { - return columnConfig; - } - public String getDataSource() { return schema.getDataSchema().getDataSource(); @@ -404,6 +390,11 @@ public class HadoopDruidIndexerConfig } } + public boolean isPersistInHeap() + { + return schema.getTuningConfig().isPersistInHeap(); + } + /****************************************** Path helper logic ******************************************/ diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java index 4c1292095e0..4f41cc52df8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java @@ -57,10 +57,10 @@ public class HadoopDruidIndexerJob implements Jobby List jobs = Lists.newArrayList(); JobHelper.ensurePaths(config); - if (config.isLegacy()) { - indexJob = new LegacyIndexGeneratorJob(config); - } else { + if (config.isPersistInHeap()) { indexJob = new IndexGeneratorJob(config); + } else { + indexJob = new LegacyIndexGeneratorJob(config); } jobs.add(indexJob); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index 753ac1884d3..028f97d7db5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -50,14 +50,11 @@ public class HadoopIngestionSpec extends IngestionSpec jobProperties; private final boolean combineText; + private final boolean persistInHeap; @JsonCreator public HadoopTuningConfig( @@ -81,7 +83,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("overwriteFiles") boolean overwriteFiles, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("jobProperties") Map jobProperties, - final @JsonProperty("combineText") boolean combineText + final @JsonProperty("combineText") boolean combineText, + final @JsonProperty("persistInHeap") boolean persistInHeap ) { this.workingPath = workingPath == null ? null : workingPath; @@ -97,6 +100,7 @@ public class HadoopTuningConfig implements TuningConfig ? ImmutableMap.of() : ImmutableMap.copyOf(jobProperties)); this.combineText = combineText; + this.persistInHeap = persistInHeap; } @JsonProperty @@ -165,6 +169,12 @@ public class HadoopTuningConfig implements TuningConfig return combineText; } + @JsonProperty + public boolean isPersistInHeap() + { + return persistInHeap; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -178,7 +188,8 @@ public class HadoopTuningConfig implements TuningConfig overwriteFiles, ignoreInvalidRows, jobProperties, - combineText + combineText, + persistInHeap ); } @@ -195,7 +206,8 @@ public class HadoopTuningConfig implements TuningConfig overwriteFiles, ignoreInvalidRows, jobProperties, - combineText + combineText, + persistInHeap ); } @@ -212,7 +224,8 @@ public class HadoopTuningConfig implements TuningConfig overwriteFiles, ignoreInvalidRows, jobProperties, - combineText + combineText, + persistInHeap ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index bdd25f8e78d..13b9cb9b2cf 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -37,7 +37,6 @@ import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.BaseProgressIndicator; import io.druid.segment.IndexIO; import io.druid.segment.IndexMaker; import io.druid.segment.LoggingProgressIndicator; @@ -90,16 +89,6 @@ import java.util.zip.ZipOutputStream; public class IndexGeneratorJob implements Jobby { private static final Logger log = new Logger(IndexGeneratorJob.class); - private final HadoopDruidIndexerConfig config; - private IndexGeneratorStats jobStats; - - public IndexGeneratorJob( - HadoopDruidIndexerConfig config - ) - { - this.config = config; - this.jobStats = new IndexGeneratorStats(); - } public static List getPublishedSegments(HadoopDruidIndexerConfig config) { @@ -133,6 +122,22 @@ public class IndexGeneratorJob implements Jobby return publishedSegments; } + private final HadoopDruidIndexerConfig config; + private IndexGeneratorStats jobStats; + + public IndexGeneratorJob( + HadoopDruidIndexerConfig config + ) + { + this.config = config; + this.jobStats = new IndexGeneratorStats(); + } + + protected void setReducerClass(final Job job) + { + job.setReducerClass(IndexGeneratorReducer.class); + } + public IndexGeneratorStats getJobStats() { return jobStats; @@ -164,7 +169,7 @@ public class IndexGeneratorJob implements Jobby job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get())); job.setPartitionerClass(IndexGeneratorPartitioner.class); - job.setReducerClass(IndexGeneratorReducer.class); + setReducerClass(job); job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(IndexGeneratorOutputFormat.class); @@ -193,7 +198,6 @@ public class IndexGeneratorJob implements Jobby } public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper - { @Override protected void innerMap( @@ -259,6 +263,42 @@ public class IndexGeneratorJob implements Jobby private List metricNames = Lists.newArrayList(); private StringInputRowParser parser; + protected ProgressIndicator makeProgressIndicator(final Context context) + { + return new LoggingProgressIndicator("IndexGeneratorJob") + { + @Override + public void progress() + { + context.progress(); + } + }; + } + + protected File persist( + final IncrementalIndex index, + final Interval interval, + final File file, + final ProgressIndicator progressIndicator + ) throws IOException + { + return IndexMaker.persist( + index, interval, file, progressIndicator + ); + } + + protected File mergeQueryableIndex( + final List indexes, + final AggregatorFactory[] aggs, + final File file, + ProgressIndicator progressIndicator + ) throws IOException + { + return IndexMaker.mergeQueryableIndex( + indexes, aggs, file, progressIndicator + ); + } + @Override protected void setup(Context context) throws IOException, InterruptedException @@ -297,14 +337,7 @@ public class IndexGeneratorJob implements Jobby long startTime = System.currentTimeMillis(); Set allDimensionNames = Sets.newHashSet(); - final ProgressIndicator progressIndicator = new LoggingProgressIndicator("IndexGeneratorJob") - { - @Override - public void progress() - { - context.progress(); - } - }; + final ProgressIndicator progressIndicator = makeProgressIndicator(context); for (final Text value : values) { context.progress(); @@ -327,9 +360,7 @@ public class IndexGeneratorJob implements Jobby toMerge.add(file); context.progress(); - IndexMaker.persist( - index, interval, file, progressIndicator - ); + persist(index, interval, file, progressIndicator); // close this index and make a new one index.close(); index = makeIncrementalIndex(bucket, aggs); @@ -350,22 +381,18 @@ public class IndexGeneratorJob implements Jobby } mergedBase = new File(baseFlushFile, "merged"); - IndexMaker.persist( - index, interval, mergedBase, progressIndicator - ); + persist(index, interval, mergedBase, progressIndicator); } else { if (!index.isEmpty()) { final File finalFile = new File(baseFlushFile, "final"); - IndexMaker.persist( - index, interval, finalFile, progressIndicator - ); + persist(index, interval, finalFile, progressIndicator); toMerge.add(finalFile); } for (File file : toMerge) { indexes.add(IndexIO.loadIndex(file)); } - mergedBase = IndexMaker.mergeQueryableIndex( + mergedBase = mergeQueryableIndex( indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator ); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java index ccfb555d998..876492ba8f2 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java @@ -19,591 +19,68 @@ package io.druid.indexer; -import com.google.common.base.Optional; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.io.Closeables; -import com.google.common.primitives.Longs; -import com.metamx.common.IAE; -import com.metamx.common.ISE; -import com.metamx.common.guava.CloseQuietly; -import com.metamx.common.logger.Logger; -import io.druid.data.input.InputRow; -import io.druid.data.input.impl.StringInputRowParser; -import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.IndexIO; +import io.druid.segment.BaseProgressIndicator; import io.druid.segment.IndexMerger; +import io.druid.segment.ProgressIndicator; import io.druid.segment.QueryableIndex; -import io.druid.segment.SegmentUtils; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.timeline.DataSegment; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3native.NativeS3FileSystem; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.joda.time.DateTime; import org.joda.time.Interval; -import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.List; -import java.util.Set; -import java.util.zip.ZipEntry; -import java.util.zip.ZipOutputStream; /** */ public class LegacyIndexGeneratorJob extends IndexGeneratorJob { - private static final Logger log = new Logger(IndexGeneratorJob.class); - private final HadoopDruidIndexerConfig config; - private IndexGeneratorStats jobStats; - public LegacyIndexGeneratorJob( HadoopDruidIndexerConfig config ) { super(config); - this.config = config; - this.jobStats = new IndexGeneratorStats(); } - public boolean run() + @Override + protected void setReducerClass(Job job) { - try { - Job job = new Job( - new Configuration(), - String.format("%s-index-generator-%s", config.getDataSource(), config.getIntervals()) - ); - - job.getConfiguration().set("io.sort.record.percent", "0.23"); - - JobHelper.injectSystemProperties(job); - - if (config.isCombineText()) { - job.setInputFormatClass(CombineTextInputFormat.class); - } else { - job.setInputFormatClass(TextInputFormat.class); - } - - job.setMapperClass(IndexGeneratorMapper.class); - job.setMapOutputValueClass(Text.class); - - SortableBytes.useSortableBytesAsMapOutputKey(job); - - job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get())); - job.setPartitionerClass(IndexGeneratorPartitioner.class); - - job.setReducerClass(IndexGeneratorReducer.class); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(Text.class); - job.setOutputFormatClass(IndexGeneratorOutputFormat.class); - FileOutputFormat.setOutputPath(job, config.makeIntermediatePath()); - - config.addInputPaths(job); - config.addJobProperties(job); - config.intoConfiguration(job); - - JobHelper.setupClasspath(config, job); - - job.submit(); - log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL()); - - boolean success = job.waitForCompletion(true); - - Counter invalidRowCount = job.getCounters() - .findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER); - jobStats.setInvalidRowCount(invalidRowCount.getValue()); - - return success; - } - catch (Exception e) { - throw new RuntimeException(e); - } + job.setReducerClass(LegacyIndexGeneratorReducer.class); } - public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper - + public static class LegacyIndexGeneratorReducer extends IndexGeneratorJob.IndexGeneratorReducer { @Override - protected void innerMap( - InputRow inputRow, - Text text, - Context context - ) throws IOException, InterruptedException + protected ProgressIndicator makeProgressIndicator(final Context context) { - // Group by bucket, sort by timestamp - final Optional bucket = getConfig().getBucket(inputRow); - - if (!bucket.isPresent()) { - throw new ISE("WTF?! No bucket found for row: %s", inputRow); - } - - context.write( - new SortableBytes( - bucket.get().toGroupKey(), - Longs.toByteArray(inputRow.getTimestampFromEpoch()) - ).toBytesWritable(), - text - ); - } - } - - public static class IndexGeneratorPartitioner extends Partitioner implements Configurable - { - private Configuration config; - - @Override - public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) - { - final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); - bytes.position(4); // Skip length added by SortableBytes - int shardNum = bytes.getInt(); - if (config.get("mapred.job.tracker").equals("local")) { - return shardNum % numPartitions; - } else { - if (shardNum >= numPartitions) { - throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions); - } - return shardNum; - - } - } - - @Override - public Configuration getConf() - { - return config; - } - - @Override - public void setConf(Configuration config) - { - this.config = config; - } - } - - public static class IndexGeneratorReducer extends Reducer - { - private HadoopDruidIndexerConfig config; - private List metricNames = Lists.newArrayList(); - private StringInputRowParser parser; - - @Override - protected void setup(Context context) - throws IOException, InterruptedException - { - config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - - for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) { - metricNames.add(factory.getName().toLowerCase()); - } - - parser = config.getParser(); - } - - @Override - protected void reduce( - BytesWritable key, Iterable values, final Context context - ) throws IOException, InterruptedException - { - SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); - Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; - - final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); - //final DataRollupSpec rollupSpec = config.getRollupSpec(); - final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators(); - - IncrementalIndex index = makeIncrementalIndex(bucket, aggs); - try { - File baseFlushFile = File.createTempFile("base", "flush"); - baseFlushFile.delete(); - baseFlushFile.mkdirs(); - - Set toMerge = Sets.newTreeSet(); - int indexCount = 0; - int lineCount = 0; - int runningTotalLineCount = 0; - long startTime = System.currentTimeMillis(); - - Set allDimensionNames = Sets.newHashSet(); - final IndexMerger.ProgressIndicator progressIndicator = new IndexMerger.ProgressIndicator() + return new BaseProgressIndicator() + { + @Override + public void progress() { - @Override - public void progress() - { - context.progress(); - } - }; - - for (final Text value : values) { context.progress(); - final InputRow inputRow = index.formatRow(parser.parse(value.toString())); - allDimensionNames.addAll(inputRow.getDimensions()); - - int numRows = index.add(inputRow); - ++lineCount; - - if (index.isFull()) { - log.info( - "%,d lines to %,d rows in %,d millis", - lineCount - runningTotalLineCount, - numRows, - System.currentTimeMillis() - startTime - ); - runningTotalLineCount = lineCount; - - final File file = new File(baseFlushFile, String.format("index%,05d", indexCount)); - toMerge.add(file); - - context.progress(); - IndexMerger.persist( - index, interval, file, progressIndicator - ); - // close this index and make a new one - index.close(); - index = makeIncrementalIndex(bucket, aggs); - - startTime = System.currentTimeMillis(); - ++indexCount; - } } - - log.info("%,d lines completed.", lineCount); - - List indexes = Lists.newArrayListWithCapacity(indexCount); - final File mergedBase; - - if (toMerge.size() == 0) { - if (index.isEmpty()) { - throw new IAE("If you try to persist empty indexes you are going to have a bad time"); - } - - mergedBase = new File(baseFlushFile, "merged"); - IndexMerger.persist( - index, interval, mergedBase, progressIndicator - ); - } else { - if (!index.isEmpty()) { - final File finalFile = new File(baseFlushFile, "final"); - IndexMerger.persist( - index, interval, finalFile, progressIndicator - ); - toMerge.add(finalFile); - } - - for (File file : toMerge) { - indexes.add(IndexIO.loadIndex(file)); - } - mergedBase = IndexMerger.mergeQueryableIndex( - indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator - ); - } - serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); - for (File file : toMerge) { - FileUtils.deleteDirectory(file); - } - } - finally { - index.close(); - } + }; } - private void serializeOutIndex(Context context, Bucket bucket, File mergedBase, List dimensionNames) - throws IOException - { - Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); - - int attemptNumber = context.getTaskAttemptID().getId(); - - FileSystem fileSystem = FileSystem.get(context.getConfiguration()); - Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); - Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); - final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); - final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); - - outputFS.mkdirs(indexBasePath); - - Exception caughtException = null; - ZipOutputStream out = null; - long size = 0; - try { - out = new ZipOutputStream(new BufferedOutputStream(outputFS.create(indexZipFilePath), 256 * 1024)); - - List filesToCopy = Arrays.asList(mergedBase.list()); - - for (String file : filesToCopy) { - size += copyFile(context, out, mergedBase, file); - } - } - catch (Exception e) { - caughtException = e; - } - finally { - if (caughtException == null) { - Closeables.close(out, false); - } else { - CloseQuietly.close(out); - throw Throwables.propagate(caughtException); - } - } - - Path finalIndexZipFilePath = new Path(indexBasePath, "index.zip"); - final URI indexOutURI = finalIndexZipFilePath.toUri(); - ImmutableMap loadSpec; - if (outputFS instanceof NativeS3FileSystem) { - loadSpec = ImmutableMap.of( - "type", "s3_zip", - "bucket", indexOutURI.getHost(), - "key", indexOutURI.getPath().substring(1) // remove the leading "/" - ); - } else if (outputFS instanceof LocalFileSystem) { - loadSpec = ImmutableMap.of( - "type", "local", - "path", indexOutURI.getPath() - ); - } else if (outputFS instanceof DistributedFileSystem) { - loadSpec = ImmutableMap.of( - "type", "hdfs", - "path", indexOutURI.getPath() - ); - } else { - throw new ISE("Unknown file system[%s]", outputFS.getClass()); - } - - DataSegment segment = new DataSegment( - config.getDataSource(), - interval, - config.getSchema().getTuningConfig().getVersion(), - loadSpec, - dimensionNames, - metricNames, - config.getShardSpec(bucket).getActualSpec(), - SegmentUtils.getVersionFromDir(mergedBase), - size - ); - - // retry 1 minute - boolean success = false; - for (int i = 0; i < 6; i++) { - if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { - log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); - success = true; - break; - } else { - log.info("Failed to rename [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); - try { - Thread.sleep(10000); - context.progress(); - } - catch (InterruptedException e) { - throw new ISE( - "Thread error in retry loop for renaming [%s] to [%s]", - indexZipFilePath.toUri().getPath(), - finalIndexZipFilePath.toUri().getPath() - ); - } - } - } - - if (!success) { - if (!outputFS.exists(indexZipFilePath)) { - throw new ISE("File [%s] does not exist after retry loop.", indexZipFilePath.toUri().getPath()); - } - - if (outputFS.getFileStatus(indexZipFilePath).getLen() == outputFS.getFileStatus(finalIndexZipFilePath) - .getLen()) { - outputFS.delete(indexZipFilePath, true); - } else { - outputFS.delete(finalIndexZipFilePath, true); - if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { - throw new ISE( - "Files [%s] and [%s] are different, but still cannot rename after retry loop", - indexZipFilePath.toUri().getPath(), - finalIndexZipFilePath.toUri().getPath() - ); - } - } - } - } - - private boolean renameIndexFiles( - FileSystem intermediateFS, - FileSystem outputFS, - Path indexBasePath, - Path indexZipFilePath, - Path finalIndexZipFilePath, - DataSegment segment - ) - throws IOException - { - final boolean needRename; - - if (outputFS.exists(finalIndexZipFilePath)) { - // NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first - final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath); - final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath); - - if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime() - || zipFile.getLen() != finalIndexZipFile.getLen()) { - log.info( - "File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]", - finalIndexZipFile.getPath(), - new DateTime(finalIndexZipFile.getModificationTime()), - finalIndexZipFile.getLen(), - zipFile.getPath(), - new DateTime(zipFile.getModificationTime()), - zipFile.getLen() - ); - outputFS.delete(finalIndexZipFilePath, false); - needRename = true; - } else { - log.info( - "File[%s / %s / %sB] existed and will be kept", - finalIndexZipFile.getPath(), - new DateTime(finalIndexZipFile.getModificationTime()), - finalIndexZipFile.getLen() - ); - needRename = false; - } - } else { - needRename = true; - } - - if (needRename && !outputFS.rename(indexZipFilePath, finalIndexZipFilePath)) { - return false; - } - - writeSegmentDescriptor(outputFS, segment, new Path(indexBasePath, "descriptor.json")); - final Path descriptorPath = config.makeDescriptorInfoPath(segment); - log.info("Writing descriptor to path[%s]", descriptorPath); - intermediateFS.mkdirs(descriptorPath.getParent()); - writeSegmentDescriptor(intermediateFS, segment, descriptorPath); - - return true; - } - - private void writeSegmentDescriptor(FileSystem outputFS, DataSegment segment, Path descriptorPath) - throws IOException - { - if (outputFS.exists(descriptorPath)) { - outputFS.delete(descriptorPath, false); - } - - final FSDataOutputStream descriptorOut = outputFS.create(descriptorPath); - try { - HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment); - } - finally { - descriptorOut.close(); - } - } - - private long copyFile( - Context context, ZipOutputStream out, File mergedBase, final String filename + @Override + protected File persist( + IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator ) throws IOException { - createNewZipEntry(out, filename); - long numRead = 0; - - InputStream in = null; - try { - in = new FileInputStream(new File(mergedBase, filename)); - byte[] buf = new byte[0x10000]; - int read; - while (true) { - read = in.read(buf); - if (read == -1) { - break; - } - - out.write(buf, 0, read); - numRead += read; - context.progress(); - } - } - finally { - CloseQuietly.close(in); - } - out.closeEntry(); - context.progress(); - - return numRead; + return IndexMerger.persist(index, interval, file, progressIndicator); } - private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) - { - return new IncrementalIndex( - new IncrementalIndexSchema.Builder() - .withMinTimestamp(theBucket.time.getMillis()) - .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) - .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) - .withMetrics(aggs) - .build(), - new OffheapBufferPool(config.getSchema().getTuningConfig().getBufferSize()) - ); - } - - private void createNewZipEntry(ZipOutputStream out, String name) throws IOException - { - log.info("Creating new ZipEntry[%s]", name); - out.putNextEntry(new ZipEntry(name)); - } - } - - public static class IndexGeneratorOutputFormat extends TextOutputFormat - { @Override - public void checkOutputSpecs(JobContext job) throws IOException + protected File mergeQueryableIndex( + List indexes, + AggregatorFactory[] aggs, + File file, + ProgressIndicator progressIndicator + ) throws IOException { - Path outDir = getOutputPath(job); - if (outDir == null) { - throw new InvalidJobConfException("Output directory not set."); - } - } - } - - public static class IndexGeneratorStats - { - private long invalidRowCount = 0; - - public long getInvalidRowCount() - { - return invalidRowCount; - } - - public void setInvalidRowCount(long invalidRowCount) - { - this.invalidRowCount = invalidRowCount; + return IndexMerger.mergeQueryableIndex(indexes, aggs, file, progressIndicator); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index dc6e334aece..48f93214303 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -403,7 +403,17 @@ public class IndexTask extends AbstractFixedIntervalTask tmpDir ).findPlumber( schema, - new RealtimeTuningConfig(ingestionSchema.getTuningConfig().getBufferSize(), null, null, null, null, null, null, shardSpec), + new RealtimeTuningConfig( + ingestionSchema.getTuningConfig().getBufferSize(), + null, + null, + null, + null, + null, + null, + shardSpec, + null + ), metrics ); @@ -554,7 +564,7 @@ public class IndexTask extends AbstractFixedIntervalTask @JsonProperty("targetPartitionSize") int targetPartitionSize, @JsonProperty("bufferSize") @Nullable Integer bufferSize, @JsonProperty("numShards") @Nullable Integer numShards - ) + ) { this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index a1b737da7e1..34a1c5c9bff 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -118,7 +118,8 @@ public class RealtimeIndexTask extends AbstractTask null, rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy, maxPendingPersists, - spec.getShardSpec() + spec.getShardSpec(), + false ), null, null, null, null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 6a6736bf807..c6bd1658830 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -350,7 +350,7 @@ public class TaskSerdeTest null, null, new HadoopIngestionSpec( - null, null, null, false, // TODO + null, null, null, "foo", new TimestampSpec("timestamp", "auto"), new JSONDataSpec(ImmutableList.of("foo"), null), diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index b5ceac4196b..0a4ef673df8 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -122,7 +122,7 @@ public class IndexMerger */ public static File persist(final IncrementalIndex index, final Interval dataInterval, File outDir) throws IOException { - return persist(index, dataInterval, outDir, new NoopProgressIndicator()); + return persist(index, dataInterval, outDir, new BaseProgressIndicator()); } public static File persist( @@ -164,7 +164,7 @@ public class IndexMerger List indexes, final AggregatorFactory[] metricAggs, File outDir ) throws IOException { - return mergeQueryableIndex(indexes, metricAggs, outDir, new NoopProgressIndicator()); + return mergeQueryableIndex(indexes, metricAggs, outDir, new BaseProgressIndicator()); } public static File mergeQueryableIndex( @@ -193,7 +193,7 @@ public class IndexMerger List indexes, final AggregatorFactory[] metricAggs, File outDir ) throws IOException { - return merge(indexes, metricAggs, outDir, new NoopProgressIndicator()); + return merge(indexes, metricAggs, outDir, new BaseProgressIndicator()); } public static File merge( @@ -316,7 +316,7 @@ public class IndexMerger List indexes, File outDir ) throws IOException { - return append(indexes, outDir, new NoopProgressIndicator()); + return append(indexes, outDir, new BaseProgressIndicator()); } public static File append( @@ -1190,15 +1190,4 @@ public class IndexMerger return retVal; } } - - public static interface ProgressIndicator - { - public void progress(); - } - - private static class NoopProgressIndicator implements ProgressIndicator - { - @Override - public void progress() {} - } } \ No newline at end of file diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 52df7cafbe4..60c19308650 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -44,6 +44,7 @@ public class RealtimeTuningConfig implements TuningConfig private static final RejectionPolicyFactory defaultRejectionPolicyFactory = new ServerTimeRejectionPolicyFactory(); private static final int defaultMaxPendingPersists = 0; private static final ShardSpec defaultShardSpec = new NoneShardSpec(); + private static final boolean defaultPersistInHeap = false; // Might make sense for this to be a builder public static RealtimeTuningConfig makeDefaultTuningConfig() @@ -56,7 +57,8 @@ public class RealtimeTuningConfig implements TuningConfig defaultVersioningPolicy, defaultRejectionPolicyFactory, defaultMaxPendingPersists, - defaultShardSpec + defaultShardSpec, + defaultPersistInHeap ); } @@ -68,6 +70,7 @@ public class RealtimeTuningConfig implements TuningConfig private final RejectionPolicyFactory rejectionPolicyFactory; private final int maxPendingPersists; private final ShardSpec shardSpec; + private final boolean persistInHeap; @JsonCreator public RealtimeTuningConfig( @@ -78,7 +81,8 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy, @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, - @JsonProperty("shardSpec") ShardSpec shardSpec + @JsonProperty("shardSpec") ShardSpec shardSpec, + @JsonProperty("persistInHeap") Boolean persistInHeap ) { this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize; @@ -93,6 +97,7 @@ public class RealtimeTuningConfig implements TuningConfig : rejectionPolicyFactory; this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; + this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap; } @JsonProperty @@ -143,6 +148,12 @@ public class RealtimeTuningConfig implements TuningConfig return shardSpec; } + @JsonProperty + public boolean isPersistInHeap() + { + return persistInHeap; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -153,7 +164,8 @@ public class RealtimeTuningConfig implements TuningConfig policy, rejectionPolicyFactory, maxPendingPersists, - shardSpec + shardSpec, + persistInHeap ); } @@ -167,7 +179,8 @@ public class RealtimeTuningConfig implements TuningConfig versioningPolicy, rejectionPolicyFactory, maxPendingPersists, - shardSpec + shardSpec, + persistInHeap ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index 3172bcb6f44..c4173877c5e 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -96,7 +96,8 @@ public class FireDepartment extends IngestionSpec() - { - @Override - public boolean apply(Long sinkKey) - { - return segment.getInterval().contains(sinkKey); - } - } + { + @Override + public boolean apply(Long sinkKey) + { + return segment.getInterval().contains(sinkKey); + } + } ); } } diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 5fca643fc08..4f9cb51ecc9 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -77,7 +77,7 @@ public class FireDepartmentTest ) ), new RealtimeTuningConfig( - null, null, null, null, null, null, null, null + null, null, null, null, null, null, null, null, false ), null, null, null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 53e24e9c4a6..669c25ca290 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -121,6 +121,7 @@ public class RealtimeManagerTest null, null, null, + null, null ); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index b580e83d3c5..734bfff8c0d 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -133,6 +133,7 @@ public class RealtimePlumberSchoolTest new IntervalStartVersioningPolicy(), new NoopRejectionPolicyFactory(), null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 47f41e7dd7d..227f753b114 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -63,7 +63,8 @@ public class SinkTest null, null, null, - null + null, + false ); final Sink sink = new Sink(interval, schema, tuningConfig, version);