remove IndexMaker

This commit is contained in:
fjy 2015-12-27 17:09:53 -08:00
parent d94821998f
commit faf421726b
6 changed files with 0 additions and 2794 deletions

View File

@ -50,7 +50,6 @@ import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
@ -90,7 +89,6 @@ public class HadoopDruidIndexerConfig
public static final ObjectMapper JSON_MAPPER;
public static final IndexIO INDEX_IO;
public static final IndexMerger INDEX_MERGER;
public static final IndexMaker INDEX_MAKER;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
@ -114,7 +112,6 @@ public class HadoopDruidIndexerConfig
JSON_MAPPER = injector.getInstance(ObjectMapper.class);
INDEX_IO = injector.getInstance(IndexIO.class);
INDEX_MERGER = injector.getInstance(IndexMerger.class);
INDEX_MAKER = injector.getInstance(IndexMaker.class);
}
public static enum IndexJobCounters

View File

@ -27,7 +27,6 @@ import com.metamx.common.ISE;
import io.druid.guice.ServerModule;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger;
import io.druid.segment.column.ColumnConfig;
@ -40,7 +39,6 @@ public class TestUtils
{
private final ObjectMapper jsonMapper;
private final IndexMerger indexMerger;
private final IndexMaker indexMaker;
private final IndexIO indexIO;
public TestUtils()
@ -58,7 +56,6 @@ public class TestUtils
}
);
indexMerger = new IndexMerger(jsonMapper, indexIO);
indexMaker = new IndexMaker(jsonMapper, indexIO);
final List<? extends Module> list = new ServerModule().getJacksonModules();
for (Module module : list) {
@ -69,7 +66,6 @@ public class TestUtils
new InjectableValues.Std()
.addValue(IndexIO.class, indexIO)
.addValue(IndexMerger.class, indexMerger)
.addValue(IndexMaker.class, indexMaker)
.addValue(ObjectMapper.class, jsonMapper)
);
}
@ -84,11 +80,6 @@ public class TestUtils
return indexMerger;
}
public IndexMaker getTestIndexMaker()
{
return indexMaker;
}
public IndexIO getTestIndexIO()
{
return indexIO;

File diff suppressed because it is too large Load Diff

View File

@ -1,629 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@Ignore
/*
* IndexMaker is not yet ready for production. Enable this test when IndexMaker is ready.
*/
@RunWith(Parameterized.class)
public class IndexMakerParameterizedTest
{
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public final CloserRule closer = new CloserRule(false);
private final static IndexMaker INDEX_MAKER = TestHelper.getTestIndexMaker();
private final static IndexIO INDEX_IO = TestHelper.getTestIndexIO();
@Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}")
public static Collection<Object[]> data()
{
return Collections2.transform(
Sets.cartesianProduct(
ImmutableList.of(
ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()),
ImmutableSet.of(
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.LZF
),
ImmutableSet.of(
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.LZF
)
)
), new Function<List<?>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<?> input)
{
return input.toArray();
}
}
);
}
static IndexSpec makeIndexSpec(
BitmapSerdeFactory bitmapSerdeFactory,
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
)
{
if (bitmapSerdeFactory != null || compressionStrategy != null) {
return new IndexSpec(
bitmapSerdeFactory,
compressionStrategy.name().toLowerCase(),
dimCompressionStrategy.name().toLowerCase()
);
} else {
return new IndexSpec();
}
}
private final IndexSpec indexSpec;
public IndexMakerParameterizedTest(
BitmapSerdeFactory bitmapSerdeFactory,
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
)
{
this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy);
}
@Test
public void testPersist() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = temporaryFolder.newFolder();
QueryableIndex index = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist,
tempDir,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
Assert.assertEquals(3, index.getColumnNames().size());
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testPersistMerge() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
toPersist2.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2")
)
);
toPersist2.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "5", "dim2", "6")
)
);
final File tempDir1 = temporaryFolder.newFolder();
final File tempDir2 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist2,
tempDir2,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(3, index2.getColumnNames().size());
QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.mergeQueryableIndex(
Arrays.asList(index1, index2),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir,
indexSpec
)
)
);
Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(3, merged.getColumnNames().size());
assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testPersistEmptyColumn() throws Exception
{
final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{},
10
);
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{},
10
);
final File tmpDir1 = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
final File tmpDir3 = temporaryFolder.newFolder();
toPersist1.add(
new MapBasedInputRow(
1L,
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", ImmutableList.of(), "dim2", "foo")
)
);
toPersist2.add(
new MapBasedInputRow(
1L,
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", ImmutableList.of(), "dim2", "bar")
)
);
final QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist1,
tmpDir1,
null,
indexSpec
)
)
);
final QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist1,
tmpDir2,
null,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec)
)
);
Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions()));
Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions()));
Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testMergeRetainsValues() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
indexSpec.getBitmapSerdeFactory()
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir,
indexSpec
)
)
);
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(3, merged.getColumnNames().size());
INDEX_IO.validateTwoSegments(tempDir1, mergedDir);
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testAppendRetainsValues() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
indexSpec.getBitmapSerdeFactory()
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.append(
ImmutableList.<IndexableAdapter>of(incrementalAdapter), tempDir1, indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir,
indexSpec
)
)
);
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(3, merged.getColumnNames().size());
INDEX_IO.validateTwoSegments(tempDir1, mergedDir);
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testMergeSpecChange() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
indexSpec.getBitmapSerdeFactory()
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
IndexSpec newSpec = new IndexSpec(
indexSpec.getBitmapSerdeFactory(),
"lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4",
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
);
QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir,
newSpec
)
)
);
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(3, merged.getColumnNames().size());
INDEX_IO.validateTwoSegments(tempDir1, mergedDir);
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, newSpec.getDimensionCompressionStrategy());
}
@Test
public void testConvertSame() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
final File convertDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
indexSpec.getBitmapSerdeFactory()
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex converted = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.convert(
tempDir1,
convertDir,
indexSpec
)
)
);
Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions()));
Assert.assertEquals(3, converted.getColumnNames().size());
INDEX_IO.validateTwoSegments(tempDir1, convertDir);
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(converted, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testConvertDifferent() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
final File convertDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
indexSpec.getBitmapSerdeFactory()
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MAKER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
INDEX_IO.validateTwoSegments(incrementalAdapter, queryableAdapter);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
IndexSpec newSpec = new IndexSpec(
indexSpec.getBitmapSerdeFactory(),
"lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4",
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
);
QueryableIndex converted = closer.closeLater(INDEX_IO.loadIndex(INDEX_MAKER.convert(tempDir1, convertDir, newSpec)));
Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions()));
Assert.assertEquals(3, converted.getColumnNames().size());
INDEX_IO.validateTwoSegments(tempDir1, convertDir);
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(converted, newSpec.getDimensionCompressionStrategy());
}
private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy)
throws Exception
{
// Java voodoo
Object encodedColumn = index.getColumn("dim2").getDictionaryEncoding();
Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column");
field.setAccessible(true);
Object obj = field.get(encodedColumn);
Field compressedSupplierField = obj.getClass().getDeclaredField("this$0");
compressedSupplierField.setAccessible(true);
Object supplier = compressedSupplierField.get(obj);
Field compressionField = supplier.getClass().getDeclaredField("compression");
compressionField.setAccessible(true);
Object strategy = compressionField.get(supplier);
Assert.assertEquals(expectedStrategy, strategy);
}
}

View File

@ -1,316 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Ignore
/*
* IndexMaker is not yet ready for production. Enable this test when IndexMaker is ready.
*/
@RunWith(Parameterized.class)
public class IndexMakerTest
{
@Rule
public final CloserRule closer = new CloserRule(false);
private static final long TIMESTAMP = DateTime.parse("2014-01-01").getMillis();
private static final AggregatorFactory[] DEFAULT_AGG_FACTORIES = new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
};
private static final IndexMaker INDEX_MAKER = TestHelper.getTestIndexMaker();
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
private static final IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
new ConciseBitmapSerdeFactory(),
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.LZ4
);
private static final List<String> DIMS = ImmutableList.of("dim0", "dim1");
private static final Function<Collection<Map<String, Object>>, Object[]> OBJECT_MAKER = new Function<Collection<Map<String, Object>>, Object[]>()
{
@Nullable
@Override
public Object[] apply(Collection<Map<String, Object>> input)
{
final ArrayList<InputRow> list = new ArrayList<>();
int i = 0;
for (final Map<String, Object> map : input) {
list.add(new MapBasedInputRow(TIMESTAMP + i++, DIMS, map));
}
return new Object[]{list};
}
};
@SafeVarargs
public static Collection<Object[]> permute(Map<String, Object>... maps)
{
if (maps == null) {
return ImmutableList.<Object[]>of();
}
return Collections2.transform(
Collections2.permutations(
Arrays.asList(maps)
),
OBJECT_MAKER
);
}
@Parameterized.Parameters
public static Iterable<Object[]> paramFeeder()
{
final Map<String, Object> map1 = ImmutableMap.<String, Object>of(
DIMS.get(0), ImmutableList.<String>of("dim00", "dim01"),
DIMS.get(1), "dim10"
);
final List<String> nullList = Collections.singletonList(null);
final Map<String, Object> map2 = ImmutableMap.<String, Object>of(
DIMS.get(0), nullList,
DIMS.get(1), "dim10"
);
final Map<String, Object> map3 = ImmutableMap.<String, Object>of(
DIMS.get(0),
ImmutableList.<String>of("dim00", "dim01")
);
final Map<String, Object> map4 = ImmutableMap.<String, Object>of();
final Map<String, Object> map5 = ImmutableMap.<String, Object>of(DIMS.get(1), "dim10");
final Map<String, Object> map6 = new HashMap<>();
map6.put(DIMS.get(1), null); // ImmutableMap cannot take null
return Iterables.<Object[]>concat(
permute(map1)
, permute(map1, map4)
, permute(map1, map5)
, permute(map5, map6)
, permute(map4, map5)
, Iterables.transform(ImmutableList.of(Arrays.asList(map1, map2, map3, map4, map5, map6)), OBJECT_MAKER)
);
}
private final Collection<InputRow> events;
public IndexMakerTest(
final Collection<InputRow> events
)
{
this.events = events;
}
IncrementalIndex toPersist;
File tmpDir;
File persistTmpDir;
@Before
public void setUp() throws IOException
{
toPersist = new OnheapIncrementalIndex(
JodaUtils.MIN_INSTANT,
QueryGranularity.NONE,
DEFAULT_AGG_FACTORIES,
1000000
);
for (InputRow event : events) {
toPersist.add(event);
}
tmpDir = Files.createTempDir();
persistTmpDir = new File(tmpDir, "persistDir");
INDEX_MERGER.persist(toPersist, persistTmpDir, null, INDEX_SPEC);
}
@After
public void tearDown() throws IOException
{
FileUtils.deleteDirectory(tmpDir);
}
@Test
public void testPersistWithSegmentMetadata() throws IOException
{
File outDir = Files.createTempDir();
QueryableIndex index = null;
try {
outDir = Files.createTempDir();
Map<String, Object> segmentMetadata = ImmutableMap.<String, Object>of("key", "value");
index = INDEX_IO.loadIndex(INDEX_MAKER.persist(toPersist, outDir, segmentMetadata, INDEX_SPEC));
Assert.assertEquals(segmentMetadata, index.getMetaData());
}
finally {
if (index != null) {
index.close();
;
}
if (outDir != null) {
FileUtils.deleteDirectory(outDir);
}
}
}
@Test
public void testSimpleReprocess() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
INDEX_IO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
}
private File reprocessAndValidate(File inDir, File tmpDir) throws IOException
{
final File outDir = INDEX_MAKER.convert(
inDir,
tmpDir,
INDEX_SPEC
);
INDEX_IO.validateTwoSegments(persistTmpDir, outDir);
return outDir;
}
private File appendAndValidate(File inDir, File tmpDir) throws IOException
{
final File outDir = INDEX_MERGER.append(
ImmutableList.<IndexableAdapter>of(new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(inDir)))),
tmpDir,
INDEX_SPEC
);
INDEX_IO.validateTwoSegments(persistTmpDir, outDir);
return outDir;
}
@Test
public void testIdempotentReprocess() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
INDEX_IO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
final File tmpDir1 = new File(tmpDir, "reprocessed1");
reprocessAndValidate(persistTmpDir, tmpDir1);
final File tmpDir2 = new File(tmpDir, "reprocessed2");
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir1)));
Assert.assertEquals(events.size(), adapter2.getNumRows());
reprocessAndValidate(tmpDir1, tmpDir2);
final File tmpDir3 = new File(tmpDir, "reprocessed3");
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir2)));
Assert.assertEquals(events.size(), adapter3.getNumRows());
reprocessAndValidate(tmpDir2, tmpDir3);
}
@Test
public void testSimpleAppend() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
INDEX_IO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
}
@Test
public void testIdempotentAppend() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
INDEX_IO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
final File tmpDir1 = new File(tmpDir, "reprocessed1");
appendAndValidate(persistTmpDir, tmpDir1);
final File tmpDir2 = new File(tmpDir, "reprocessed2");
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir1)));
Assert.assertEquals(events.size(), adapter2.getNumRows());
appendAndValidate(tmpDir1, tmpDir2);
final File tmpDir3 = new File(tmpDir, "reprocessed3");
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(tmpDir2)));
Assert.assertEquals(events.size(), adapter3.getNumRows());
appendAndValidate(tmpDir2, tmpDir3);
}
}

View File

@ -36,7 +36,6 @@ import java.util.Iterator;
public class TestHelper
{
private static final IndexMerger INDEX_MERGER;
private static final IndexMaker INDEX_MAKER;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
@ -53,7 +52,6 @@ public class TestHelper
}
);
INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MAKER = new IndexMaker(JSON_MAPPER, INDEX_IO);
}
@ -62,11 +60,6 @@ public class TestHelper
return INDEX_MERGER;
}
public static IndexMaker getTestIndexMaker()
{
return INDEX_MAKER;
}
public static IndexIO getTestIndexIO()
{
return INDEX_IO;