StorageAdapter: Add getRowSignature method. (#11953)

Simplifies logic for callers that only want to get a list of all the
column names, or column names and types. Updated callers SegmentAnalyzer,
HashJoinSegmentStorageAdapter, and DruidSegmentReader.
This commit is contained in:
Gian Merlino 2021-11-24 13:14:25 -08:00 committed by GitHub
parent 0354407655
commit 5e168b861a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 52 additions and 43 deletions

View File

@ -52,7 +52,6 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters;
@ -127,11 +126,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
// schemaless mode.
final Set<String> columnsToRead = Sets.newLinkedHashSet(
Iterables.filter(
Iterables.concat(
Collections.singleton(ColumnHolder.TIME_COLUMN_NAME),
storageAdapter.getAdapter().getAvailableDimensions(),
storageAdapter.getAdapter().getAvailableMetrics()
),
storageAdapter.getAdapter().getRowSignature().getColumnNames(),
columnsFilter::apply
)
);

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -40,12 +39,12 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ColumnTypeFactory;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
@ -58,9 +57,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
public class SegmentAnalyzer
@ -99,32 +96,28 @@ public class SegmentAnalyzer
// get length and column names from storageAdapter
final int length = storageAdapter.getNumRows();
final Set<String> columnNames = new HashSet<>();
Iterables.addAll(columnNames, storageAdapter.getAvailableDimensions());
Iterables.addAll(columnNames, storageAdapter.getAvailableMetrics());
Map<String, ColumnAnalysis> columns = new TreeMap<>();
Function<String, ColumnCapabilities> adapterCapabilitesFn =
storageAdapter instanceof IncrementalIndexStorageAdapter
? ((IncrementalIndexStorageAdapter) storageAdapter)::getSnapshotColumnCapabilities
: storageAdapter::getColumnCapabilities;
for (String columnName : columnNames) {
final ColumnHolder columnHolder = index == null ? null : index.getColumnHolder(columnName);
final RowSignature rowSignature = storageAdapter.getRowSignature();
for (String columnName : rowSignature.getColumnNames()) {
final ColumnCapabilities capabilities;
if (columnHolder != null) {
capabilities = columnHolder.getCapabilities();
if (storageAdapter instanceof IncrementalIndexStorageAdapter) {
// See javadocs for getSnapshotColumnCapabilities for a discussion of why we need to do this.
capabilities = ((IncrementalIndexStorageAdapter) storageAdapter).getSnapshotColumnCapabilities(columnName);
} else {
// this can be removed if we get to the point where IncrementalIndexStorageAdapter.getColumnCapabilities
// accurately reports the capabilities
capabilities = adapterCapabilitesFn.apply(columnName);
capabilities = storageAdapter.getColumnCapabilities(columnName);
}
final ColumnAnalysis analysis;
switch (capabilities.getType()) {
case LONG:
analysis = analyzeNumericColumn(capabilities, length, Long.BYTES);
final int bytesPerRow =
ColumnHolder.TIME_COLUMN_NAME.equals(columnName) ? NUM_BYTES_IN_TIMESTAMP : Long.BYTES;
analysis = analyzeNumericColumn(capabilities, length, bytesPerRow);
break;
case FLOAT:
analysis = analyzeNumericColumn(capabilities, length, NUM_BYTES_IN_TEXT_FLOAT);
@ -134,12 +127,13 @@ public class SegmentAnalyzer
break;
case STRING:
if (index != null) {
analysis = analyzeStringColumn(capabilities, columnHolder);
analysis = analyzeStringColumn(capabilities, index.getColumnHolder(columnName));
} else {
analysis = analyzeStringColumn(capabilities, storageAdapter, columnName);
}
break;
case COMPLEX:
final ColumnHolder columnHolder = index != null ? index.getColumnHolder(columnName) : null;
analysis = analyzeComplexColumn(capabilities, columnHolder);
break;
default:
@ -150,16 +144,6 @@ public class SegmentAnalyzer
columns.put(columnName, analysis);
}
// Add time column too
ColumnCapabilities timeCapabilities = adapterCapabilitesFn.apply(ColumnHolder.TIME_COLUMN_NAME);
if (timeCapabilities == null) {
timeCapabilities = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG);
}
columns.put(
ColumnHolder.TIME_COLUMN_NAME,
analyzeNumericColumn(timeCapabilities, length, NUM_BYTES_IN_TIMESTAMP)
);
return columns;
}

View File

@ -82,6 +82,12 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
return Collections.emptyList();
}
@Override
public RowSignature getRowSignature()
{
return rowSignature;
}
@Override
public int getDimensionCardinality(String column)
{

View File

@ -19,13 +19,16 @@
package org.apache.druid.segment;
import com.google.common.collect.Iterables;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.Indexed;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Optional;
/**
*/
@ -36,6 +39,25 @@ public interface StorageAdapter extends CursorFactory, ColumnInspector
Indexed<String> getAvailableDimensions();
Iterable<String> getAvailableMetrics();
/**
* Returns the row signature of the data available from this adapter. For mutable adapters, even though the signature
* may evolve over time, any particular object returned by this method is an immutable snapshot.
*/
default RowSignature getRowSignature()
{
final RowSignature.Builder builder = RowSignature.builder();
builder.addTimeColumn();
for (final String column : Iterables.concat(getAvailableDimensions(), getAvailableMetrics())) {
builder.add(
column,
Optional.ofNullable(getColumnCapabilities(column)).map(ColumnCapabilities::toColumnType).orElse(null)
);
}
return builder.build();
}
/**
* Returns the number of distinct values for the given column if known, or {@link Integer#MAX_VALUE} if unknown,
* e. g. the column is numeric. If the column doesn't exist, returns 0.

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment.join;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -34,7 +33,6 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.filter.Filters;
@ -370,10 +368,7 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
@Nullable List<VirtualColumn> postJoinVirtualColumns
)
{
final Set<String> baseColumns = new HashSet<>();
baseColumns.add(ColumnHolder.TIME_COLUMN_NAME);
Iterables.addAll(baseColumns, baseAdapter.getAvailableDimensions());
Iterables.addAll(baseColumns, baseAdapter.getAvailableMetrics());
final Set<String> baseColumns = new HashSet<>(baseAdapter.getRowSignature().getColumnNames());
for (VirtualColumn virtualColumn : virtualColumns.getVirtualColumns()) {
// Virtual columns cannot depend on each other, so we don't need to check transitive dependencies.

View File

@ -253,6 +253,13 @@ public class RowBasedStorageAdapterTest
);
}
@Test
public void test_getRowSignature()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter();
Assert.assertEquals(ROW_SIGNATURE, adapter.getRowSignature());
}
@Test
public void test_getDimensionCardinality_knownColumns()
{