From 351afb8be7da1fe93fc86dddd100974a58be8aba Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 9 Sep 2014 17:04:35 -0700 Subject: [PATCH] allow legacy index generator --- .../indexer/HadoopDruidIndexerConfig.java | 6 + .../druid/indexer/HadoopDruidIndexerJob.java | 26 +- .../io/druid/indexer/HadoopIngestionSpec.java | 14 + .../indexer/LegacyIndexGeneratorJob.java | 609 +++++++++ .../indexing/common/task/TaskSerdeTest.java | 2 +- .../java/io/druid/segment/IndexMerger.java | 1204 +++++++++++++++++ .../segment/data/VSizeIndexedWriter.java | 145 ++ 7 files changed, 1995 insertions(+), 11 deletions(-) create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java create mode 100644 processing/src/main/java/io/druid/segment/IndexMerger.java create mode 100644 processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 1ff6e5a4482..99f243eaf17 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -207,6 +207,12 @@ public class HadoopDruidIndexerConfig } } + // TODO: remove this + public boolean isLegacy() + { + return schema.isLegacy(); + } + @JsonProperty public HadoopIngestionSpec getSchema() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java index 64bc1267146..4c1292095e0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java @@ -57,7 +57,11 @@ public class HadoopDruidIndexerJob implements Jobby List jobs = Lists.newArrayList(); JobHelper.ensurePaths(config); - indexJob = new IndexGeneratorJob(config); + if (config.isLegacy()) { + indexJob = new LegacyIndexGeneratorJob(config); + } else { + indexJob = new IndexGeneratorJob(config); + } jobs.add(indexJob); if (dbUpdaterJob != null) { @@ -66,15 +70,17 @@ public class HadoopDruidIndexerJob implements Jobby log.info("No updaterJobSpec set, not uploading to database"); } - jobs.add(new Jobby() - { - @Override - public boolean run() - { - publishedSegments = IndexGeneratorJob.getPublishedSegments(config); - return true; - } - }); + jobs.add( + new Jobby() + { + @Override + public boolean run() + { + publishedSegments = IndexGeneratorJob.getPublishedSegments(config); + return true; + } + } + ); JobHelper.runJobs(jobs, config); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index d759ca26074..753ac1884d3 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -50,11 +50,14 @@ public class HadoopIngestionSpec extends IngestionSpec + + { + @Override + protected void innerMap( + InputRow inputRow, + Text text, + Context context + ) throws IOException, InterruptedException + { + // Group by bucket, sort by timestamp + final Optional bucket = getConfig().getBucket(inputRow); + + if (!bucket.isPresent()) { + throw new ISE("WTF?! No bucket found for row: %s", inputRow); + } + + context.write( + new SortableBytes( + bucket.get().toGroupKey(), + Longs.toByteArray(inputRow.getTimestampFromEpoch()) + ).toBytesWritable(), + text + ); + } + } + + public static class IndexGeneratorPartitioner extends Partitioner implements Configurable + { + private Configuration config; + + @Override + public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) + { + final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); + bytes.position(4); // Skip length added by SortableBytes + int shardNum = bytes.getInt(); + if (config.get("mapred.job.tracker").equals("local")) { + return shardNum % numPartitions; + } else { + if (shardNum >= numPartitions) { + throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions); + } + return shardNum; + + } + } + + @Override + public Configuration getConf() + { + return config; + } + + @Override + public void setConf(Configuration config) + { + this.config = config; + } + } + + public static class IndexGeneratorReducer extends Reducer + { + private HadoopDruidIndexerConfig config; + private List metricNames = Lists.newArrayList(); + private StringInputRowParser parser; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + + for (AggregatorFactory factory : config.getSchema().getDataSchema().getAggregators()) { + metricNames.add(factory.getName().toLowerCase()); + } + + parser = config.getParser(); + } + + @Override + protected void reduce( + BytesWritable key, Iterable values, final Context context + ) throws IOException, InterruptedException + { + SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); + Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; + + final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); + //final DataRollupSpec rollupSpec = config.getRollupSpec(); + final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators(); + + IncrementalIndex index = makeIncrementalIndex(bucket, aggs); + try { + File baseFlushFile = File.createTempFile("base", "flush"); + baseFlushFile.delete(); + baseFlushFile.mkdirs(); + + Set toMerge = Sets.newTreeSet(); + int indexCount = 0; + int lineCount = 0; + int runningTotalLineCount = 0; + long startTime = System.currentTimeMillis(); + + Set allDimensionNames = Sets.newHashSet(); + final IndexMerger.ProgressIndicator progressIndicator = new IndexMerger.ProgressIndicator() + { + @Override + public void progress() + { + context.progress(); + } + }; + + for (final Text value : values) { + context.progress(); + final InputRow inputRow = index.formatRow(parser.parse(value.toString())); + allDimensionNames.addAll(inputRow.getDimensions()); + + int numRows = index.add(inputRow); + ++lineCount; + + if (index.isFull()) { + log.info( + "%,d lines to %,d rows in %,d millis", + lineCount - runningTotalLineCount, + numRows, + System.currentTimeMillis() - startTime + ); + runningTotalLineCount = lineCount; + + final File file = new File(baseFlushFile, String.format("index%,05d", indexCount)); + toMerge.add(file); + + context.progress(); + IndexMerger.persist( + index, interval, file, progressIndicator + ); + // close this index and make a new one + index.close(); + index = makeIncrementalIndex(bucket, aggs); + + startTime = System.currentTimeMillis(); + ++indexCount; + } + } + + log.info("%,d lines completed.", lineCount); + + List indexes = Lists.newArrayListWithCapacity(indexCount); + final File mergedBase; + + if (toMerge.size() == 0) { + if (index.isEmpty()) { + throw new IAE("If you try to persist empty indexes you are going to have a bad time"); + } + + mergedBase = new File(baseFlushFile, "merged"); + IndexMerger.persist( + index, interval, mergedBase, progressIndicator + ); + } else { + if (!index.isEmpty()) { + final File finalFile = new File(baseFlushFile, "final"); + IndexMerger.persist( + index, interval, finalFile, progressIndicator + ); + toMerge.add(finalFile); + } + + for (File file : toMerge) { + indexes.add(IndexIO.loadIndex(file)); + } + mergedBase = IndexMerger.mergeQueryableIndex( + indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator + ); + } + serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); + for (File file : toMerge) { + FileUtils.deleteDirectory(file); + } + } + finally { + index.close(); + } + } + + private void serializeOutIndex(Context context, Bucket bucket, File mergedBase, List dimensionNames) + throws IOException + { + Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); + + int attemptNumber = context.getTaskAttemptID().getId(); + + FileSystem fileSystem = FileSystem.get(context.getConfiguration()); + Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); + Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); + final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); + final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); + + outputFS.mkdirs(indexBasePath); + + Exception caughtException = null; + ZipOutputStream out = null; + long size = 0; + try { + out = new ZipOutputStream(new BufferedOutputStream(outputFS.create(indexZipFilePath), 256 * 1024)); + + List filesToCopy = Arrays.asList(mergedBase.list()); + + for (String file : filesToCopy) { + size += copyFile(context, out, mergedBase, file); + } + } + catch (Exception e) { + caughtException = e; + } + finally { + if (caughtException == null) { + Closeables.close(out, false); + } else { + CloseQuietly.close(out); + throw Throwables.propagate(caughtException); + } + } + + Path finalIndexZipFilePath = new Path(indexBasePath, "index.zip"); + final URI indexOutURI = finalIndexZipFilePath.toUri(); + ImmutableMap loadSpec; + if (outputFS instanceof NativeS3FileSystem) { + loadSpec = ImmutableMap.of( + "type", "s3_zip", + "bucket", indexOutURI.getHost(), + "key", indexOutURI.getPath().substring(1) // remove the leading "/" + ); + } else if (outputFS instanceof LocalFileSystem) { + loadSpec = ImmutableMap.of( + "type", "local", + "path", indexOutURI.getPath() + ); + } else if (outputFS instanceof DistributedFileSystem) { + loadSpec = ImmutableMap.of( + "type", "hdfs", + "path", indexOutURI.getPath() + ); + } else { + throw new ISE("Unknown file system[%s]", outputFS.getClass()); + } + + DataSegment segment = new DataSegment( + config.getDataSource(), + interval, + config.getSchema().getTuningConfig().getVersion(), + loadSpec, + dimensionNames, + metricNames, + config.getShardSpec(bucket).getActualSpec(), + SegmentUtils.getVersionFromDir(mergedBase), + size + ); + + // retry 1 minute + boolean success = false; + for (int i = 0; i < 6; i++) { + if (renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { + log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); + success = true; + break; + } else { + log.info("Failed to rename [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); + try { + Thread.sleep(10000); + context.progress(); + } + catch (InterruptedException e) { + throw new ISE( + "Thread error in retry loop for renaming [%s] to [%s]", + indexZipFilePath.toUri().getPath(), + finalIndexZipFilePath.toUri().getPath() + ); + } + } + } + + if (!success) { + if (!outputFS.exists(indexZipFilePath)) { + throw new ISE("File [%s] does not exist after retry loop.", indexZipFilePath.toUri().getPath()); + } + + if (outputFS.getFileStatus(indexZipFilePath).getLen() == outputFS.getFileStatus(finalIndexZipFilePath) + .getLen()) { + outputFS.delete(indexZipFilePath, true); + } else { + outputFS.delete(finalIndexZipFilePath, true); + if (!renameIndexFiles(infoFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { + throw new ISE( + "Files [%s] and [%s] are different, but still cannot rename after retry loop", + indexZipFilePath.toUri().getPath(), + finalIndexZipFilePath.toUri().getPath() + ); + } + } + } + } + + private boolean renameIndexFiles( + FileSystem intermediateFS, + FileSystem outputFS, + Path indexBasePath, + Path indexZipFilePath, + Path finalIndexZipFilePath, + DataSegment segment + ) + throws IOException + { + final boolean needRename; + + if (outputFS.exists(finalIndexZipFilePath)) { + // NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first + final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath); + final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath); + + if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime() + || zipFile.getLen() != finalIndexZipFile.getLen()) { + log.info( + "File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]", + finalIndexZipFile.getPath(), + new DateTime(finalIndexZipFile.getModificationTime()), + finalIndexZipFile.getLen(), + zipFile.getPath(), + new DateTime(zipFile.getModificationTime()), + zipFile.getLen() + ); + outputFS.delete(finalIndexZipFilePath, false); + needRename = true; + } else { + log.info( + "File[%s / %s / %sB] existed and will be kept", + finalIndexZipFile.getPath(), + new DateTime(finalIndexZipFile.getModificationTime()), + finalIndexZipFile.getLen() + ); + needRename = false; + } + } else { + needRename = true; + } + + if (needRename && !outputFS.rename(indexZipFilePath, finalIndexZipFilePath)) { + return false; + } + + writeSegmentDescriptor(outputFS, segment, new Path(indexBasePath, "descriptor.json")); + final Path descriptorPath = config.makeDescriptorInfoPath(segment); + log.info("Writing descriptor to path[%s]", descriptorPath); + intermediateFS.mkdirs(descriptorPath.getParent()); + writeSegmentDescriptor(intermediateFS, segment, descriptorPath); + + return true; + } + + private void writeSegmentDescriptor(FileSystem outputFS, DataSegment segment, Path descriptorPath) + throws IOException + { + if (outputFS.exists(descriptorPath)) { + outputFS.delete(descriptorPath, false); + } + + final FSDataOutputStream descriptorOut = outputFS.create(descriptorPath); + try { + HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment); + } + finally { + descriptorOut.close(); + } + } + + private long copyFile( + Context context, ZipOutputStream out, File mergedBase, final String filename + ) throws IOException + { + createNewZipEntry(out, filename); + long numRead = 0; + + InputStream in = null; + try { + in = new FileInputStream(new File(mergedBase, filename)); + byte[] buf = new byte[0x10000]; + int read; + while (true) { + read = in.read(buf); + if (read == -1) { + break; + } + + out.write(buf, 0, read); + numRead += read; + context.progress(); + } + } + finally { + CloseQuietly.close(in); + } + out.closeEntry(); + context.progress(); + + return numRead; + } + + private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) + { + return new IncrementalIndex( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(theBucket.time.getMillis()) + .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) + .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) + .withMetrics(aggs) + .build(), + new OffheapBufferPool(config.getSchema().getTuningConfig().getBufferSize()) + ); + } + + private void createNewZipEntry(ZipOutputStream out, String name) throws IOException + { + log.info("Creating new ZipEntry[%s]", name); + out.putNextEntry(new ZipEntry(name)); + } + } + + public static class IndexGeneratorOutputFormat extends TextOutputFormat + { + @Override + public void checkOutputSpecs(JobContext job) throws IOException + { + Path outDir = getOutputPath(job); + if (outDir == null) { + throw new InvalidJobConfException("Output directory not set."); + } + } + } + + public static class IndexGeneratorStats + { + private long invalidRowCount = 0; + + public long getInvalidRowCount() + { + return invalidRowCount; + } + + public void setInvalidRowCount(long invalidRowCount) + { + this.invalidRowCount = invalidRowCount; + } + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index c6bd1658830..6a6736bf807 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -350,7 +350,7 @@ public class TaskSerdeTest null, null, new HadoopIngestionSpec( - null, null, null, + null, null, null, false, // TODO "foo", new TimestampSpec("timestamp", "auto"), new JSONDataSpec(ImmutableList.of("foo"), null), diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java new file mode 100644 index 00000000000..b5ceac4196b --- /dev/null +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -0,0 +1,1204 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import com.google.common.io.OutputSupplier; +import com.google.common.primitives.Ints; +import com.metamx.collections.spatial.ImmutableRTree; +import com.metamx.collections.spatial.RTree; +import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.guava.CloseQuietly; +import com.metamx.common.guava.FunctionalIterable; +import com.metamx.common.guava.MergeIterable; +import com.metamx.common.guava.nary.BinaryFn; +import com.metamx.common.io.smoosh.Smoosh; +import com.metamx.common.logger.Logger; +import io.druid.collections.CombiningIterable; +import io.druid.common.guava.FileOutputSupplier; +import io.druid.common.guava.GuavaUtils; +import io.druid.common.utils.JodaUtils; +import io.druid.common.utils.SerializerUtils; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.ToLowerCaseAggregatorFactory; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ColumnCapabilitiesImpl; +import io.druid.segment.column.ValueType; +import io.druid.segment.data.ByteBufferWriter; +import io.druid.segment.data.CompressedLongsSupplierSerializer; +import io.druid.segment.data.CompressedObjectStrategy; +import io.druid.segment.data.ConciseCompressedIndexedInts; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.GenericIndexedWriter; +import io.druid.segment.data.IOPeon; +import io.druid.segment.data.Indexed; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.data.IndexedIterable; +import io.druid.segment.data.IndexedRTree; +import io.druid.segment.data.TmpFileIOPeon; +import io.druid.segment.data.VSizeIndexedWriter; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexAdapter; +import io.druid.segment.serde.ComplexMetricColumnSerializer; +import io.druid.segment.serde.ComplexMetricSerde; +import io.druid.segment.serde.ComplexMetrics; +import it.uniroma3.mat.extendedset.intset.ConciseSet; +import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + */ +public class IndexMerger +{ + private static final Logger log = new Logger(IndexMerger.class); + + private static final SerializerUtils serializerUtils = new SerializerUtils(); + private static final int INVALID_ROW = -1; + private static final Splitter SPLITTER = Splitter.on(","); + + public static File persist(final IncrementalIndex index, File outDir) throws IOException + { + return persist(index, index.getInterval(), outDir); + } + + /** + * This is *not* thread-safe and havok will ensue if this is called and writes are still occurring + * on the IncrementalIndex object. + * + * @param index the IncrementalIndex to persist + * @param dataInterval the Interval that the data represents + * @param outDir the directory to persist the data to + * + * @return the index output directory + * + * @throws java.io.IOException if an IO error occurs persisting the index + */ + public static File persist(final IncrementalIndex index, final Interval dataInterval, File outDir) throws IOException + { + return persist(index, dataInterval, outDir, new NoopProgressIndicator()); + } + + public static File persist( + final IncrementalIndex index, final Interval dataInterval, File outDir, ProgressIndicator progress + ) throws IOException + { + if (index.isEmpty()) { + throw new IAE("Trying to persist an empty index!"); + } + + final long firstTimestamp = index.getMinTime().getMillis(); + final long lastTimestamp = index.getMaxTime().getMillis(); + if (!(dataInterval.contains(firstTimestamp) && dataInterval.contains(lastTimestamp))) { + throw new IAE( + "interval[%s] does not encapsulate the full range of timestamps[%s, %s]", + dataInterval, + new DateTime(firstTimestamp), + new DateTime(lastTimestamp) + ); + } + + if (!outDir.exists()) { + outDir.mkdirs(); + } + if (!outDir.isDirectory()) { + throw new ISE("Can only persist to directories, [%s] wasn't a directory", outDir); + } + + log.info("Starting persist for interval[%s], rows[%,d]", dataInterval, index.size()); + return merge( + Arrays.asList(new IncrementalIndexAdapter(dataInterval, index)), + index.getMetricAggs(), + outDir, + progress + ); + } + + public static File mergeQueryableIndex( + List indexes, final AggregatorFactory[] metricAggs, File outDir + ) throws IOException + { + return mergeQueryableIndex(indexes, metricAggs, outDir, new NoopProgressIndicator()); + } + + public static File mergeQueryableIndex( + List indexes, final AggregatorFactory[] metricAggs, File outDir, ProgressIndicator progress + ) throws IOException + { + return merge( + Lists.transform( + indexes, + new Function() + { + @Override + public IndexableAdapter apply(final QueryableIndex input) + { + return new QueryableIndexIndexableAdapter(input); + } + } + ), + metricAggs, + outDir, + progress + ); + } + + public static File merge( + List indexes, final AggregatorFactory[] metricAggs, File outDir + ) throws IOException + { + return merge(indexes, metricAggs, outDir, new NoopProgressIndicator()); + } + + public static File merge( + List indexes, final AggregatorFactory[] metricAggs, File outDir, ProgressIndicator progress + ) throws IOException + { + FileUtils.deleteDirectory(outDir); + if (!outDir.mkdirs()) { + throw new ISE("Couldn't make outdir[%s].", outDir); + } + + final AggregatorFactory[] lowerCaseMetricAggs = new AggregatorFactory[metricAggs.length]; + for (int i = 0; i < metricAggs.length; i++) { + lowerCaseMetricAggs[i] = new ToLowerCaseAggregatorFactory(metricAggs[i]); + } + + final List mergedDimensions = mergeIndexed( + Lists.transform( + indexes, + new Function>() + { + @Override + public Iterable apply(@Nullable IndexableAdapter input) + { + return Iterables.transform( + input.getDimensionNames(), + new Function() + { + @Override + public String apply(@Nullable String input) + { + return input.toLowerCase(); + } + } + ); + } + } + ) + ); + final List mergedMetrics = Lists.transform( + mergeIndexed( + Lists.>newArrayList( + FunctionalIterable + .create(indexes) + .transform( + new Function>() + { + @Override + public Iterable apply(@Nullable IndexableAdapter input) + { + return Iterables.transform( + input.getMetricNames(), + new Function() + { + @Override + public String apply(@Nullable String input) + { + return input.toLowerCase(); + } + } + ); + } + } + ) + .concat(Arrays.>asList(new AggFactoryStringIndexed(lowerCaseMetricAggs))) + ) + ), + new Function() + { + @Override + public String apply(@Nullable String input) + { + return input.toLowerCase(); + } + } + ); + if (mergedMetrics.size() != lowerCaseMetricAggs.length) { + throw new IAE("Bad number of metrics[%d], expected [%d]", mergedMetrics.size(), lowerCaseMetricAggs.length); + } + + final AggregatorFactory[] sortedMetricAggs = new AggregatorFactory[mergedMetrics.size()]; + for (int i = 0; i < lowerCaseMetricAggs.length; i++) { + AggregatorFactory metricAgg = lowerCaseMetricAggs[i]; + sortedMetricAggs[mergedMetrics.indexOf(metricAgg.getName())] = metricAgg; + } + + for (int i = 0; i < mergedMetrics.size(); i++) { + if (!sortedMetricAggs[i].getName().equals(mergedMetrics.get(i))) { + throw new IAE( + "Metric mismatch, index[%d] [%s] != [%s]", + i, + lowerCaseMetricAggs[i].getName(), + mergedMetrics.get(i) + ); + } + } + + Function>, Iterable> rowMergerFn = new Function>, Iterable>() + { + @Override + public Iterable apply( + @Nullable ArrayList> boats + ) + { + return CombiningIterable.create( + new MergeIterable( + Ordering.natural().nullsFirst(), + boats + ), + Ordering.natural().nullsFirst(), + new RowboatMergeFunction(sortedMetricAggs) + ); + } + }; + + return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn); + } + + public static File append( + List indexes, File outDir + ) throws IOException + { + return append(indexes, outDir, new NoopProgressIndicator()); + } + + public static File append( + List indexes, File outDir, ProgressIndicator progress + ) throws IOException + { + FileUtils.deleteDirectory(outDir); + if (!outDir.mkdirs()) { + throw new ISE("Couldn't make outdir[%s].", outDir); + } + + final List mergedDimensions = mergeIndexed( + Lists.transform( + indexes, + new Function>() + { + @Override + public Iterable apply(@Nullable IndexableAdapter input) + { + return Iterables.transform( + input.getDimensionNames(), + new Function() + { + @Override + public String apply(@Nullable String input) + { + return input.toLowerCase(); + } + } + ); + } + } + ) + ); + final List mergedMetrics = mergeIndexed( + Lists.transform( + indexes, + new Function>() + { + @Override + public Iterable apply(@Nullable IndexableAdapter input) + { + return Iterables.transform( + input.getMetricNames(), + new Function() + { + @Override + public String apply(@Nullable String input) + { + return input.toLowerCase(); + } + } + ); + } + } + ) + ); + + Function>, Iterable> rowMergerFn = new Function>, Iterable>() + { + @Override + public Iterable apply( + @Nullable final ArrayList> boats + ) + { + return new MergeIterable( + Ordering.natural().nullsFirst(), + boats + ); + } + }; + + return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn); + } + + private static File makeIndexFiles( + final List indexes, + final File outDir, + final ProgressIndicator progress, + final List mergedDimensions, + final List mergedMetrics, + final Function>, Iterable> rowMergerFn + ) throws IOException + { + final Map valueTypes = Maps.newTreeMap(Ordering.natural().nullsFirst()); + final Map metricTypeNames = Maps.newTreeMap(Ordering.natural().nullsFirst()); + final Map columnCapabilities = Maps.newHashMap(); + + for (IndexableAdapter adapter : indexes) { + for (String dimension : adapter.getDimensionNames()) { + ColumnCapabilitiesImpl mergedCapabilities = columnCapabilities.get(dimension); + ColumnCapabilities capabilities = adapter.getCapabilities(dimension); + if (mergedCapabilities == null) { + mergedCapabilities = new ColumnCapabilitiesImpl(); + mergedCapabilities.setType(ValueType.STRING); + } + columnCapabilities.put(dimension, mergedCapabilities.merge(capabilities)); + } + for (String metric : adapter.getMetricNames()) { + ColumnCapabilitiesImpl mergedCapabilities = columnCapabilities.get(metric); + ColumnCapabilities capabilities = adapter.getCapabilities(metric); + if (mergedCapabilities == null) { + mergedCapabilities = new ColumnCapabilitiesImpl(); + } + columnCapabilities.put(metric, mergedCapabilities.merge(capabilities)); + + valueTypes.put(metric, capabilities.getType()); + metricTypeNames.put(metric, adapter.getMetricType(metric)); + } + } + + + final Interval dataInterval; + File v8OutDir = new File(outDir, "v8-tmp"); + v8OutDir.mkdirs(); + + /************* Main index.drd file **************/ + progress.progress(); + long startTime = System.currentTimeMillis(); + File indexFile = new File(v8OutDir, "index.drd"); + + FileOutputStream fileOutputStream = null; + FileChannel channel = null; + try { + fileOutputStream = new FileOutputStream(indexFile); + channel = fileOutputStream.getChannel(); + channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION})); + + GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy).writeToChannel(channel); + GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy).writeToChannel(channel); + + DateTime minTime = new DateTime(Long.MAX_VALUE); + DateTime maxTime = new DateTime(0l); + + for (IndexableAdapter index : indexes) { + minTime = JodaUtils.minDateTime(minTime, index.getDataInterval().getStart()); + maxTime = JodaUtils.maxDateTime(maxTime, index.getDataInterval().getEnd()); + } + + dataInterval = new Interval(minTime, maxTime); + serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime)); + } + finally { + CloseQuietly.close(channel); + channel = null; + CloseQuietly.close(fileOutputStream); + fileOutputStream = null; + } + IndexIO.checkFileSize(indexFile); + log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); + + /************* Setup Dim Conversions **************/ + progress.progress(); + startTime = System.currentTimeMillis(); + + IOPeon ioPeon = new TmpFileIOPeon(); + ArrayList dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size()); + Map dimensionCardinalities = Maps.newHashMap(); + ArrayList> dimConversions = Lists.newArrayListWithCapacity(indexes.size()); + + for (IndexableAdapter index : indexes) { + dimConversions.add(Maps.newHashMap()); + } + + for (String dimension : mergedDimensions) { + final GenericIndexedWriter writer = new GenericIndexedWriter( + ioPeon, dimension, GenericIndexed.stringStrategy + ); + writer.open(); + + List> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size()); + DimValueConverter[] converters = new DimValueConverter[indexes.size()]; + for (int i = 0; i < indexes.size(); i++) { + Indexed dimValues = indexes.get(i).getDimValueLookup(dimension); + if (dimValues != null) { + dimValueLookups.add(dimValues); + converters[i] = new DimValueConverter(dimValues); + } + } + + Iterable dimensionValues = CombiningIterable.createSplatted( + Iterables.transform( + dimValueLookups, + new Function, Iterable>() + { + @Override + public Iterable apply(@Nullable Indexed indexed) + { + return Iterables.transform( + indexed, + new Function() + { + @Override + public String apply(@Nullable String input) + { + return (input == null) ? "" : input; + } + } + ); + } + } + ) + , + Ordering.natural().nullsFirst() + ); + + int count = 0; + for (String value : dimensionValues) { + value = value == null ? "" : value; + writer.write(value); + + for (int i = 0; i < indexes.size(); i++) { + DimValueConverter converter = converters[i]; + if (converter != null) { + converter.convert(value, count); + } + } + + ++count; + } + dimensionCardinalities.put(dimension, count); + + FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true); + dimOuts.add(dimOut); + + writer.close(); + serializerUtils.writeString(dimOut, dimension); + ByteStreams.copy(writer.combineStreams(), dimOut); + for (int i = 0; i < indexes.size(); ++i) { + DimValueConverter converter = converters[i]; + if (converter != null) { + dimConversions.get(i).put(dimension, converters[i].getConversionBuffer()); + } + } + + ioPeon.cleanup(); + } + log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); + + /************* Walk through data sets and merge them *************/ + progress.progress(); + startTime = System.currentTimeMillis(); + + ArrayList> boats = Lists.newArrayListWithCapacity(indexes.size()); + + for (int i = 0; i < indexes.size(); ++i) { + final IndexableAdapter adapter = indexes.get(i); + + final int[] dimLookup = new int[mergedDimensions.size()]; + int count = 0; + for (String dim : adapter.getDimensionNames()) { + dimLookup[count] = mergedDimensions.indexOf(dim.toLowerCase()); + count++; + } + + final int[] metricLookup = new int[mergedMetrics.size()]; + count = 0; + for (String metric : adapter.getMetricNames()) { + metricLookup[count] = mergedMetrics.indexOf(metric); + count++; + } + + boats.add( + new MMappedIndexRowIterable( + Iterables.transform( + indexes.get(i).getRows(), + new Function() + { + @Override + public Rowboat apply(@Nullable Rowboat input) + { + int[][] newDims = new int[mergedDimensions.size()][]; + int j = 0; + for (int[] dim : input.getDims()) { + newDims[dimLookup[j]] = dim; + j++; + } + + Object[] newMetrics = new Object[mergedMetrics.size()]; + j = 0; + for (Object met : input.getMetrics()) { + newMetrics[metricLookup[j]] = met; + j++; + } + + return new Rowboat( + input.getTimestamp(), + newDims, + newMetrics, + input.getRowNum() + ); + } + } + ) + , mergedDimensions, dimConversions.get(i), i + ) + ); + } + + Iterable theRows = rowMergerFn.apply(boats); + + CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create( + ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY + ); + + timeWriter.open(); + + ArrayList forwardDimWriters = Lists.newArrayListWithCapacity(mergedDimensions.size()); + for (String dimension : mergedDimensions) { + VSizeIndexedWriter writer = new VSizeIndexedWriter(ioPeon, dimension, dimensionCardinalities.get(dimension)); + writer.open(); + forwardDimWriters.add(writer); + } + + ArrayList metWriters = Lists.newArrayListWithCapacity(mergedMetrics.size()); + for (String metric : mergedMetrics) { + ValueType type = valueTypes.get(metric); + switch (type) { + case FLOAT: + metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon)); + break; + case COMPLEX: + final String typeName = metricTypeNames.get(metric); + ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + + if (serde == null) { + throw new ISE("Unknown type[%s]", typeName); + } + + metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde)); + break; + default: + throw new ISE("Unknown type[%s]", type); + } + } + + for (MetricColumnSerializer metWriter : metWriters) { + metWriter.open(); + } + + int rowCount = 0; + long time = System.currentTimeMillis(); + List rowNumConversions = Lists.newArrayListWithCapacity(indexes.size()); + for (IndexableAdapter index : indexes) { + int[] arr = new int[index.getNumRows()]; + Arrays.fill(arr, INVALID_ROW); + rowNumConversions.add(IntBuffer.wrap(arr)); + } + + for (Rowboat theRow : theRows) { + progress.progress(); + timeWriter.add(theRow.getTimestamp()); + + final Object[] metrics = theRow.getMetrics(); + for (int i = 0; i < metrics.length; ++i) { + metWriters.get(i).serialize(metrics[i]); + } + + int[][] dims = theRow.getDims(); + for (int i = 0; i < dims.length; ++i) { + List listToWrite = (i >= dims.length || dims[i] == null) + ? null + : Ints.asList(dims[i]); + forwardDimWriters.get(i).write(listToWrite); + } + + for (Map.Entry> comprisedRow : theRow.getComprisedRows().entrySet()) { + final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getKey()); + + for (Integer rowNum : comprisedRow.getValue()) { + while (conversionBuffer.position() < rowNum) { + conversionBuffer.put(INVALID_ROW); + } + conversionBuffer.put(rowCount); + } + } + + if ((++rowCount % 500000) == 0) { + log.info( + "outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time + ); + time = System.currentTimeMillis(); + } + } + + for (IntBuffer rowNumConversion : rowNumConversions) { + rowNumConversion.rewind(); + } + + final File timeFile = IndexIO.makeTimeFile(v8OutDir, IndexIO.BYTE_ORDER); + timeFile.delete(); + OutputSupplier out = Files.newOutputStreamSupplier(timeFile, true); + timeWriter.closeAndConsolidate(out); + IndexIO.checkFileSize(timeFile); + + for (int i = 0; i < mergedDimensions.size(); ++i) { + forwardDimWriters.get(i).close(); + ByteStreams.copy(forwardDimWriters.get(i).combineStreams(), dimOuts.get(i)); + } + + for (MetricColumnSerializer metWriter : metWriters) { + metWriter.close(); + } + + ioPeon.cleanup(); + log.info( + "outDir[%s] completed walk through of %,d rows in %,d millis.", + v8OutDir, + rowCount, + System.currentTimeMillis() - startTime + ); + + /************ Create Inverted Indexes *************/ + startTime = System.currentTimeMillis(); + + final File invertedFile = new File(v8OutDir, "inverted.drd"); + Files.touch(invertedFile); + out = Files.newOutputStreamSupplier(invertedFile, true); + + final File geoFile = new File(v8OutDir, "spatial.drd"); + Files.touch(geoFile); + OutputSupplier spatialOut = Files.newOutputStreamSupplier(geoFile, true); + + for (int i = 0; i < mergedDimensions.size(); ++i) { + long dimStartTime = System.currentTimeMillis(); + String dimension = mergedDimensions.get(i); + + File dimOutFile = dimOuts.get(i).getFile(); + final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); + + if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { + throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); + } + Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy); + log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size()); + + GenericIndexedWriter writer = new GenericIndexedWriter( + ioPeon, dimension, ConciseCompressedIndexedInts.objectStrategy + ); + writer.open(); + + boolean isSpatialDim = columnCapabilities.get(dimension).hasSpatialIndexes(); + ByteBufferWriter spatialWriter = null; + RTree tree = null; + IOPeon spatialIoPeon = new TmpFileIOPeon(); + if (isSpatialDim) { + spatialWriter = new ByteBufferWriter( + spatialIoPeon, dimension, IndexedRTree.objectStrategy + ); + spatialWriter.open(); + tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50)); + } + + for (String dimVal : IndexedIterable.create(dimVals)) { + progress.progress(); + List> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size()); + for (int j = 0; j < indexes.size(); ++j) { + convertedInverteds.add( + new ConvertingIndexedInts( + indexes.get(j).getInverteds(dimension, dimVal), rowNumConversions.get(j) + ) + ); + } + + ConciseSet bitset = new ConciseSet(); + for (Integer row : CombiningIterable.createSplatted( + convertedInverteds, + Ordering.natural().nullsFirst() + )) { + if (row != INVALID_ROW) { + bitset.add(row); + } + } + + writer.write(ImmutableConciseSet.newImmutableFromMutable(bitset)); + + if (isSpatialDim && dimVal != null) { + List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); + float[] coords = new float[stringCoords.size()]; + for (int j = 0; j < coords.length; j++) { + coords[j] = Float.valueOf(stringCoords.get(j)); + } + tree.insert(coords, bitset); + } + } + writer.close(); + + serializerUtils.writeString(out, dimension); + ByteStreams.copy(writer.combineStreams(), out); + ioPeon.cleanup(); + + log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime); + + if (isSpatialDim) { + spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree)); + spatialWriter.close(); + + serializerUtils.writeString(spatialOut, dimension); + ByteStreams.copy(spatialWriter.combineStreams(), spatialOut); + spatialIoPeon.cleanup(); + } + + } + + log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); + + final ArrayList expectedFiles = Lists.newArrayList( + Iterables.concat( + Arrays.asList( + "index.drd", "inverted.drd", "spatial.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER) + ), + Iterables.transform(mergedDimensions, GuavaUtils.formatFunction("dim_%s.drd")), + Iterables.transform( + mergedMetrics, GuavaUtils.formatFunction(String.format("met_%%s_%s.drd", IndexIO.BYTE_ORDER)) + ) + ) + ); + + Map files = Maps.newLinkedHashMap(); + for (String fileName : expectedFiles) { + files.put(fileName, new File(v8OutDir, fileName)); + } + + File smooshDir = new File(v8OutDir, "smoosher"); + smooshDir.mkdir(); + + for (Map.Entry entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) { + entry.getValue().delete(); + } + + for (File file : smooshDir.listFiles()) { + Files.move(file, new File(v8OutDir, file.getName())); + } + + if (!smooshDir.delete()) { + log.info("Unable to delete temporary dir[%s], contains[%s]", smooshDir, Arrays.asList(smooshDir.listFiles())); + throw new IOException(String.format("Unable to delete temporary dir[%s]", smooshDir)); + } + + createIndexDrdFile( + IndexIO.V8_VERSION, + v8OutDir, + GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy), + GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy), + dataInterval + ); + + IndexIO.DefaultIndexIOHandler.convertV8toV9(v8OutDir, outDir); + FileUtils.deleteDirectory(v8OutDir); + + return outDir; + } + + private static ArrayList mergeIndexed(final List> indexedLists) + { + Set retVal = Sets.newTreeSet(Ordering.natural().nullsFirst()); + + for (Iterable indexedList : indexedLists) { + for (T val : indexedList) { + retVal.add(val); + } + } + + return Lists.newArrayList(retVal); + } + + public static void createIndexDrdFile( + byte versionId, + File inDir, + GenericIndexed availableDimensions, + GenericIndexed availableMetrics, + Interval dataInterval + ) throws IOException + { + File indexFile = new File(inDir, "index.drd"); + + FileChannel channel = null; + try { + channel = new FileOutputStream(indexFile).getChannel(); + channel.write(ByteBuffer.wrap(new byte[]{versionId})); + + availableDimensions.writeToChannel(channel); + availableMetrics.writeToChannel(channel); + serializerUtils.writeString( + channel, String.format("%s/%s", dataInterval.getStart(), dataInterval.getEnd()) + ); + } + finally { + CloseQuietly.close(channel); + channel = null; + } + IndexIO.checkFileSize(indexFile); + } + + private static class DimValueConverter + { + private final Indexed dimSet; + private final IntBuffer conversionBuf; + + private int currIndex; + private String lastVal = null; + + DimValueConverter( + Indexed dimSet + ) + { + this.dimSet = dimSet; + conversionBuf = ByteBuffer.allocateDirect(dimSet.size() * Ints.BYTES).asIntBuffer(); + + currIndex = 0; + } + + public void convert(String value, int index) + { + if (dimSet.size() == 0) { + return; + } + if (lastVal != null) { + if (value.compareTo(lastVal) <= 0) { + throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.", value, lastVal); + } + return; + } + String currValue = dimSet.get(currIndex); + + while (currValue == null) { + conversionBuf.position(conversionBuf.position() + 1); + ++currIndex; + if (currIndex == dimSet.size()) { + lastVal = value; + return; + } + currValue = dimSet.get(currIndex); + } + + if (Objects.equal(currValue, value)) { + conversionBuf.put(index); + ++currIndex; + if (currIndex == dimSet.size()) { + lastVal = value; + } + } else if (currValue.compareTo(value) < 0) { + throw new ISE( + "Skipped currValue[%s], currIndex[%,d]; incoming value[%s], index[%,d]", currValue, currIndex, value, index + ); + } + } + + public IntBuffer getConversionBuffer() + { + if (currIndex != conversionBuf.limit() || conversionBuf.hasRemaining()) { + throw new ISE( + "Asked for incomplete buffer. currIndex[%,d] != buf.limit[%,d]", currIndex, conversionBuf.limit() + ); + } + return (IntBuffer) conversionBuf.asReadOnlyBuffer().rewind(); + } + } + + private static class ConvertingIndexedInts implements Iterable + { + private final IndexedInts baseIndex; + private final IntBuffer conversionBuffer; + + public ConvertingIndexedInts( + IndexedInts baseIndex, + IntBuffer conversionBuffer + ) + { + this.baseIndex = baseIndex; + this.conversionBuffer = conversionBuffer; + } + + public int size() + { + return baseIndex.size(); + } + + public int get(int index) + { + return conversionBuffer.get(baseIndex.get(index)); + } + + @Override + public Iterator iterator() + { + return Iterators.transform( + baseIndex.iterator(), + new Function() + { + @Override + public Integer apply(@Nullable Integer input) + { + return conversionBuffer.get(input); + } + } + ); + } + } + + private static class MMappedIndexRowIterable implements Iterable + { + private final Iterable index; + private final List convertedDims; + private final Map converters; + private final int indexNumber; + + MMappedIndexRowIterable( + Iterable index, + List convertedDims, + Map converters, + int indexNumber + ) + { + this.index = index; + this.convertedDims = convertedDims; + this.converters = converters; + this.indexNumber = indexNumber; + } + + public Iterable getIndex() + { + return index; + } + + public List getConvertedDims() + { + return convertedDims; + } + + public Map getConverters() + { + return converters; + } + + public int getIndexNumber() + { + return indexNumber; + } + + @Override + public Iterator iterator() + { + return Iterators.transform( + index.iterator(), + new Function() + { + int rowCount = 0; + + @Override + public Rowboat apply(@Nullable Rowboat input) + { + int[][] dims = input.getDims(); + int[][] newDims = new int[convertedDims.size()][]; + for (int i = 0; i < convertedDims.size(); ++i) { + IntBuffer converter = converters.get(convertedDims.get(i)); + + if (converter == null) { + continue; + } + + if (i >= dims.length || dims[i] == null) { + continue; + } + + newDims[i] = new int[dims[i].length]; + + for (int j = 0; j < dims[i].length; ++j) { + if (!converter.hasRemaining()) { + log.error("Converter mismatch! wtfbbq!"); + } + newDims[i][j] = converter.get(dims[i][j]); + } + } + + final Rowboat retVal = new Rowboat( + input.getTimestamp(), + newDims, + input.getMetrics(), + input.getRowNum() + ); + + retVal.addRow(indexNumber, input.getRowNum()); + + return retVal; + } + } + ); + } + } + + private static class AggFactoryStringIndexed implements Indexed + { + private final AggregatorFactory[] metricAggs; + + public AggFactoryStringIndexed(AggregatorFactory[] metricAggs) {this.metricAggs = metricAggs;} + + @Override + public Class getClazz() + { + return String.class; + } + + @Override + public int size() + { + return metricAggs.length; + } + + @Override + public String get(int index) + { + return metricAggs[index].getName(); + } + + @Override + public int indexOf(String value) + { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() + { + return IndexedIterable.create(this).iterator(); + } + } + + private static class RowboatMergeFunction implements BinaryFn + { + private final AggregatorFactory[] metricAggs; + + public RowboatMergeFunction(AggregatorFactory[] metricAggs) + { + this.metricAggs = metricAggs; + } + + @Override + public Rowboat apply(Rowboat lhs, Rowboat rhs) + { + if (lhs == null) { + return rhs; + } + if (rhs == null) { + return lhs; + } + + Object[] metrics = new Object[metricAggs.length]; + Object[] lhsMetrics = lhs.getMetrics(); + Object[] rhsMetrics = rhs.getMetrics(); + + for (int i = 0; i < metrics.length; ++i) { + metrics[i] = metricAggs[i].combine(lhsMetrics[i], rhsMetrics[i]); + } + + final Rowboat retVal = new Rowboat( + lhs.getTimestamp(), + lhs.getDims(), + metrics, + lhs.getRowNum() + ); + + for (Rowboat rowboat : Arrays.asList(lhs, rhs)) { + for (Map.Entry> entry : rowboat.getComprisedRows().entrySet()) { + for (Integer rowNum : entry.getValue()) { + retVal.addRow(entry.getKey(), rowNum); + } + } + } + + return retVal; + } + } + + public static interface ProgressIndicator + { + public void progress(); + } + + private static class NoopProgressIndicator implements ProgressIndicator + { + @Override + public void progress() {} + } +} \ No newline at end of file diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java new file mode 100644 index 00000000000..0de682f8c23 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java @@ -0,0 +1,145 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.data; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.io.ByteStreams; +import com.google.common.io.Closeables; +import com.google.common.io.CountingOutputStream; +import com.google.common.io.InputSupplier; +import com.google.common.primitives.Ints; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; + +/** + * Streams arrays of objects out in the binary format described by VSizeIndexed + */ +public class VSizeIndexedWriter implements Closeable +{ + private static final byte version = 0x1; + private static final byte[] EMPTY_ARRAY = new byte[]{}; + + private final int maxId; + + private CountingOutputStream headerOut = null; + private CountingOutputStream valuesOut = null; + int numWritten = 0; + private final IOPeon ioPeon; + private final String filenameBase; + + public VSizeIndexedWriter( + IOPeon ioPeon, + String filenameBase, + int maxId + ) + { + this.ioPeon = ioPeon; + this.filenameBase = filenameBase; + this.maxId = maxId; + } + + public void open() throws IOException + { + headerOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("header"))); + valuesOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("values"))); + } + + public void write(List ints) throws IOException + { + byte[] bytesToWrite = ints == null ? EMPTY_ARRAY : VSizeIndexedInts.fromList(ints, maxId).getBytesNoPadding(); + + valuesOut.write(bytesToWrite); + + headerOut.write(Ints.toByteArray((int) valuesOut.getCount())); + + ++numWritten; + } + + private String makeFilename(String suffix) + { + return String.format("%s.%s", filenameBase, suffix); + } + + @Override + public void close() throws IOException + { + final byte numBytesForMax = VSizeIndexedInts.getNumBytesForMax(maxId); + + valuesOut.write(new byte[4 - numBytesForMax]); + + Closeables.close(headerOut, false); + Closeables.close(valuesOut, false); + + final long numBytesWritten = headerOut.getCount() + valuesOut.getCount(); + + Preconditions.checkState( + headerOut.getCount() == (numWritten * 4), + "numWritten[%s] number of rows should have [%s] bytes written to headerOut, had[%s]", + numWritten, + numWritten * 4, + headerOut.getCount() + ); + Preconditions.checkState( + numBytesWritten < Integer.MAX_VALUE, "Wrote[%s] bytes, which is too many.", numBytesWritten + ); + + OutputStream metaOut = ioPeon.makeOutputStream(makeFilename("meta")); + + try { + metaOut.write(new byte[]{version, numBytesForMax}); + metaOut.write(Ints.toByteArray((int) numBytesWritten + 4)); + metaOut.write(Ints.toByteArray(numWritten)); + } + finally { + metaOut.close(); + } + } + + public InputSupplier combineStreams() + { + return ByteStreams.join( + Iterables.transform( + Arrays.asList("meta", "header", "values"), + new Function>() { + + @Override + public InputSupplier apply(final String input) + { + return new InputSupplier() + { + @Override + public InputStream getInput() throws IOException + { + return ioPeon.makeInputStream(makeFilename(input)); + } + }; + } + } + ) + ); + } +} \ No newline at end of file