Refactor SemanticCreator (#16700)

Refactors the SemanticCreator annotation.

    Moves the interface to the semantic package.
    Create a SemanticUtils to hold logic for storing semantic maps.
    Add FrameMaker interface.
This commit is contained in:
Adarsh Sanjeev 2024-08-06 21:59:38 +05:30 committed by GitHub
parent 593c3b2150
commit 2b81c18fd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 640 additions and 126 deletions

View File

@ -304,9 +304,7 @@ public class BaseColumnarLongsBenchmark
}
serializer.open();
for (long val : vals) {
serializer.add(val);
}
serializer.addAll(vals, 0, vals.length);
serializer.writeTo(output, null);
return (int) serializer.getSerializedSize();
}

View File

@ -737,7 +737,7 @@ public class CompactionTaskTest
);
provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of());
}
@Test
public void testCreateIngestionSchema() throws IOException
{
@ -1855,14 +1855,6 @@ public class CompactionTaskTest
}
}
final Metadata metadata = new Metadata(
null,
aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
);
queryableIndexMap.put(
entry.getValue(),
new SimpleQueryableIndex(
@ -1871,9 +1863,21 @@ public class CompactionTaskTest
null,
columnMap,
null,
metadata,
false
)
{
@Override
public Metadata getMetadata()
{
return new Metadata(
null,
aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
);
}
}
);
}
}
@ -1896,10 +1900,15 @@ public class CompactionTaskTest
index.getBitmapFactoryForDimensions(),
index.getColumns(),
index.getFileMapper(),
null,
false
)
);
{
@Override
public Metadata getMetadata()
{
return null;
}
});
}
}

View File

@ -17,7 +17,9 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols;
package org.apache.druid.common.semantic;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -26,8 +28,8 @@ import java.lang.annotation.Target;
/**
* Annotation used to indicate that the method is used as a creator for a semantic interface.
*
* Used in conjuction with {@link RowsAndColumns#makeAsMap(Class)} to build maps for simplified implementation of
* <p>
* Used in conjuction with {@link SemanticUtils#makeAsMap(Class)} to build maps for simplified implementation of
* the {@link RowsAndColumns#as(Class)} method.
*/
@Retention(RetentionPolicy.RUNTIME)

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.semantic;
import org.apache.druid.error.DruidException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
public class SemanticUtils
{
private static final Map<Class<?>, Map<Class<?>, Function<?, ?>>> OVERRIDES = new LinkedHashMap<>();
/**
* Allows the registration of overrides, which allows overriding of already existing mappings.
* This allows extensions to register mappings.
*/
@SuppressWarnings("unused")
public static <C, T> void registerAsOverride(Class<C> clazz, Class<T> asInterface, Function<C, T> fn)
{
final Map<Class<?>, Function<?, ?>> classOverrides = OVERRIDES.computeIfAbsent(
clazz,
theClazz -> new LinkedHashMap<>()
);
final Function<?, ?> oldVal = classOverrides.get(asInterface);
if (oldVal != null) {
throw DruidException.defensive(
"Attempt to side-override the same interface [%s] multiple times for the same class [%s].",
asInterface,
clazz
);
} else {
classOverrides.put(asInterface, fn);
}
}
public static <T> Map<Class<?>, Function<T, ?>> makeAsMap(Class<T> clazz)
{
final Map<Class<?>, Function<T, ?>> retVal = new HashMap<>();
for (Method method : clazz.getMethods()) {
if (method.isAnnotationPresent(SemanticCreator.class)) {
if (method.getParameterCount() != 0) {
throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method);
}
retVal.put(method.getReturnType(), arg -> {
try {
return method.invoke(arg);
}
catch (InvocationTargetException | IllegalAccessException e) {
throw DruidException.defensive().build(e, "Problem invoking method [%s]", method);
}
});
}
}
final Map<Class<?>, Function<?, ?>> classOverrides = OVERRIDES.get(clazz);
if (classOverrides != null) {
for (Map.Entry<Class<?>, Function<?, ?>> overrideEntry : classOverrides.entrySet()) {
//noinspection unchecked
retVal.put(overrideEntry.getKey(), (Function<T, ?>) overrideEntry.getValue());
}
}
return retVal;
}
}

View File

@ -176,6 +176,17 @@ public class DruidException extends RuntimeException
return defensive().build(format, args);
}
/**
* Build a "defensive" exception, this is an exception that should never actually be triggered, but we are
* throwing it inside a defensive check.
*
* @return A builder for a defensive exception.
*/
public static DruidException defensive(Throwable cause, String format, Object... args)
{
return defensive().build(cause, format, args);
}
/**
* Build a "defensive" exception, this is an exception that should never actually be triggered. Throw to
* allow messages to be seen by developers

View File

@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.Arrays;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntComparator;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.common.semantic.SemanticCreator;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.operator.ColumnWithDirection;
@ -73,7 +75,7 @@ import java.util.function.Function;
public class ArrayListRowsAndColumns<RowType> implements AppendableRowsAndColumns
{
@SuppressWarnings("rawtypes")
private static final Map<Class<?>, Function<ArrayListRowsAndColumns, ?>> AS_MAP = RowsAndColumns
private static final Map<Class<?>, Function<ArrayListRowsAndColumns, ?>> AS_MAP = SemanticUtils
.makeAsMap(ArrayListRowsAndColumns.class);
private final ArrayList<RowType> rows;

View File

@ -20,6 +20,8 @@
package org.apache.druid.query.rowsandcols;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.semantic.SemanticCreator;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.key.KeyColumn;
@ -66,7 +68,7 @@ import java.util.function.Function;
public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
{
private static final Map<Class<?>, Function<LazilyDecoratedRowsAndColumns, ?>> AS_MAP = RowsAndColumns
private static final Map<Class<?>, Function<LazilyDecoratedRowsAndColumns, ?>> AS_MAP = SemanticUtils
.makeAsMap(LazilyDecoratedRowsAndColumns.class);
private RowsAndColumns base;

View File

@ -19,19 +19,13 @@
package org.apache.druid.query.rowsandcols;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
/**
* An interface representing a chunk of RowsAndColumns. Essentially a RowsAndColumns is just a batch of rows
@ -75,31 +69,6 @@ public interface RowsAndColumns
return retVal;
}
static <T> Map<Class<?>, Function<T, ?>> makeAsMap(Class<T> clazz)
{
Map<Class<?>, Function<T, ?>> retVal = new HashMap<>();
for (Method method : clazz.getMethods()) {
if (method.isAnnotationPresent(SemanticCreator.class)) {
if (method.getParameterCount() != 0) {
throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method);
}
retVal.put(method.getReturnType(), arg -> {
try {
return method.invoke(arg);
}
catch (InvocationTargetException | IllegalAccessException e) {
throw DruidException.defensive().build(e, "Problem invoking method [%s]", method);
}
});
}
}
return retVal;
}
/**
* The set of column names available from the RowsAndColumns
*

View File

@ -19,10 +19,11 @@
package org.apache.druid.query.rowsandcols.concrete;
import org.apache.druid.common.semantic.SemanticCreator;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.SemanticCreator;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.QueryableIndex;
@ -41,7 +42,7 @@ import java.util.function.Function;
public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter
{
private static final Map<Class<?>, Function<QueryableIndexRowsAndColumns, ?>> AS_MAP = RowsAndColumns
private static final Map<Class<?>, Function<QueryableIndexRowsAndColumns, ?>> AS_MAP = SemanticUtils
.makeAsMap(QueryableIndexRowsAndColumns.class);
private final QueryableIndex index;

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.RowSignature;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
public class DefaultFrameMaker implements FrameMaker
{
private final RowsAndColumns rac;
public DefaultFrameMaker(RowsAndColumns rac)
{
this.rac = rac;
}
@Override
public RowSignature computeSignature()
{
final RowSignature.Builder signatureBuilder = RowSignature.builder();
for (String column : rac.getColumnNames()) {
final Column racColumn = rac.findColumn(column);
if (racColumn == null) {
continue;
}
signatureBuilder.add(column, racColumn.toAccessor().getType());
}
return signatureBuilder.build();
}
@Override
public Frame toColumnBasedFrame()
{
final AtomicInteger rowId = new AtomicInteger(0);
final int numRows = rac.numRows();
final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac);
final ColumnSelectorFactory selectorFactory = csfm.make(rowId);
final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(200 << 20); // 200 MB
final FrameWriter frameWriter = FrameWriters.makeColumnBasedFrameWriterFactory(
memFactory,
computeSignature(),
Collections.emptyList()
).newFrameWriter(selectorFactory);
rowId.set(0);
for (; rowId.get() < numRows; rowId.incrementAndGet()) {
frameWriter.addSelection();
}
return Frame.wrap(frameWriter.toByteArray());
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.segment.column.RowSignature;
public interface FrameMaker
{
static FrameMaker fromRAC(RowsAndColumns rac)
{
FrameMaker retVal = rac.as(FrameMaker.class);
if (retVal == null) {
retVal = new DefaultFrameMaker(rac);
}
return retVal;
}
RowSignature computeSignature();
Frame toColumnBasedFrame();
}

View File

@ -510,9 +510,15 @@ public class IndexIO
new ConciseBitmapFactory(),
columns,
index.getFileMapper(),
null,
lazy
);
)
{
@Override
public Metadata getMetadata()
{
return null;
}
};
}
private Supplier<ColumnHolder> getColumnHolderSupplier(ColumnBuilder builder, boolean lazy)
@ -604,25 +610,6 @@ public class IndexIO
allDims = null;
}
Metadata metadata = null;
ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd");
if (metadataBB != null) {
try {
metadata = mapper.readValue(
SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()),
Metadata.class
);
}
catch (JsonParseException | JsonMappingException ex) {
// Any jackson deserialization errors are ignored e.g. if metadata contains some aggregator which
// is no longer supported then it is OK to not use the metadata instead of failing segment loading
log.warn(ex, "Failed to load metadata for segment [%s]", inDir);
}
catch (IOException ex) {
throw new IOException("Failed to read metadata", ex);
}
}
Map<String, Supplier<ColumnHolder>> columns = new LinkedHashMap<>();
// Register the time column
@ -663,9 +650,32 @@ public class IndexIO
segmentBitmapSerdeFactory.getBitmapFactory(),
columns,
smooshedFiles,
metadata,
lazy
);
)
{
@Override
public Metadata getMetadata()
{
try {
ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd");
if (metadataBB != null) {
return mapper.readValue(
SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()),
Metadata.class
);
}
}
catch (JsonParseException | JsonMappingException ex) {
// Any jackson deserialization errors are ignored e.g. if metadata contains some aggregator which
// is no longer supported then it is OK to not use the metadata instead of failing segment loading
log.warn(ex, "Failed to load metadata for segment [%s]", inDir);
}
catch (IOException ex) {
log.warn(ex, "Failed to read metadata for segment [%s]", inDir);
}
return null;
}
};
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);

View File

@ -19,17 +19,24 @@
package org.apache.druid.segment;
import org.apache.druid.common.semantic.SemanticCreator;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.query.rowsandcols.concrete.QueryableIndexRowsAndColumns;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.function.Function;
/**
*/
public class QueryableIndexSegment implements Segment
{
private static final Map<Class<?>, Function<QueryableIndexSegment, ?>> AS_MAP = SemanticUtils
.makeAsMap(QueryableIndexSegment.class);
private final QueryableIndex index;
private final QueryableIndexStorageAdapter storageAdapter;
private final SegmentId segmentId;
@ -77,10 +84,18 @@ public class QueryableIndexSegment implements Segment
@Override
public <T> T as(@Nonnull Class<T> clazz)
{
if (CloseableShapeshifter.class.equals(clazz)) {
return (T) new QueryableIndexRowsAndColumns(index);
final Function<QueryableIndexSegment, ?> fn = AS_MAP.get(clazz);
if (fn != null) {
return (T) fn.apply(this);
}
return Segment.super.as(clazz);
}
@SemanticCreator
@SuppressWarnings("unused")
public CloseableShapeshifter toCloseableShapeshifter()
{
return new QueryableIndexRowsAndColumns(index);
}
}

View File

@ -38,7 +38,7 @@ import java.util.Map;
/**
*
*/
public class SimpleQueryableIndex implements QueryableIndex
public abstract class SimpleQueryableIndex implements QueryableIndex
{
private final Interval dataInterval;
private final List<String> columnNames;
@ -46,8 +46,6 @@ public class SimpleQueryableIndex implements QueryableIndex
private final BitmapFactory bitmapFactory;
private final Map<String, Supplier<ColumnHolder>> columns;
private final SmooshedFileMapper fileMapper;
@Nullable
private final Metadata metadata;
private final Supplier<Map<String, DimensionHandler>> dimensionHandlers;
public SimpleQueryableIndex(
@ -56,7 +54,6 @@ public class SimpleQueryableIndex implements QueryableIndex
BitmapFactory bitmapFactory,
Map<String, Supplier<ColumnHolder>> columns,
SmooshedFileMapper fileMapper,
@Nullable Metadata metadata,
boolean lazy
)
{
@ -73,7 +70,6 @@ public class SimpleQueryableIndex implements QueryableIndex
this.bitmapFactory = bitmapFactory;
this.columns = columns;
this.fileMapper = fileMapper;
this.metadata = metadata;
if (lazy) {
this.dimensionHandlers = Suppliers.memoize(() -> initDimensionHandlers(availableDimensions));
@ -141,10 +137,7 @@ public class SimpleQueryableIndex implements QueryableIndex
}
@Override
public Metadata getMetadata()
{
return metadata;
}
public abstract Metadata getMetadata();
@Override
public Map<String, DimensionHandler> getDimensionHandlers()

View File

@ -26,6 +26,7 @@ import org.apache.druid.segment.vector.ReadableVectorOffset;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.io.Closeable;
public interface BaseColumn extends Closeable
@ -41,4 +42,11 @@ public interface BaseColumn extends Closeable
{
throw new UOE("Cannot make VectorObjectSelector for column with class[%s]", getClass().getName());
}
@SuppressWarnings("unused")
@Nullable
default <T> T as(Class<? extends T> clazz)
{
return null;
}
}

View File

@ -28,6 +28,8 @@ import org.apache.druid.segment.data.ReadableOffset;
import org.apache.druid.segment.vector.ReadableVectorOffset;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
/**
*/
public class LongsColumn implements NumericColumn
@ -75,6 +77,13 @@ public class LongsColumn implements NumericColumn
return column.get(rowNum);
}
@Override
@Nullable
public <T> T as(Class<? extends T> clazz)
{
return column.as(clazz);
}
@Override
public void close()
{

View File

@ -47,7 +47,7 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
CompressionStrategy strategy
)
{
baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy));
this.baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy));
this.totalSize = totalSize;
this.sizePer = sizePer;
this.baseReader = reader;
@ -156,6 +156,12 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
@Override
public void get(final long[] out, final int start, final int length)
{
get(out, 0, start, length);
}
@Override
public void get(long[] out, int offset, int start, int length)
{
// division + remainder is optimized by the compiler so keep those together
int bufferNum = start / sizePer;
@ -169,7 +175,7 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
}
final int limit = Math.min(length - p, sizePer - bufferIndex);
reader.read(out, p, bufferIndex, limit);
reader.read(out, offset + p, bufferIndex, limit);
p += limit;
bufferNum++;
bufferIndex = 0;

View File

@ -28,4 +28,10 @@ import java.io.Closeable;
*/
public interface ColumnarInts extends IndexedInts, Closeable
{
default void get(int[] out, int offset, int start, int length)
{
for (int i = 0; i < length; i++) {
out[offset + i] = get(i + start);
}
}
}

View File

@ -46,9 +46,14 @@ public interface ColumnarLongs extends Closeable
long get(int index);
default void get(long[] out, int start, int length)
{
get(out, 0, start, length);
}
default void get(long[] out, int offset, int start, int length)
{
for (int i = 0; i < length; i++) {
out[i] = get(i + start);
out[offset + i] = get(i + start);
}
}
@ -62,6 +67,12 @@ public interface ColumnarLongs extends Closeable
@Override
void close();
@Nullable
default <T> T as(Class<? extends T> clazz)
{
return null;
}
default ColumnValueSelector<Long> makeColumnValueSelector(ReadableOffset offset, ImmutableBitmap nullValueBitmap)
{
if (nullValueBitmap.isEmpty()) {

View File

@ -29,6 +29,15 @@ import java.io.IOException;
public interface ColumnarLongsSerializer extends Serializer
{
void open() throws IOException;
int size();
void add(long value) throws IOException;
default void addAll(long[] values, int start, int end) throws IOException
{
for (int i = start; i < end; ++i) {
add(values[i]);
}
}
}

View File

@ -452,7 +452,10 @@ public abstract class CompressedNestedDataComplexColumn<TStringDictionary extend
@Override
public int getLength()
{
return -1;
if (compressedRawColumn == null) {
compressedRawColumn = closer.register(compressedRawColumnSupplier.get());
}
return compressedRawColumn.size();
}
@Override
@ -535,9 +538,14 @@ public abstract class CompressedNestedDataComplexColumn<TStringDictionary extend
if (arrayFieldIndex >= 0) {
final int elementNumber = ((NestedPathArrayElement) lastPath).getIndex();
if (elementNumber < 0) {
throw new IAE("Cannot make array element selector for path [%s], negative array index not supported for this selector", path);
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("Cannot make array element selector for path [%s], negative array index not supported for this selector", path);
}
DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) getColumnHolder(arrayField, arrayFieldIndex).getColumn();
DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) getColumnHolder(
arrayField,
arrayFieldIndex
).getColumn();
ColumnValueSelector arraySelector = col.makeColumnValueSelector(readableOffset);
return new ColumnValueSelector<Object>()
{
@ -634,9 +642,14 @@ public abstract class CompressedNestedDataComplexColumn<TStringDictionary extend
if (arrayFieldIndex >= 0) {
final int elementNumber = ((NestedPathArrayElement) lastPath).getIndex();
if (elementNumber < 0) {
throw new IAE("Cannot make array element selector for path [%s], negative array index not supported for this selector", path);
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("Cannot make array element selector for path [%s], negative array index not supported for this selector", path);
}
DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) getColumnHolder(arrayField, arrayFieldIndex).getColumn();
DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) getColumnHolder(
arrayField,
arrayFieldIndex
).getColumn();
VectorObjectSelector arraySelector = col.makeVectorObjectSelector(readableOffset);
return new VectorObjectSelector()
@ -702,9 +715,14 @@ public abstract class CompressedNestedDataComplexColumn<TStringDictionary extend
if (arrayFieldIndex >= 0) {
final int elementNumber = ((NestedPathArrayElement) lastPath).getIndex();
if (elementNumber < 0) {
throw new IAE("Cannot make array element selector for path [%s], negative array index not supported for this selector", path);
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("Cannot make array element selector for path [%s], negative array index not supported for this selector", path);
}
DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) getColumnHolder(arrayField, arrayFieldIndex).getColumn();
DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) getColumnHolder(
arrayField,
arrayFieldIndex
).getColumn();
VectorObjectSelector arraySelector = col.makeVectorObjectSelector(readableOffset);
return new VectorValueSelector()

View File

@ -17,10 +17,9 @@
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
package org.apache.druid.common.semantic;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.rowsandcols.SemanticCreator;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -80,7 +79,7 @@ public class SemanticCreatorUsageTest
/**
* {@link SemanticCreator} must return with an interface.
*
* <p>
* An exact implementation may indicate that some interface methods might be missing.
*/
@Test
@ -95,7 +94,7 @@ public class SemanticCreatorUsageTest
/**
* {@link SemanticCreator} method names must follow the naming pattern toReturnType().
*
* <p>
* For example: a method returning with a type of Ball should be named as "toBall"
*/
@Test

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.semantic;
import org.apache.druid.error.DruidException;
import org.apache.druid.segment.CloseableShapeshifter;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.function.Function;
public class SemanticUtilsTest
{
@Test
public void testInvalidParameters()
{
Assert.assertThrows(
DruidException.class,
() -> SemanticUtils.makeAsMap(InvalidShapeshifter.class)
);
}
@Test
public void testValidParameters()
{
TestShapeshifter testShapeshifter = new TestShapeshifter();
Assert.assertTrue(testShapeshifter.as(A.class) instanceof A);
}
@Test
public void testOverrideForNewMapping()
{
SemanticUtils.registerAsOverride(
TestShapeshifter.class,
OverrideClass.class,
(testShapeshifter) -> new OverrideClass()
);
TestShapeshifter testShapeshifter = new TestShapeshifter();
Assert.assertTrue(testShapeshifter.as(A.class) instanceof A);
Assert.assertTrue(testShapeshifter.as(OverrideClass.class) instanceof OverrideClass);
}
@Test
public void testOverrideForExistingMapping()
{
SemanticUtils.registerAsOverride(
TestShapeshifter.class,
A.class,
(testShapeshifter) -> new OverrideClass()
);
TestShapeshifter testShapeshifter = new TestShapeshifter();
Assert.assertTrue(testShapeshifter.as(A.class) instanceof OverrideClass);
}
static class TestShapeshifter implements CloseableShapeshifter
{
private final Map<Class<?>, Function<TestShapeshifter, ?>> asMap;
public TestShapeshifter()
{
this.asMap = SemanticUtils.makeAsMap(TestShapeshifter.class);
}
@SuppressWarnings("unchecked")
@Override
@Nullable
public <T> T as(@Nonnull Class<T> clazz)
{
//noinspection ReturnOfNull
return (T) asMap.getOrDefault(clazz, arg -> null).apply(this);
}
@Override
public void close()
{
}
@SemanticCreator
public AInterface toAInterface()
{
return new A();
}
}
static class InvalidShapeshifter implements CloseableShapeshifter
{
@Nullable
@Override
public <T> T as(@Nonnull Class<T> clazz)
{
return null;
}
@Override
public void close()
{
}
@SemanticCreator
public AInterface toAInterface(String invalidParameter)
{
return new A();
}
}
interface AInterface
{
}
static class A implements AInterface
{
}
static class OverrideClass extends A
{
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.ArrayListRowsAndColumnsTest;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.junit.Assert;
import org.junit.Test;
public class FrameMakerTest
{
public static RowSignature ROW_SIGNATURE = RowSignature.builder()
.add("dim1", ColumnType.STRING)
.add("dim2", ColumnType.STRING)
.add("dim3", ColumnType.STRING)
.add("m1", ColumnType.LONG)
.add("m2", ColumnType.LONG)
.build();
@Test
public void testFrameMaker()
{
final MapOfColumnsRowsAndColumns mapOfColumnsRowsAndColumns = MapOfColumnsRowsAndColumns
.builder()
.add("dim1", ColumnType.STRING, "a", "b", "c")
.add("dim2", ColumnType.STRING, "m", "d", "e")
.add("dim3", ColumnType.STRING, "a")
.add("m1", ColumnType.LONG, 1L, 2L, 3L)
.add("m2", ColumnType.LONG, 52L, 42L)
.build();
final FrameMaker frameMaker = FrameMaker.fromRAC(ArrayListRowsAndColumnsTest.MAKER.apply(mapOfColumnsRowsAndColumns));
Assert.assertEquals(ROW_SIGNATURE, frameMaker.computeSignature());
final Frame frame = frameMaker.toColumnBasedFrame();
ColumnBasedFrameRowsAndColumns columnBasedFrameRowsAndColumns = new ColumnBasedFrameRowsAndColumns(
frame,
frameMaker.computeSignature()
);
for (String columnName : mapOfColumnsRowsAndColumns.getColumnNames()) {
ColumnAccessor expectedColumn = mapOfColumnsRowsAndColumns.findColumn(columnName).toAccessor();
ColumnAccessor actualColumn = columnBasedFrameRowsAndColumns.findColumn(columnName).toAccessor();
for (int i = 0; i < expectedColumn.numRows(); i++) {
Assert.assertEquals(
expectedColumn.getObject(i),
actualColumn.getObject(i)
);
}
}
Assert.assertEquals(3, frame.numRows());
}
}

View File

@ -31,6 +31,7 @@ import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.io.smoosh.Smoosh;
@ -184,20 +185,6 @@ public class IndexIONullColumnsCompatibilityTest extends InitializedNullHandling
segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();
}
Metadata metadata = null;
ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd");
if (metadataBB != null) {
try {
metadata = mapper.readValue(
IndexIO.SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()),
Metadata.class
);
}
catch (IOException ex) {
throw new IOException("Failed to read metadata", ex);
}
}
Map<String, Supplier<ColumnHolder>> columns = new HashMap<>();
for (String columnName : cols) {
@ -251,9 +238,28 @@ public class IndexIONullColumnsCompatibilityTest extends InitializedNullHandling
segmentBitmapSerdeFactory.getBitmapFactory(),
columns,
smooshedFiles,
metadata,
lazy
);
)
{
@Override
public Metadata getMetadata()
{
try {
ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd");
if (metadataBB != null) {
return mapper.readValue(
IndexIO.SERIALIZER_UTILS.readBytes(metadataBB, metadataBB.remaining()),
Metadata.class
);
} else {
return null;
}
}
catch (IOException ex) {
throw DruidException.defensive(ex, "Failed to read metadata");
}
}
};
return index;
}

View File

@ -167,11 +167,15 @@ public class IndexMergerLongestSharedDimOrderTest
mockBitmapFactory,
ImmutableMap.of(ColumnHolder.TIME_COLUMN_NAME, mockSupplier),
mockSmooshedFileMapper,
null,
true
)
{
@Override
public Metadata getMetadata()
{
return null;
}
}
);
}
}

View File

@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.nio.channels.Channels;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
@ -290,6 +291,11 @@ public class CompressedColumnarIntsSupplierTest extends CompressionStrategyTest
indices[i] = i;
}
int[] offsetValues = new int[columnarInts.size() + 1];
columnarInts.get(offsetValues, 1, 0, columnarInts.size());
Assert.assertEquals(0, offsetValues[0]);
Assert.assertArrayEquals(vals, Arrays.copyOfRange(offsetValues, 1, offsetValues.length));
// random access, limited to 1000 elements for large lists (every element would take too long)
IntArrays.shuffle(indices, ThreadLocalRandom.current());
final int limit = Math.min(columnarInts.size(), 1000);

View File

@ -108,9 +108,7 @@ public class CompressedLongsAutoEncodingSerdeTest
);
serializer.open();
for (long value : values) {
serializer.add(value);
}
serializer.addAll(values, 0, values.length);
Assert.assertEquals(values.length, serializer.size());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();

View File

@ -186,9 +186,7 @@ public class CompressedLongsSerdeTest
);
serializer.open();
for (long value : values) {
serializer.add(value);
}
serializer.addAll(values, 0, values.length);
Assert.assertEquals(values.length, serializer.size());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();