Merge pull request #88 from metamx/8to9-empty-column

Additional fix for columns with cardinality 0
This commit is contained in:
cheddar 2013-02-15 10:15:05 -08:00
commit e1533b871e
2 changed files with 49 additions and 18 deletions

View File

@ -21,6 +21,7 @@ package com.metamx.druid.index.v1;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -62,7 +63,6 @@ import com.metamx.druid.kv.VSizeIndexedInts;
import com.metamx.druid.utils.SerializerUtils; import com.metamx.druid.utils.SerializerUtils;
import it.uniroma3.mat.extendedset.intset.ConciseSet; import it.uniroma3.mat.extendedset.intset.ConciseSet;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -369,8 +369,8 @@ public class IndexIO
); );
} }
LinkedHashSet<String> skippedFiles = Sets.newLinkedHashSet(); final LinkedHashSet<String> skippedFiles = Sets.newLinkedHashSet();
Set<String> skippedDimensions = Sets.newLinkedHashSet(); final Set<String> skippedDimensions = Sets.newLinkedHashSet();
for (String filename : v8SmooshedFiles.getInternalFilenames()) { for (String filename : v8SmooshedFiles.getInternalFilenames()) {
log.info("Processing file[%s]", filename); log.info("Processing file[%s]", filename);
if (filename.startsWith("dim_")) { if (filename.startsWith("dim_")) {
@ -570,25 +570,37 @@ public class IndexIO
final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd"); final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd");
indexBuffer.get(); // Skip the version byte indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> dims = GenericIndexed.read( final GenericIndexed<String> dims8 = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy indexBuffer, GenericIndexed.stringStrategy
); );
final GenericIndexed<String> dims9 = GenericIndexed.fromIterable(
Iterables.filter(
dims8, new Predicate<String>()
{
@Override
public boolean apply(String s)
{
return !skippedDimensions.contains(s);
}
}
),
GenericIndexed.stringStrategy
);
final GenericIndexed<String> availableMetrics = GenericIndexed.read( final GenericIndexed<String> availableMetrics = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy indexBuffer, GenericIndexed.stringStrategy
); );
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
Set<String> columns = Sets.newTreeSet(); Set<String> columns = Sets.newTreeSet();
columns.addAll(Lists.newArrayList(dims)); columns.addAll(Lists.newArrayList(dims9));
columns.addAll(Lists.newArrayList(availableMetrics)); columns.addAll(Lists.newArrayList(availableMetrics));
columns.removeAll(skippedDimensions);
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy); GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy);
final int numBytes = cols.getSerializedSize() + dims.getSerializedSize() + 16; final int numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16;
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer); cols.writeToChannel(writer);
dims.writeToChannel(writer); dims9.writeToChannel(writer);
serializerUtils.writeLong(writer, dataInterval.getStartMillis()); serializerUtils.writeLong(writer, dataInterval.getStartMillis());
serializerUtils.writeLong(writer, dataInterval.getEndMillis()); serializerUtils.writeLong(writer, dataInterval.getEndMillis());
writer.close(); writer.close();

View File

@ -33,6 +33,7 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
/** /**
*/ */
@ -116,11 +117,14 @@ public class IndexMergerTest
@Test @Test
public void testPersistEmptyColumn() throws Exception public void testPersistEmptyColumn() throws Exception
{ {
final IncrementalIndex toPersist = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); final IncrementalIndex toPersist1 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
final File tmpDir = Files.createTempDir(); final IncrementalIndex toPersist2 = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
final File tmpDir1 = Files.createTempDir();
final File tmpDir2 = Files.createTempDir();
final File tmpDir3 = Files.createTempDir();
try { try {
toPersist.add( toPersist1.add(
new MapBasedInputRow( new MapBasedInputRow(
1L, 1L,
ImmutableList.of("dim1", "dim2"), ImmutableList.of("dim1", "dim2"),
@ -128,17 +132,32 @@ public class IndexMergerTest
) )
); );
final QueryableIndex merged = IndexIO.loadIndex( toPersist2.add(
IndexMerger.persist(toPersist, tmpDir) new MapBasedInputRow(
1L,
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", ImmutableList.of(), "dim2", "bar")
)
); );
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2));
final QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
);
Assert.assertEquals(1, index1.getTimeColumn().getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions()));
Assert.assertEquals(1, index2.getTimeColumn().getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions()));
Assert.assertEquals(1, merged.getTimeColumn().getLength()); Assert.assertEquals(1, merged.getTimeColumn().getLength());
Assert.assertEquals(ImmutableList.of("dim1", "dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
Assert.assertEquals(null, merged.getColumn("dim1"));
} finally { } finally {
FileUtils.deleteQuietly(tmpDir); FileUtils.deleteQuietly(tmpDir1);
FileUtils.deleteQuietly(tmpDir2);
FileUtils.deleteQuietly(tmpDir3);
} }
} }
} }