From ed8eb5c99120e744b30090c2ae63e173c9ba1137 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 29 May 2015 09:26:39 -0700 Subject: [PATCH] Improvements around resource handling in IndexMerger / IndexIO / QueryableIndex * Fix resource leak in `io.druid.segment.IndexIO.DefaultIndexIOHandler#validateTwoSegments(java.io.File, java.io.File)` * Un-deprecate `close()` in `QueryableIndex` and make it inherit `Closeable` * Fix resource leaks in various unit tests * Add `CloserRule` for closing out resources --- .../main/java/io/druid/segment/IndexIO.java | 12 +- .../java/io/druid/segment/IndexMaker.java | 42 +- .../java/io/druid/segment/IndexMerger.java | 33 +- .../java/io/druid/segment/QueryableIndex.java | 5 +- .../java/io/druid/segment/CloserRule.java | 103 ++++ .../java/io/druid/segment/CloserRuleTest.java | 364 ++++++++++++ .../segment/IndexMakerParameterizedTest.java | 554 ++++++++++++++++++ .../java/io/druid/segment/IndexMakerTest.java | 21 +- .../io/druid/segment/IndexMergerTest.java | 231 ++++++-- 9 files changed, 1293 insertions(+), 72 deletions(-) create mode 100644 processing/src/test/java/io/druid/segment/CloserRule.java create mode 100644 processing/src/test/java/io/druid/segment/CloserRuleTest.java create mode 100644 processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 8fde6d90508..d46d0f9937b 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -466,10 +466,14 @@ public class IndexIO public static void validateTwoSegments(File dir1, File dir2) throws IOException { - validateTwoSegments( - new QueryableIndexIndexableAdapter(loadIndex(dir1)), - new QueryableIndexIndexableAdapter(loadIndex(dir2)) - ); + try(QueryableIndex queryableIndex1 = loadIndex(dir1)) { + try(QueryableIndex queryableIndex2 = loadIndex(dir2)) { + validateTwoSegments( + new QueryableIndexIndexableAdapter(queryableIndex1), + new QueryableIndexIndexableAdapter(queryableIndex2) + ); + } + } } public static void validateTwoSegments(final IndexableAdapter adapter1, final IndexableAdapter adapter2) diff --git a/processing/src/main/java/io/druid/segment/IndexMaker.java b/processing/src/main/java/io/druid/segment/IndexMaker.java index 06a525039c7..069a74e2d5f 100644 --- a/processing/src/main/java/io/druid/segment/IndexMaker.java +++ b/processing/src/main/java/io/druid/segment/IndexMaker.java @@ -333,24 +333,26 @@ public class IndexMaker final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress ) throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir)); - return makeIndexFiles( - ImmutableList.of(adapter), - outDir, - progress, - Lists.newArrayList(adapter.getDimensionNames()), - Lists.newArrayList(adapter.getMetricNames()), - new Function>, Iterable>() - { - @Nullable - @Override - public Iterable apply(ArrayList> input) + try (QueryableIndex index = IndexIO.loadIndex(inDir)) { + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); + return makeIndexFiles( + ImmutableList.of(adapter), + outDir, + progress, + Lists.newArrayList(adapter.getDimensionNames()), + Lists.newArrayList(adapter.getMetricNames()), + new Function>, Iterable>() { - return input.get(0); - } - }, - indexSpec - ); + @Nullable + @Override + public Iterable apply(ArrayList> input) + { + return input.get(0); + } + }, + indexSpec + ); + } } @@ -842,6 +844,7 @@ public class IndexMaker { private final List delegate; private final boolean delegateHasNullAtZero; + NullsAtZeroConvertingIntList(List delegate, final boolean delegateHasNullAtZero) { this.delegate = delegate; @@ -961,7 +964,10 @@ public class IndexMaker if (input == null) { return VSizeIndexedInts.fromList(ImmutableList.of(0), dictionarySize); } else { - return VSizeIndexedInts.fromList(new NullsAtZeroConvertingIntList(input, false), dictionarySize); + return VSizeIndexedInts.fromList( + new NullsAtZeroConvertingIntList(input, false), + dictionarySize + ); } } } diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index c645b146eee..26eb29f60f5 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Objects; import com.google.common.base.Splitter; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; @@ -337,6 +336,38 @@ public class IndexMerger return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec); } + // Faster than IndexMaker + public static File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException + { + return convert(inDir, outDir, indexSpec, new BaseProgressIndicator()); + } + + public static File convert( + final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress + ) throws IOException + { + try (QueryableIndex index = IndexIO.loadIndex(inDir)) { + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); + return makeIndexFiles( + ImmutableList.of(adapter), + outDir, + progress, + Lists.newArrayList(adapter.getDimensionNames()), + Lists.newArrayList(adapter.getMetricNames()), + new Function>, Iterable>() + { + @Nullable + @Override + public Iterable apply(ArrayList> input) + { + return input.get(0); + } + }, + indexSpec + ); + } + } + public static File append( List indexes, File outDir, IndexSpec indexSpec ) throws IOException diff --git a/processing/src/main/java/io/druid/segment/QueryableIndex.java b/processing/src/main/java/io/druid/segment/QueryableIndex.java index 82d96bc2322..1d05dddf621 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndex.java @@ -21,11 +21,12 @@ import com.metamx.collections.bitmap.BitmapFactory; import io.druid.segment.data.Indexed; import org.joda.time.Interval; +import java.io.Closeable; import java.io.IOException; /** */ -public interface QueryableIndex extends ColumnSelector +public interface QueryableIndex extends ColumnSelector, Closeable { public Interval getDataInterval(); public int getNumRows(); @@ -37,6 +38,6 @@ public interface QueryableIndex extends ColumnSelector * The close method shouldn't actually be here as this is nasty. We will adjust it in the future. * @throws java.io.IOException if an exception was thrown closing the index */ - @Deprecated + //@Deprecated // This is still required for SimpleQueryableIndex. It should not go away unitl SimpleQueryableIndex is fixed public void close() throws IOException; } diff --git a/processing/src/test/java/io/druid/segment/CloserRule.java b/processing/src/test/java/io/druid/segment/CloserRule.java new file mode 100644 index 00000000000..4d7340b51af --- /dev/null +++ b/processing/src/test/java/io/druid/segment/CloserRule.java @@ -0,0 +1,103 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import com.metamx.common.logger.Logger; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +public class CloserRule implements TestRule +{ + private final boolean throwException; + + public CloserRule(boolean throwException) + { + this.throwException = throwException; + } + + private static final Logger log = new Logger(CloserRule.class); + private final List autoCloseables = new LinkedList<>(); + + @Override + public Statement apply( + final Statement base, Description description + ) + { + return new Statement() + { + @Override + public void evaluate() throws Throwable + { + Throwable baseThrown = null; + try { + base.evaluate(); + } + catch (Throwable e) { + baseThrown = e; + } + finally { + Throwable exception = null; + for (AutoCloseable autoCloseable : autoCloseables) { + try { + autoCloseable.close(); + } + catch (Exception e) { + exception = suppressOrSet(exception, e); + } + } + autoCloseables.clear(); + if (exception != null) { + if (throwException && baseThrown == null) { + throw exception; + } else if (baseThrown != null) { + baseThrown.addSuppressed(exception); + } else { + log.error(exception, "Exception closing resources"); + } + } + if (baseThrown != null) { + throw baseThrown; + } + } + } + }; + } + + private static Throwable suppressOrSet(Throwable prior, Throwable other) + { + if (prior == null) { + prior = new IOException("Error closing resources"); + } + prior.addSuppressed(other); + return prior; + } + + public T closeLater(T autoCloseable) + { + autoCloseables.add(autoCloseable); + return autoCloseable; + } +} diff --git a/processing/src/test/java/io/druid/segment/CloserRuleTest.java b/processing/src/test/java/io/druid/segment/CloserRuleTest.java new file mode 100644 index 00000000000..f25d0cf7850 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/CloserRuleTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Runnables; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class CloserRuleTest +{ + @Rule + public final CloserRule closer = new CloserRule(true); + @Test + public void testCloses() throws Throwable + { + final CloserRule closer = new CloserRule(false); + final AtomicBoolean closed = new AtomicBoolean(false); + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + closed.set(true); + } + } + ); + run(closer, Runnables.doNothing()); + Assert.assertTrue(closed.get()); + } + + @Test + public void testAutoCloses() throws Throwable + { + final CloserRule closer = new CloserRule(false); + final AtomicBoolean closed = new AtomicBoolean(false); + closer.closeLater( + new AutoCloseable() + { + @Override + public void close() throws Exception + { + closed.set(true); + } + } + ); + run(closer, Runnables.doNothing()); + Assert.assertTrue(closed.get()); + } + + @Test + public void testPreservesException() throws Throwable + { + final CloserRule closer = new CloserRule(false); + final AtomicBoolean closed = new AtomicBoolean(false); + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + closed.set(true); + } + } + ); + + final String msg = "You can't divide by zero, you can only take the limit of such!"; + Exception ex = null; + try { + run( + closer, new Runnable() + { + @Override + public void run() + { + throw new ArithmeticException(msg); + } + } + ); + } + catch (Exception e) { + ex = e; + } + Assert.assertTrue(closed.get()); + Assert.assertNotNull(ex); + Assert.assertTrue(ex instanceof ArithmeticException); + Assert.assertEquals(msg, ex.getMessage()); + } + + + @Test + public void testAddsSuppressed() throws Throwable + { + final CloserRule closer = new CloserRule(false); + final AtomicBoolean closed = new AtomicBoolean(false); + final String ioExceptionMsg = "You can't triple stamp a double stamp!"; + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + throw new IOException(ioExceptionMsg); + } + } + ); + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + closed.set(true); + } + } + ); + + final String msg = "You can't divide by zero, you can only take the limit of such!"; + Throwable ex = null; + try { + run( + closer, new Runnable() + { + @Override + public void run() + { + throw new ArithmeticException(msg); + } + } + ); + } + catch (Throwable e) { + ex = e; + } + Assert.assertTrue(closed.get()); + Assert.assertNotNull(ex); + Assert.assertTrue(ex instanceof ArithmeticException); + Assert.assertEquals(msg, ex.getMessage()); + Assert.assertEquals( + ImmutableList.of(ioExceptionMsg), + Lists.transform( + Arrays.asList(ex.getSuppressed()), + new Function() + { + @Nullable + @Override + public String apply(@Nullable Throwable input) + { + if (input == null) { + return null; + } + return input.getSuppressed()[0].getMessage(); + } + } + ) + ); + } + + @Test + public void testThrowsCloseException() + { + final CloserRule closer = new CloserRule(true); + final String ioExceptionMsg = "You can't triple stamp a double stamp!"; + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + throw new IOException(ioExceptionMsg); + } + } + ); + Throwable ex = null; + try { + run(closer, Runnables.doNothing()); + } + catch (Throwable throwable) { + ex = throwable; + } + Assert.assertNotNull(ex); + ex = ex.getSuppressed()[0]; + Assert.assertNotNull(ex); + Assert.assertTrue(ex instanceof IOException); + Assert.assertEquals(ioExceptionMsg, ex.getMessage()); + } + + + @Test + public void testJustLogs() throws Throwable + { + final CloserRule closer = new CloserRule(false); + final String ioExceptionMsg = "You can't triple stamp a double stamp!"; + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + throw new IOException(ioExceptionMsg); + } + } + ); + run(closer, Runnables.doNothing()); + } + + @Test + public void testJustLogsAnything() throws Throwable + { + final CloserRule closer = new CloserRule(false); + final String ioExceptionMsg = "You can't triple stamp a double stamp!"; + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + throw new IOException(ioExceptionMsg); + } + } + ); + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + throw new IOException(ioExceptionMsg); + } + } + ); + closer.closeLater( + new AutoCloseable() + { + @Override + public void close() throws Exception + { + throw new IOException(ioExceptionMsg); + } + } + ); + run(closer, Runnables.doNothing()); + } + + @Test + public void testClosesEverything() + { + final AtomicLong counter = new AtomicLong(0L); + final CloserRule closer = new CloserRule(true); + final String ioExceptionMsg = "You can't triple stamp a double stamp!"; + final List ioExceptions = Arrays.asList( + new IOException(ioExceptionMsg), + null, + new IOException(ioExceptionMsg), + null, + new IOException(ioExceptionMsg), + null + ); + for(final IOException throwable : ioExceptions){ + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + counter.incrementAndGet(); + if(throwable != null){ + throw throwable; + } + } + } + ); + } + for(final IOException throwable : ioExceptions){ + closer.closeLater( + new AutoCloseable() + { + @Override + public void close() throws Exception + { + counter.incrementAndGet(); + if(throwable != null){ + throw throwable; + } + } + } + ); + } + Throwable ex = null; + try { + run(closer, Runnables.doNothing()); + }catch (Throwable throwable) { + ex = throwable; + } + Assert.assertNotNull(ex); + Assert.assertEquals(ioExceptions.size() * 2, counter.get()); + Assert.assertEquals(ioExceptions.size(), ex.getSuppressed().length); + } + + @Ignore // This one doesn't quite work right, it will throw the IOException, but JUnit doesn't detect it properly and treats it as suppressed instead + @Test(expected = IOException.class) + public void testCloserException() + { + closer.closeLater( + new Closeable() + { + @Override + public void close() throws IOException + { + throw new IOException("This is a test"); + } + } + ); + } + + private void run(CloserRule closer, final Runnable runnable) throws Throwable + { + closer.apply( + new Statement() + { + @Override + public void evaluate() throws Throwable + { + runnable.run(); + } + }, Description.createTestDescription( + CloserRuleTest.class.getCanonicalName(), "baseRunner", UUID.randomUUID() + ) + ).evaluate(); + } +} diff --git a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java new file mode 100644 index 00000000000..4dec2230e17 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java @@ -0,0 +1,554 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.druid.data.input.MapBasedInputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.segment.column.Column; +import io.druid.segment.column.SimpleDictionaryEncodedColumn; +import io.druid.segment.data.BitmapSerdeFactory; +import io.druid.segment.data.CompressedObjectStrategy; +import io.druid.segment.data.ConciseBitmapSerdeFactory; +import io.druid.segment.data.IncrementalIndexTest; +import io.druid.segment.data.RoaringBitmapSerdeFactory; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexAdapter; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.io.File; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +@RunWith(Parameterized.class) +public class IndexMakerParameterizedTest +{ + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public final CloserRule closer = new CloserRule(false); + + @Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}") + public static Collection data() + { + return Collections2.transform( + Sets.cartesianProduct( + ImmutableList.of( + ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()), + ImmutableSet.of( + CompressedObjectStrategy.CompressionStrategy.LZ4, + CompressedObjectStrategy.CompressionStrategy.LZF + ), + ImmutableSet.of( + CompressedObjectStrategy.CompressionStrategy.LZ4, + CompressedObjectStrategy.CompressionStrategy.LZF + ) + ) + ), new Function, Object[]>() + { + @Nullable + @Override + public Object[] apply(List input) + { + return input.toArray(); + } + } + ); + } + + static IndexSpec makeIndexSpec( + BitmapSerdeFactory bitmapSerdeFactory, + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy + ) + { + if (bitmapSerdeFactory != null || compressionStrategy != null) { + return new IndexSpec( + bitmapSerdeFactory, + compressionStrategy.name().toLowerCase(), + dimCompressionStrategy.name().toLowerCase() + ); + } else { + return new IndexSpec(); + } + } + + private final IndexSpec indexSpec; + + public IndexMakerParameterizedTest( + BitmapSerdeFactory bitmapSerdeFactory, + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy + ) + { + this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy); + } + + @Test + public void testPersist() throws Exception + { + final long timestamp = System.currentTimeMillis(); + + IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist); + + final File tempDir = temporaryFolder.newFolder(); + QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir, indexSpec))); + + Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); + Assert.assertEquals(3, index.getColumnNames().size()); + + assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); + } + + @Test + public void testPersistMerge() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + IncrementalIndex toPersist2 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + 1000 + ); + + toPersist2.add( + new MapBasedInputRow( + timestamp, + Arrays.asList("dim1", "dim2"), + ImmutableMap.of("dim1", "1", "dim2", "2") + ) + ); + + toPersist2.add( + new MapBasedInputRow( + timestamp, + Arrays.asList("dim1", "dim2"), + ImmutableMap.of("dim1", "5", "dim2", "6") + ) + ); + + final File tempDir1 = temporaryFolder.newFolder(); + final File tempDir2 = temporaryFolder.newFolder(); + final File mergedDir = temporaryFolder.newFolder(); + + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2, indexSpec))); + + Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); + Assert.assertEquals(3, index2.getColumnNames().size()); + + QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.mergeQueryableIndex( + Arrays.asList(index1, index2), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) + ) + ); + + Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); + Assert.assertEquals(3, merged.getColumnNames().size()); + assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + } + + @Test + public void testPersistEmptyColumn() throws Exception + { + final IncrementalIndex toPersist1 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{}, + 10 + ); + final IncrementalIndex toPersist2 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{}, + 10 + ); + final File tmpDir1 = temporaryFolder.newFolder(); + final File tmpDir2 = temporaryFolder.newFolder(); + final File tmpDir3 = temporaryFolder.newFolder(); + + toPersist1.add( + new MapBasedInputRow( + 1L, + ImmutableList.of("dim1", "dim2"), + ImmutableMap.of("dim1", ImmutableList.of(), "dim2", "foo") + ) + ); + + toPersist2.add( + new MapBasedInputRow( + 1L, + ImmutableList.of("dim1", "dim2"), + ImmutableMap.of("dim1", ImmutableList.of(), "dim2", "bar") + ) + ); + + final QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist1, + tmpDir1, + indexSpec + ) + ) + ); + final QueryableIndex index2 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist1, + tmpDir2, + indexSpec + ) + ) + ); + final QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec) + ) + ); + + Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions())); + + Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions())); + + Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + } + + @Test + public void testMergeRetainsValues() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File mergedDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + + + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + + QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) + ) + ); + + Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); + Assert.assertEquals(3, merged.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + } + + + @Test + public void testAppendRetainsValues() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File mergedDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.append( + ImmutableList.of(incrementalAdapter), tempDir1, indexSpec + ) + ) + ); + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + + QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) + ) + ); + + Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); + Assert.assertEquals(3, merged.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + } + + @Test + public void testMergeSpecChange() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File mergedDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + + + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + + IndexSpec newSpec = new IndexSpec( + indexSpec.getBitmapSerdeFactory(), + "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", + "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" + ); + + + QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + newSpec + ) + ) + ); + + Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); + Assert.assertEquals(3, merged.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(merged, newSpec.getDimensionCompressionStrategy()); + } + + + @Test + public void testConvertSame() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File convertDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + + + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + + QueryableIndex converted = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.convert( + tempDir1, + convertDir, + indexSpec + ) + ) + ); + + Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); + Assert.assertEquals(3, converted.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, convertDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(converted, indexSpec.getDimensionCompressionStrategy()); + } + + + @Test + public void testConvertDifferent() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File convertDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + + + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(3, index1.getColumnNames().size()); + + + IndexSpec newSpec = new IndexSpec( + indexSpec.getBitmapSerdeFactory(), + "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", + "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" + ); + + QueryableIndex converted = closer.closeLater(IndexIO.loadIndex(IndexMaker.convert(tempDir1, convertDir, newSpec))); + + Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); + Assert.assertEquals(3, converted.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, convertDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(converted, newSpec.getDimensionCompressionStrategy()); + } + + private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) + throws Exception + { + // Java voodoo + + Object encodedColumn = index.getColumn("dim2").getDictionaryEncoding(); + Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column"); + field.setAccessible(true); + + Object obj = field.get(encodedColumn); + Field compressedSupplierField = obj.getClass().getDeclaredField("this$0"); + compressedSupplierField.setAccessible(true); + + Object supplier = compressedSupplierField.get(obj); + Field compressionField = supplier.getClass().getDeclaredField("compression"); + compressionField.setAccessible(true); + + Object strategy = compressionField.get(supplier); + + Assert.assertEquals(expectedStrategy, strategy); + } +} diff --git a/processing/src/test/java/io/druid/segment/IndexMakerTest.java b/processing/src/test/java/io/druid/segment/IndexMakerTest.java index ba131d92132..5da3098f105 100644 --- a/processing/src/test/java/io/druid/segment/IndexMakerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMakerTest.java @@ -40,6 +40,7 @@ import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -58,6 +59,8 @@ import java.util.Map; @RunWith(Parameterized.class) public class IndexMakerTest { + @Rule + public final CloserRule closer = new CloserRule(false); private static final long TIMESTAMP = DateTime.parse("2014-01-01").getMillis(); private static final AggregatorFactory[] DEFAULT_AGG_FACTORIES = new AggregatorFactory[]{ new CountAggregatorFactory( @@ -179,7 +182,7 @@ public class IndexMakerTest @Test public void testSimpleReprocess() throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir)); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir))); Assert.assertEquals(events.size(), adapter.getNumRows()); reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed")); } @@ -198,7 +201,7 @@ public class IndexMakerTest private File appendAndValidate(File inDir, File tmpDir) throws IOException { final File outDir = IndexMerger.append( - ImmutableList.of(new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir))), + ImmutableList.of(new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(inDir)))), tmpDir, INDEX_SPEC ); @@ -209,18 +212,18 @@ public class IndexMakerTest @Test public void testIdempotentReprocess() throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir)); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir))); Assert.assertEquals(events.size(), adapter.getNumRows()); final File tmpDir1 = new File(tmpDir, "reprocessed1"); reprocessAndValidate(persistTmpDir, tmpDir1); final File tmpDir2 = new File(tmpDir, "reprocessed2"); - final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir1)); + final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(tmpDir1))); Assert.assertEquals(events.size(), adapter2.getNumRows()); reprocessAndValidate(tmpDir1, tmpDir2); final File tmpDir3 = new File(tmpDir, "reprocessed3"); - final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir2)); + final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(tmpDir2))); Assert.assertEquals(events.size(), adapter3.getNumRows()); reprocessAndValidate(tmpDir2, tmpDir3); } @@ -228,7 +231,7 @@ public class IndexMakerTest @Test public void testSimpleAppend() throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir)); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir))); Assert.assertEquals(events.size(), adapter.getNumRows()); appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed")); } @@ -236,18 +239,18 @@ public class IndexMakerTest @Test public void testIdempotentAppend() throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir)); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir))); Assert.assertEquals(events.size(), adapter.getNumRows()); final File tmpDir1 = new File(tmpDir, "reprocessed1"); appendAndValidate(persistTmpDir, tmpDir1); final File tmpDir2 = new File(tmpDir, "reprocessed2"); - final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir1)); + final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(tmpDir1))); Assert.assertEquals(events.size(), adapter2.getNumRows()); appendAndValidate(tmpDir1, tmpDir2); final File tmpDir3 = new File(tmpDir, "reprocessed3"); - final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir2)); + final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(tmpDir2))); Assert.assertEquals(events.size(), adapter3.getNumRows()); appendAndValidate(tmpDir2, tmpDir3); } diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 07a5b0e4692..4b246187dcb 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -28,6 +28,7 @@ import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.column.Column; import io.druid.segment.column.SimpleDictionaryEncodedColumn; import io.druid.segment.data.BitmapSerdeFactory; @@ -104,6 +105,8 @@ public class IndexMergerTest } private final IndexSpec indexSpec; + @Rule + public final CloserRule closer = new CloserRule(false); public IndexMergerTest( BitmapSerdeFactory bitmapSerdeFactory, @@ -123,7 +126,7 @@ public class IndexMergerTest IncrementalIndexTest.populateIndex(timestamp, toPersist); final File tempDir = temporaryFolder.newFolder(); - QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec)); + QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec))); Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); @@ -166,24 +169,26 @@ public class IndexMergerTest final File tempDir2 = temporaryFolder.newFolder(); final File mergedDir = temporaryFolder.newFolder(); - QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)); + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))); Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(3, index1.getColumnNames().size()); - QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec)); + QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec))); Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); Assert.assertEquals(3, index2.getColumnNames().size()); - QueryableIndex merged = IndexIO.loadIndex( - IndexMerger.mergeQueryableIndex( - Arrays.asList(index1, index2), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - indexSpec + QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + Arrays.asList(index1, index2), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) ) ); @@ -230,10 +235,33 @@ public class IndexMergerTest ) ); - final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1, indexSpec)); - final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2, indexSpec)); - final QueryableIndex merged = IndexIO.loadIndex( - IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec) + final QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist1, + tmpDir1, + indexSpec + ) + ) + ); + final QueryableIndex index2 = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist1, + tmpDir2, + indexSpec + ) + ) + ); + final QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + Arrays.asList(index1, index2), + new AggregatorFactory[]{}, + tmpDir3, + indexSpec + ) + ) ); Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); @@ -266,7 +294,7 @@ public class IndexMergerTest .getBitmapFactory() ); - QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)); + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -278,12 +306,14 @@ public class IndexMergerTest Assert.assertEquals(3, index1.getColumnNames().size()); - QueryableIndex merged = IndexIO.loadIndex( - IndexMerger.mergeQueryableIndex( - ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - indexSpec + QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) ) ); @@ -314,9 +344,11 @@ public class IndexMergerTest .getBitmapFactory() ); - QueryableIndex index1 = IndexIO.loadIndex( - IndexMerger.append( - ImmutableList.of(incrementalAdapter), tempDir1, indexSpec + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.append( + ImmutableList.of(incrementalAdapter), tempDir1, indexSpec + ) ) ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -328,12 +360,14 @@ public class IndexMergerTest Assert.assertEquals(3, index1.getColumnNames().size()); - QueryableIndex merged = IndexIO.loadIndex( - IndexMerger.mergeQueryableIndex( - ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - indexSpec + QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + indexSpec + ) ) ); @@ -363,7 +397,7 @@ public class IndexMergerTest .getBitmapFactory() ); - QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)); + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -375,15 +409,21 @@ public class IndexMergerTest Assert.assertEquals(3, index1.getColumnNames().size()); - IndexSpec newSpec = new IndexSpec(indexSpec.getBitmapSerdeFactory(), "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"); + IndexSpec newSpec = new IndexSpec( + indexSpec.getBitmapSerdeFactory(), + "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", + "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" + ); - QueryableIndex merged = IndexIO.loadIndex( - IndexMerger.mergeQueryableIndex( - ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - mergedDir, - newSpec + QueryableIndex merged = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + ImmutableList.of(index1), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedDir, + newSpec + ) ) ); @@ -397,7 +437,122 @@ public class IndexMergerTest assertDimCompression(merged, newSpec.getDimensionCompressionStrategy()); } - private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) throws Exception + + @Test + public void testConvertSame() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex( + true, + new AggregatorFactory[]{ + new LongSumAggregatorFactory( + "longSum1", + "dim1" + ), + new LongSumAggregatorFactory("longSum2", "dim2") + } + ); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File convertDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)) + ); + + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(4, index1.getColumnNames().size()); + + + QueryableIndex converted = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.convert( + tempDir1, + convertDir, + indexSpec + ) + ) + ); + + Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); + Assert.assertEquals(4, converted.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, convertDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(converted, indexSpec.getDimensionCompressionStrategy()); + } + + + @Test + public void testConvertDifferent() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex( + true, new AggregatorFactory[]{ + new LongSumAggregatorFactory( + "longSum1", + "dim1" + ), + new LongSumAggregatorFactory("longSum2", "dim2") + } + ); + IncrementalIndexTest.populateIndex(timestamp, toPersist1); + + final File tempDir1 = temporaryFolder.newFolder(); + final File convertDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist1.getInterval(), + toPersist1, + indexSpec.getBitmapSerdeFactory() + .getBitmapFactory() + ); + + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))); + + + final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter); + + Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); + Assert.assertEquals(4, index1.getColumnNames().size()); + + + IndexSpec newSpec = new IndexSpec( + indexSpec.getBitmapSerdeFactory(), + "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", + "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" + ); + + QueryableIndex converted = closer.closeLater(IndexIO.loadIndex(IndexMerger.convert(tempDir1, convertDir, newSpec))); + + Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); + Assert.assertEquals(4, converted.getColumnNames().size()); + + IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, convertDir); + + assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); + assertDimCompression(converted, newSpec.getDimensionCompressionStrategy()); + } + + private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) + throws Exception { // Java voodoo