diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 31ee956c09a..fd3a1f9c97c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -50,7 +50,6 @@ import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.path.PathSpec; import io.druid.initialization.Initialization; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.granularity.GranularitySpec; @@ -90,7 +89,6 @@ public class HadoopDruidIndexerConfig public static final ObjectMapper JSON_MAPPER; public static final IndexIO INDEX_IO; public static final IndexMerger INDEX_MERGER; - public static final IndexMaker INDEX_MAKER; private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing"; @@ -114,7 +112,6 @@ public class HadoopDruidIndexerConfig JSON_MAPPER = injector.getInstance(ObjectMapper.class); INDEX_IO = injector.getInstance(IndexIO.class); INDEX_MERGER = injector.getInstance(IndexMerger.class); - INDEX_MAKER = injector.getInstance(IndexMaker.class); } public static enum IndexJobCounters diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java index af05058104c..ec323598714 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java @@ -27,7 +27,6 @@ import com.metamx.common.ISE; import io.druid.guice.ServerModule; import io.druid.jackson.DefaultObjectMapper; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.column.ColumnConfig; @@ -40,7 +39,6 @@ public class TestUtils { private final ObjectMapper jsonMapper; private final IndexMerger indexMerger; - private final IndexMaker indexMaker; private final IndexIO indexIO; public TestUtils() @@ -58,7 +56,6 @@ public class TestUtils } ); indexMerger = new IndexMerger(jsonMapper, indexIO); - indexMaker = new IndexMaker(jsonMapper, indexIO); final List list = new ServerModule().getJacksonModules(); for (Module module : list) { @@ -69,7 +66,6 @@ public class TestUtils new InjectableValues.Std() .addValue(IndexIO.class, indexIO) .addValue(IndexMerger.class, indexMerger) - .addValue(IndexMaker.class, indexMaker) .addValue(ObjectMapper.class, jsonMapper) ); } @@ -84,11 +80,6 @@ public class TestUtils return indexMerger; } - public IndexMaker getTestIndexMaker() - { - return indexMaker; - } - public IndexIO getTestIndexIO() { return indexIO; diff --git a/processing/src/main/java/io/druid/segment/IndexMaker.java b/processing/src/main/java/io/druid/segment/IndexMaker.java deleted file mode 100644 index 095921371d5..00000000000 --- a/processing/src/main/java/io/druid/segment/IndexMaker.java +++ /dev/null @@ -1,1830 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Splitter; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; -import com.google.common.io.ByteStreams; -import com.google.common.io.Files; -import com.google.common.primitives.Ints; -import com.google.inject.Inject; -import com.metamx.collections.bitmap.BitmapFactory; -import com.metamx.collections.bitmap.ImmutableBitmap; -import com.metamx.collections.bitmap.MutableBitmap; -import com.metamx.collections.spatial.ImmutableRTree; -import com.metamx.collections.spatial.RTree; -import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; -import com.metamx.common.IAE; -import com.metamx.common.ISE; -import com.metamx.common.guava.FunctionalIterable; -import com.metamx.common.guava.MergeIterable; -import com.metamx.common.guava.nary.BinaryFn; -import com.metamx.common.io.smoosh.FileSmoosher; -import com.metamx.common.io.smoosh.SmooshedWriter; -import com.metamx.common.logger.Logger; -import io.druid.collections.CombiningIterable; -import io.druid.common.utils.JodaUtils; -import io.druid.common.utils.SerializerUtils; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ColumnCapabilitiesImpl; -import io.druid.segment.column.ColumnDescriptor; -import io.druid.segment.column.ValueType; -import io.druid.segment.data.BitmapSerdeFactory; -import io.druid.segment.data.CompressedFloatsIndexedSupplier; -import io.druid.segment.data.CompressedLongsIndexedSupplier; -import io.druid.segment.data.CompressedObjectStrategy; -import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier; -import io.druid.segment.data.GenericIndexed; -import io.druid.segment.data.Indexed; -import io.druid.segment.data.IndexedInts; -import io.druid.segment.data.IndexedIterable; -import io.druid.segment.data.VSizeIndexed; -import io.druid.segment.data.VSizeIndexedInts; -import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexAdapter; -import io.druid.segment.serde.ColumnPartSerde; -import io.druid.segment.serde.ComplexColumnPartSerde; -import io.druid.segment.serde.ComplexMetricSerde; -import io.druid.segment.serde.ComplexMetrics; -import io.druid.segment.serde.DictionaryEncodedColumnPartSerde; -import io.druid.segment.serde.FloatGenericColumnPartSerde; -import io.druid.segment.serde.LongGenericColumnPartSerde; -import org.apache.commons.io.FileUtils; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -import javax.annotation.Nullable; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.FloatBuffer; -import java.nio.IntBuffer; -import java.nio.LongBuffer; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -@Deprecated -/** - * This class is not yet ready for production use and requires more work. This class provides a demonstration of how - * to build v9 segments directly. - */ -public class IndexMaker -{ - private static final Logger log = new Logger(IndexMaker.class); - private static final SerializerUtils serializerUtils = new SerializerUtils(); - private static final int INVALID_ROW = -1; - private static final Splitter SPLITTER = Splitter.on(","); - private final ObjectMapper mapper; - private final IndexIO indexIO; - - @Inject - public IndexMaker( - ObjectMapper mapper, - IndexIO indexIO - ) - { - this.mapper = Preconditions.checkNotNull(mapper, "null ObjectMapper"); - this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO"); - } - - public File persist( - final IncrementalIndex index, - File outDir, - final Map segmentMetadata, - final IndexSpec indexSpec - ) throws IOException - { - return persist(index, index.getInterval(), outDir, segmentMetadata, indexSpec); - } - - /** - * This is *not* thread-safe and havok will ensue if this is called and writes are still occurring - * on the IncrementalIndex object. - * - * @param index the IncrementalIndex to persist - * @param dataInterval the Interval that the data represents - * @param outDir the directory to persist the data to - * - * @throws java.io.IOException - */ - public File persist( - final IncrementalIndex index, - final Interval dataInterval, - File outDir, - final Map segmentMetadata, - final IndexSpec indexSpec - ) throws IOException - { - return persist( - index, dataInterval, outDir, segmentMetadata, indexSpec, new LoggingProgressIndicator(outDir.toString()) - ); - } - - public File persist( - final IncrementalIndex index, - final Interval dataInterval, - File outDir, - final Map segmentMetadata, - final IndexSpec indexSpec, - ProgressIndicator progress - ) throws IOException - { - if (index.isEmpty()) { - throw new IAE("Trying to persist an empty index!"); - } - - final long firstTimestamp = index.getMinTime().getMillis(); - final long lastTimestamp = index.getMaxTime().getMillis(); - if (!(dataInterval.contains(firstTimestamp) && dataInterval.contains(lastTimestamp))) { - throw new IAE( - "interval[%s] does not encapsulate the full range of timestamps[%s, %s]", - dataInterval, - new DateTime(firstTimestamp), - new DateTime(lastTimestamp) - ); - } - - if (!outDir.exists()) { - outDir.mkdirs(); - } - if (!outDir.isDirectory()) { - throw new ISE("Can only persist to directories, [%s] wasn't a directory", outDir); - } - - log.info("Starting persist for interval[%s], rows[%,d]", dataInterval, index.size()); - return merge( - Arrays.asList( - new IncrementalIndexAdapter( - dataInterval, - index, - indexSpec.getBitmapSerdeFactory().getBitmapFactory() - ) - ), - index.getMetricAggs(), - outDir, - segmentMetadata, - indexSpec, - progress - ); - } - - public File mergeQueryableIndex( - List indexes, final AggregatorFactory[] metricAggs, File outDir, final IndexSpec indexSpec - ) throws IOException - { - return mergeQueryableIndex(indexes, metricAggs, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString())); - } - - public File mergeQueryableIndex( - List indexes, - final AggregatorFactory[] metricAggs, - File outDir, - final IndexSpec indexSpec, - ProgressIndicator progress - ) throws IOException - { - return merge( - Lists.transform( - indexes, - new Function() - { - @Override - public IndexableAdapter apply(final QueryableIndex input) - { - return new QueryableIndexIndexableAdapter(input); - } - } - ), - metricAggs, - outDir, - null, - indexSpec, - progress - ); - } - - public File merge( - List adapters, final AggregatorFactory[] metricAggs, File outDir, final IndexSpec indexSpec - ) throws IOException - { - return merge( - adapters, metricAggs, outDir, null, indexSpec, new LoggingProgressIndicator(outDir.toString()) - ); - } - - public File merge( - List adapters, - final AggregatorFactory[] metricAggs, - File outDir, - final Map segmentMetaData, - final IndexSpec indexSpec, - ProgressIndicator progress - ) throws IOException - { - FileUtils.deleteDirectory(outDir); - if (!outDir.mkdirs()) { - throw new ISE("Couldn't make outdir[%s].", outDir); - } - - final List mergedDimensions = mergeIndexed( - Lists.transform( - adapters, - new Function>() - { - @Override - public Iterable apply(IndexableAdapter input) - { - return input.getDimensionNames(); - } - } - ) - ); - - final List mergedMetrics = Lists.transform( - mergeIndexed( - Lists.newArrayList( - FunctionalIterable - .create(adapters) - .transform( - new Function>() - { - @Override - public Iterable apply(IndexableAdapter input) - { - return input.getMetricNames(); - } - } - ) - .concat(Arrays.>asList(new AggFactoryStringIndexed(metricAggs))) - ) - ), - new Function() - { - @Override - public String apply(String input) - { - return input; - } - } - ); - if (mergedMetrics.size() != metricAggs.length) { - throw new IAE("Bad number of metrics[%d], expected [%d]", mergedMetrics.size(), metricAggs.length); - } - - final AggregatorFactory[] sortedMetricAggs = new AggregatorFactory[mergedMetrics.size()]; - for (int i = 0; i < metricAggs.length; i++) { - AggregatorFactory metricAgg = metricAggs[i]; - sortedMetricAggs[mergedMetrics.indexOf(metricAgg.getName())] = metricAgg; - } - - for (int i = 0; i < mergedMetrics.size(); i++) { - if (!sortedMetricAggs[i].getName().equals(mergedMetrics.get(i))) { - throw new IAE( - "Metric mismatch, index[%d] [%s] != [%s]", - i, - metricAggs[i].getName(), - mergedMetrics.get(i) - ); - } - } - - Function>, Iterable> rowMergerFn = new Function>, Iterable>() - { - @Override - public Iterable apply( - ArrayList> boats - ) - { - return CombiningIterable.create( - new MergeIterable<>( - Ordering.natural().nullsFirst(), - boats - ), - Ordering.natural().nullsFirst(), - new RowboatMergeFunction(sortedMetricAggs) - ); - } - }; - - return makeIndexFiles( - adapters, outDir, progress, mergedDimensions, mergedMetrics, segmentMetaData, rowMergerFn, indexSpec - ); - } - - - public File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException - { - return convert(inDir, outDir, indexSpec, new BaseProgressIndicator()); - } - - public File convert( - final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress - ) throws IOException - { - try (QueryableIndex index = indexIO.loadIndex(inDir)) { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); - return makeIndexFiles( - ImmutableList.of(adapter), - outDir, - progress, - Lists.newArrayList(adapter.getDimensionNames()), - Lists.newArrayList(adapter.getMetricNames()), - null, - new Function>, Iterable>() - { - @Nullable - @Override - public Iterable apply(ArrayList> input) - { - return input.get(0); - } - }, - indexSpec - ); - } - } - - public File append( - final List adapters, - final File outDir, - final IndexSpec indexSpec - ) throws IOException - { - return append(adapters, outDir, new LoggingProgressIndicator(outDir.toString()), indexSpec); - } - - public File append( - final List adapters, - final File outDir, - final ProgressIndicator progress, - final IndexSpec indexSpec - ) throws IOException - { - FileUtils.deleteDirectory(outDir); - if (!outDir.mkdirs()) { - throw new ISE("Couldn't make outdir[%s].", outDir); - } - - final List mergedDimensions = mergeIndexed( - Lists.transform( - adapters, - new Function>() - { - @Override - public Iterable apply(IndexableAdapter input) - { - return Iterables.transform( - input.getDimensionNames(), - new Function() - { - @Override - public String apply(String input) - { - return input; - } - } - ); - } - } - ) - ); - final List mergedMetrics = mergeIndexed( - Lists.transform( - adapters, - new Function>() - { - @Override - public Iterable apply(IndexableAdapter input) - { - return Iterables.transform( - input.getMetricNames(), - new Function() - { - @Override - public String apply(String input) - { - return input; - } - } - ); - } - } - ) - ); - - Function>, Iterable> rowMergerFn = new Function>, Iterable>() - { - @Override - public Iterable apply( - final ArrayList> boats - ) - { - return new MergeIterable<>( - Ordering.natural().nullsFirst(), - boats - ); - } - }; - - return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec); - } - - private File makeIndexFiles( - final List adapters, - final File outDir, - final ProgressIndicator progress, - final List mergedDimensions, - final List mergedMetrics, - final Map segmentMetadata, - final Function>, Iterable> rowMergerFn, - final IndexSpec indexSpec - ) throws IOException - { - progress.start(); - progress.progress(); - - final Map valueTypes = Maps.newTreeMap(Ordering.natural().nullsFirst()); - final Map metricTypeNames = Maps.newTreeMap(Ordering.natural().nullsFirst()); - final Map columnCapabilities = Maps.newHashMap(); - - for (IndexableAdapter adapter : adapters) { - for (String dimension : adapter.getDimensionNames()) { - ColumnCapabilitiesImpl mergedCapabilities = columnCapabilities.get(dimension); - ColumnCapabilities capabilities = adapter.getCapabilities(dimension); - if (mergedCapabilities == null) { - mergedCapabilities = new ColumnCapabilitiesImpl(); - mergedCapabilities.setType(ValueType.STRING); - } - columnCapabilities.put(dimension, mergedCapabilities.merge(capabilities)); - } - for (String metric : adapter.getMetricNames()) { - ColumnCapabilitiesImpl mergedCapabilities = columnCapabilities.get(metric); - ColumnCapabilities capabilities = adapter.getCapabilities(metric); - if (mergedCapabilities == null) { - mergedCapabilities = new ColumnCapabilitiesImpl(); - } - columnCapabilities.put(metric, mergedCapabilities.merge(capabilities)); - - valueTypes.put(metric, capabilities.getType()); - metricTypeNames.put(metric, adapter.getMetricType(metric)); - } - } - - outDir.mkdirs(); - final FileSmoosher v9Smoosher = new FileSmoosher(outDir); - - ByteStreams.write( - Ints.toByteArray(IndexIO.V9_VERSION), - Files.newOutputStreamSupplier(new File(outDir, "version.bin")) - ); - - final Map dimIndexes = Maps.newHashMap(); - final Map> dimensionValuesLookup = Maps.newHashMap(); - final ArrayList> dimConversions = Lists.newArrayListWithCapacity(adapters.size()); - final Set skippedDimensions = Sets.newHashSet(); - final List rowNumConversions = Lists.newArrayListWithCapacity(adapters.size()); - - progress.progress(); - setupDimConversion( - adapters, - progress, - mergedDimensions, - dimConversions, - dimIndexes, - skippedDimensions, - dimensionValuesLookup - ); - - progress.progress(); - final Iterable theRows = makeRowIterable( - adapters, - mergedDimensions, - mergedMetrics, - dimConversions, - rowMergerFn - ); - - progress.progress(); - final int rowCount = convertDims(adapters, progress, theRows, rowNumConversions); - - progress.progress(); - makeTimeColumn(v9Smoosher, progress, theRows, rowCount); - - progress.progress(); - makeDimColumns( - v9Smoosher, - adapters, - progress, - mergedDimensions, - skippedDimensions, - theRows, - columnCapabilities, - dimensionValuesLookup, - rowNumConversions, - indexSpec - ); - - progress.progress(); - makeMetricColumns(v9Smoosher, progress, theRows, mergedMetrics, valueTypes, metricTypeNames, rowCount, indexSpec); - - progress.progress(); - makeIndexBinary( - v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress, indexSpec - ); - makeMetadataBinary(v9Smoosher, progress, segmentMetadata); - - v9Smoosher.close(); - - progress.stop(); - - return outDir; - } - - private void setupDimConversion( - final List adapters, - final ProgressIndicator progress, - final List mergedDimensions, - final List> dimConversions, - final Map dimIndexes, - final Set skippedDimensions, - final Map> dimensionValuesLookup - ) - { - final String section = "setup dimension conversions"; - progress.startSection(section); - - for (IndexableAdapter adapter : adapters) { - dimConversions.add(Maps.newHashMap()); - } - - int dimIndex = 0; - for (String dimension : mergedDimensions) { - dimIndexes.put(dimension, dimIndex++); - - // lookups for all dimension values of this dimension - final List> dimValueLookups = Lists.newArrayListWithCapacity(adapters.size()); - - // each converter converts dim values of this dimension to global dictionary - final DimValueConverter[] converters = new DimValueConverter[adapters.size()]; - - for (int i = 0; i < adapters.size(); i++) { - Indexed dimValues = adapters.get(i).getDimValueLookup(dimension); - if (!IndexMerger.isNullColumn(dimValues)) { - dimValueLookups.add(dimValues); - converters[i] = new DimValueConverter(dimValues); - } - } - - // sort all dimension values and treat all null values as empty strings - final Iterable dimensionValues = CombiningIterable.createSplatted( - Iterables.transform( - dimValueLookups, - new Function, Iterable>() - { - @Override - public Iterable apply(Indexed indexed) - { - return Iterables.transform( - indexed, - new Function() - { - @Override - public String apply(@Nullable String input) - { - return (input == null) ? "" : input; - } - } - ); - } - } - ), - Ordering.natural() - ); - - int cardinality = 0; - for (String value : dimensionValues) { - for (int i = 0; i < adapters.size(); i++) { - DimValueConverter converter = converters[i]; - if (converter != null) { - converter.convert(value, cardinality); - } - } - - ++cardinality; - } - - if (cardinality == 0) { - log.info("Skipping [%s], it is empty!", dimension); - skippedDimensions.add(dimension); - continue; - } - - dimensionValuesLookup.put(dimension, dimensionValues); - - // make the dictionary - for (int i = 0; i < adapters.size(); ++i) { - DimValueConverter converter = converters[i]; - if (converter != null) { - dimConversions.get(i).put(dimension, converters[i].getConversionBuffer()); - } - } - } - - progress.stopSection(section); - } - - private Iterable makeRowIterable( - final List adapters, - final List mergedDimensions, - final List mergedMetrics, - final ArrayList> dimConversions, - final Function>, Iterable> rowMergerFn - ) - { - ArrayList> boats = Lists.newArrayListWithCapacity(adapters.size()); - - for (int i = 0; i < adapters.size(); ++i) { - final IndexableAdapter adapter = adapters.get(i); - - final int[] dimLookup = new int[mergedDimensions.size()]; - int count = 0; - for (String dim : adapter.getDimensionNames()) { - dimLookup[count] = mergedDimensions.indexOf(dim); - count++; - } - - final int[] metricLookup = new int[mergedMetrics.size()]; - count = 0; - for (String metric : adapter.getMetricNames()) { - metricLookup[count] = mergedMetrics.indexOf(metric); - count++; - } - - boats.add( - new MMappedIndexRowIterable( - Iterables.transform( - adapters.get(i).getRows(), - new Function() - { - @Override - public Rowboat apply(Rowboat input) - { - int[][] newDims = new int[mergedDimensions.size()][]; - int j = 0; - for (int[] dim : input.getDims()) { - newDims[dimLookup[j]] = dim; - j++; - } - - Object[] newMetrics = new Object[mergedMetrics.size()]; - j = 0; - for (Object met : input.getMetrics()) { - newMetrics[metricLookup[j]] = met; - j++; - } - - return new Rowboat( - input.getTimestamp(), - newDims, - newMetrics, - input.getRowNum() - ); - } - } - ), - mergedDimensions, - dimConversions.get(i), - i - ) - ); - } - - return rowMergerFn.apply(boats); - } - - private int convertDims( - final List adapters, - final ProgressIndicator progress, - final Iterable theRows, - final List rowNumConversions - ) throws IOException - { - final String section = "convert dims"; - progress.startSection(section); - - for (IndexableAdapter index : adapters) { - int[] arr = new int[index.getNumRows()]; - Arrays.fill(arr, INVALID_ROW); - rowNumConversions.add(IntBuffer.wrap(arr)); - } - - int rowCount = 0; - for (Rowboat theRow : theRows) { - for (Map.Entry> comprisedRow : theRow.getComprisedRows().entrySet()) { - final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getKey()); - - for (Integer rowNum : comprisedRow.getValue()) { - while (conversionBuffer.position() < rowNum) { - conversionBuffer.put(INVALID_ROW); - } - conversionBuffer.put(rowCount); - } - } - - if ((++rowCount % 500000) == 0) { - progress.progressSection(section, String.format("Walked 500,000/%,d rows", rowCount)); - } - } - - for (IntBuffer rowNumConversion : rowNumConversions) { - rowNumConversion.rewind(); - } - - progress.stopSection(section); - - return rowCount; - } - - private void makeTimeColumn( - final FileSmoosher v9Smoosher, - final ProgressIndicator progress, - final Iterable theRows, - final int rowCount - ) throws IOException - { - final String section = "make time column"; - progress.startSection(section); - - long[] longs = new long[rowCount]; - - int rowNum = 0; - for (Rowboat theRow : theRows) { - longs[rowNum++] = theRow.getTimestamp(); - } - - CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromLongBuffer( - LongBuffer.wrap(longs), - IndexIO.BYTE_ORDER, - CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY - - ); - - final ColumnDescriptor.Builder timeBuilder = ColumnDescriptor.builder(); - timeBuilder.setValueType(ValueType.LONG); - - writeColumn( - v9Smoosher, - new LongGenericColumnPartSerde(timestamps, IndexIO.BYTE_ORDER), - timeBuilder, - "__time" - ); - - progress.stopSection(section); - } - - private void makeDimColumns( - final FileSmoosher v9Smoosher, - final List adapters, - final ProgressIndicator progress, - final List mergedDimensions, - final Set skippedDimensions, - final Iterable theRows, - final Map columnCapabilities, - final Map> dimensionValuesLookup, - final List rowNumConversions, - final IndexSpec indexSpec - ) throws IOException - { - final String dimSection = "make dimension columns"; - progress.startSection(dimSection); - - int dimIndex = 0; - for (String dimension : mergedDimensions) { - if (skippedDimensions.contains(dimension)) { - dimIndex++; - continue; - } - - makeDimColumn( - v9Smoosher, - adapters, - progress, - theRows, - dimIndex, - dimension, - columnCapabilities, - dimensionValuesLookup, - rowNumConversions, - indexSpec.getBitmapSerdeFactory(), - indexSpec.getDimensionCompressionStrategy() - ); - dimIndex++; - } - progress.stopSection(dimSection); - } - - private static class NullsAtZeroConvertingIntList extends AbstractList - { - private final List delegate; - private final boolean delegateHasNullAtZero; - - NullsAtZeroConvertingIntList(List delegate, final boolean delegateHasNullAtZero) - { - this.delegate = delegate; - this.delegateHasNullAtZero = delegateHasNullAtZero; - } - - @Override - public Integer get(int index) - { - Integer val = delegate.get(index); - if (val == null) { - return 0; - } - return delegateHasNullAtZero ? val : val + 1; - } - - @Override - public int size() - { - return delegate.size(); - } - } - - private void makeDimColumn( - final FileSmoosher v9Smoosher, - final List adapters, - final ProgressIndicator progress, - final Iterable theRows, - final int dimIndex, - final String dimension, - final Map columnCapabilities, - final Map> dimensionValuesLookup, - final List rowNumConversions, - final BitmapSerdeFactory bitmapSerdeFactory, - final CompressedObjectStrategy.CompressionStrategy compressionStrategy - ) throws IOException - { - final String section = String.format("make %s", dimension); - progress.startSection(section); - - final ColumnDescriptor.Builder dimBuilder = ColumnDescriptor.builder(); - dimBuilder.setValueType(ValueType.STRING); - - final List outParts = Lists.newArrayList(); - - ByteArrayOutputStream nameBAOS = new ByteArrayOutputStream(); - serializerUtils.writeString(nameBAOS, dimension); - outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray())); - - boolean hasMultipleValues = columnCapabilities.get(dimension).hasMultipleValues(); - dimBuilder.setHasMultipleValues(hasMultipleValues); - - // make dimension columns - List singleValCol; - final VSizeIndexed multiValCol; - - ColumnDictionaryEntryStore adder = hasMultipleValues - ? new MultiValColumnDictionaryEntryStore() - : new SingleValColumnDictionaryEntryStore(); - - final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); - MutableBitmap nullSet = null; - int rowCount = 0; - - for (Rowboat theRow : theRows) { - if (dimIndex > theRow.getDims().length) { - if (nullSet == null) { - nullSet = bitmapFactory.makeEmptyMutableBitmap(); - } - nullSet.add(rowCount); - adder.add(null); - } else { - int[] dimVals = theRow.getDims()[dimIndex]; - if (dimVals == null || dimVals.length == 0) { - if (nullSet == null) { - nullSet = bitmapFactory.makeEmptyMutableBitmap(); - } - nullSet.add(rowCount); - } - adder.add(dimVals); - } - rowCount++; - } - - final Iterable dimensionValues = dimensionValuesLookup.get(dimension); - GenericIndexed dictionary = GenericIndexed.fromIterable( - dimensionValues, - GenericIndexed.STRING_STRATEGY - ); - boolean bumpDictionary = false; - - if (hasMultipleValues) { - final List> vals = ((MultiValColumnDictionaryEntryStore) adder).get(); - if (nullSet != null) { - log.info("Dimension[%s] has null rows.", dimension); - - if (Iterables.getFirst(dimensionValues, "") != null) { - bumpDictionary = true; - log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension); - - dictionary = GenericIndexed.fromIterable( - Iterables.concat(Collections.singleton(null), dimensionValues), - GenericIndexed.STRING_STRATEGY - ); - - final int dictionarySize = dictionary.size(); - - singleValCol = null; - multiValCol = VSizeIndexed.fromIterable( - Iterables.transform( - vals, - new Function, VSizeIndexedInts>() - { - @Override - public VSizeIndexedInts apply(final List input) - { - if (input == null) { - return VSizeIndexedInts.fromList(ImmutableList.of(0), dictionarySize); - } else { - return VSizeIndexedInts.fromList( - new NullsAtZeroConvertingIntList(input, false), - dictionarySize - ); - } - } - } - ) - ); - } else { - final int dictionarySize = dictionary.size(); - singleValCol = null; - multiValCol = VSizeIndexed.fromIterable( - Iterables.transform( - vals, - new Function, VSizeIndexedInts>() - { - @Override - public VSizeIndexedInts apply(List input) - { - if (input == null) { - return VSizeIndexedInts.fromList(ImmutableList.of(0), dictionarySize); - } else { - return VSizeIndexedInts.fromList(input, dictionarySize); - } - } - } - ) - ); - } - } else { - final int dictionarySize = dictionary.size(); - singleValCol = null; - multiValCol = VSizeIndexed.fromIterable( - Iterables.transform( - vals, - new Function, VSizeIndexedInts>() - { - @Override - public VSizeIndexedInts apply(List input) - { - return VSizeIndexedInts.fromList(input, dictionarySize); - } - } - ) - ); - } - } else { - final List vals = ((SingleValColumnDictionaryEntryStore) adder).get(); - - if (nullSet != null) { - log.info("Dimension[%s] has null rows.", dimension); - - if (Iterables.getFirst(dimensionValues, "") != null) { - bumpDictionary = true; - log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension); - - final List nullList = Lists.newArrayList(); - nullList.add(null); - - dictionary = GenericIndexed.fromIterable( - Iterables.concat(nullList, dimensionValues), - GenericIndexed.STRING_STRATEGY - ); - multiValCol = null; - singleValCol = new NullsAtZeroConvertingIntList(vals, false); - } else { - multiValCol = null; - singleValCol = new NullsAtZeroConvertingIntList(vals, true); - } - } else { - multiValCol = null; - singleValCol = new AbstractList() - { - @Override - public Integer get(int index) - { - return vals.get(index); - } - - @Override - public int size() - { - return vals.size(); - } - }; - } - } - - // Make bitmap indexes - List mutableBitmaps = Lists.newArrayList(); - for (String dimVal : dimensionValues) { - List> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size()); - for (int j = 0; j < adapters.size(); ++j) { - convertedInverteds.add( - new ConvertingIndexedInts( - adapters.get(j).getBitmapIndex(dimension, dimVal), rowNumConversions.get(j) - ) - ); - } - - MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap(); - for (Integer row : CombiningIterable.createSplatted( - convertedInverteds, - Ordering.natural().nullsFirst() - )) { - if (row != INVALID_ROW) { - bitset.add(row); - } - } - - mutableBitmaps.add(bitset); - } - - GenericIndexed bitmaps; - - if (nullSet != null) { - final ImmutableBitmap theNullSet = bitmapFactory.makeImmutableBitmap(nullSet); - if (bumpDictionary) { - bitmaps = GenericIndexed.fromIterable( - Iterables.concat( - Arrays.asList(theNullSet), - Iterables.transform( - mutableBitmaps, - new Function() - { - @Override - public ImmutableBitmap apply(MutableBitmap input) - { - return bitmapFactory.makeImmutableBitmap(input); - } - } - ) - ), - bitmapSerdeFactory.getObjectStrategy() - ); - } else { - Iterable immutableBitmaps = Iterables.transform( - mutableBitmaps, - new Function() - { - @Override - public ImmutableBitmap apply(MutableBitmap input) - { - return bitmapFactory.makeImmutableBitmap(input); - } - } - ); - - bitmaps = GenericIndexed.fromIterable( - Iterables.concat( - Arrays.asList( - theNullSet.union(Iterables.getFirst(immutableBitmaps, null)) - ), - Iterables.skip(immutableBitmaps, 1) - ), - bitmapSerdeFactory.getObjectStrategy() - ); - } - } else { - bitmaps = GenericIndexed.fromIterable( - Iterables.transform( - mutableBitmaps, - new Function() - { - @Override - public ImmutableBitmap apply(MutableBitmap input) - { - return bitmapFactory.makeImmutableBitmap(input); - } - } - ), - bitmapSerdeFactory.getObjectStrategy() - ); - } - - // Make spatial indexes - ImmutableRTree spatialIndex = null; - boolean hasSpatialIndexes = columnCapabilities.get(dimension).hasSpatialIndexes(); - RTree tree = null; - if (hasSpatialIndexes) { - tree = new RTree( - 2, - new LinearGutmanSplitStrategy(0, 50, bitmapSerdeFactory.getBitmapFactory()), - bitmapSerdeFactory.getBitmapFactory() - ); - } - - int dimValIndex = 0; - for (String dimVal : dimensionValuesLookup.get(dimension)) { - if (hasSpatialIndexes) { - if (dimVal != null && !dimVal.isEmpty()) { - List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); - float[] coords = new float[stringCoords.size()]; - for (int j = 0; j < coords.length; j++) { - coords[j] = Float.valueOf(stringCoords.get(j)); - } - tree.insert(coords, mutableBitmaps.get(dimValIndex)); - } - dimValIndex++; - } - } - if (hasSpatialIndexes) { - spatialIndex = ImmutableRTree.newImmutableFromMutable(tree); - } - - log.info("Completed dimension[%s] with cardinality[%,d]. Starting write.", dimension, dictionary.size()); - - final DictionaryEncodedColumnPartSerde.Builder dimPartBuilder = DictionaryEncodedColumnPartSerde - .builder() - .withDictionary(dictionary) - .withBitmapSerdeFactory(bitmapSerdeFactory) - .withBitmaps(bitmaps) - .withSpatialIndex(spatialIndex) - .withByteOrder(IndexIO.BYTE_ORDER); - - if (singleValCol != null) { - if (compressionStrategy != null) { - dimPartBuilder.withSingleValuedColumn( - CompressedVSizeIntsIndexedSupplier.fromList( - singleValCol, - dictionary.size(), - CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()), - IndexIO.BYTE_ORDER, - compressionStrategy - ) - ); - } else { - dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); - } - } else if (compressionStrategy != null) { - dimPartBuilder.withMultiValuedColumn( - CompressedVSizeIndexedSupplier.fromIterable( - multiValCol, - dictionary.size(), - IndexIO.BYTE_ORDER, - compressionStrategy - ) - ); - } else { - dimPartBuilder.withMultiValuedColumn(multiValCol); - } - - - writeColumn( - v9Smoosher, - dimPartBuilder.build(), - dimBuilder, - dimension - ); - - progress.stopSection(section); - } - - private void makeMetricColumns( - final FileSmoosher v9Smoosher, - final ProgressIndicator progress, - final Iterable theRows, - final List mergedMetrics, - final Map valueTypes, - final Map metricTypeNames, - final int rowCount, - final IndexSpec indexSpec - ) throws IOException - { - final String metSection = "make metric columns"; - progress.startSection(metSection); - - int metIndex = 0; - for (String metric : mergedMetrics) { - makeMetricColumn( - v9Smoosher, - progress, - theRows, - metIndex, - metric, - valueTypes, - metricTypeNames, - rowCount, - indexSpec.getMetricCompressionStrategy() - ); - metIndex++; - } - progress.stopSection(metSection); - } - - private void makeMetricColumn( - final FileSmoosher v9Smoosher, - final ProgressIndicator progress, - final Iterable theRows, - final int metricIndex, - final String metric, - final Map valueTypes, - final Map metricTypeNames, - final int rowCount, - final CompressedObjectStrategy.CompressionStrategy compressionStrategy - ) throws IOException - { - final String section = String.format("make column[%s]", metric); - progress.startSection(section); - - final ColumnDescriptor.Builder metBuilder = ColumnDescriptor.builder(); - ValueType type = valueTypes.get(metric); - - switch (type) { - case FLOAT: { - metBuilder.setValueType(ValueType.FLOAT); - - float[] arr = new float[rowCount]; - int rowNum = 0; - for (Rowboat theRow : theRows) { - Object obj = theRow.getMetrics()[metricIndex]; - arr[rowNum++] = (obj == null) ? 0 : ((Number) obj).floatValue(); - } - - CompressedFloatsIndexedSupplier compressedFloats = CompressedFloatsIndexedSupplier.fromFloatBuffer( - FloatBuffer.wrap(arr), - IndexIO.BYTE_ORDER, - compressionStrategy - ); - - writeColumn( - v9Smoosher, - new FloatGenericColumnPartSerde(compressedFloats, IndexIO.BYTE_ORDER), - metBuilder, - metric - ); - break; - } - case LONG: { - metBuilder.setValueType(ValueType.LONG); - - long[] arr = new long[rowCount]; - int rowNum = 0; - for (Rowboat theRow : theRows) { - Object obj = theRow.getMetrics()[metricIndex]; - arr[rowNum++] = (obj == null) ? 0 : ((Number) obj).longValue(); - } - - CompressedLongsIndexedSupplier compressedLongs = CompressedLongsIndexedSupplier.fromLongBuffer( - LongBuffer.wrap(arr), - IndexIO.BYTE_ORDER, - compressionStrategy - ); - - writeColumn( - v9Smoosher, - new LongGenericColumnPartSerde(compressedLongs, IndexIO.BYTE_ORDER), - metBuilder, - metric - ); - break; - } - case COMPLEX: - String complexType = metricTypeNames.get(metric); - - ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(complexType); - - if (serde == null) { - throw new ISE("Unknown type[%s]", complexType); - } - - final GenericIndexed metricColumn = GenericIndexed.fromIterable( - Iterables.transform( - theRows, - new Function() - { - @Override - public Object apply(Rowboat input) - { - return input.getMetrics()[metricIndex]; - } - } - ), - serde.getObjectStrategy() - ); - - metBuilder.setValueType(ValueType.COMPLEX); - writeColumn( - v9Smoosher, - new ComplexColumnPartSerde(metricColumn, complexType), - metBuilder, - metric - ); - break; - default: - throw new ISE("Unknown type[%s]", type); - } - - progress.stopSection(section); - } - - private void makeIndexBinary( - final FileSmoosher v9Smoosher, - final List adapters, - final File outDir, - final List mergedDimensions, - final List mergedMetrics, - final Set skippedDimensions, - final ProgressIndicator progress, - final IndexSpec indexSpec - ) throws IOException - { - final String section = "building index.drd"; - progress.startSection(section); - - final Set finalColumns = Sets.newTreeSet(); - finalColumns.addAll(mergedDimensions); - finalColumns.addAll(mergedMetrics); - finalColumns.removeAll(skippedDimensions); - - final Iterable finalDimensions = Iterables.filter( - mergedDimensions, - new Predicate() - { - @Override - public boolean apply(String input) - { - return !skippedDimensions.contains(input); - } - } - ); - - GenericIndexed cols = GenericIndexed.fromIterable(finalColumns, GenericIndexed.STRING_STRATEGY); - GenericIndexed dims = GenericIndexed.fromIterable(finalDimensions, GenericIndexed.STRING_STRATEGY); - - final String bitmapSerdeFactoryType = mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory()); - final long numBytes = cols.getSerializedSize() - + dims.getSerializedSize() - + 16 - + serializerUtils.getSerializedStringByteSize(bitmapSerdeFactoryType); - - final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); - cols.writeToChannel(writer); - dims.writeToChannel(writer); - - DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT); - DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT); - - for (IndexableAdapter index : adapters) { - minTime = JodaUtils.minDateTime(minTime, index.getDataInterval().getStart()); - maxTime = JodaUtils.maxDateTime(maxTime, index.getDataInterval().getEnd()); - } - final Interval dataInterval = new Interval(minTime, maxTime); - - serializerUtils.writeLong(writer, dataInterval.getStartMillis()); - serializerUtils.writeLong(writer, dataInterval.getEndMillis()); - - serializerUtils.writeString( - writer, bitmapSerdeFactoryType - ); - writer.close(); - - IndexIO.checkFileSize(new File(outDir, "index.drd")); - - progress.stopSection(section); - } - - private void makeMetadataBinary( - final FileSmoosher v9Smoosher, - final ProgressIndicator progress, - final Map segmentMetadata - ) throws IOException - { - if (segmentMetadata != null && !segmentMetadata.isEmpty()) { - progress.startSection("metadata.drd"); - v9Smoosher.add("metadata.drd", ByteBuffer.wrap(mapper.writeValueAsBytes(segmentMetadata))); - progress.stopSection("metadata.drd"); - } - } - - private void writeColumn( - FileSmoosher v9Smoosher, - ColumnPartSerde serde, - ColumnDescriptor.Builder builder, - String name - ) throws IOException - { - builder.addSerde(serde); - - final ColumnDescriptor descriptor = builder.build(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(descriptor)); - byte[] specBytes = baos.toByteArray(); - - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - name, descriptor.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - descriptor.write(channel); - channel.close(); - } - - private ArrayList mergeIndexed(final List> indexedLists) - { - Set retVal = Sets.newTreeSet(Ordering.natural().nullsFirst()); - - for (Iterable indexedList : indexedLists) { - for (T val : indexedList) { - retVal.add(val); - } - } - - return Lists.newArrayList(retVal); - } - - private static interface ColumnDictionaryEntryStore - { - public void add(int[] vals); - } - - private static class DimValueConverter - { - private final Indexed dimSet; - private final IntBuffer conversionBuf; - private int currIndex; - private String lastVal = null; - - DimValueConverter( - Indexed dimSet - ) - { - this.dimSet = dimSet; - final int bufferSize = dimSet.size() * Ints.BYTES; - log.info("Allocating new dimension conversion buffer of size[%,d]", bufferSize); - this.conversionBuf = ByteBuffer.allocateDirect(bufferSize).asIntBuffer(); - - this.currIndex = 0; - } - - public void convert(String value, int index) - { - if (dimSet.size() == 0) { - return; - } - if (lastVal != null) { - if (value.compareTo(lastVal) <= 0) { - throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.", value, lastVal); - } - return; - } - String currValue = dimSet.get(currIndex); - - while (currValue == null) { - conversionBuf.position(conversionBuf.position() + 1); - ++currIndex; - if (currIndex == dimSet.size()) { - lastVal = value; - return; - } - currValue = dimSet.get(currIndex); - } - - if (Objects.equal(currValue, value)) { - conversionBuf.put(index); - ++currIndex; - if (currIndex == dimSet.size()) { - lastVal = value; - } - } else if (currValue.compareTo(value) < 0) { - throw new ISE( - "Skipped currValue[%s], currIndex[%,d]; incoming value[%s], index[%,d]", currValue, currIndex, value, index - ); - } - } - - public IntBuffer getConversionBuffer() - { - if (currIndex != conversionBuf.limit() || conversionBuf.hasRemaining()) { - throw new ISE( - "Asked for incomplete buffer. currIndex[%,d] != buf.limit[%,d]", currIndex, conversionBuf.limit() - ); - } - return (IntBuffer) conversionBuf.asReadOnlyBuffer().rewind(); - } - } - - private static class ConvertingIndexedInts implements Iterable - { - private final IndexedInts baseIndex; - private final IntBuffer conversionBuffer; - - public ConvertingIndexedInts( - IndexedInts baseIndex, - IntBuffer conversionBuffer - ) - { - this.baseIndex = baseIndex; - this.conversionBuffer = conversionBuffer; - } - - public int size() - { - return baseIndex.size(); - } - - public int get(int index) - { - return conversionBuffer.get(baseIndex.get(index)); - } - - @Override - public Iterator iterator() - { - return Iterators.transform( - baseIndex.iterator(), - new Function() - { - @Override - public Integer apply(Integer input) - { - return conversionBuffer.get(input); - } - } - ); - } - } - - private static class MMappedIndexRowIterable implements Iterable - { - private final Iterable index; - private final List convertedDims; - private final Map converters; - private final int indexNumber; - - - MMappedIndexRowIterable( - Iterable index, - List convertedDims, - Map converters, - int indexNumber - ) - { - this.index = index; - this.convertedDims = convertedDims; - this.converters = converters; - this.indexNumber = indexNumber; - } - - public Iterable getIndex() - { - return index; - } - - public List getConvertedDims() - { - return convertedDims; - } - - public Map getConverters() - { - return converters; - } - - public int getIndexNumber() - { - return indexNumber; - } - - @Override - public Iterator iterator() - { - final IntBuffer[] converterArray = FluentIterable - .from(convertedDims) - .transform( - new Function() - { - @Override - public IntBuffer apply(String input) - { - return converters.get(input); - } - } - ).toArray(IntBuffer.class); - return Iterators.transform( - index.iterator(), - new Function() - { - @Override - public Rowboat apply(Rowboat input) - { - final int[][] dims = input.getDims(); - final int[][] newDims = new int[convertedDims.size()][]; - for (int i = 0; i < newDims.length; ++i) { - final IntBuffer converter = converterArray[i]; - - if (converter == null) { - continue; - } - - if (i >= dims.length || dims[i] == null) { - continue; - } - - newDims[i] = new int[dims[i].length]; - - for (int j = 0; j < dims[i].length; ++j) { - if (!converter.hasRemaining()) { - throw new ISE("Converter mismatch! wtfbbq!"); - } - newDims[i][j] = converter.get(dims[i][j]); - } - } - - final Rowboat retVal = new Rowboat( - input.getTimestamp(), - newDims, - input.getMetrics(), - input.getRowNum() - ); - - retVal.addRow(indexNumber, input.getRowNum()); - - return retVal; - } - } - ); - } - } - - private static class AggFactoryStringIndexed implements Indexed - { - private final AggregatorFactory[] metricAggs; - - public AggFactoryStringIndexed(AggregatorFactory[] metricAggs) {this.metricAggs = metricAggs;} - - @Override - public Class getClazz() - { - return String.class; - } - - @Override - public int size() - { - return metricAggs.length; - } - - @Override - public String get(int index) - { - return metricAggs[index].getName(); - } - - @Override - public int indexOf(String value) - { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() - { - return IndexedIterable.create(this).iterator(); - } - } - - private static class RowboatMergeFunction implements BinaryFn - { - private final AggregatorFactory[] metricAggs; - - public RowboatMergeFunction(AggregatorFactory[] metricAggs) - { - this.metricAggs = metricAggs; - } - - @Override - public Rowboat apply(Rowboat lhs, Rowboat rhs) - { - if (lhs == null) { - return rhs; - } - if (rhs == null) { - return lhs; - } - - Object[] metrics = new Object[metricAggs.length]; - Object[] lhsMetrics = lhs.getMetrics(); - Object[] rhsMetrics = rhs.getMetrics(); - - for (int i = 0; i < metrics.length; ++i) { - metrics[i] = metricAggs[i].combine(lhsMetrics[i], rhsMetrics[i]); - } - - final Rowboat retVal = new Rowboat( - lhs.getTimestamp(), - lhs.getDims(), - metrics, - lhs.getRowNum() - ); - - for (Rowboat rowboat : Arrays.asList(lhs, rhs)) { - for (Map.Entry> entry : rowboat.getComprisedRows().entrySet()) { - for (Integer rowNum : entry.getValue()) { - retVal.addRow(entry.getKey(), rowNum); - } - } - } - - return retVal; - } - } - - private static class SingleValColumnDictionaryEntryStore implements ColumnDictionaryEntryStore - { - private final List data = Lists.newArrayList(); - - @Override - public void add(int[] vals) - { - if (vals == null || vals.length == 0) { - data.add(null); - } else { - data.add(vals[0]); - } - } - - public List get() - { - return data; - } - } - - private static class MultiValColumnDictionaryEntryStore implements ColumnDictionaryEntryStore - { - private final List> data = Lists.newArrayList(); - - public void add(int[] vals) - { - if (vals == null || vals.length == 0) { - data.add(null); - } else { - data.add(Ints.asList(vals)); - } - } - - public List> get() - { - return data; - } - } -} diff --git a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java deleted file mode 100644 index fea31643e1b..00000000000 --- a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java +++ /dev/null @@ -1,629 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment; - -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import io.druid.data.input.MapBasedInputRow; -import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.CountAggregatorFactory; -import io.druid.segment.column.Column; -import io.druid.segment.column.SimpleDictionaryEncodedColumn; -import io.druid.segment.data.BitmapSerdeFactory; -import io.druid.segment.data.CompressedObjectStrategy; -import io.druid.segment.data.ConciseBitmapSerdeFactory; -import io.druid.segment.data.IncrementalIndexTest; -import io.druid.segment.data.RoaringBitmapSerdeFactory; -import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexAdapter; -import io.druid.segment.incremental.OnheapIncrementalIndex; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import javax.annotation.Nullable; -import java.io.File; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -@Ignore -/* -* IndexMaker is not yet ready for production. Enable this test when IndexMaker is ready. -*/ - -@RunWith(Parameterized.class) -public class IndexMakerParameterizedTest -{ - - @Rule - public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - @Rule - public final CloserRule closer = new CloserRule(false); - - private final static IndexMaker INDEX_MAKER = TestHelper.getTestIndexMaker(); - private final static IndexIO INDEX_IO = TestHelper.getTestIndexIO(); - - - @Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}") - public static Collection data() - { - return Collections2.transform( - Sets.cartesianProduct( - ImmutableList.of( - ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()), - ImmutableSet.of( - CompressedObjectStrategy.CompressionStrategy.LZ4, - CompressedObjectStrategy.CompressionStrategy.LZF - ), - ImmutableSet.of( - CompressedObjectStrategy.CompressionStrategy.LZ4, - CompressedObjectStrategy.CompressionStrategy.LZF - ) - ) - ), new Function, Object[]>() - { - @Nullable - @Override - public Object[] apply(List input) - { - return input.toArray(); - } - } - ); - } - - static IndexSpec makeIndexSpec( - BitmapSerdeFactory bitmapSerdeFactory, - CompressedObjectStrategy.CompressionStrategy compressionStrategy, - CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy - ) - { - if (bitmapSerdeFactory != null || compressionStrategy != null) { - return new IndexSpec( - bitmapSerdeFactory, - compressionStrategy.name().toLowerCase(), - dimCompressionStrategy.name().toLowerCase() - ); - } else { - return new IndexSpec(); - } - } - - private final IndexSpec indexSpec; - - public IndexMakerParameterizedTest( - BitmapSerdeFactory bitmapSerdeFactory, - CompressedObjectStrategy.CompressionStrategy compressionStrategy, - CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy - ) - { - this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy); - } - - @Test - public void testPersist() throws Exception - { - final long timestamp = System.currentTimeMillis(); - - IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null); - IncrementalIndexTest.populateIndex(timestamp, toPersist); - - final File tempDir = temporaryFolder.newFolder(); - QueryableIndex index = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist, - tempDir, - null, - indexSpec - ) - ) - ); - - Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); - Assert.assertEquals(3, index.getColumnNames().size()); - - assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); - } - - @Test - public void testPersistMerge() throws Exception - { - final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - IncrementalIndex toPersist2 = new OnheapIncrementalIndex( - 0L, - QueryGranularity.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - 1000 - ); - - toPersist2.add( - new MapBasedInputRow( - timestamp, - Arrays.asList("dim1", "dim2"), - ImmutableMap.of("dim1", "1", "dim2", "2") - ) - ); - - toPersist2.add( - new MapBasedInputRow( - timestamp, - Arrays.asList("dim1", "dim2"), - ImmutableMap.of("dim1", "5", "dim2", "6") - ) - ); - - final File tempDir1 = temporaryFolder.newFolder(); - final File tempDir2 = temporaryFolder.newFolder(); - final File mergedDir = temporaryFolder.newFolder(); - - QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist1, - tempDir1, - null, - indexSpec - ) - ) - ); - - Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(3, index1.getColumnNames().size()); - - QueryableIndex index2 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist2, - tempDir2, - null, - indexSpec - ) - ) - ); - - Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); - Assert.assertEquals(3, index2.getColumnNames().size()); - - QueryableIndex merged = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.mergeQueryableIndex( - Arrays.asList(index1, index2), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - indexSpec - ) - ) - ); - - Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); - Assert.assertEquals(3, merged.getColumnNames().size()); - assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); - } - - @Test - public void testPersistEmptyColumn() throws Exception - { - final IncrementalIndex toPersist1 = new OnheapIncrementalIndex( - 0L, - QueryGranularity.NONE, - new AggregatorFactory[]{}, - 10 - ); - final IncrementalIndex toPersist2 = new OnheapIncrementalIndex( - 0L, - QueryGranularity.NONE, - new AggregatorFactory[]{}, - 10 - ); - final File tmpDir1 = temporaryFolder.newFolder(); - final File tmpDir2 = temporaryFolder.newFolder(); - final File tmpDir3 = temporaryFolder.newFolder(); - - toPersist1.add( - new MapBasedInputRow( - 1L, - ImmutableList.of("dim1", "dim2"), - ImmutableMap.of("dim1", ImmutableList.of(), "dim2", "foo") - ) - ); - - toPersist2.add( - new MapBasedInputRow( - 1L, - ImmutableList.of("dim1", "dim2"), - ImmutableMap.of("dim1", ImmutableList.of(), "dim2", "bar") - ) - ); - - final QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist1, - tmpDir1, - null, - indexSpec - ) - ) - ); - final QueryableIndex index2 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist1, - tmpDir2, - null, - indexSpec - ) - ) - ); - final QueryableIndex merged = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec) - ) - ); - - Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions())); - - Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions())); - - Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); - - assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); - } - - @Test - public void testMergeRetainsValues() throws Exception - { - final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - final File tempDir1 = temporaryFolder.newFolder(); - final File mergedDir = temporaryFolder.newFolder(); - final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( - toPersist1.getInterval(), - toPersist1, - indexSpec.getBitmapSerdeFactory() - .getBitmapFactory() - ); - - QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist1, - tempDir1, - null, - indexSpec - ) - ) - ); - - - final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); - - INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter); - - Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(3, index1.getColumnNames().size()); - - - QueryableIndex merged = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.mergeQueryableIndex( - ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - indexSpec - ) - ) - ); - - Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); - Assert.assertEquals(3, merged.getColumnNames().size()); - - INDEX_IO.validateTwoSegments(tempDir1, mergedDir); - - assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); - } - - - @Test - public void testAppendRetainsValues() throws Exception - { - final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - final File tempDir1 = temporaryFolder.newFolder(); - final File mergedDir = temporaryFolder.newFolder(); - final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( - toPersist1.getInterval(), - toPersist1, - indexSpec.getBitmapSerdeFactory() - .getBitmapFactory() - ); - - QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.append( - ImmutableList.of(incrementalAdapter), tempDir1, indexSpec - ) - ) - ); - final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); - - INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter); - - Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(3, index1.getColumnNames().size()); - - - QueryableIndex merged = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.mergeQueryableIndex( - ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - indexSpec - ) - ) - ); - - Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); - Assert.assertEquals(3, merged.getColumnNames().size()); - - INDEX_IO.validateTwoSegments(tempDir1, mergedDir); - - assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); - } - - @Test - public void testMergeSpecChange() throws Exception - { - final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - final File tempDir1 = temporaryFolder.newFolder(); - final File mergedDir = temporaryFolder.newFolder(); - final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( - toPersist1.getInterval(), - toPersist1, - indexSpec.getBitmapSerdeFactory() - .getBitmapFactory() - ); - - QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist1, - tempDir1, - null, - indexSpec - ) - ) - ); - - - final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); - - INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter); - - Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(3, index1.getColumnNames().size()); - - - IndexSpec newSpec = new IndexSpec( - indexSpec.getBitmapSerdeFactory(), - "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", - "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" - ); - - - QueryableIndex merged = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.mergeQueryableIndex( - ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - newSpec - ) - ) - ); - - Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); - Assert.assertEquals(3, merged.getColumnNames().size()); - - INDEX_IO.validateTwoSegments(tempDir1, mergedDir); - - assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(merged, newSpec.getDimensionCompressionStrategy()); - } - - - @Test - public void testConvertSame() throws Exception - { - final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - final File tempDir1 = temporaryFolder.newFolder(); - final File convertDir = temporaryFolder.newFolder(); - final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( - toPersist1.getInterval(), - toPersist1, - indexSpec.getBitmapSerdeFactory() - .getBitmapFactory() - ); - - QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist1, - tempDir1, - null, - indexSpec - ) - ) - ); - - - final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); - - INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter); - - Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(3, index1.getColumnNames().size()); - - - QueryableIndex converted = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.convert( - tempDir1, - convertDir, - indexSpec - ) - ) - ); - - Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); - Assert.assertEquals(3, converted.getColumnNames().size()); - - INDEX_IO.validateTwoSegments(tempDir1, convertDir); - - assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(converted, indexSpec.getDimensionCompressionStrategy()); - } - - - @Test - public void testConvertDifferent() throws Exception - { - final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - final File tempDir1 = temporaryFolder.newFolder(); - final File convertDir = temporaryFolder.newFolder(); - final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( - toPersist1.getInterval(), - toPersist1, - indexSpec.getBitmapSerdeFactory() - .getBitmapFactory() - ); - - QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MAKER.persist( - toPersist1, - tempDir1, - null, - indexSpec - ) - ) - ); - - - final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); - - INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter); - - Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(3, index1.getColumnNames().size()); - - - IndexSpec newSpec = new IndexSpec( - indexSpec.getBitmapSerdeFactory(), - "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", - "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" - ); - - QueryableIndex converted = closer.closeLater(INDEX_IO.loadIndex(INDEX_MAKER.convert(tempDir1, convertDir, newSpec))); - - Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); - Assert.assertEquals(3, converted.getColumnNames().size()); - - INDEX_IO.validateTwoSegments(tempDir1, convertDir); - - assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); - assertDimCompression(converted, newSpec.getDimensionCompressionStrategy()); - } - - private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) - throws Exception - { - // Java voodoo - - Object encodedColumn = index.getColumn("dim2").getDictionaryEncoding(); - Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column"); - field.setAccessible(true); - - Object obj = field.get(encodedColumn); - Field compressedSupplierField = obj.getClass().getDeclaredField("this$0"); - compressedSupplierField.setAccessible(true); - - Object supplier = compressedSupplierField.get(obj); - Field compressionField = supplier.getClass().getDeclaredField("compression"); - compressionField.setAccessible(true); - - Object strategy = compressionField.get(supplier); - - Assert.assertEquals(expectedStrategy, strategy); - } -} diff --git a/processing/src/test/java/io/druid/segment/IndexMakerTest.java b/processing/src/test/java/io/druid/segment/IndexMakerTest.java deleted file mode 100644 index 552870869dc..00000000000 --- a/processing/src/test/java/io/druid/segment/IndexMakerTest.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment; - -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.io.Files; -import io.druid.common.utils.JodaUtils; -import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedInputRow; -import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.CountAggregatorFactory; -import io.druid.segment.data.CompressedObjectStrategy; -import io.druid.segment.data.ConciseBitmapSerdeFactory; -import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; -import org.apache.commons.io.FileUtils; -import org.joda.time.DateTime; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import javax.annotation.Nullable; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Ignore -/* -* IndexMaker is not yet ready for production. Enable this test when IndexMaker is ready. -*/ - -@RunWith(Parameterized.class) -public class IndexMakerTest -{ - @Rule - public final CloserRule closer = new CloserRule(false); - private static final long TIMESTAMP = DateTime.parse("2014-01-01").getMillis(); - private static final AggregatorFactory[] DEFAULT_AGG_FACTORIES = new AggregatorFactory[]{ - new CountAggregatorFactory( - "count" - ) - }; - - private static final IndexMaker INDEX_MAKER = TestHelper.getTestIndexMaker(); - private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO(); - private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger(); - - private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec( - new ConciseBitmapSerdeFactory(), - CompressedObjectStrategy.CompressionStrategy.LZ4, - CompressedObjectStrategy.CompressionStrategy.LZ4 - ); - private static final List DIMS = ImmutableList.of("dim0", "dim1"); - - private static final Function>, Object[]> OBJECT_MAKER = new Function>, Object[]>() - { - @Nullable - @Override - public Object[] apply(Collection> input) - { - final ArrayList list = new ArrayList<>(); - int i = 0; - for (final Map map : input) { - list.add(new MapBasedInputRow(TIMESTAMP + i++, DIMS, map)); - } - return new Object[]{list}; - } - }; - - @SafeVarargs - public static Collection permute(Map... maps) - { - if (maps == null) { - return ImmutableList.of(); - } - return Collections2.transform( - Collections2.permutations( - Arrays.asList(maps) - ), - OBJECT_MAKER - ); - } - - @Parameterized.Parameters - public static Iterable paramFeeder() - { - final Map map1 = ImmutableMap.of( - DIMS.get(0), ImmutableList.of("dim00", "dim01"), - DIMS.get(1), "dim10" - ); - - final List nullList = Collections.singletonList(null); - - final Map map2 = ImmutableMap.of( - DIMS.get(0), nullList, - DIMS.get(1), "dim10" - ); - - - final Map map3 = ImmutableMap.of( - DIMS.get(0), - ImmutableList.of("dim00", "dim01") - ); - - final Map map4 = ImmutableMap.of(); - - final Map map5 = ImmutableMap.of(DIMS.get(1), "dim10"); - - final Map map6 = new HashMap<>(); - map6.put(DIMS.get(1), null); // ImmutableMap cannot take null - - - return Iterables.concat( - permute(map1) - , permute(map1, map4) - , permute(map1, map5) - , permute(map5, map6) - , permute(map4, map5) - , Iterables.transform(ImmutableList.of(Arrays.asList(map1, map2, map3, map4, map5, map6)), OBJECT_MAKER) - ); - - } - - private final Collection events; - - public IndexMakerTest( - final Collection events - ) - { - this.events = events; - } - - IncrementalIndex toPersist; - File tmpDir; - File persistTmpDir; - - @Before - public void setUp() throws IOException - { - toPersist = new OnheapIncrementalIndex( - JodaUtils.MIN_INSTANT, - QueryGranularity.NONE, - DEFAULT_AGG_FACTORIES, - 1000000 - ); - for (InputRow event : events) { - toPersist.add(event); - } - tmpDir = Files.createTempDir(); - persistTmpDir = new File(tmpDir, "persistDir"); - INDEX_MERGER.persist(toPersist, persistTmpDir, null, INDEX_SPEC); - } - - @After - public void tearDown() throws IOException - { - FileUtils.deleteDirectory(tmpDir); - } - - @Test - public void testPersistWithSegmentMetadata() throws IOException - { - File outDir = Files.createTempDir(); - QueryableIndex index = null; - try { - outDir = Files.createTempDir(); - Map segmentMetadata = ImmutableMap.of("key", "value"); - index = INDEX_IO.loadIndex(INDEX_MAKER.persist(toPersist, outDir, segmentMetadata, INDEX_SPEC)); - - Assert.assertEquals(segmentMetadata, index.getMetaData()); - } - finally { - if (index != null) { - index.close(); - ; - } - - if (outDir != null) { - FileUtils.deleteDirectory(outDir); - } - } - } - - @Test - public void testSimpleReprocess() throws IOException - { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( - closer.closeLater( - INDEX_IO.loadIndex( - persistTmpDir - ) - ) - ); - Assert.assertEquals(events.size(), adapter.getNumRows()); - reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed")); - } - - private File reprocessAndValidate(File inDir, File tmpDir) throws IOException - { - final File outDir = INDEX_MAKER.convert( - inDir, - tmpDir, - INDEX_SPEC - ); - INDEX_IO.validateTwoSegments(persistTmpDir, outDir); - return outDir; - } - - private File appendAndValidate(File inDir, File tmpDir) throws IOException - { - final File outDir = INDEX_MERGER.append( - ImmutableList.of(new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(inDir)))), - tmpDir, - INDEX_SPEC - ); - INDEX_IO.validateTwoSegments(persistTmpDir, outDir); - return outDir; - } - - @Test - public void testIdempotentReprocess() throws IOException - { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( - closer.closeLater( - INDEX_IO.loadIndex( - persistTmpDir - ) - ) - ); - Assert.assertEquals(events.size(), adapter.getNumRows()); - final File tmpDir1 = new File(tmpDir, "reprocessed1"); - reprocessAndValidate(persistTmpDir, tmpDir1); - - final File tmpDir2 = new File(tmpDir, "reprocessed2"); - final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir1))); - Assert.assertEquals(events.size(), adapter2.getNumRows()); - reprocessAndValidate(tmpDir1, tmpDir2); - - final File tmpDir3 = new File(tmpDir, "reprocessed3"); - final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir2))); - Assert.assertEquals(events.size(), adapter3.getNumRows()); - reprocessAndValidate(tmpDir2, tmpDir3); - } - - @Test - public void testSimpleAppend() throws IOException - { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( - closer.closeLater( - INDEX_IO.loadIndex( - persistTmpDir - ) - ) - ); - Assert.assertEquals(events.size(), adapter.getNumRows()); - appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed")); - } - - @Test - public void testIdempotentAppend() throws IOException - { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( - closer.closeLater( - INDEX_IO.loadIndex( - persistTmpDir - ) - ) - ); - Assert.assertEquals(events.size(), adapter.getNumRows()); - final File tmpDir1 = new File(tmpDir, "reprocessed1"); - appendAndValidate(persistTmpDir, tmpDir1); - - final File tmpDir2 = new File(tmpDir, "reprocessed2"); - final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir1))); - Assert.assertEquals(events.size(), adapter2.getNumRows()); - appendAndValidate(tmpDir1, tmpDir2); - - final File tmpDir3 = new File(tmpDir, "reprocessed3"); - final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir2))); - Assert.assertEquals(events.size(), adapter3.getNumRows()); - appendAndValidate(tmpDir2, tmpDir3); - } -} diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java index 426bd6e5ec5..6875caf9076 100644 --- a/processing/src/test/java/io/druid/segment/TestHelper.java +++ b/processing/src/test/java/io/druid/segment/TestHelper.java @@ -36,7 +36,6 @@ import java.util.Iterator; public class TestHelper { private static final IndexMerger INDEX_MERGER; - private static final IndexMaker INDEX_MAKER; private static final IndexIO INDEX_IO; public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); @@ -53,7 +52,6 @@ public class TestHelper } ); INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO); - INDEX_MAKER = new IndexMaker(JSON_MAPPER, INDEX_IO); } @@ -62,11 +60,6 @@ public class TestHelper return INDEX_MERGER; } - public static IndexMaker getTestIndexMaker() - { - return INDEX_MAKER; - } - public static IndexIO getTestIndexIO() { return INDEX_IO;