From 367c50d4ba5a2b377fdbc98651c3f1cc05303195 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 15 Sep 2015 18:20:21 -0700 Subject: [PATCH] Allow SegmentAnalyzer to read columns from StorageAdapter, allow SegmentMetadataQuery to query IncrementalIndexSegments on realtime node --- .../druid/query/metadata/SegmentAnalyzer.java | 80 ++++++++++++++++++- .../SegmentMetadataQueryQueryToolChest.java | 4 - .../SegmentMetadataQueryRunnerFactory.java | 14 ++-- .../segment/QueryableIndexStorageAdapter.java | 28 +++++-- .../java/io/druid/segment/StorageAdapter.java | 3 + .../IncrementalIndexStorageAdapter.java | 26 +++++- .../query/metadata/SegmentAnalyzerTest.java | 36 ++++++++- 7 files changed, 166 insertions(+), 25 deletions(-) diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java index e9f70d3b7e6..af6d254effd 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java @@ -19,20 +19,26 @@ package io.druid.query.metadata; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Longs; import com.metamx.common.logger.Logger; import com.metamx.common.StringUtils; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.segment.QueryableIndex; +import io.druid.segment.StorageAdapter; import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ComplexColumn; import io.druid.segment.column.ValueType; +import io.druid.segment.data.Indexed; import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetrics; +import java.util.Collections; +import java.util.List; import java.util.Map; public class SegmentAnalyzer @@ -61,7 +67,7 @@ public class SegmentAnalyzer final ColumnAnalysis analysis; final ValueType type = capabilities.getType(); - switch(type) { + switch (type) { case LONG: analysis = analyzeLongColumn(column); break; @@ -82,7 +88,55 @@ public class SegmentAnalyzer columns.put(columnName, analysis); } - columns.put(Column.TIME_COLUMN_NAME, lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP)); + columns.put( + Column.TIME_COLUMN_NAME, + lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP) + ); + + return columns; + } + + public Map analyze(StorageAdapter adapter) + { + Preconditions.checkNotNull(adapter, "Adapter cannot be null"); + Map columns = Maps.newTreeMap(); + List columnNames = getStorageAdapterColumnNames(adapter); + + int numRows = adapter.getNumRows(); + for (String columnName : columnNames) { + final ColumnCapabilities capabilities = adapter.getColumnCapabilities(columnName); + final ColumnAnalysis analysis; + + /** + * StorageAdapter doesn't provide a way to get column values, so size is + * not calculated for STRING and COMPLEX columns. + */ + ValueType capType = capabilities.getType(); + switch (capType) { + case LONG: + analysis = lengthBasedAnalysisForAdapter(capType.name(), capabilities, numRows, Longs.BYTES); + break; + case FLOAT: + analysis = lengthBasedAnalysisForAdapter(capType.name(), capabilities, numRows, NUM_BYTES_IN_TEXT_FLOAT); + break; + case STRING: + analysis = new ColumnAnalysis(capType.name(), 0, adapter.getDimensionCardinality(columnName), null); + break; + case COMPLEX: + analysis = new ColumnAnalysis(capType.name(), 0, null, null); + break; + default: + log.warn("Unknown column type[%s].", capType); + analysis = ColumnAnalysis.error(String.format("unknown_type_%s", capType)); + } + + columns.put(columnName, analysis); + } + + columns.put( + Column.TIME_COLUMN_NAME, + lengthBasedAnalysisForAdapter(ValueType.LONG.name(), null, numRows, NUM_BYTES_IN_TIMESTAMP) + ); return columns; } @@ -154,4 +208,26 @@ public class SegmentAnalyzer return new ColumnAnalysis(typeName, size, null, null); } + + private List getStorageAdapterColumnNames(StorageAdapter adapter) + { + Indexed dims = adapter.getAvailableDimensions(); + Iterable metrics = adapter.getAvailableMetrics(); + Iterable columnNames = Iterables.concat(dims, metrics); + List sortedColumnNames = Lists.newArrayList(columnNames); + Collections.sort(sortedColumnNames); + return sortedColumnNames; + } + + private ColumnAnalysis lengthBasedAnalysisForAdapter( + String type, ColumnCapabilities capabilities, + int numRows, final int numBytes + ) + { + if (capabilities != null && capabilities.hasMultipleValues()) { + return ColumnAnalysis.error("multi_value"); + } + return new ColumnAnalysis(type, numRows * numBytes, null, null); + } + } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index ab5e82e2820..90ee42297ca 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -114,10 +114,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest newIntervals = JodaUtils.condenseIntervals( Iterables.concat(arg1.getIntervals(), arg2.getIntervals()) ); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 5c1389a5a05..742b08d26c5 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -82,15 +82,17 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory analyzedColumns; + long totalSize = 0; if (index == null) { - return Sequences.empty(); + // IncrementalIndexSegments (used by in-memory hydrants in the realtime service) do not have a QueryableIndex + analyzedColumns = analyzer.analyze(segment.asStorageAdapter()); + } else { + analyzedColumns = analyzer.analyze(index); + // Initialize with the size of the whitespace, 1 byte per + totalSize = analyzedColumns.size() * index.getNumRows(); } - final Map analyzedColumns = analyzer.analyze(index); - - // Initialize with the size of the whitespace, 1 byte per - long totalSize = analyzedColumns.size() * index.getNumRows(); - Map columns = Maps.newTreeMap(); ColumnIncluderator includerator = query.getToInclude(); for (Map.Entry entry : analyzedColumns.entrySet()) { diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index 28d7031a77d..fecce53ead2 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -104,6 +104,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter return column.getDictionaryEncoding().getCardinality(); } + @Override + public int getNumRows() + { + return index.getNumRows(); + } + @Override public DateTime getMinTime() { @@ -136,6 +142,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter return Capabilities.builder().dimensionValuesSorted(true).build(); } + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + return index.getColumn(column).getCapabilities(); + } + @Override public DateTime getMaxIngestedEventTime() { @@ -275,7 +287,10 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } @Override - public DimensionSelector makeDimensionSelector(final String dimension, @Nullable final ExtractionFn extractionFn) + public DimensionSelector makeDimensionSelector( + final String dimension, + @Nullable final ExtractionFn extractionFn + ) { final Column columnDesc = index.getColumn(dimension); if (columnDesc == null) { @@ -296,8 +311,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter if (column == null) { return NULL_DIMENSION_SELECTOR; - } - else if (columnDesc.getCapabilities().hasMultipleValues()) { + } else if (columnDesc.getCapabilities().hasMultipleValues()) { return new DimensionSelector() { @Override @@ -325,7 +339,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter public int lookupId(String name) { if (extractionFn != null) { - throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function"); + throw new UnsupportedOperationException( + "cannot perform lookup when applying an extraction function" + ); } return column.lookupId(name); } @@ -388,7 +404,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter public int lookupId(String name) { if (extractionFn != null) { - throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function"); + throw new UnsupportedOperationException( + "cannot perform lookup when applying an extraction function" + ); } return column.lookupId(name); } diff --git a/processing/src/main/java/io/druid/segment/StorageAdapter.java b/processing/src/main/java/io/druid/segment/StorageAdapter.java index 13a4fec8771..96d73853adf 100644 --- a/processing/src/main/java/io/druid/segment/StorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/StorageAdapter.java @@ -17,6 +17,7 @@ package io.druid.segment; +import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.data.Indexed; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -42,5 +43,7 @@ public interface StorageAdapter extends CursorFactory public DateTime getMinTime(); public DateTime getMaxTime(); public Capabilities getCapabilities(); + public ColumnCapabilities getColumnCapabilities(String column); + public int getNumRows(); public DateTime getMaxIngestedEventTime(); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 629ed07235f..1d22674791c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -42,6 +42,7 @@ import io.druid.segment.ObjectColumnSelector; import io.druid.segment.SingleScanTimeDimSelector; import io.druid.segment.StorageAdapter; import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.ListIndexed; @@ -102,7 +103,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public int getDimensionCardinality(String dimension) { - if(dimension.equals(Column.TIME_COLUMN_NAME)) { + if (dimension.equals(Column.TIME_COLUMN_NAME)) { return Integer.MAX_VALUE; } IncrementalIndex.DimDim dimDim = index.getDimension(dimension); @@ -112,6 +113,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return dimDim.size(); } + @Override + public int getNumRows() + { + return index.size(); + } + @Override public DateTime getMinTime() { @@ -130,6 +137,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return Capabilities.builder().dimensionValuesSorted(false).build(); } + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + return index.getCapabilities(column); + } + @Override public DateTime getMaxIngestedEventTime() { @@ -278,7 +291,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } @Override - public DimensionSelector makeDimensionSelector(final String dimension, @Nullable final ExtractionFn extractionFn) + public DimensionSelector makeDimensionSelector( + final String dimension, + @Nullable final ExtractionFn extractionFn + ) { if (dimension.equals(Column.TIME_COLUMN_NAME)) { return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn); @@ -310,7 +326,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } } // check for null entry - if(vals.isEmpty() && dimValLookup.contains(null)){ + if (vals.isEmpty() && dimValLookup.contains(null)) { int id = dimValLookup.getId(null); if (id < maxId) { vals.add(id); @@ -369,7 +385,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter public int lookupId(String name) { if (extractionFn != null) { - throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function"); + throw new UnsupportedOperationException( + "cannot perform lookup when applying an extraction function" + ); } return dimValLookup.getId(name); } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java index 5ca7e4b1c59..4722be8d77e 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java @@ -44,13 +44,38 @@ import java.util.Map; public class SegmentAnalyzerTest { @Test - public void testIncrementalDoesNotWork() throws Exception + public void testIncrementalWorks() throws Exception { final List results = getSegmentAnalysises( new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null) ); - Assert.assertEquals(0, results.size()); + Assert.assertEquals(1, results.size()); + + final SegmentAnalysis analysis = results.get(0); + Assert.assertEquals(null, analysis.getId()); + + final Map columns = analysis.getColumns(); + + Assert.assertEquals( + TestIndex.COLUMNS.length, + columns.size() + ); // All columns including time and empty/null column + + for (String dimension : TestIndex.DIMENSIONS) { + final ColumnAnalysis columnAnalysis = columns.get(dimension); + + Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType()); + Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0); + } + + for (String metric : TestIndex.METRICS) { + final ColumnAnalysis columnAnalysis = columns.get(metric); + + Assert.assertEquals(metric, ValueType.FLOAT.name(), columnAnalysis.getType()); + Assert.assertTrue(metric, columnAnalysis.getSize() > 0); + Assert.assertNull(metric, columnAnalysis.getCardinality()); + } } @Test @@ -66,7 +91,10 @@ public class SegmentAnalyzerTest Assert.assertEquals("test_1", analysis.getId()); final Map columns = analysis.getColumns(); - Assert.assertEquals(TestIndex.COLUMNS.length -1, columns.size()); // All columns including time and excluding empty/null column + Assert.assertEquals( + TestIndex.COLUMNS.length - 1, + columns.size() + ); // All columns including time and excluding empty/null column for (String dimension : TestIndex.DIMENSIONS) { final ColumnAnalysis columnAnalysis = columns.get(dimension); @@ -107,7 +135,7 @@ public class SegmentAnalyzerTest final SegmentMetadataQuery query = new SegmentMetadataQuery( new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, false ); - HashMap context = new HashMap(); + HashMap context = new HashMap(); return Sequences.toList(query.run(runner, context), Lists.newArrayList()); } }