mirror of https://github.com/apache/druid.git
Improvements around resource handling in IndexMerger / IndexIO / QueryableIndex
* Fix resource leak in `io.druid.segment.IndexIO.DefaultIndexIOHandler#validateTwoSegments(java.io.File, java.io.File)` * Un-deprecate `close()` in `QueryableIndex` and make it inherit `Closeable` * Fix resource leaks in various unit tests * Add `CloserRule` for closing out resources
This commit is contained in:
parent
069fc25528
commit
ed8eb5c991
|
@ -466,11 +466,15 @@ public class IndexIO
|
|||
|
||||
public static void validateTwoSegments(File dir1, File dir2) throws IOException
|
||||
{
|
||||
try(QueryableIndex queryableIndex1 = loadIndex(dir1)) {
|
||||
try(QueryableIndex queryableIndex2 = loadIndex(dir2)) {
|
||||
validateTwoSegments(
|
||||
new QueryableIndexIndexableAdapter(loadIndex(dir1)),
|
||||
new QueryableIndexIndexableAdapter(loadIndex(dir2))
|
||||
new QueryableIndexIndexableAdapter(queryableIndex1),
|
||||
new QueryableIndexIndexableAdapter(queryableIndex2)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void validateTwoSegments(final IndexableAdapter adapter1, final IndexableAdapter adapter2)
|
||||
{
|
||||
|
|
|
@ -333,7 +333,8 @@ public class IndexMaker
|
|||
final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress
|
||||
) throws IOException
|
||||
{
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir));
|
||||
try (QueryableIndex index = IndexIO.loadIndex(inDir)) {
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
|
||||
return makeIndexFiles(
|
||||
ImmutableList.of(adapter),
|
||||
outDir,
|
||||
|
@ -352,6 +353,7 @@ public class IndexMaker
|
|||
indexSpec
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static File append(
|
||||
|
@ -842,6 +844,7 @@ public class IndexMaker
|
|||
{
|
||||
private final List<Integer> delegate;
|
||||
private final boolean delegateHasNullAtZero;
|
||||
|
||||
NullsAtZeroConvertingIntList(List<Integer> delegate, final boolean delegateHasNullAtZero)
|
||||
{
|
||||
this.delegate = delegate;
|
||||
|
@ -961,7 +964,10 @@ public class IndexMaker
|
|||
if (input == null) {
|
||||
return VSizeIndexedInts.fromList(ImmutableList.<Integer>of(0), dictionarySize);
|
||||
} else {
|
||||
return VSizeIndexedInts.fromList(new NullsAtZeroConvertingIntList(input, false), dictionarySize);
|
||||
return VSizeIndexedInts.fromList(
|
||||
new NullsAtZeroConvertingIntList(input, false),
|
||||
dictionarySize
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Iterators;
|
||||
|
@ -337,6 +336,38 @@ public class IndexMerger
|
|||
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
|
||||
}
|
||||
|
||||
// Faster than IndexMaker
|
||||
public static File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException
|
||||
{
|
||||
return convert(inDir, outDir, indexSpec, new BaseProgressIndicator());
|
||||
}
|
||||
|
||||
public static File convert(
|
||||
final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress
|
||||
) throws IOException
|
||||
{
|
||||
try (QueryableIndex index = IndexIO.loadIndex(inDir)) {
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
|
||||
return makeIndexFiles(
|
||||
ImmutableList.of(adapter),
|
||||
outDir,
|
||||
progress,
|
||||
Lists.newArrayList(adapter.getDimensionNames()),
|
||||
Lists.newArrayList(adapter.getMetricNames()),
|
||||
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Iterable<Rowboat> apply(ArrayList<Iterable<Rowboat>> input)
|
||||
{
|
||||
return input.get(0);
|
||||
}
|
||||
},
|
||||
indexSpec
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public static File append(
|
||||
List<IndexableAdapter> indexes, File outDir, IndexSpec indexSpec
|
||||
) throws IOException
|
||||
|
|
|
@ -21,11 +21,12 @@ import com.metamx.collections.bitmap.BitmapFactory;
|
|||
import io.druid.segment.data.Indexed;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface QueryableIndex extends ColumnSelector
|
||||
public interface QueryableIndex extends ColumnSelector, Closeable
|
||||
{
|
||||
public Interval getDataInterval();
|
||||
public int getNumRows();
|
||||
|
@ -37,6 +38,6 @@ public interface QueryableIndex extends ColumnSelector
|
|||
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.
|
||||
* @throws java.io.IOException if an exception was thrown closing the index
|
||||
*/
|
||||
@Deprecated
|
||||
//@Deprecated // This is still required for SimpleQueryableIndex. It should not go away unitl SimpleQueryableIndex is fixed
|
||||
public void close() throws IOException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.metamx.common.logger.Logger;
|
||||
import org.junit.rules.TestRule;
|
||||
import org.junit.runner.Description;
|
||||
import org.junit.runners.model.Statement;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
public class CloserRule implements TestRule
|
||||
{
|
||||
private final boolean throwException;
|
||||
|
||||
public CloserRule(boolean throwException)
|
||||
{
|
||||
this.throwException = throwException;
|
||||
}
|
||||
|
||||
private static final Logger log = new Logger(CloserRule.class);
|
||||
private final List<AutoCloseable> autoCloseables = new LinkedList<>();
|
||||
|
||||
@Override
|
||||
public Statement apply(
|
||||
final Statement base, Description description
|
||||
)
|
||||
{
|
||||
return new Statement()
|
||||
{
|
||||
@Override
|
||||
public void evaluate() throws Throwable
|
||||
{
|
||||
Throwable baseThrown = null;
|
||||
try {
|
||||
base.evaluate();
|
||||
}
|
||||
catch (Throwable e) {
|
||||
baseThrown = e;
|
||||
}
|
||||
finally {
|
||||
Throwable exception = null;
|
||||
for (AutoCloseable autoCloseable : autoCloseables) {
|
||||
try {
|
||||
autoCloseable.close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
exception = suppressOrSet(exception, e);
|
||||
}
|
||||
}
|
||||
autoCloseables.clear();
|
||||
if (exception != null) {
|
||||
if (throwException && baseThrown == null) {
|
||||
throw exception;
|
||||
} else if (baseThrown != null) {
|
||||
baseThrown.addSuppressed(exception);
|
||||
} else {
|
||||
log.error(exception, "Exception closing resources");
|
||||
}
|
||||
}
|
||||
if (baseThrown != null) {
|
||||
throw baseThrown;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static Throwable suppressOrSet(Throwable prior, Throwable other)
|
||||
{
|
||||
if (prior == null) {
|
||||
prior = new IOException("Error closing resources");
|
||||
}
|
||||
prior.addSuppressed(other);
|
||||
return prior;
|
||||
}
|
||||
|
||||
public <T extends AutoCloseable> T closeLater(T autoCloseable)
|
||||
{
|
||||
autoCloseables.add(autoCloseable);
|
||||
return autoCloseable;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,364 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Runnables;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.Description;
|
||||
import org.junit.runners.model.Statement;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class CloserRuleTest
|
||||
{
|
||||
@Rule
|
||||
public final CloserRule closer = new CloserRule(true);
|
||||
@Test
|
||||
public void testCloses() throws Throwable
|
||||
{
|
||||
final CloserRule closer = new CloserRule(false);
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
closed.set(true);
|
||||
}
|
||||
}
|
||||
);
|
||||
run(closer, Runnables.doNothing());
|
||||
Assert.assertTrue(closed.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCloses() throws Throwable
|
||||
{
|
||||
final CloserRule closer = new CloserRule(false);
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
closer.closeLater(
|
||||
new AutoCloseable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws Exception
|
||||
{
|
||||
closed.set(true);
|
||||
}
|
||||
}
|
||||
);
|
||||
run(closer, Runnables.doNothing());
|
||||
Assert.assertTrue(closed.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreservesException() throws Throwable
|
||||
{
|
||||
final CloserRule closer = new CloserRule(false);
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
closed.set(true);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
final String msg = "You can't divide by zero, you can only take the limit of such!";
|
||||
Exception ex = null;
|
||||
try {
|
||||
run(
|
||||
closer, new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
throw new ArithmeticException(msg);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
ex = e;
|
||||
}
|
||||
Assert.assertTrue(closed.get());
|
||||
Assert.assertNotNull(ex);
|
||||
Assert.assertTrue(ex instanceof ArithmeticException);
|
||||
Assert.assertEquals(msg, ex.getMessage());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAddsSuppressed() throws Throwable
|
||||
{
|
||||
final CloserRule closer = new CloserRule(false);
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
final String ioExceptionMsg = "You can't triple stamp a double stamp!";
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
throw new IOException(ioExceptionMsg);
|
||||
}
|
||||
}
|
||||
);
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
closed.set(true);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
final String msg = "You can't divide by zero, you can only take the limit of such!";
|
||||
Throwable ex = null;
|
||||
try {
|
||||
run(
|
||||
closer, new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
throw new ArithmeticException(msg);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Throwable e) {
|
||||
ex = e;
|
||||
}
|
||||
Assert.assertTrue(closed.get());
|
||||
Assert.assertNotNull(ex);
|
||||
Assert.assertTrue(ex instanceof ArithmeticException);
|
||||
Assert.assertEquals(msg, ex.getMessage());
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(ioExceptionMsg),
|
||||
Lists.transform(
|
||||
Arrays.asList(ex.getSuppressed()),
|
||||
new Function<Throwable, String>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public String apply(@Nullable Throwable input)
|
||||
{
|
||||
if (input == null) {
|
||||
return null;
|
||||
}
|
||||
return input.getSuppressed()[0].getMessage();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThrowsCloseException()
|
||||
{
|
||||
final CloserRule closer = new CloserRule(true);
|
||||
final String ioExceptionMsg = "You can't triple stamp a double stamp!";
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
throw new IOException(ioExceptionMsg);
|
||||
}
|
||||
}
|
||||
);
|
||||
Throwable ex = null;
|
||||
try {
|
||||
run(closer, Runnables.doNothing());
|
||||
}
|
||||
catch (Throwable throwable) {
|
||||
ex = throwable;
|
||||
}
|
||||
Assert.assertNotNull(ex);
|
||||
ex = ex.getSuppressed()[0];
|
||||
Assert.assertNotNull(ex);
|
||||
Assert.assertTrue(ex instanceof IOException);
|
||||
Assert.assertEquals(ioExceptionMsg, ex.getMessage());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testJustLogs() throws Throwable
|
||||
{
|
||||
final CloserRule closer = new CloserRule(false);
|
||||
final String ioExceptionMsg = "You can't triple stamp a double stamp!";
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
throw new IOException(ioExceptionMsg);
|
||||
}
|
||||
}
|
||||
);
|
||||
run(closer, Runnables.doNothing());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJustLogsAnything() throws Throwable
|
||||
{
|
||||
final CloserRule closer = new CloserRule(false);
|
||||
final String ioExceptionMsg = "You can't triple stamp a double stamp!";
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
throw new IOException(ioExceptionMsg);
|
||||
}
|
||||
}
|
||||
);
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
throw new IOException(ioExceptionMsg);
|
||||
}
|
||||
}
|
||||
);
|
||||
closer.closeLater(
|
||||
new AutoCloseable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws Exception
|
||||
{
|
||||
throw new IOException(ioExceptionMsg);
|
||||
}
|
||||
}
|
||||
);
|
||||
run(closer, Runnables.doNothing());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosesEverything()
|
||||
{
|
||||
final AtomicLong counter = new AtomicLong(0L);
|
||||
final CloserRule closer = new CloserRule(true);
|
||||
final String ioExceptionMsg = "You can't triple stamp a double stamp!";
|
||||
final List<IOException> ioExceptions = Arrays.<IOException>asList(
|
||||
new IOException(ioExceptionMsg),
|
||||
null,
|
||||
new IOException(ioExceptionMsg),
|
||||
null,
|
||||
new IOException(ioExceptionMsg),
|
||||
null
|
||||
);
|
||||
for(final IOException throwable : ioExceptions){
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
counter.incrementAndGet();
|
||||
if(throwable != null){
|
||||
throw throwable;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
for(final IOException throwable : ioExceptions){
|
||||
closer.closeLater(
|
||||
new AutoCloseable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws Exception
|
||||
{
|
||||
counter.incrementAndGet();
|
||||
if(throwable != null){
|
||||
throw throwable;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
Throwable ex = null;
|
||||
try {
|
||||
run(closer, Runnables.doNothing());
|
||||
}catch (Throwable throwable) {
|
||||
ex = throwable;
|
||||
}
|
||||
Assert.assertNotNull(ex);
|
||||
Assert.assertEquals(ioExceptions.size() * 2, counter.get());
|
||||
Assert.assertEquals(ioExceptions.size(), ex.getSuppressed().length);
|
||||
}
|
||||
|
||||
@Ignore // This one doesn't quite work right, it will throw the IOException, but JUnit doesn't detect it properly and treats it as suppressed instead
|
||||
@Test(expected = IOException.class)
|
||||
public void testCloserException()
|
||||
{
|
||||
closer.closeLater(
|
||||
new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
throw new IOException("This is a test");
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void run(CloserRule closer, final Runnable runnable) throws Throwable
|
||||
{
|
||||
closer.apply(
|
||||
new Statement()
|
||||
{
|
||||
@Override
|
||||
public void evaluate() throws Throwable
|
||||
{
|
||||
runnable.run();
|
||||
}
|
||||
}, Description.createTestDescription(
|
||||
CloserRuleTest.class.getCanonicalName(), "baseRunner", UUID.randomUUID()
|
||||
)
|
||||
).evaluate();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,554 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
|
||||
import io.druid.segment.data.BitmapSerdeFactory;
|
||||
import io.druid.segment.data.CompressedObjectStrategy;
|
||||
import io.druid.segment.data.ConciseBitmapSerdeFactory;
|
||||
import io.druid.segment.data.IncrementalIndexTest;
|
||||
import io.druid.segment.data.RoaringBitmapSerdeFactory;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexAdapter;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class IndexMakerParameterizedTest
|
||||
{
|
||||
|
||||
@Rule
|
||||
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
@Rule
|
||||
public final CloserRule closer = new CloserRule(false);
|
||||
|
||||
@Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}")
|
||||
public static Collection<Object[]> data()
|
||||
{
|
||||
return Collections2.transform(
|
||||
Sets.cartesianProduct(
|
||||
ImmutableList.of(
|
||||
ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()),
|
||||
ImmutableSet.of(
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4,
|
||||
CompressedObjectStrategy.CompressionStrategy.LZF
|
||||
),
|
||||
ImmutableSet.of(
|
||||
CompressedObjectStrategy.CompressionStrategy.LZ4,
|
||||
CompressedObjectStrategy.CompressionStrategy.LZF
|
||||
)
|
||||
)
|
||||
), new Function<List<?>, Object[]>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Object[] apply(List<?> input)
|
||||
{
|
||||
return input.toArray();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
static IndexSpec makeIndexSpec(
|
||||
BitmapSerdeFactory bitmapSerdeFactory,
|
||||
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
|
||||
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
|
||||
)
|
||||
{
|
||||
if (bitmapSerdeFactory != null || compressionStrategy != null) {
|
||||
return new IndexSpec(
|
||||
bitmapSerdeFactory,
|
||||
compressionStrategy.name().toLowerCase(),
|
||||
dimCompressionStrategy.name().toLowerCase()
|
||||
);
|
||||
} else {
|
||||
return new IndexSpec();
|
||||
}
|
||||
}
|
||||
|
||||
private final IndexSpec indexSpec;
|
||||
|
||||
public IndexMakerParameterizedTest(
|
||||
BitmapSerdeFactory bitmapSerdeFactory,
|
||||
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
|
||||
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
|
||||
)
|
||||
{
|
||||
this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersist() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
|
||||
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist);
|
||||
|
||||
final File tempDir = temporaryFolder.newFolder();
|
||||
QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir, indexSpec)));
|
||||
|
||||
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index.getColumnNames().size());
|
||||
|
||||
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersistMerge() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
1000
|
||||
);
|
||||
|
||||
toPersist2.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Arrays.asList("dim1", "dim2"),
|
||||
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2")
|
||||
)
|
||||
);
|
||||
|
||||
toPersist2.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Arrays.asList("dim1", "dim2"),
|
||||
ImmutableMap.<String, Object>of("dim1", "5", "dim2", "6")
|
||||
)
|
||||
);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File tempDir2 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2, indexSpec)));
|
||||
|
||||
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index2.getColumnNames().size());
|
||||
|
||||
QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.mergeQueryableIndex(
|
||||
Arrays.asList(index1, index2),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, merged.getColumnNames().size());
|
||||
assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersistEmptyColumn() throws Exception
|
||||
{
|
||||
final IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{},
|
||||
10
|
||||
);
|
||||
final IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{},
|
||||
10
|
||||
);
|
||||
final File tmpDir1 = temporaryFolder.newFolder();
|
||||
final File tmpDir2 = temporaryFolder.newFolder();
|
||||
final File tmpDir3 = temporaryFolder.newFolder();
|
||||
|
||||
toPersist1.add(
|
||||
new MapBasedInputRow(
|
||||
1L,
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableMap.<String, Object>of("dim1", ImmutableList.of(), "dim2", "foo")
|
||||
)
|
||||
);
|
||||
|
||||
toPersist2.add(
|
||||
new MapBasedInputRow(
|
||||
1L,
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
ImmutableMap.<String, Object>of("dim1", ImmutableList.of(), "dim2", "bar")
|
||||
)
|
||||
);
|
||||
|
||||
final QueryableIndex index1 = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.persist(
|
||||
toPersist1,
|
||||
tmpDir1,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
final QueryableIndex index2 = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.persist(
|
||||
toPersist1,
|
||||
tmpDir2,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
final QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions()));
|
||||
|
||||
Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions()));
|
||||
|
||||
Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeRetainsValues() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, merged.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAppendRetainsValues() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.append(
|
||||
ImmutableList.<IndexableAdapter>of(incrementalAdapter), tempDir1, indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, merged.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeSpecChange() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
IndexSpec newSpec = new IndexSpec(
|
||||
indexSpec.getBitmapSerdeFactory(),
|
||||
"lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4",
|
||||
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
|
||||
);
|
||||
|
||||
|
||||
QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
newSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, merged.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, mergedDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(merged, newSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testConvertSame() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File convertDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
QueryableIndex converted = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMaker.convert(
|
||||
tempDir1,
|
||||
convertDir,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, converted.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, convertDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(converted, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testConvertDifferent() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File convertDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
IndexSpec newSpec = new IndexSpec(
|
||||
indexSpec.getBitmapSerdeFactory(),
|
||||
"lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4",
|
||||
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
|
||||
);
|
||||
|
||||
QueryableIndex converted = closer.closeLater(IndexIO.loadIndex(IndexMaker.convert(tempDir1, convertDir, newSpec)));
|
||||
|
||||
Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, converted.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, convertDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(converted, newSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy)
|
||||
throws Exception
|
||||
{
|
||||
// Java voodoo
|
||||
|
||||
Object encodedColumn = index.getColumn("dim2").getDictionaryEncoding();
|
||||
Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column");
|
||||
field.setAccessible(true);
|
||||
|
||||
Object obj = field.get(encodedColumn);
|
||||
Field compressedSupplierField = obj.getClass().getDeclaredField("this$0");
|
||||
compressedSupplierField.setAccessible(true);
|
||||
|
||||
Object supplier = compressedSupplierField.get(obj);
|
||||
Field compressionField = supplier.getClass().getDeclaredField("compression");
|
||||
compressionField.setAccessible(true);
|
||||
|
||||
Object strategy = compressionField.get(supplier);
|
||||
|
||||
Assert.assertEquals(expectedStrategy, strategy);
|
||||
}
|
||||
}
|
|
@ -40,6 +40,7 @@ import org.joda.time.DateTime;
|
|||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -58,6 +59,8 @@ import java.util.Map;
|
|||
@RunWith(Parameterized.class)
|
||||
public class IndexMakerTest
|
||||
{
|
||||
@Rule
|
||||
public final CloserRule closer = new CloserRule(false);
|
||||
private static final long TIMESTAMP = DateTime.parse("2014-01-01").getMillis();
|
||||
private static final AggregatorFactory[] DEFAULT_AGG_FACTORIES = new AggregatorFactory[]{
|
||||
new CountAggregatorFactory(
|
||||
|
@ -179,7 +182,7 @@ public class IndexMakerTest
|
|||
@Test
|
||||
public void testSimpleReprocess() throws IOException
|
||||
{
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir)));
|
||||
Assert.assertEquals(events.size(), adapter.getNumRows());
|
||||
reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
|
||||
}
|
||||
|
@ -198,7 +201,7 @@ public class IndexMakerTest
|
|||
private File appendAndValidate(File inDir, File tmpDir) throws IOException
|
||||
{
|
||||
final File outDir = IndexMerger.append(
|
||||
ImmutableList.<IndexableAdapter>of(new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir))),
|
||||
ImmutableList.<IndexableAdapter>of(new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(inDir)))),
|
||||
tmpDir,
|
||||
INDEX_SPEC
|
||||
);
|
||||
|
@ -209,18 +212,18 @@ public class IndexMakerTest
|
|||
@Test
|
||||
public void testIdempotentReprocess() throws IOException
|
||||
{
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir)));
|
||||
Assert.assertEquals(events.size(), adapter.getNumRows());
|
||||
final File tmpDir1 = new File(tmpDir, "reprocessed1");
|
||||
reprocessAndValidate(persistTmpDir, tmpDir1);
|
||||
|
||||
final File tmpDir2 = new File(tmpDir, "reprocessed2");
|
||||
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir1));
|
||||
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(tmpDir1)));
|
||||
Assert.assertEquals(events.size(), adapter2.getNumRows());
|
||||
reprocessAndValidate(tmpDir1, tmpDir2);
|
||||
|
||||
final File tmpDir3 = new File(tmpDir, "reprocessed3");
|
||||
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir2));
|
||||
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(tmpDir2)));
|
||||
Assert.assertEquals(events.size(), adapter3.getNumRows());
|
||||
reprocessAndValidate(tmpDir2, tmpDir3);
|
||||
}
|
||||
|
@ -228,7 +231,7 @@ public class IndexMakerTest
|
|||
@Test
|
||||
public void testSimpleAppend() throws IOException
|
||||
{
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir)));
|
||||
Assert.assertEquals(events.size(), adapter.getNumRows());
|
||||
appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
|
||||
}
|
||||
|
@ -236,18 +239,18 @@ public class IndexMakerTest
|
|||
@Test
|
||||
public void testIdempotentAppend() throws IOException
|
||||
{
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
|
||||
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir)));
|
||||
Assert.assertEquals(events.size(), adapter.getNumRows());
|
||||
final File tmpDir1 = new File(tmpDir, "reprocessed1");
|
||||
appendAndValidate(persistTmpDir, tmpDir1);
|
||||
|
||||
final File tmpDir2 = new File(tmpDir, "reprocessed2");
|
||||
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir1));
|
||||
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(tmpDir1)));
|
||||
Assert.assertEquals(events.size(), adapter2.getNumRows());
|
||||
appendAndValidate(tmpDir1, tmpDir2);
|
||||
|
||||
final File tmpDir3 = new File(tmpDir, "reprocessed3");
|
||||
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir2));
|
||||
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(tmpDir2)));
|
||||
Assert.assertEquals(events.size(), adapter3.getNumRows());
|
||||
appendAndValidate(tmpDir2, tmpDir3);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import io.druid.data.input.MapBasedInputRow;
|
|||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
|
||||
import io.druid.segment.data.BitmapSerdeFactory;
|
||||
|
@ -104,6 +105,8 @@ public class IndexMergerTest
|
|||
}
|
||||
|
||||
private final IndexSpec indexSpec;
|
||||
@Rule
|
||||
public final CloserRule closer = new CloserRule(false);
|
||||
|
||||
public IndexMergerTest(
|
||||
BitmapSerdeFactory bitmapSerdeFactory,
|
||||
|
@ -123,7 +126,7 @@ public class IndexMergerTest
|
|||
IncrementalIndexTest.populateIndex(timestamp, toPersist);
|
||||
|
||||
final File tempDir = temporaryFolder.newFolder();
|
||||
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec));
|
||||
QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec)));
|
||||
|
||||
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
|
||||
|
@ -166,25 +169,27 @@ public class IndexMergerTest
|
|||
final File tempDir2 = temporaryFolder.newFolder();
|
||||
final File mergedDir = temporaryFolder.newFolder();
|
||||
|
||||
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec));
|
||||
QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec)));
|
||||
|
||||
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index2.getColumnNames().size());
|
||||
|
||||
QueryableIndex merged = IndexIO.loadIndex(
|
||||
QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(
|
||||
Arrays.asList(index1, index2),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
|
@ -230,10 +235,33 @@ public class IndexMergerTest
|
|||
)
|
||||
);
|
||||
|
||||
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1, indexSpec));
|
||||
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2, indexSpec));
|
||||
final QueryableIndex merged = IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec)
|
||||
final QueryableIndex index1 = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.persist(
|
||||
toPersist1,
|
||||
tmpDir1,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
final QueryableIndex index2 = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.persist(
|
||||
toPersist1,
|
||||
tmpDir2,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
final QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(
|
||||
Arrays.asList(index1, index2),
|
||||
new AggregatorFactory[]{},
|
||||
tmpDir3,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
|
@ -266,7 +294,7 @@ public class IndexMergerTest
|
|||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
@ -278,13 +306,15 @@ public class IndexMergerTest
|
|||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
QueryableIndex merged = IndexIO.loadIndex(
|
||||
QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
|
@ -314,10 +344,12 @@ public class IndexMergerTest
|
|||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = IndexIO.loadIndex(
|
||||
QueryableIndex index1 = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.append(
|
||||
ImmutableList.<IndexableAdapter>of(incrementalAdapter), tempDir1, indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
|
@ -328,13 +360,15 @@ public class IndexMergerTest
|
|||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
QueryableIndex merged = IndexIO.loadIndex(
|
||||
QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
|
@ -363,7 +397,7 @@ public class IndexMergerTest
|
|||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
@ -375,16 +409,22 @@ public class IndexMergerTest
|
|||
Assert.assertEquals(3, index1.getColumnNames().size());
|
||||
|
||||
|
||||
IndexSpec newSpec = new IndexSpec(indexSpec.getBitmapSerdeFactory(), "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4");
|
||||
IndexSpec newSpec = new IndexSpec(
|
||||
indexSpec.getBitmapSerdeFactory(),
|
||||
"lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4",
|
||||
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
|
||||
);
|
||||
|
||||
|
||||
QueryableIndex merged = IndexIO.loadIndex(
|
||||
QueryableIndex merged = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.mergeQueryableIndex(
|
||||
ImmutableList.of(index1),
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")},
|
||||
mergedDir,
|
||||
newSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
|
@ -397,7 +437,122 @@ public class IndexMergerTest
|
|||
assertDimCompression(merged, newSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) throws Exception
|
||||
|
||||
@Test
|
||||
public void testConvertSame() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(
|
||||
true,
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory(
|
||||
"longSum1",
|
||||
"dim1"
|
||||
),
|
||||
new LongSumAggregatorFactory("longSum2", "dim2")
|
||||
}
|
||||
);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File convertDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(
|
||||
IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))
|
||||
);
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(4, index1.getColumnNames().size());
|
||||
|
||||
|
||||
QueryableIndex converted = closer.closeLater(
|
||||
IndexIO.loadIndex(
|
||||
IndexMerger.convert(
|
||||
tempDir1,
|
||||
convertDir,
|
||||
indexSpec
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions()));
|
||||
Assert.assertEquals(4, converted.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, convertDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(converted, indexSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testConvertDifferent() throws Exception
|
||||
{
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(
|
||||
true, new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory(
|
||||
"longSum1",
|
||||
"dim1"
|
||||
),
|
||||
new LongSumAggregatorFactory("longSum2", "dim2")
|
||||
}
|
||||
);
|
||||
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
|
||||
|
||||
final File tempDir1 = temporaryFolder.newFolder();
|
||||
final File convertDir = temporaryFolder.newFolder();
|
||||
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
|
||||
toPersist1.getInterval(),
|
||||
toPersist1,
|
||||
indexSpec.getBitmapSerdeFactory()
|
||||
.getBitmapFactory()
|
||||
);
|
||||
|
||||
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)));
|
||||
|
||||
|
||||
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(incrementalAdapter, queryableAdapter);
|
||||
|
||||
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
|
||||
Assert.assertEquals(4, index1.getColumnNames().size());
|
||||
|
||||
|
||||
IndexSpec newSpec = new IndexSpec(
|
||||
indexSpec.getBitmapSerdeFactory(),
|
||||
"lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4",
|
||||
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
|
||||
);
|
||||
|
||||
QueryableIndex converted = closer.closeLater(IndexIO.loadIndex(IndexMerger.convert(tempDir1, convertDir, newSpec)));
|
||||
|
||||
Assert.assertEquals(2, converted.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions()));
|
||||
Assert.assertEquals(4, converted.getColumnNames().size());
|
||||
|
||||
IndexIO.DefaultIndexIOHandler.validateTwoSegments(tempDir1, convertDir);
|
||||
|
||||
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
|
||||
assertDimCompression(converted, newSpec.getDimensionCompressionStrategy());
|
||||
}
|
||||
|
||||
private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy)
|
||||
throws Exception
|
||||
{
|
||||
// Java voodoo
|
||||
|
||||
|
|
Loading…
Reference in New Issue