Fix IncrementalIndexAdapter getRows() Iterable

This commit is contained in:
jon-wei 2015-11-12 13:09:57 -08:00
parent ef74cd394c
commit cdceaf2d26
2 changed files with 90 additions and 26 deletions

View File

@ -20,12 +20,11 @@
package io.druid.segment.incremental; package io.druid.segment.incremental;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.collections.bitmap.BitmapFactory; import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.MutableBitmap; import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.segment.IndexableAdapter; import io.druid.segment.IndexableAdapter;
@ -175,9 +174,18 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override @Override
public Iterable<Rowboat> getRows() public Iterable<Rowboat> getRows()
{ {
return FunctionalIterable return new Iterable<Rowboat>()
.create(index.getFacts().entrySet()) {
.transform( @Override
public Iterator<Rowboat> iterator()
{
/*
* Note that the transform function increments a counter to determine the rowNum of
* the iterated Rowboats. We need to return a new iterator on each
* iterator() call to ensure the counter starts at 0.
*/
return (Iterators.transform(
index.getFacts().entrySet().iterator(),
new Function<Map.Entry<IncrementalIndex.TimeAndDims, Integer>, Rowboat>() new Function<Map.Entry<IncrementalIndex.TimeAndDims, Integer>, Rowboat>()
{ {
int count = 0; int count = 0;
@ -225,7 +233,9 @@ public class IncrementalIndexAdapter implements IndexableAdapter
); );
} }
} }
); ));
}
};
} }
@Override @Override

View File

@ -1,37 +1,43 @@
/* /*
* Druid - a distributed column store. * Licensed to Metamarkets Group Inc. (Metamarkets) under one
* Copyright 2012 - 2015 Metamarkets Group Inc. * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* Licensed under the Apache License, Version 2.0 (the "License"); * regarding copyright ownership. Metamarkets licenses this file
* you may not use this file except in compliance with the License. * to you under the Apache License, Version 2.0 (the
* You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0 *
* * http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, * Unless required by applicable law or agreed to in writing,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * software distributed under the License is distributed on an
* See the License for the specific language governing permissions and * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* limitations under the License. * KIND, either express or implied. See the License for the
*/ * specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental; package io.druid.segment.incremental;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.IndexableAdapter; import io.druid.segment.IndexableAdapter;
import io.druid.segment.Rowboat;
import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory; import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.metamx.common.ISE; import com.metamx.common.ISE;
public class IncrementalIndexAdapterTest { import java.util.ArrayList;
import java.util.List;
public class IncrementalIndexAdapterTest
{
private static final IndexSpec INDEX_SPEC = new IndexSpec( private static final IndexSpec INDEX_SPEC = new IndexSpec(
new ConciseBitmapSerdeFactory(), new ConciseBitmapSerdeFactory(),
CompressedObjectStrategy.CompressionStrategy.LZ4.name().toLowerCase(), CompressedObjectStrategy.CompressionStrategy.LZ4.name().toLowerCase(),
@ -45,7 +51,7 @@ public class IncrementalIndexAdapterTest {
IncrementalIndex incrementalIndex = IncrementalIndexTest.createIndex(null); IncrementalIndex incrementalIndex = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, incrementalIndex); IncrementalIndexTest.populateIndex(timestamp, incrementalIndex);
IndexableAdapter adapter = new IncrementalIndexAdapter( IndexableAdapter adapter = new IncrementalIndexAdapter(
incrementalIndex.getInterval(), incrementalIndex.getInterval(),
incrementalIndex, incrementalIndex,
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory() INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
); );
@ -57,7 +63,8 @@ public class IncrementalIndexAdapterTest {
try { try {
bitmapIndexSeeker.seek("01"); bitmapIndexSeeker.seek("01");
Assert.assertFalse("Only support access in order", true); Assert.assertFalse("Only support access in order", true);
} catch(ISE ise) { }
catch (ISE ise) {
Assert.assertTrue("Only support access in order", true); Assert.assertTrue("Only support access in order", true);
} }
IndexedInts indexedInts2 = bitmapIndexSeeker.seek("2"); IndexedInts indexedInts2 = bitmapIndexSeeker.seek("2");
@ -67,4 +74,51 @@ public class IncrementalIndexAdapterTest {
IndexedInts indexedInts4 = bitmapIndexSeeker.seek("4"); IndexedInts indexedInts4 = bitmapIndexSeeker.seek("4");
Assert.assertEquals(0, indexedInts4.size()); Assert.assertEquals(0, indexedInts4.size());
} }
@Test
public void testGetRowsIterable() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist1.getInterval(),
toPersist1,
INDEX_SPEC.getBitmapSerdeFactory()
.getBitmapFactory()
);
Iterable<Rowboat> boats = incrementalAdapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
Assert.assertEquals(2, boatList.size());
Assert.assertEquals(0, boatList.get(0).getRowNum());
Assert.assertEquals(1, boatList.get(1).getRowNum());
/* Iterate through the Iterable a few times, check that boat row numbers are correct afterwards */
boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
Assert.assertEquals(2, boatList.size());
Assert.assertEquals(0, boatList.get(0).getRowNum());
Assert.assertEquals(1, boatList.get(1).getRowNum());
}
} }