General Druid refactors (#16708)

Some general refactors across Druid.

    Switch to DruidExceptions
    Add javadocs
    Fix a bug in IntArrayColumns
    Add a class for LongArrayColumns
    Remove wireTransferable since it would never be called
    Refactor DictionaryWriter to return the index written as a return value from write.
This commit is contained in:
Adarsh Sanjeev 2024-08-06 22:17:08 +05:30 committed by GitHub
parent 2b81c18fd7
commit 739068469c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 638 additions and 87 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
@ -130,6 +131,8 @@ import java.util.Map;
@NotThreadSafe
public class DruidException extends RuntimeException
{
public static final String CLASS_NAME_STR = DruidException.class.getName();
/**
* Starts building a "general" DruidException targeting the specified persona.
*
@ -478,7 +481,7 @@ public class DruidException extends RuntimeException
public DruidException build(Throwable cause, String formatMe, Object... vals)
{
return new DruidException(
final DruidException retVal = new DruidException(
cause,
errorCode,
targetPersona,
@ -486,6 +489,19 @@ public class DruidException extends RuntimeException
StringUtils.nonStrictFormat(formatMe, vals),
deserialized
);
StackTraceElement[] stackTrace = retVal.getStackTrace();
int firstNonDruidExceptionIndex = 0;
while (
firstNonDruidExceptionIndex < stackTrace.length
&& stackTrace[firstNonDruidExceptionIndex].getClassName().startsWith(CLASS_NAME_STR)) {
++firstNonDruidExceptionIndex;
}
if (firstNonDruidExceptionIndex > 0) {
retVal.setStackTrace(Arrays.copyOfRange(stackTrace, firstNonDruidExceptionIndex, stackTrace.length));
}
return retVal;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.error;
/**
* A failure class that is used to indicate that something is just not implemented yet. This is useful when a
* developer builds something and they intentionally do not implement a specific branch of code or type of object.
* <p>
* The lack of implementation is not necessarily a statement that it SHOULDN'T be implemented, it's just an indication
* that it has not YET been implemented. When one of these exceptions is seen, it is usually an indication that it is
* now time to actually implement the path that was previously elided.
* <p>
* Oftentimes, the code path wasn't implemented because the developer thought that it wasn't actually possible to
* see it executed. So, collecting and providing information about why the particular path got executed is often
* extremely helpful in understanding why it happened and accelerating the implementation of what the correct behavior
* should be.
*/
public class NotYetImplemented extends DruidException.Failure
{
public static DruidException ex(Throwable t, String msg, Object... args)
{
return DruidException.fromFailure(new NotYetImplemented(t, msg, args));
}
private final Throwable t;
private final String msg;
private final Object[] args;
public NotYetImplemented(Throwable t, String msg, Object[] args)
{
super("notYetImplemented");
this.t = t;
this.msg = msg;
this.args = args;
}
@Override
protected DruidException makeException(DruidException.DruidExceptionBuilder bob)
{
bob = bob.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE);
if (t == null) {
return bob.build(msg, args);
} else {
return bob.build(t, msg, args);
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.frame.read;
import com.google.common.base.Preconditions;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.field.FieldReader;
import org.apache.druid.frame.field.FieldReaders;
@ -31,7 +32,6 @@ import org.apache.druid.frame.read.columnar.FrameColumnReaders;
import org.apache.druid.frame.segment.row.FrameCursorFactory;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
@ -44,7 +44,7 @@ import java.util.Set;
/**
* Embeds the logic to read frames with a given {@link RowSignature}.
*
* <p>
* Stateless and immutable.
*/
public class FrameReader
@ -146,7 +146,7 @@ public class FrameReader
case ROW_BASED:
return new FrameCursorFactory(frame, this, fieldReaders);
default:
throw new ISE("Unrecognized frame type [%s]", frame.type());
throw DruidException.defensive("Unrecognized frame type [%s]", frame.type());
}
}

View File

@ -236,7 +236,7 @@ public class JsonConfigurator
// to configure ParametrizedUriEmitterConfig object. So skipping xxx=yyy key-value pair when configuring Emitter
// doesn't make any difference. That is why we just log this situation, instead of throwing an exception.
log.info(
"Skipping %s property: one of it's prefixes is also used as a property key. Prefix: %s",
"Skipping property [%s]: one of it's prefixes [%s] is also used as a property key.",
originalProperty,
propertyPrefix
);

View File

@ -25,6 +25,7 @@ import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.LongArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
@ -170,6 +171,12 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns
return add(name, new IntArrayColumn(vals));
}
@SuppressWarnings("unused")
public Builder add(String name, long[] vals)
{
return add(name, new LongArrayColumn(vals));
}
public Builder add(String name, double[] vals)
{
return add(name, new DoubleArrayColumn(vals));

View File

@ -22,6 +22,10 @@ package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.util.FindResult;
/**
* The implementations of this interface will not validate that things are sorted for the binary search, it assumes that
* they must be. As such, behavior are undefined if the column is not actually sorted.
*/
public interface BinarySearchableAccessor extends ColumnAccessor
{
static BinarySearchableAccessor fromColumn(Column col)

View File

@ -19,7 +19,7 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
@ -55,7 +55,13 @@ public class ConstantObjectColumn implements Column
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - numRows < intoStart) {
throw new ISE("too many rows!!! intoStart[%,d], numRows[%,d] combine to exceed max_int", intoStart, numRows);
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build(
"too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
intoStart,
numRows
);
}
Arrays.fill(into, intoStart, intoStart + numRows, obj);
};

View File

@ -19,8 +19,8 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
@ -54,11 +54,13 @@ public class DoubleArrayColumn implements Column
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - vals.length < intoStart) {
throw new ISE(
"too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
intoStart,
vals.length
);
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build(
"too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
intoStart,
vals.length
);
}
for (int i = 0; i < vals.length; ++i) {
into[intoStart + i] = vals[i];
@ -183,13 +185,13 @@ public class DoubleArrayColumn implements Column
@Override
public FindResult findString(int startIndex, int endIndex, String val)
{
return findDouble(startIndex, endIndex, Numbers.tryParseDouble(val, 0));
throw NotYetImplemented.ex(null, "findString is not currently supported for DoubleArrayColumns");
}
@Override
public FindResult findComplex(int startIndex, int endIndex, Object val)
{
return findDouble(startIndex, endIndex, Numbers.tryParseDouble(val, 0));
throw NotYetImplemented.ex(null, "findComplex is not currently supported for DoubleArrayColumns");
}
}
}

View File

@ -19,8 +19,8 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
@ -54,11 +54,13 @@ public class IntArrayColumn implements Column
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - vals.length < intoStart) {
throw new ISE(
"too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
intoStart,
vals.length
);
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build(
"too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
intoStart,
vals.length
);
}
for (int i = 0; i < vals.length; ++i) {
into[intoStart + i] = vals[i];
@ -189,13 +191,13 @@ public class IntArrayColumn implements Column
@Override
public FindResult findString(int startIndex, int endIndex, String val)
{
return findInt(startIndex, endIndex, (int) Numbers.tryParseLong(val, 0));
throw NotYetImplemented.ex(null, "findString is not currently supported for IntArrayColumns");
}
@Override
public FindResult findComplex(int startIndex, int endIndex, Object val)
{
return findDouble(startIndex, endIndex, (int) Numbers.tryParseLong(val, 0));
throw NotYetImplemented.ex(null, "findComplex is not currently supported for IntArrayColumns");
}
}
}

View File

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
public class LongArrayColumn implements Column
{
private final long[] vals;
public LongArrayColumn(
long[] vals
)
{
this.vals = vals;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new MyColumnAccessor();
}
@Nullable
@SuppressWarnings("unchecked")
@Override
public <T> T as(Class<? extends T> clazz)
{
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - vals.length < intoStart) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build(
"too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
intoStart,
vals.length
);
}
for (int i = 0; i < vals.length; ++i) {
into[intoStart + i] = vals[i];
}
};
}
if (ColumnValueSwapper.class.equals(clazz)) {
return (T) (ColumnValueSwapper) (lhs, rhs) -> {
long tmp = vals[lhs];
vals[lhs] = vals[rhs];
vals[rhs] = tmp;
};
}
return null;
}
private class MyColumnAccessor implements BinarySearchableAccessor
{
@Override
public ColumnType getType()
{
return ColumnType.LONG;
}
@Override
public int numRows()
{
return vals.length;
}
@Override
public boolean isNull(int rowNum)
{
return false;
}
@Override
public Object getObject(int rowNum)
{
return vals[rowNum];
}
@Override
public double getDouble(int rowNum)
{
return vals[rowNum];
}
@Override
public float getFloat(int rowNum)
{
return vals[rowNum];
}
@Override
public long getLong(int rowNum)
{
return vals[rowNum];
}
@Override
public int getInt(int rowNum)
{
return (int) vals[rowNum];
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return Long.compare(vals[lhsRowNum], vals[rhsRowNum]);
}
@Override
public FindResult findNull(int startIndex, int endIndex)
{
return FindResult.notFound(endIndex);
}
@Override
public FindResult findDouble(int startIndex, int endIndex, double val)
{
return findLong(startIndex, endIndex, (long) val);
}
@Override
public FindResult findFloat(int startIndex, int endIndex, float val)
{
return findLong(startIndex, endIndex, (long) val);
}
@Override
public FindResult findLong(int startIndex, int endIndex, long val)
{
if (vals[startIndex] == val) {
int end = startIndex + 1;
while (end < endIndex && vals[end] == val) {
++end;
}
return FindResult.found(startIndex, end);
}
int i = Arrays.binarySearch(vals, startIndex, endIndex, val);
if (i > 0) {
int foundStart = i;
int foundEnd = i + 1;
while (foundStart - 1 >= startIndex && vals[foundStart - 1] == val) {
--foundStart;
}
while (foundEnd < endIndex && vals[foundEnd] == val) {
++foundEnd;
}
return FindResult.found(foundStart, foundEnd);
} else {
return FindResult.notFound(-(i + 1));
}
}
@SuppressWarnings("unused")
public FindResult findInt(int startIndex, int endIndex, int val)
{
return findLong(startIndex, endIndex, val);
}
@Override
public FindResult findString(int startIndex, int endIndex, String val)
{
throw NotYetImplemented.ex(null, "findString is not currently supported for LongArrayColumns");
}
@Override
public FindResult findComplex(int startIndex, int endIndex, Object val)
{
throw NotYetImplemented.ex(null, "findComplex is not currently supported for LongArrayColumns");
}
}
}

View File

@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnType;
@ -80,7 +79,6 @@ public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoClose
}
}
return colCache.get(name);
}
@SuppressWarnings("unchecked")
@ -91,9 +89,6 @@ public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoClose
if (StorageAdapter.class.equals(clazz)) {
return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY);
}
if (WireTransferable.class.equals(clazz)) {
return (T) this;
}
return null;
}

View File

@ -91,7 +91,7 @@ public class ColumnHolderRACColumn implements Column, Closeable
public boolean isNull(int rowNum)
{
offset.set(rowNum);
return valueSelector.isNull();
return valueSelector.getObject() == null;
}
@Nullable

View File

@ -22,7 +22,7 @@ package org.apache.druid.segment;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnDescriptor;
@ -212,7 +212,7 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
);
break;
default:
throw new ISE(
throw DruidException.defensive(
"How did we get here? Column [%s] with type [%s] does not have specialized serializer",
name,
logicalType
@ -349,7 +349,7 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
@Override
public ColumnDescriptor makeColumnDescriptor()
{
ColumnDescriptor.Builder descriptorBuilder = new ColumnDescriptor.Builder();
ColumnDescriptor.Builder descriptorBuilder = ColumnDescriptor.builder();
final NestedCommonFormatColumnPartSerde partSerde =
NestedCommonFormatColumnPartSerde.serializerBuilder()

View File

@ -528,11 +528,11 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
/**
* Base type for a {@link SingleValueDimensionVectorSelector} for a dictionary encoded {@link ColumnType#STRING}
* built around a {@link ColumnarInts}. Dictionary not included - BYO dictionary lookup methods.
*
* <p>
* Assumes that all implementations return true for {@link #supportsLookupNameUtf8()}.
*/
public abstract static class StringSingleValueDimensionVectorSelector
implements SingleValueDimensionVectorSelector, IdLookup
implements SingleValueDimensionVectorSelector, IdLookup
{
private final ColumnarInts column;
private final ReadableVectorOffset offset;
@ -540,8 +540,8 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
private int id = ReadableVectorInspector.NULL_ID;
public StringSingleValueDimensionVectorSelector(
ColumnarInts column,
ReadableVectorOffset offset
ColumnarInts column,
ReadableVectorOffset offset
)
{
this.column = column;
@ -601,11 +601,11 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
/**
* Base type for a {@link MultiValueDimensionVectorSelector} for a dictionary encoded {@link ColumnType#STRING}
* built around a {@link ColumnarMultiInts}. Dictionary not included - BYO dictionary lookup methods.
*
* <p>
* Assumes that all implementations return true for {@link #supportsLookupNameUtf8()}.
*/
public abstract static class StringMultiValueDimensionVectorSelector
implements MultiValueDimensionVectorSelector, IdLookup
implements MultiValueDimensionVectorSelector, IdLookup
{
private final ColumnarMultiInts multiValueColumn;
private final ReadableVectorOffset offset;
@ -614,8 +614,8 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
private int id = ReadableVectorInspector.NULL_ID;
public StringMultiValueDimensionVectorSelector(
ColumnarMultiInts multiValueColumn,
ReadableVectorOffset offset
ColumnarMultiInts multiValueColumn,
ReadableVectorOffset offset
)
{
this.multiValueColumn = multiValueColumn;
@ -670,6 +670,7 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
{
return this;
}
@Override
public int getCurrentVectorSize()
{
@ -697,8 +698,8 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
private int id = ReadableVectorInspector.NULL_ID;
public StringVectorObjectSelector(
ColumnarInts column,
ReadableVectorOffset offset
ColumnarInts column,
ReadableVectorOffset offset
)
{
this.column = column;
@ -757,8 +758,8 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
private int id = ReadableVectorInspector.NULL_ID;
public MultiValueStringVectorObjectSelector(
ColumnarMultiInts multiValueColumn,
ReadableVectorOffset offset
ColumnarMultiInts multiValueColumn,
ReadableVectorOffset offset
)
{
this.multiValueColumn = multiValueColumn;

View File

@ -20,7 +20,10 @@
package org.apache.druid.segment.data;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ConciseBitmapFactory;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.error.DruidException;
public class BitmapSerde
{
@ -48,4 +51,14 @@ public class BitmapSerde
{
return new LegacyBitmapSerdeFactory();
}
public static BitmapSerdeFactory forBitmapFactory(BitmapFactory factory)
{
if (factory instanceof RoaringBitmapFactory) {
return new DefaultBitmapSerdeFactory();
} else if (factory instanceof ConciseBitmapFactory) {
return new ConciseBitmapSerdeFactory();
}
throw DruidException.defensive("Unknown type of bitmapFactory [%s]", factory.getClass());
}
}

View File

@ -233,6 +233,14 @@ public class CompressionFactory
void write(long value) throws IOException;
@SuppressWarnings("unused")
default void write(long[] values, int offset, int length) throws IOException
{
for (int i = offset; i < length; ++i) {
write(values[i]);
}
}
/**
* Flush the unwritten content to the current output.
*/
@ -294,6 +302,9 @@ public class CompressionFactory
* various duplicates.
*/
LongEncodingReader duplicate();
@SuppressWarnings("unused")
LongEncodingStrategy getStrategy();
}
public static Supplier<ColumnarLongs> getLongSupplier(

View File

@ -82,4 +82,10 @@ public class DeltaLongEncodingReader implements CompressionFactory.LongEncodingR
{
return new DeltaLongEncodingReader(buffer.duplicate(), base, bitsPerValue);
}
@Override
public CompressionFactory.LongEncodingStrategy getStrategy()
{
return CompressionFactory.LongEncodingStrategy.AUTO;
}
}

View File

@ -28,12 +28,41 @@ public interface DictionaryWriter<T> extends Serializer
{
boolean isSorted();
/**
* Prepares the writer for writing
*
* @throws IOException if there is a problem with IO
*/
void open() throws IOException;
void write(@Nullable T objectToWrite) throws IOException;
/**
* Writes an object to the dictionary.
* <p>
* Returns the index of the value that was just written. This is defined as the `int` value that can be passed
* into {@link #get} such that it will return the same value back.
*
* @param objectToWrite object to be written to the dictionary
* @return index of the value that was just written
* @throws IOException if there is a problem with IO
*/
int write(@Nullable T objectToWrite) throws IOException;
/**
* Returns an object that has already been written via the {@link #write} method.
*
* @param dictId index of the object to return
* @return the object identified by the given index
* @throws IOException if there is a problem with IO
*/
@Nullable
T get(int dictId) throws IOException;
/**
* Returns the number of items that have been written so far in this dictionary. Any number lower than this
* cardinality can be passed into {@link #get} and a value will be returned. If a value greater than or equal to
* the cardinality is passed into {@link #get} all sorts of things could happen, but likely none of them are good.
*
* @return the number of items that have been written so far
*/
int getCardinality();
}

View File

@ -58,9 +58,9 @@ public class EncodedStringDictionaryWriter implements DictionaryWriter<String>
}
@Override
public void write(@Nullable String objectToWrite) throws IOException
public int write(@Nullable String objectToWrite) throws IOException
{
delegate.write(StringUtils.toUtf8Nullable(NullHandling.emptyToNullIfNeeded(objectToWrite)));
return delegate.write(StringUtils.toUtf8Nullable(NullHandling.emptyToNullIfNeeded(objectToWrite)));
}
@Nullable

View File

@ -20,8 +20,8 @@
package org.apache.druid.segment.data;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
@ -46,14 +46,16 @@ public class FixedIndexedWriter<T> implements DictionaryWriter<T>
private final Comparator<T> comparator;
private final ByteBuffer scratch;
private final ByteBuffer readBuffer;
private int numWritten;
private final boolean isSorted;
private final int width;
private int cardinality = 0;
@Nullable
private WriteOutBytes valuesOut = null;
private boolean hasNulls = false;
private boolean isSorted;
@Nullable
private T prevObject = null;
private final int width;
public FixedIndexedWriter(
SegmentWriteOutMedium segmentWriteOutMedium,
@ -87,7 +89,7 @@ public class FixedIndexedWriter<T> implements DictionaryWriter<T>
@Override
public int getCardinality()
{
return hasNulls ? numWritten + 1 : numWritten;
return cardinality;
}
@Override
@ -97,28 +99,31 @@ public class FixedIndexedWriter<T> implements DictionaryWriter<T>
}
@Override
public void write(@Nullable T objectToWrite) throws IOException
public int write(@Nullable T objectToWrite) throws IOException
{
if (prevObject != null && isSorted && comparator.compare(prevObject, objectToWrite) >= 0) {
throw new ISE(
throw DruidException.defensive(
"Values must be sorted and unique. Element [%s] with value [%s] is before or equivalent to [%s]",
numWritten,
cardinality,
objectToWrite,
prevObject
);
}
if (objectToWrite == null) {
if (cardinality != 0) {
throw DruidException.defensive("Null must come first, got it at cardinality[%,d]!=0", cardinality);
}
hasNulls = true;
return;
return cardinality++;
}
scratch.clear();
typeStrategy.write(scratch, objectToWrite, width);
scratch.flip();
Channels.writeFully(valuesOut, scratch);
numWritten++;
prevObject = objectToWrite;
return cardinality++;
}
@Override
@ -141,7 +146,7 @@ public class FixedIndexedWriter<T> implements DictionaryWriter<T>
scratch.flip();
Channels.writeFully(channel, scratch);
scratch.clear();
scratch.putInt(numWritten);
scratch.putInt(hasNulls ? cardinality - 1 : cardinality); // we don't actually write the null entry, so subtract 1
scratch.flip();
Channels.writeFully(channel, scratch);
valuesOut.writeTo(channel);
@ -166,7 +171,7 @@ public class FixedIndexedWriter<T> implements DictionaryWriter<T>
public Iterator<T> getIterator()
{
final ByteBuffer iteratorBuffer = ByteBuffer.allocate(width * PAGE_SIZE).order(readBuffer.order());
final int totalCount = hasNulls ? 1 + numWritten : numWritten;
final int totalCount = cardinality;
final int startPos = hasNulls ? 1 : 0;
return new Iterator<T>()
@ -197,13 +202,8 @@ public class FixedIndexedWriter<T> implements DictionaryWriter<T>
{
iteratorBuffer.clear();
try {
if (numWritten - (pos - startPos) < PAGE_SIZE) {
int size = (numWritten - (pos - startPos)) * width;
iteratorBuffer.limit(size);
valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer);
} else {
valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer);
}
iteratorBuffer.limit(Math.min(PAGE_SIZE, (cardinality - pos) * width));
valuesOut.readFully((long) (pos - startPos) * width, iteratorBuffer);
iteratorBuffer.clear();
}
catch (IOException e) {

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.data;
import com.google.common.primitives.Ints;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -102,7 +103,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
}
@Override
public void write(@Nullable byte[] value) throws IOException
public int write(@Nullable byte[] value) throws IOException
{
if (prevObject != null && compareNullableUtf8UsingJavaStringOrdering(prevObject, value) >= 0) {
throw new ISE(
@ -114,8 +115,11 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
}
if (value == null) {
if (numWritten != 0) {
throw DruidException.defensive("Null must come first, got it at cardinality[%,d]!=0", numWritten);
}
hasNulls = true;
return;
return 0;
}
// if the bucket buffer is full, write the bucket
@ -143,8 +147,9 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
bucketBuffer[numWritten % bucketSize] = value;
++numWritten;
int retVal = numWritten++;
prevObject = value;
return retVal + (hasNulls ? 1 : 0);
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.data;
import com.google.common.primitives.Ints;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -81,6 +82,10 @@ public class FrontCodedIntArrayIndexedWriter implements DictionaryWriter<int[]>
private boolean isClosed = false;
private boolean hasNulls = false;
private int readCachedBucket = -1;
@Nullable
private ByteBuffer readBufferCache = null;
public FrontCodedIntArrayIndexedWriter(
SegmentWriteOutMedium segmentWriteOutMedium,
ByteOrder byteOrder,
@ -107,7 +112,7 @@ public class FrontCodedIntArrayIndexedWriter implements DictionaryWriter<int[]>
}
@Override
public void write(@Nullable int[] value) throws IOException
public int write(@Nullable int[] value) throws IOException
{
if (prevObject != null && ARRAY_COMPARATOR.compare(prevObject, value) >= 0) {
@ -120,8 +125,11 @@ public class FrontCodedIntArrayIndexedWriter implements DictionaryWriter<int[]>
}
if (value == null) {
if (numWritten != 0) {
throw DruidException.defensive("Null must come first, got it at numWritten[%,d]!=0", numWritten);
}
hasNulls = true;
return;
return 0;
}
// if the bucket buffer is full, write the bucket
@ -147,8 +155,9 @@ public class FrontCodedIntArrayIndexedWriter implements DictionaryWriter<int[]>
bucketBuffer[numWritten % bucketSize] = value;
++numWritten;
int retVal = numWritten++;
prevObject = value;
return retVal + (hasNulls ? 1 : 0);
}
@ -206,6 +215,11 @@ public class FrontCodedIntArrayIndexedWriter implements DictionaryWriter<int[]>
return bucketBuffer[relativeIndex];
} else {
final int bucket = adjustedIndex >> div;
if (readCachedBucket == bucket) {
readBufferCache.position(0);
return getFromBucket(readBufferCache, relativeIndex);
}
long startOffset;
if (bucket == 0) {
startOffset = 0;
@ -217,10 +231,17 @@ public class FrontCodedIntArrayIndexedWriter implements DictionaryWriter<int[]>
if (currentBucketSize == 0) {
return null;
}
final ByteBuffer bucketBuffer = ByteBuffer.allocate(currentBucketSize).order(byteOrder);
valuesOut.readFully(startOffset, bucketBuffer);
bucketBuffer.clear();
return getFromBucket(bucketBuffer, relativeIndex);
if (readBufferCache == null || readBufferCache.capacity() < currentBucketSize) {
readBufferCache = ByteBuffer.allocate(currentBucketSize).order(byteOrder);
}
readBufferCache.clear();
readBufferCache.limit(currentBucketSize);
valuesOut.readFully(startOffset, readBufferCache);
readCachedBucket = bucket;
readBufferCache.position(0);
return getFromBucket(readBufferCache, relativeIndex);
}
}

View File

@ -242,7 +242,7 @@ public class GenericIndexedWriter<T> implements DictionaryWriter<T>
}
@Override
public void write(@Nullable T objectToWrite) throws IOException
public int write(@Nullable T objectToWrite) throws IOException
{
if (objectsSorted && prevObject != null && strategy.compare(prevObject, objectToWrite) >= 0) {
objectsSorted = false;
@ -263,7 +263,7 @@ public class GenericIndexedWriter<T> implements DictionaryWriter<T>
// Increment number of values written. Important to do this after the check above, since numWritten is
// accessed during "initializeHeaderOutLong" to determine the length of the header.
++numWritten;
int retVal = numWritten++;
if (!requireMultipleFiles) {
headerOut.writeInt(checkedCastNonnegativeLongToInt(valuesOut.size()));
@ -280,6 +280,7 @@ public class GenericIndexedWriter<T> implements DictionaryWriter<T>
if (objectsSorted) {
prevObject = objectToWrite;
}
return retVal;
}
@Nullable

View File

@ -71,4 +71,10 @@ public class LongsLongEncodingReader implements CompressionFactory.LongEncodingR
{
return new LongsLongEncodingReader(buffer.getByteBuffer(), buffer.getTypeByteOrder());
}
@Override
public CompressionFactory.LongEncodingStrategy getStrategy()
{
return CompressionFactory.LongEncodingStrategy.LONGS;
}
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.segment.data;
import org.apache.druid.segment.writeout.WriteOutBytes;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

View File

@ -84,7 +84,7 @@ public class RoaringBitmapSerdeFactory implements BitmapSerdeFactory
@Override
public byte[] toBytes(@Nullable ImmutableBitmap val)
{
if (val == null || val.size() == 0) {
if (val == null || val.isEmpty()) {
return new byte[]{};
}
return val.toBytes();

View File

@ -88,4 +88,10 @@ public class TableLongEncodingReader implements CompressionFactory.LongEncodingR
{
return new TableLongEncodingReader(buffer.duplicate(), table, bitsPerValue);
}
@Override
public CompressionFactory.LongEncodingStrategy getStrategy()
{
return CompressionFactory.LongEncodingStrategy.AUTO;
}
}

View File

@ -51,7 +51,7 @@ import java.util.EnumSet;
/**
* Value to dictionary id lookup, backed with memory mapped dictionaries populated lazily by the supplied
* @link DictionaryWriter}.
* {@link DictionaryWriter}.
*/
public final class DictionaryIdLookup implements Closeable
{

View File

@ -112,7 +112,11 @@ public final class MetaSerdeHelper<T>
public int size(T x)
{
return fieldWriters.stream().mapToInt(w -> w.size(x)).sum();
int retVal = 0;
for (FieldWriter<T> fieldWriter : fieldWriters) {
retVal += fieldWriter.size(x);
}
return retVal;
}
public interface FieldWriter<T>

View File

@ -22,6 +22,12 @@ package org.apache.druid.segment.serde.cell;
import java.io.Closeable;
import java.io.IOException;
/**
* An Iterator-like interface that is intentionally not extending Iterator. This is because it is Closeable
* and we never want to lose track of the fact that the object needs to be closed.
*
* @param <T>
*/
public interface IOIterator<T> extends Closeable
{
boolean hasNext() throws IOException;

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.error;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class ExceptionTest
{
@Test
public void testNoCause()
{
DruidException exception = DruidException.defensive().build("defensive");
StackTraceElement[] stackTrace = exception.getStackTrace();
for (StackTraceElement stackTraceElement : stackTrace) {
Assert.assertFalse(stackTraceElement.getClassName().startsWith(DruidException.CLASS_NAME_STR));
}
}
@Test
public void testNoStacktrace()
{
ErrorResponse errorResponse = new ErrorResponse(Forbidden.exception());
final Map<String, Object> asMap = errorResponse.getAsMap();
DruidException exception = ErrorResponse.fromMap(asMap).getUnderlyingException();
Assert.assertTrue(exception.getCause() instanceof DruidException);
Assert.assertEquals(0, exception.getCause().getStackTrace().length);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.query.rowsandcols.util.FindResult;
import org.junit.Assert;
import org.junit.Test;
public class LongArrayColumnTest
{
@Test
public void testLongArrayColumnWithLongValues()
{
Column column = new LongArrayColumn(new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
ColumnAccessor accessor = column.toAccessor();
for (int i = 0; i < 10; i++) {
Assert.assertFalse(accessor.isNull(i));
Assert.assertEquals(i, accessor.getLong(i));
Assert.assertEquals((long) i, accessor.getObject(i));
Assert.assertEquals(i, accessor.getDouble(i), 0);
Assert.assertEquals(i, accessor.getInt(i));
}
}
@Test
public void testFindLong()
{
Column column = new LongArrayColumn(new long[] {1, 1, 1, 3, 5, 5, 6, 7, 8, 9});
BinarySearchableAccessor accessor = (BinarySearchableAccessor) column.toAccessor();
FindResult findResult = accessor.findLong(0, accessor.numRows(), 1);
Assert.assertTrue(findResult.wasFound());
Assert.assertEquals(0, findResult.getStartRow());
Assert.assertEquals(3, findResult.getEndRow());
findResult = accessor.findLong(0, accessor.numRows(), 6);
Assert.assertTrue(findResult.wasFound());
Assert.assertEquals(6, findResult.getStartRow());
Assert.assertEquals(7, findResult.getEndRow());
Assert.assertFalse(accessor.findLong(0, accessor.numRows(), 2).wasFound());
Assert.assertFalse(accessor.findLong(0, 3, 9).wasFound());
}
@Test
public void testOtherTypeFinds()
{
Column column = new LongArrayColumn(new long[] {0, 1, 2, 3, 4, 5, Long.MAX_VALUE});
BinarySearchableAccessor accessor = (BinarySearchableAccessor) column.toAccessor();
FindResult findResult = accessor.findNull(0, accessor.numRows());
Assert.assertFalse(findResult.wasFound()); // Always false for long array columns
findResult = accessor.findDouble(0, accessor.numRows(), 3.0);
Assert.assertTrue(findResult.wasFound());
Assert.assertEquals(3, findResult.getStartRow());
Assert.assertEquals(4, findResult.getEndRow());
findResult = accessor.findFloat(0, accessor.numRows(), 1.0f);
Assert.assertTrue(findResult.wasFound());
Assert.assertEquals(1, findResult.getStartRow());
Assert.assertEquals(2, findResult.getEndRow());
}
}

View File

@ -24,6 +24,7 @@ import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.LongArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
@ -48,7 +49,7 @@ public class AppendableRowsAndColumnsTest extends SemanticTestBase
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
"colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
"colB", new LongArrayColumn(new long[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
)
));
@ -58,7 +59,7 @@ public class AppendableRowsAndColumnsTest extends SemanticTestBase
new RowsAndColumnsHelper()
.expectColumn("colA", new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.expectColumn("colB", new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
.expectColumn("colB", new long[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
.expectColumn("newCol", new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.allColumnsRegistered()
.validate(appender);

View File

@ -20,6 +20,8 @@
package org.apache.druid.segment.data;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.collections.bitmap.ConciseBitmapFactory;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
@ -46,4 +48,11 @@ public class BitmapSerdeFactoryTest
Assert.assertTrue(mapper.readValue("{\"type\":\"concise\"}", BitmapSerdeFactory.class) instanceof ConciseBitmapSerdeFactory);
Assert.assertTrue(mapper.readValue("{\"type\":\"BitmapSerde$SomeRandomClass\"}", BitmapSerdeFactory.class) instanceof RoaringBitmapSerdeFactory);
}
@Test
public void testForBitmapFactory()
{
Assert.assertTrue(BitmapSerde.forBitmapFactory(new RoaringBitmapFactory()) instanceof BitmapSerde.DefaultBitmapSerdeFactory);
Assert.assertTrue(BitmapSerde.forBitmapFactory(new ConciseBitmapFactory()) instanceof ConciseBitmapSerdeFactory);
}
}

View File

@ -436,7 +436,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
while (sortedStrings.hasNext()) {
final String next = sortedStrings.next();
final byte[] nextBytes = StringUtils.toUtf8Nullable(next);
writer.write(nextBytes);
Assert.assertEquals(index, writer.write(nextBytes));
if (nextBytes == null) {
Assert.assertNull(writer.get(index));
} else {