From e1a745717e0b008ca76a0a27e96d3097fc48a8f9 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 21 Jul 2019 15:23:47 +0200 Subject: [PATCH] Spotbugs: NP_STORE_INTO_NONNULL_FIELD (#8021) --- codestyle/spotbugs-exclude.xml | 1 - .../DoublesSketchBuildAggregator.java | 3 + .../DoublesSketchMergeAggregator.java | 3 + .../datasketches/theta/SketchAggregator.java | 4 + .../datasketches/theta/SketchHolder.java | 6 +- .../ArrayOfDoublesSketchBuildAggregator.java | 4 + ...yOfDoublesSketchBuildBufferAggregator.java | 3 + .../ArrayOfDoublesSketchMergeAggregator.java | 3 + .../query/groupby/GroupByQueryEngine.java | 7 +- .../epinephelinae/BufferArrayGrouper.java | 4 + .../epinephelinae/BufferHashGrouper.java | 41 +- .../epinephelinae/GroupByQueryEngineV2.java | 1 + .../epinephelinae/RowBasedGrouperHelper.java | 591 +++++++----------- .../apache/druid/segment/MetricHolder.java | 4 + .../SingleScanTimeDimensionSelector.java | 4 +- .../druid/segment/StringDimensionIndexer.java | 6 +- .../segment/StringDimensionMergerV9.java | 54 +- .../druid/segment/column/ColumnBuilder.java | 11 +- .../column/ColumnCapabilitiesImpl.java | 4 + .../segment/column/ColumnDescriptor.java | 4 + .../BlockLayoutColumnarDoublesSerializer.java | 3 + .../BlockLayoutColumnarFloatsSerializer.java | 3 + .../BlockLayoutColumnarLongsSerializer.java | 3 + .../druid/segment/data/ByteBufferWriter.java | 4 + .../CompressedColumnarIntsSerializer.java | 6 +- ...CompressedVSizeColumnarIntsSerializer.java | 7 +- .../druid/segment/data/GenericIndexed.java | 1 + .../segment/data/GenericIndexedWriter.java | 3 + .../IntermediateColumnarLongsSerializer.java | 6 +- .../segment/data/LongsLongEncodingWriter.java | 4 + .../data/VSizeColumnarIntsSerializer.java | 6 +- .../VSizeColumnarMultiIntsSerializer.java | 4 + .../druid/segment/data/VSizeLongSerde.java | 9 +- .../segment/incremental/IncrementalIndex.java | 86 ++- .../incremental/IncrementalIndexAdapter.java | 1 + .../IncrementalIndexRowHolder.java | 3 + .../incremental/OffheapIncrementalIndex.java | 5 + .../incremental/OnheapIncrementalIndex.java | 4 +- .../segment/serde/ComplexColumnPartSerde.java | 18 +- .../DictionaryEncodedColumnPartSerde.java | 10 +- .../serde/DoubleNumericColumnPartSerde.java | 35 +- .../serde/DoubleNumericColumnPartSerdeV2.java | 5 +- .../serde/FloatNumericColumnPartSerde.java | 2 + .../serde/FloatNumericColumnPartSerdeV2.java | 5 +- .../serde/LongNumericColumnPartSerde.java | 35 +- .../serde/LongNumericColumnPartSerdeV2.java | 11 +- .../EventReceiverFirehoseFactory.java | 5 +- .../realtime/firehose/PredicateFirehose.java | 1 + .../realtime/firehose/SqlFirehoseFactory.java | 3 + .../BatchDataSegmentAnnouncer.java | 24 +- .../BatchDataSegmentAnnouncerProvider.java | 6 +- .../coordination/ChangeRequestHttpSyncer.java | 5 +- .../coordination/SegmentLoadDropHandler.java | 95 ++- .../server/coordination/ZkCoordinator.java | 57 +- 54 files changed, 609 insertions(+), 624 deletions(-) diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml index e1fdbf71b05..44acdb862c1 100644 --- a/codestyle/spotbugs-exclude.xml +++ b/codestyle/spotbugs-exclude.xml @@ -70,7 +70,6 @@ - diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java index 18f94a9d13f..7e0ff890f46 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildAggregator.java @@ -24,11 +24,14 @@ import com.yahoo.sketches.quantiles.UpdateDoublesSketch; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.ColumnValueSelector; +import javax.annotation.Nullable; + public class DoublesSketchBuildAggregator implements Aggregator { private final ColumnValueSelector valueSelector; + @Nullable private UpdateDoublesSketch sketch; public DoublesSketchBuildAggregator(final ColumnValueSelector valueSelector, final int size) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java index 45980489239..8c19e3389d6 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java @@ -24,10 +24,13 @@ import com.yahoo.sketches.quantiles.DoublesUnion; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.ColumnValueSelector; +import javax.annotation.Nullable; + public class DoublesSketchMergeAggregator implements Aggregator { private final ColumnValueSelector selector; + @Nullable private DoublesUnion union; public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java index 075d8c7f3a4..ee1ddd20f7a 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java @@ -26,12 +26,16 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import javax.annotation.Nullable; + import java.util.List; public class SketchAggregator implements Aggregator { private final BaseObjectColumnValueSelector selector; private final int size; + + @Nullable private Union union; public SketchAggregator(BaseObjectColumnValueSelector selector, int size) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java index 9ac9056dfb8..9ba8cdafb32 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchHolder.java @@ -35,6 +35,8 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Comparator; @@ -49,7 +51,7 @@ public class SketchHolder ); public static final Comparator COMPARATOR = Ordering.from( - new Comparator() + new Comparator() { @Override public int compare(Object o1, Object o2) @@ -108,7 +110,9 @@ public class SketchHolder private final Object obj; + @Nullable private volatile Double cachedEstimate = null; + @Nullable private volatile Sketch cachedSketch = null; private SketchHolder(Object obj) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java index 11a30614834..2781bc58270 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java @@ -26,6 +26,8 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.data.IndexedInts; +import javax.annotation.Nullable; + import java.util.List; /** @@ -38,7 +40,9 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator private final DimensionSelector keySelector; private final BaseDoubleColumnValueSelector[] valueSelectors; + @Nullable private double[] values; // not part of the state, but to reuse in aggregate() method + @Nullable private ArrayOfDoublesUpdatableSketch sketch; public ArrayOfDoublesSketchBuildAggregator( diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java index 051d59fca69..1c79df8b627 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java @@ -30,6 +30,8 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.data.IndexedInts; +import javax.annotation.Nullable; + import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.List; @@ -50,6 +52,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat private final BaseDoubleColumnValueSelector[] valueSelectors; private final int nominalEntries; private final int maxIntermediateSize; + @Nullable private double[] values; // not part of the state, but to reuse in aggregate() method private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java index 3f99ce958ce..db33fb9c547 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java @@ -25,6 +25,8 @@ import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import javax.annotation.Nullable; + /** * This aggregator merges existing sketches. * The input column contains ArrayOfDoublesSketch. @@ -34,6 +36,7 @@ public class ArrayOfDoublesSketchMergeAggregator implements Aggregator { private final BaseObjectColumnValueSelector selector; + @Nullable private ArrayOfDoublesUnion union; public ArrayOfDoublesSketchMergeAggregator( diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java index 1383b202d35..96a0b76e40c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryEngine.java @@ -298,14 +298,13 @@ public class GroupByQueryEngine private final ByteBuffer metricsBuffer; private final int maxIntermediateRows; - private final List dimensionSpecs; private final List dimensions; private final ArrayList dimNames; - private final List aggregatorSpecs; private final BufferAggregator[] aggregators; private final String[] metricNames; private final int[] sizesRequired; + @Nullable private List unprocessedKeys; private Iterator delegate; @@ -320,7 +319,7 @@ public class GroupByQueryEngine unprocessedKeys = null; delegate = Collections.emptyIterator(); - dimensionSpecs = query.getDimensions(); + List dimensionSpecs = query.getDimensions(); dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); @@ -340,7 +339,7 @@ public class GroupByQueryEngine dimNames.add(dimSpec.getOutputName()); } - aggregatorSpecs = query.getAggregatorSpecs(); + List aggregatorSpecs = query.getAggregatorSpecs(); aggregators = new BufferAggregator[aggregatorSpecs.size()]; metricNames = new String[aggregatorSpecs.size()]; sizesRequired = new int[aggregatorSpecs.size()]; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 7fd34bf2288..a0fc9378441 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -28,6 +28,8 @@ import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -59,7 +61,9 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper private ByteBuffer valBuffer; // Scratch objects used by aggregateVector(). Only set if initVectorized() is called. + @Nullable private int[] vAggregationPositions = null; + @Nullable private int[] vAggregationRows = null; static long requiredBufferCapacity( diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java index 17995798f1a..a695066b499 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -28,10 +28,11 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.AggregatorFactory; +import javax.annotation.Nullable; + import java.nio.ByteBuffer; import java.util.AbstractList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.NoSuchElementException; import java.util.function.ToIntFunction; @@ -42,7 +43,6 @@ public class BufferHashGrouper extends AbstractBufferHashGrouper extends AbstractBufferHashGrouper extends AbstractBufferHashGrouper extends AbstractBufferHashGrouper extends AbstractBufferHashGrouper() - { - @Override - public int compare(Integer lhs, Integer rhs) - { - final ByteBuffer tableBuffer = hashTable.getTableBuffer(); - return comparator.compare( - tableBuffer, - tableBuffer, - lhs + HASH_SIZE, - rhs + HASH_SIZE - ); - } + (lhs, rhs) -> { + final ByteBuffer tableBuffer = hashTable.getTableBuffer(); + return comparator.compare( + tableBuffer, + tableBuffer, + lhs + HASH_SIZE, + rhs + HASH_SIZE + ); } ); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index a39441d5745..836f36fc76a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -363,6 +363,7 @@ public class GroupByQueryEngineV2 protected final GroupByColumnSelectorPlus[] dims; protected final DateTime timestamp; + @Nullable protected CloseableGrouperIterator delegate = null; protected final boolean allSingleValueDims; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index de5da6222fb..2c75c354e31 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -256,35 +256,27 @@ public class RowBasedGrouperHelper valueTypes ); - final Accumulator accumulator = new Accumulator() - { - @Override - public AggregateResult accumulate( - final AggregateResult priorResult, - final Row row - ) - { - BaseQuery.checkInterrupted(); + final Accumulator accumulator = (priorResult, row) -> { + BaseQuery.checkInterrupted(); - if (priorResult != null && !priorResult.isOk()) { - // Pass-through error returns without doing more work. - return priorResult; - } - - if (!grouper.isInitialized()) { - grouper.init(); - } - - columnSelectorRow.set(row); - - final Comparable[] key = new Comparable[keySize]; - valueExtractFn.apply(row, key); - - final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key)); - columnSelectorRow.set(null); - - return aggregateResult; + if (priorResult != null && !priorResult.isOk()) { + // Pass-through error returns without doing more work. + return priorResult; } + + if (!grouper.isInitialized()) { + grouper.init(); + } + + columnSelectorRow.set(row); + + final Comparable[] key = new Comparable[keySize]; + valueExtractFn.apply(row, key); + + final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key)); + columnSelectorRow.set(null); + + return aggregateResult; }; return new Pair<>(grouper, accumulator); @@ -302,33 +294,12 @@ public class RowBasedGrouperHelper { if (isInputRaw) { if (query.getGranularity() instanceof AllGranularity) { - return new TimestampExtractFunction() - { - @Override - public long apply(Row row) - { - return query.getIntervals().get(0).getStartMillis(); - } - }; + return row -> query.getIntervals().get(0).getStartMillis(); } else { - return new TimestampExtractFunction() - { - @Override - public long apply(Row row) - { - return query.getGranularity().bucketStart(row.getTimestamp()).getMillis(); - } - }; + return row -> query.getGranularity().bucketStart(row.getTimestamp()).getMillis(); } } else { - return new TimestampExtractFunction() - { - @Override - public long apply(Row row) - { - return row.getTimestampFromEpoch(); - } - }; + return Row::getTimestampFromEpoch; } } @@ -358,60 +329,40 @@ public class RowBasedGrouperHelper ); if (includeTimestamp) { - return new ValueExtractFunction() - { - @Override - public Comparable[] apply(Row row, Comparable[] key) - { - key[0] = timestampExtractFn.apply(row); - for (int i = 1; i < key.length; i++) { - final Comparable val = inputRawSuppliers[i - 1].get(); - key[i] = valueConvertFns[i - 1].apply(val); - } - return key; + return (row, key) -> { + key[0] = timestampExtractFn.apply(row); + for (int i = 1; i < key.length; i++) { + final Comparable val = inputRawSuppliers[i - 1].get(); + key[i] = valueConvertFns[i - 1].apply(val); } + return key; }; } else { - return new ValueExtractFunction() - { - @Override - public Comparable[] apply(Row row, Comparable[] key) - { - for (int i = 0; i < key.length; i++) { - final Comparable val = inputRawSuppliers[i].get(); - key[i] = valueConvertFns[i].apply(val); - } - return key; + return (row, key) -> { + for (int i = 0; i < key.length; i++) { + final Comparable val = inputRawSuppliers[i].get(); + key[i] = valueConvertFns[i].apply(val); } + return key; }; } } else { if (includeTimestamp) { - return new ValueExtractFunction() - { - @Override - public Comparable[] apply(Row row, Comparable[] key) - { - key[0] = timestampExtractFn.apply(row); - for (int i = 1; i < key.length; i++) { - final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName()); - key[i] = valueConvertFns[i - 1].apply(val); - } - return key; + return (row, key) -> { + key[0] = timestampExtractFn.apply(row); + for (int i = 1; i < key.length; i++) { + final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName()); + key[i] = valueConvertFns[i - 1].apply(val); } + return key; }; } else { - return new ValueExtractFunction() - { - @Override - public Comparable[] apply(Row row, Comparable[] key) - { - for (int i = 0; i < key.length; i++) { - final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName()); - key[i] = valueConvertFns[i].apply(val); - } - return key; + return (row, key) -> { + for (int i = 0; i < key.length; i++) { + final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName()); + key[i] = valueConvertFns[i].apply(val); } + return key; }; } } @@ -437,56 +388,51 @@ public class RowBasedGrouperHelper return new CloseableGrouperIterator<>( grouper.iterator(true), - new Function, Row>() - { - @Override - public Row apply(Grouper.Entry entry) - { - Map theMap = Maps.newLinkedHashMap(); + entry -> { + Map theMap = Maps.newLinkedHashMap(); - // Get timestamp, maybe. - final DateTime timestamp; - final int dimStart; + // Get timestamp, maybe. + final DateTime timestamp; + final int dimStart; - if (includeTimestamp) { - timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0]))); - dimStart = 1; - } else { - timestamp = null; - dimStart = 0; - } - - // Add dimensions. - if (dimsToInclude == null) { - for (int i = dimStart; i < entry.getKey().getKey().length; i++) { - Object dimVal = entry.getKey().getKey()[i]; - theMap.put( - query.getDimensions().get(i - dimStart).getOutputName(), - dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal - ); - } - } else { - Map dimensions = new HashMap<>(); - for (int i = dimStart; i < entry.getKey().getKey().length; i++) { - Object dimVal = entry.getKey().getKey()[i]; - dimensions.put( - query.getDimensions().get(i - dimStart).getOutputName(), - dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal - ); - } - - for (String dimToInclude : dimsToInclude) { - theMap.put(dimToInclude, dimensions.get(dimToInclude)); - } - } - - // Add aggregations. - for (int i = 0; i < entry.getValues().length; i++) { - theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); - } - - return new MapBasedRow(timestamp, theMap); + if (includeTimestamp) { + timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0]))); + dimStart = 1; + } else { + timestamp = null; + dimStart = 0; } + + // Add dimensions. + if (dimsToInclude == null) { + for (int i = dimStart; i < entry.getKey().getKey().length; i++) { + Object dimVal = entry.getKey().getKey()[i]; + theMap.put( + query.getDimensions().get(i - dimStart).getOutputName(), + dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal + ); + } + } else { + Map dimensions = new HashMap<>(); + for (int i = dimStart; i < entry.getKey().getKey().length; i++) { + Object dimVal = entry.getKey().getKey()[i]; + dimensions.put( + query.getDimensions().get(i - dimStart).getOutputName(), + dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal + ); + } + + for (String dimToInclude : dimsToInclude) { + theMap.put(dimToInclude, dimensions.get(dimToInclude)); + } + } + + // Add aggregations. + for (int i = 0; i < entry.getValues().length; i++) { + theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); + } + + return new MapBasedRow(timestamp, theMap); }, closeable ); @@ -713,47 +659,30 @@ public class RowBasedGrouperHelper if (includeTimestamp) { if (sortByDimsFirst) { - return new Comparator>() - { - @Override - public int compare(Grouper.Entry entry1, Grouper.Entry entry2) - { - final int cmp = compareDimsInRows(entry1.getKey(), entry2.getKey(), 1); - if (cmp != 0) { - return cmp; - } - - return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]); + return (entry1, entry2) -> { + final int cmp = compareDimsInRows(entry1.getKey(), entry2.getKey(), 1); + if (cmp != 0) { + return cmp; } + + return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]); }; } else { - return new Comparator>() - { - @Override - public int compare(Grouper.Entry entry1, Grouper.Entry entry2) - { - final int timeCompare = Longs.compare( - (long) entry1.getKey().getKey()[0], - (long) entry2.getKey().getKey()[0] - ); + return (entry1, entry2) -> { + final int timeCompare = Longs.compare( + (long) entry1.getKey().getKey()[0], + (long) entry2.getKey().getKey()[0] + ); - if (timeCompare != 0) { - return timeCompare; - } - - return compareDimsInRows(entry1.getKey(), entry2.getKey(), 1); + if (timeCompare != 0) { + return timeCompare; } + + return compareDimsInRows(entry1.getKey(), entry2.getKey(), 1); }; } } else { - return new Comparator>() - { - @Override - public int compare(Grouper.Entry entry1, Grouper.Entry entry2) - { - return compareDimsInRows(entry1.getKey(), entry2.getKey(), 0); - } - }; + return (entry1, entry2) -> compareDimsInRows(entry1.getKey(), entry2.getKey(), 0); } } @@ -804,74 +733,57 @@ public class RowBasedGrouperHelper if (includeTimestamp) { if (sortByDimsFirst) { - return new Comparator>() - { - @Override - public int compare(Grouper.Entry entry1, Grouper.Entry entry2) - { - final int cmp = compareDimsInRowsWithAggs( - entry1, - entry2, - 1, - needsReverses, - aggFlags, - fieldIndices, - isNumericField, - comparators - ); - if (cmp != 0) { - return cmp; - } - - return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]); - } - }; - } else { - return new Comparator>() - { - @Override - public int compare(Grouper.Entry entry1, Grouper.Entry entry2) - { - final int timeCompare = Longs.compare( - (long) entry1.getKey().getKey()[0], - (long) entry2.getKey().getKey()[0] - ); - - if (timeCompare != 0) { - return timeCompare; - } - - return compareDimsInRowsWithAggs( - entry1, - entry2, - 1, - needsReverses, - aggFlags, - fieldIndices, - isNumericField, - comparators - ); - } - }; - } - } else { - return new Comparator>() - { - @Override - public int compare(Grouper.Entry entry1, Grouper.Entry entry2) - { - return compareDimsInRowsWithAggs( + return (entry1, entry2) -> { + final int cmp = compareDimsInRowsWithAggs( entry1, entry2, - 0, + 1, needsReverses, aggFlags, fieldIndices, isNumericField, comparators ); - } - }; + if (cmp != 0) { + return cmp; + } + + return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]); + }; + } else { + return (entry1, entry2) -> { + final int timeCompare = Longs.compare( + (long) entry1.getKey().getKey()[0], + (long) entry2.getKey().getKey()[0] + ); + + if (timeCompare != 0) { + return timeCompare; + } + + return compareDimsInRowsWithAggs( + entry1, + entry2, + 1, + needsReverses, + aggFlags, + fieldIndices, + isNumericField, + comparators + ); + }; + } + } else { + return (entry1, entry2) -> compareDimsInRowsWithAggs( + entry1, + entry2, + 0, + needsReverses, + aggFlags, + fieldIndices, + isNumericField, + comparators + ); } } @@ -981,6 +893,7 @@ public class RowBasedGrouperHelper // dictionary id -> rank of the sorted dictionary // This is initialized in the constructor and bufferComparator() with static dictionary and dynamic dictionary, // respectively. + @Nullable private int[] rankOfDictionaryIds = null; RowBasedKeySerde( @@ -1118,68 +1031,53 @@ public class RowBasedGrouperHelper if (includeTimestamp) { if (sortByDimsFirst) { - return new Grouper.BufferComparator() - { - @Override - public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) - { - final int cmp = compareDimsInBuffersForNullFudgeTimestamp( - serdeHelperComparators, - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - if (cmp != 0) { - return cmp; - } - - return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + final int cmp = compareDimsInBuffersForNullFudgeTimestamp( + serdeHelperComparators, + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + if (cmp != 0) { + return cmp; } + + return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); }; } else { - return new Grouper.BufferComparator() - { - @Override - public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) - { - final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); - if (timeCompare != 0) { - return timeCompare; - } - - return compareDimsInBuffersForNullFudgeTimestamp( - serdeHelperComparators, - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); + if (timeCompare != 0) { + return timeCompare; } + + return compareDimsInBuffersForNullFudgeTimestamp( + serdeHelperComparators, + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); }; } } else { - return new Grouper.BufferComparator() - { - @Override - public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) - { - for (int i = 0; i < dimCount; i++) { - final int cmp = serdeHelperComparators[i].compare( - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + for (int i = 0; i < dimCount; i++) { + final int cmp = serdeHelperComparators[i].compare( + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); - if (cmp != 0) { - return cmp; - } + if (cmp != 0) { + return cmp; } - - return 0; } + + return 0; }; } } @@ -1246,84 +1144,69 @@ public class RowBasedGrouperHelper if (includeTimestamp) { if (sortByDimsFirst) { - return new Grouper.BufferComparator() - { - @Override - public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) - { - final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( - adjustedSerdeHelperComparators, - needsReverses, - fieldCount, - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - if (cmp != 0) { - return cmp; - } - - return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); - } - }; - } else { - return new Grouper.BufferComparator() - { - @Override - public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) - { - final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); - - if (timeCompare != 0) { - return timeCompare; - } - - int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( - adjustedSerdeHelperComparators, - needsReverses, - fieldCount, - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( + adjustedSerdeHelperComparators, + needsReverses, + fieldCount, + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + if (cmp != 0) { return cmp; } + + return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + }; + } else { + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition)); + + if (timeCompare != 0) { + return timeCompare; + } + + int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( + adjustedSerdeHelperComparators, + needsReverses, + fieldCount, + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); + + return cmp; }; } } else { - return new Grouper.BufferComparator() - { - @Override - public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) - { - for (int i = 0; i < fieldCount; i++) { - final int cmp; - if (needsReverses.get(i)) { - cmp = adjustedSerdeHelperComparators[i].compare( - rhsBuffer, - lhsBuffer, - rhsPosition, - lhsPosition - ); - } else { - cmp = adjustedSerdeHelperComparators[i].compare( - lhsBuffer, - rhsBuffer, - lhsPosition, - rhsPosition - ); - } - - if (cmp != 0) { - return cmp; - } + return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> { + for (int i = 0; i < fieldCount; i++) { + final int cmp; + if (needsReverses.get(i)) { + cmp = adjustedSerdeHelperComparators[i].compare( + rhsBuffer, + lhsBuffer, + rhsPosition, + lhsPosition + ); + } else { + cmp = adjustedSerdeHelperComparators[i].compare( + lhsBuffer, + rhsBuffer, + lhsPosition, + rhsPosition + ); } - return 0; + if (cmp != 0) { + return cmp; + } } + + return 0; }; } } diff --git a/processing/src/main/java/org/apache/druid/segment/MetricHolder.java b/processing/src/main/java/org/apache/druid/segment/MetricHolder.java index 770573509e2..8f4ee20ab21 100644 --- a/processing/src/main/java/org/apache/druid/segment/MetricHolder.java +++ b/processing/src/main/java/org/apache/druid/segment/MetricHolder.java @@ -26,6 +26,8 @@ import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.serde.ComplexMetricSerde; import org.apache.druid.segment.serde.ComplexMetrics; +import javax.annotation.Nullable; + import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -96,7 +98,9 @@ public class MetricHolder private final String name; private final String typeName; private final MetricType type; + @Nullable CompressedColumnarFloatsSupplier floatType = null; + @Nullable GenericIndexed complexType = null; private MetricHolder( diff --git a/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java b/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java index 8a1be5a9eb6..1cf3efb2458 100644 --- a/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/SingleScanTimeDimensionSelector.java @@ -46,10 +46,12 @@ public class SingleScanTimeDimensionSelector implements DimensionSelector private final List timeValues = new ArrayList<>(); private final SingleIndexedInt row = new SingleIndexedInt(); - private String currentValue = null; private long currentTimestamp = Long.MIN_VALUE; private int index = -1; + @Nullable + private String currentValue = null; + public SingleScanTimeDimensionSelector( BaseLongColumnValueSelector selector, @Nullable ExtractionFn extractionFn, diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java index cf201e4f9b2..332da92ad52 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java @@ -232,9 +232,11 @@ public class StringDimensionIndexer implements DimensionIndexer dimValuesList = (List) dimValues; + List dimValuesList = (List) dimValues; if (dimValuesList.isEmpty()) { dimLookup.add(null); encodedDimensionValues = IntArrays.EMPTY_ARRAY; diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java index 050163bb7f7..63405020498 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java @@ -77,27 +77,36 @@ public class StringDimensionMergerV9 implements DimensionMergerV9 private static final Indexed NULL_STR_DIM_VAL = new ListIndexed<>(Collections.singletonList(null)); private static final Splitter SPLITTER = Splitter.on(","); - private ColumnarIntsSerializer encodedValueSerializer; - - private String dimensionName; - private GenericIndexedWriter dictionaryWriter; - private String firstDictionaryValue; - private int dictionarySize; - private GenericIndexedWriter bitmapWriter; - private ByteBufferWriter spatialWriter; - private ArrayList dimConversions; - private int cardinality = 0; - private boolean hasNull = false; - private MutableBitmap nullRowsBitmap; - private final SegmentWriteOutMedium segmentWriteOutMedium; - private int rowCount = 0; - private ColumnCapabilities capabilities; - private List adapters; - private final IndexSpec indexSpec; - private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator; - + private final String dimensionName; private final ProgressIndicator progress; private final Closer closer; + private final IndexSpec indexSpec; + private final SegmentWriteOutMedium segmentWriteOutMedium; + private final MutableBitmap nullRowsBitmap; + private final ColumnCapabilities capabilities; + + private int dictionarySize; + private int rowCount = 0; + private int cardinality = 0; + private boolean hasNull = false; + + @Nullable + private GenericIndexedWriter bitmapWriter; + @Nullable + private ByteBufferWriter spatialWriter; + @Nullable + private ArrayList dimConversions; + @Nullable + private List adapters; + @Nullable + private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator; + @Nullable + private ColumnarIntsSerializer encodedValueSerializer; + @Nullable + private GenericIndexedWriter dictionaryWriter; + @Nullable + private String firstDictionaryValue; + public StringDimensionMergerV9( String dimensionName, @@ -537,13 +546,12 @@ public class StringDimensionMergerV9 implements DimensionMergerV9 .withBitmapIndex(bitmapWriter) .withSpatialIndex(spatialWriter) .withByteOrder(IndexIO.BYTE_ORDER); - final ColumnDescriptor serdeficator = builder - .addSerde(partBuilder.build()) - .build(); //log.info("Completed dimension column[%s] in %,d millis.", dimensionName, System.currentTimeMillis() - dimStartTime); - return serdeficator; + return builder + .addSerde(partBuilder.build()) + .build(); } protected interface IndexSeeker diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java index ce081dffe9a..86b9b86e361 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java @@ -23,18 +23,25 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import javax.annotation.Nullable; + /** */ public class ColumnBuilder { + @Nullable private ValueType type = null; private boolean hasMultipleValues = false; private boolean filterable = false; - - private Supplier columnSupplier = null; private boolean dictionaryEncoded = false; + + @Nullable + private Supplier columnSupplier = null; + @Nullable private Supplier bitmapIndex = null; + @Nullable private Supplier spatialIndex = null; + @Nullable private SmooshedFileMapper fileMapper = null; public ColumnBuilder setFileMapper(SmooshedFileMapper fileMapper) diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java index 620a0f6e45c..94cdbe3d850 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilitiesImpl.java @@ -23,12 +23,16 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.ISE; +import javax.annotation.Nullable; + /** * */ public class ColumnCapabilitiesImpl implements ColumnCapabilities { + @Nullable private ValueType type = null; + private boolean dictionaryEncoded = false; private boolean runLengthEncoded = false; private boolean hasInvertedIndexes = false; diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java index 1307640fcf9..fbb5b1d1bbc 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java @@ -28,6 +28,8 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.segment.serde.ColumnPartSerde; import org.apache.druid.segment.serde.Serializer; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; @@ -111,7 +113,9 @@ public class ColumnDescriptor implements Serializer public static class Builder { + @Nullable private ValueType valueType = null; + @Nullable private Boolean hasMultipleValues = null; private final List parts = new ArrayList<>(); diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java index 8c2db09fefa..3558473a931 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java @@ -25,6 +25,8 @@ import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -45,6 +47,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri private final CompressionStrategy compression; private int numInserted = 0; + @Nullable private ByteBuffer endBuffer; BlockLayoutColumnarDoublesSerializer( diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java index 84d2e26bbb6..aa225b80efe 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java @@ -25,6 +25,8 @@ import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -45,6 +47,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial private final CompressionStrategy compression; private int numInserted = 0; + @Nullable private ByteBuffer endBuffer; BlockLayoutColumnarFloatsSerializer( diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java index c1851c73d38..778a06140de 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java @@ -24,6 +24,8 @@ import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -47,6 +49,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ private int numInserted = 0; private int numInsertedForNextFlush; + @Nullable private ByteBuffer endBuffer; BlockLayoutColumnarLongsSerializer( diff --git a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java index 71529440113..ee1e7813dca 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java @@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.Serializer; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.WriteOutBytes; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.channels.WritableByteChannel; @@ -36,7 +38,9 @@ public class ByteBufferWriter implements Serializer private final SegmentWriteOutMedium segmentWriteOutMedium; private final ObjectStrategy strategy; + @Nullable private WriteOutBytes headerOut = null; + @Nullable private WriteOutBytes valueOut = null; public ByteBufferWriter(SegmentWriteOutMedium segmentWriteOutMedium, ObjectStrategy strategy) diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java index dacf62f27b4..a5db9e276e2 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -45,9 +47,11 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer private final int chunkFactor; private final CompressionStrategy compression; private final GenericIndexedWriter flattener; - private ByteBuffer endBuffer; private int numInserted; + @Nullable + private ByteBuffer endBuffer; + CompressedColumnarIntsSerializer( final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java index ffc1ee84a6c..72f9fc988b3 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java @@ -25,6 +25,8 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -67,10 +69,11 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn private final CompressionStrategy compression; private final GenericIndexedWriter flattener; private final ByteBuffer intBuffer; - - private ByteBuffer endBuffer; private int numInserted; + @Nullable + private ByteBuffer endBuffer; + CompressedVSizeColumnarIntsSerializer( final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java index e44a200e1bc..8a2bc5773e2 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java @@ -198,6 +198,7 @@ public class GenericIndexed implements CloseableIndexed, Serializer private int logBaseTwoOfElementsPerValueFile; private int relativeIndexMask; + @Nullable private final ByteBuffer theBuffer; /** diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index 77bcd916f81..120b36bd903 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -140,10 +140,13 @@ public class GenericIndexedWriter implements Serializer private boolean objectsSorted = true; @Nullable private T prevObject = null; + @Nullable private WriteOutBytes headerOut = null; + @Nullable private WriteOutBytes valuesOut = null; private int numWritten = 0; private boolean requireMultipleFiles = false; + @Nullable private LongList headerOutLong; private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES); diff --git a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java index e57e81df00a..e08e463dd44 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java @@ -27,6 +27,8 @@ import it.unimi.dsi.fastutil.longs.LongList; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; @@ -42,7 +44,6 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali private final String filenameBase; private final ByteOrder order; private final CompressionStrategy compression; - private LongList tempOut = null; private int numInserted = 0; @@ -52,6 +53,9 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali private long maxVal = Long.MIN_VALUE; private long minVal = Long.MAX_VALUE; + @Nullable + private LongList tempOut = null; + @Nullable private ColumnarLongsSerializer delegate; IntermediateColumnarLongsSerializer( diff --git a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java index b421105f91f..2aeb194d9a8 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingWriter.java @@ -21,6 +21,8 @@ package org.apache.druid.segment.data; import org.apache.druid.segment.writeout.WriteOutBytes; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -31,7 +33,9 @@ public class LongsLongEncodingWriter implements CompressionFactory.LongEncodingW private final ByteBuffer orderBuffer; private final ByteOrder order; + @Nullable private ByteBuffer outBuffer = null; + @Nullable private OutputStream outStream = null; public LongsLongEncodingWriter(ByteOrder order) diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java index 90d4c2033cc..ff646fc98d0 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarIntsSerializer.java @@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.WriteOutBytes; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; @@ -46,9 +48,11 @@ public class VSizeColumnarIntsSerializer extends SingleValueColumnarIntsSerializ private final int numBytes; private final ByteBuffer helperBuffer = ByteBuffer.allocate(Integer.BYTES); - private WriteOutBytes valuesOut = null; private boolean bufPaddingWritten = false; + @Nullable + private WriteOutBytes valuesOut = null; + public VSizeColumnarIntsSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final int maxValue) { this.segmentWriteOutMedium = segmentWriteOutMedium; diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java index d183d357437..6a85a46d56d 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java @@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.WriteOutBytes; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.channels.WritableByteChannel; @@ -83,7 +85,9 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize private final WriteInt writeInt; private final SegmentWriteOutMedium segmentWriteOutMedium; + @Nullable private WriteOutBytes headerOut = null; + @Nullable private WriteOutBytes valuesOut = null; private int numWritten = 0; private boolean numBytesForMaxWritten = false; diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java index 3bfa68ac281..6741be5de9e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java @@ -21,6 +21,8 @@ package org.apache.druid.segment.data; import org.apache.druid.java.util.common.IAE; +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; @@ -189,6 +191,7 @@ public class VSizeLongSerde private static final class Size1Ser implements LongSerializer { + @Nullable OutputStream output = null; ByteBuffer buffer; byte curByte = 0; @@ -242,6 +245,7 @@ public class VSizeLongSerde private static final class Size2Ser implements LongSerializer { + @Nullable OutputStream output = null; ByteBuffer buffer; byte curByte = 0; @@ -295,8 +299,8 @@ public class VSizeLongSerde private static final class Mult4Ser implements LongSerializer { - - OutputStream output = null; + @Nullable + OutputStream output; ByteBuffer buffer; int numBytes; byte curByte = 0; @@ -361,6 +365,7 @@ public class VSizeLongSerde private static final class Mult8Ser implements LongSerializer { + @Nullable OutputStream output; ByteBuffer buffer; int numBytes; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 515c47571f4..8e21296b90d 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -330,6 +330,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp public static class Builder { + @Nullable private IncrementalIndexSchema incrementalIndexSchema; private boolean deserializeComplexMetrics; private boolean reportParseExceptions; @@ -505,8 +506,8 @@ public abstract class IncrementalIndex extends AbstractIndex imp static class IncrementalIndexRowResult { - private IncrementalIndexRow incrementalIndexRow; - private List parseExceptionMessages; + private final IncrementalIndexRow incrementalIndexRow; + private final List parseExceptionMessages; IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List parseExceptionMessages) { @@ -527,9 +528,9 @@ public abstract class IncrementalIndex extends AbstractIndex imp static class AddToFactsResult { - private int rowCount; + private final int rowCount; private final long bytesInMemory; - private List parseExceptionMessages; + private final List parseExceptionMessages; public AddToFactsResult( int rowCount, @@ -997,53 +998,48 @@ public abstract class IncrementalIndex extends AbstractIndex imp public Iterable iterableWithPostAggregations(final List postAggs, final boolean descending) { - return new Iterable() - { - @Override - public Iterator iterator() - { - final List dimensions = getDimensions(); + return () -> { + final List dimensions = getDimensions(); - return Iterators.transform( - getFacts().iterator(descending), - incrementalIndexRow -> { - final int rowOffset = incrementalIndexRow.getRowIndex(); + return Iterators.transform( + getFacts().iterator(descending), + incrementalIndexRow -> { + final int rowOffset = incrementalIndexRow.getRowIndex(); - Object[] theDims = incrementalIndexRow.getDims(); + Object[] theDims = incrementalIndexRow.getDims(); - Map theVals = Maps.newLinkedHashMap(); - for (int i = 0; i < theDims.length; ++i) { - Object dim = theDims[i]; - DimensionDesc dimensionDesc = dimensions.get(i); - if (dimensionDesc == null) { - continue; - } - String dimensionName = dimensionDesc.getName(); - DimensionHandler handler = dimensionDesc.getHandler(); - if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) { - theVals.put(dimensionName, null); - continue; - } - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim); - theVals.put(dimensionName, rowVals); + Map theVals = Maps.newLinkedHashMap(); + for (int i = 0; i < theDims.length; ++i) { + Object dim = theDims[i]; + DimensionDesc dimensionDesc = dimensions.get(i); + if (dimensionDesc == null) { + continue; } - - AggregatorType[] aggs = getAggsForRow(rowOffset); - for (int i = 0; i < aggs.length; ++i) { - theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i)); + String dimensionName = dimensionDesc.getName(); + DimensionHandler handler = dimensionDesc.getHandler(); + if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) { + theVals.put(dimensionName, null); + continue; } - - if (postAggs != null) { - for (PostAggregator postAgg : postAggs) { - theVals.put(postAgg.getName(), postAgg.compute(theVals)); - } - } - - return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals); + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim); + theVals.put(dimensionName, rowVals); } - ); - } + + AggregatorType[] aggs = getAggsForRow(rowOffset); + for (int i = 0; i < aggs.length; ++i) { + theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i)); + } + + if (postAggs != null) { + for (PostAggregator postAgg : postAggs) { + theVals.put(postAgg.getName(), postAgg.compute(theVals)); + } + } + + return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals); + } + ); }; } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java index 2493bdd391e..b896c9ec927 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java @@ -48,6 +48,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter private static class DimensionAccessor { private final IncrementalIndex.DimensionDesc dimensionDesc; + @Nullable private final MutableBitmap[] invertedIndexes; private final DimensionIndexer indexer; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java index b9180ddd5f7..c46ef559b00 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRowHolder.java @@ -22,6 +22,8 @@ package org.apache.druid.segment.incremental; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.LongColumnSelector; +import javax.annotation.Nullable; + /** * IncrementalIndexRowHolder is a simple {@link #get}/{@link #set} holder of {@link IncrementalIndexRow}. It is used * to implement various machinery around {@link IncrementalIndex}, e. g. {@link @@ -33,6 +35,7 @@ import org.apache.druid.segment.LongColumnSelector; */ public class IncrementalIndexRowHolder implements LongColumnSelector { + @Nullable private IncrementalIndexRow currEntry = null; public IncrementalIndexRow get() diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java index 95c88fc9606..ac48901d09e 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java @@ -33,6 +33,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.segment.ColumnSelectorFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -59,14 +61,17 @@ public class OffheapIncrementalIndex extends IncrementalIndex protected final int maxRowCount; + @Nullable private volatile Map selectors; //given a ByteBuffer and an offset where all aggregates for a row are stored //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate //is stored + @Nullable private volatile int[] aggOffsetInBuffer; private volatile int aggsTotalSize; + @Nullable private String outOfRowsReason = null; OffheapIncrementalIndex( diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index 80e21a08c49..8e5c55b046a 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -60,8 +60,10 @@ public class OnheapIncrementalIndex extends IncrementalIndex private final long maxBytesPerRowForAggregators; protected final int maxRowCount; protected final long maxBytesInMemory; - private volatile Map selectors; + @Nullable + private volatile Map selectors; + @Nullable private String outOfRowsReason = null; OnheapIncrementalIndex( diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java index c3623515681..f10f53baf79 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java @@ -22,16 +22,15 @@ package org.apache.druid.segment.serde; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.segment.GenericColumnSerializer; -import org.apache.druid.segment.column.ColumnBuilder; -import org.apache.druid.segment.column.ColumnConfig; -import java.nio.ByteBuffer; +import javax.annotation.Nullable; /** */ public class ComplexColumnPartSerde implements ColumnPartSerde { private final String typeName; + @Nullable private final ComplexMetricSerde serde; private final Serializer serializer; @@ -70,21 +69,18 @@ public class ComplexColumnPartSerde implements ColumnPartSerde @Override public Deserializer getDeserializer() { - return new Deserializer() - { - @Override - public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig) - { - if (serde != null) { - serde.deserializeColumn(buffer, builder); - } + return (buffer, builder, columnConfig) -> { + if (serde != null) { + serde.deserializeColumn(buffer, builder); } }; } public static class SerializerBuilder { + @Nullable private String typeName = null; + @Nullable private GenericColumnSerializer delegate = null; public SerializerBuilder withTypeName(final String typeName) diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java index 136eca692fd..7f0bc3da3be 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java @@ -143,13 +143,21 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde public static class SerializerBuilder { - private VERSION version = null; private int flags = STARTING_FLAGS; + + @Nullable + private VERSION version = null; + @Nullable private GenericIndexedWriter dictionaryWriter = null; + @Nullable private ColumnarIntsSerializer valueWriter = null; + @Nullable private BitmapSerdeFactory bitmapSerdeFactory = null; + @Nullable private GenericIndexedWriter bitmapIndexWriter = null; + @Nullable private ByteBufferWriter spatialIndexWriter = null; + @Nullable private ByteOrder byteOrder = null; public SerializerBuilder withDictionary(GenericIndexedWriter dictionaryWriter) diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java index 82ebfc2d683..5a08e91e22c 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerde.java @@ -23,14 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Supplier; import org.apache.druid.segment.IndexIO; -import org.apache.druid.segment.column.ColumnBuilder; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.ColumnarDoubles; import org.apache.druid.segment.data.CompressedColumnarDoublesSuppliers; import javax.annotation.Nullable; -import java.nio.ByteBuffer; + import java.nio.ByteOrder; public class DoubleNumericColumnPartSerde implements ColumnPartSerde @@ -66,7 +64,9 @@ public class DoubleNumericColumnPartSerde implements ColumnPartSerde public static class SerializerBuilder { + @Nullable private ByteOrder byteOrder = null; + @Nullable private Serializer delegate = null; public SerializerBuilder withByteOrder(final ByteOrder byteOrder) @@ -97,24 +97,19 @@ public class DoubleNumericColumnPartSerde implements ColumnPartSerde @Override public Deserializer getDeserializer() { - return new Deserializer() - { - @Override - public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig) - { - final Supplier column = CompressedColumnarDoublesSuppliers.fromByteBuffer( - buffer, - byteOrder - ); - DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier( - column, - IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() - ); - builder.setType(ValueType.DOUBLE) - .setHasMultipleValues(false) - .setNumericColumnSupplier(columnSupplier); + return (buffer, builder, columnConfig) -> { + final Supplier column = CompressedColumnarDoublesSuppliers.fromByteBuffer( + buffer, + byteOrder + ); + DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier( + column, + IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() + ); + builder.setType(ValueType.DOUBLE) + .setHasMultipleValues(false) + .setNumericColumnSupplier(columnSupplier); - } }; } } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java index b822dbf0dcc..2262184d9f5 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/DoubleNumericColumnPartSerdeV2.java @@ -54,7 +54,7 @@ public class DoubleNumericColumnPartSerdeV2 implements ColumnPartSerde private final ByteOrder byteOrder; @Nullable - private Serializer serializer; + private final Serializer serializer; private final BitmapSerdeFactory bitmapSerdeFactory; public DoubleNumericColumnPartSerdeV2( @@ -87,8 +87,11 @@ public class DoubleNumericColumnPartSerdeV2 implements ColumnPartSerde public static class SerializerBuilder { + @Nullable private ByteOrder byteOrder = null; + @Nullable private Serializer delegate = null; + @Nullable private BitmapSerdeFactory bitmapSerdeFactory = null; public SerializerBuilder withByteOrder(final ByteOrder byteOrder) diff --git a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java index 46cc40b62e9..02ac221be32 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerde.java @@ -66,7 +66,9 @@ public class FloatNumericColumnPartSerde implements ColumnPartSerde public static class SerializerBuilder { + @Nullable private ByteOrder byteOrder = null; + @Nullable private Serializer delegate = null; public SerializerBuilder withByteOrder(final ByteOrder byteOrder) diff --git a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java index ccc748a210a..b3c71db94a5 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/FloatNumericColumnPartSerdeV2.java @@ -52,7 +52,7 @@ public class FloatNumericColumnPartSerdeV2 implements ColumnPartSerde private final ByteOrder byteOrder; @Nullable - private Serializer serializer; + private final Serializer serializer; private final BitmapSerdeFactory bitmapSerdeFactory; private FloatNumericColumnPartSerdeV2( @@ -85,8 +85,11 @@ public class FloatNumericColumnPartSerdeV2 implements ColumnPartSerde public static class SerializerBuilder { + @Nullable private ByteOrder byteOrder = null; + @Nullable private Serializer delegate = null; + @Nullable private BitmapSerdeFactory bitmapSerdeFactory = null; public SerializerBuilder withByteOrder(final ByteOrder byteOrder) diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java index 2dda2eb3c00..3884875cf9c 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerde.java @@ -22,13 +22,11 @@ package org.apache.druid.segment.serde; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.segment.IndexIO; -import org.apache.druid.segment.column.ColumnBuilder; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.CompressedColumnarLongsSupplier; import javax.annotation.Nullable; -import java.nio.ByteBuffer; + import java.nio.ByteOrder; /** @@ -66,7 +64,9 @@ public class LongNumericColumnPartSerde implements ColumnPartSerde public static class SerializerBuilder { + @Nullable private ByteOrder byteOrder = null; + @Nullable private Serializer delegate = null; public SerializerBuilder withByteOrder(final ByteOrder byteOrder) @@ -97,23 +97,18 @@ public class LongNumericColumnPartSerde implements ColumnPartSerde @Override public Deserializer getDeserializer() { - return new Deserializer() - { - @Override - public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig) - { - final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer( - buffer, - byteOrder - ); - LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier( - column, - IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() - ); - builder.setType(ValueType.LONG) - .setHasMultipleValues(false) - .setNumericColumnSupplier(columnSupplier); - } + return (buffer, builder, columnConfig) -> { + final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer( + buffer, + byteOrder + ); + LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier( + column, + IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() + ); + builder.setType(ValueType.LONG) + .setHasMultipleValues(false) + .setNumericColumnSupplier(columnSupplier); }; } } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java index ad27d81260f..7a46c51eb39 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/LongNumericColumnPartSerdeV2.java @@ -50,14 +50,16 @@ public class LongNumericColumnPartSerdeV2 implements ColumnPartSerde ); } + @Nullable + private final Serializer serializer; + @Nullable private final ByteOrder byteOrder; @Nullable - private Serializer serializer; private final BitmapSerdeFactory bitmapSerdeFactory; private LongNumericColumnPartSerdeV2( - ByteOrder byteOrder, - BitmapSerdeFactory bitmapSerdeFactory, + @Nullable ByteOrder byteOrder, + @Nullable BitmapSerdeFactory bitmapSerdeFactory, @Nullable Serializer serializer ) { @@ -85,8 +87,11 @@ public class LongNumericColumnPartSerdeV2 implements ColumnPartSerde public static class SerializerBuilder { + @Nullable private ByteOrder byteOrder = null; + @Nullable private Serializer delegate = null; + @Nullable private BitmapSerdeFactory bitmapSerdeFactory = null; public SerializerBuilder withByteOrder(final ByteOrder byteOrder) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index d8c1ef9f02f..6842f8a26bc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -104,7 +104,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory> parser) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java index b8c6b85581b..446efa3ed1b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java @@ -40,6 +40,7 @@ public class PredicateFirehose implements Firehose private final Firehose firehose; private final Predicate predicate; + @Nullable private InputRow savedInputRow = null; public PredicateFirehose(Firehose firehose, Predicate predicate) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java index 43738f8f684..866be521fe8 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseFactory.java @@ -35,6 +35,8 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.ResultSetException; import org.skife.jdbi.v2.exceptions.StatementException; +import javax.annotation.Nullable; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -51,6 +53,7 @@ public class SqlFirehoseFactory extends PrefetchSqlFirehoseFactory { @JsonProperty private final List sqls; + @Nullable @JsonProperty private final MetadataStorageConnectorConfig connectorConfig; private final ObjectMapper objectMapper; diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java index fd53a1c9898..5c521bb1f1d 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -69,7 +69,8 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer private final ConcurrentMap segmentLookup = new ConcurrentHashMap<>(); private final Function segmentTransformer; - private final ChangeRequestHistory changes = new ChangeRequestHistory(); + private final ChangeRequestHistory changes = new ChangeRequestHistory<>(); + @Nullable private final SegmentZNode dummyZnode; @Inject @@ -87,20 +88,15 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer this.server = server; this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName()); - segmentTransformer = new Function() - { - @Override - public DataSegment apply(DataSegment input) - { - DataSegment rv = input; - if (config.isSkipDimensionsAndMetrics()) { - rv = rv.withDimensions(null).withMetrics(null); - } - if (config.isSkipLoadSpec()) { - rv = rv.withLoadSpec(null); - } - return rv; + segmentTransformer = input -> { + DataSegment rv = input; + if (config.isSkipDimensionsAndMetrics()) { + rv = rv.withDimensions(null).withMetrics(null); } + if (config.isSkipLoadSpec()) { + rv = rv.withLoadSpec(null); + } + return rv; }; if (this.config.isSkipSegmentAnnouncementOnZk()) { diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java index b4acbf71a4e..b521594d12d 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncerProvider.java @@ -21,15 +21,15 @@ package org.apache.druid.server.coordination; import com.fasterxml.jackson.annotation.JacksonInject; -import javax.validation.constraints.NotNull; +import javax.annotation.Nullable; /** */ public class BatchDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider { @JacksonInject - @NotNull - private BatchDataSegmentAnnouncer batchAnnouncer = null; + @Nullable + private final BatchDataSegmentAnnouncer batchAnnouncer = null; @Override public DataSegmentAnnouncer get() diff --git a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java index 8f38f9fa1bf..fc466d11491 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java @@ -39,6 +39,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Duration; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletResponse; import java.io.InputStream; import java.net.URL; @@ -84,12 +85,14 @@ public class ChangeRequestHttpSyncer private final LifecycleLock startStopLock = new LifecycleLock(); private final String logIdentity; - private ChangeRequestHistory.Counter counter = null; private long unstableStartTime = -1; private int consecutiveFailedAttemptCount = 0; private long lastSuccessfulSyncTime = 0; private long lastSyncTime = 0; + @Nullable + private ChangeRequestHistory.Counter counter = null; + public ChangeRequestHttpSyncer( ObjectMapper smileMapper, HttpClient httpClient, diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index c2a58bafd4d..cb08182e37f 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -44,6 +44,8 @@ import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -240,14 +242,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler addSegments( cachedSegments, - new DataSegmentChangeCallback() - { - @Override - public void execute() - { - log.info("Cache load took %,d ms", System.currentTimeMillis() - start); - } - } + () -> log.info("Cache load took %,d ms", System.currentTimeMillis() - start) ); } @@ -348,35 +343,30 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>(); for (final DataSegment segment : segments) { loadingExecutor.submit( - new Runnable() - { - @Override - public void run() - { + () -> { + try { + log.info( + "Loading segment[%d/%d][%s]", + counter.incrementAndGet(), + numSegments, + segment.getId() + ); + loadSegment(segment, callback); try { - log.info( - "Loading segment[%d/%d][%s]", - counter.incrementAndGet(), - numSegments, - segment.getId() - ); - loadSegment(segment, callback); - try { - backgroundSegmentAnnouncer.announceSegment(segment); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } + backgroundSegmentAnnouncer.announceSegment(segment); } - catch (SegmentLoadingException e) { - log.error(e, "[%s] failed to load", segment.getId()); - failedSegments.add(segment); - } - finally { - latch.countDown(); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); } } + catch (SegmentLoadingException e) { + log.error(e, "[%s] failed to load", segment.getId()); + failedSegments.add(segment); + } + finally { + latch.countDown(); + } } ); } @@ -427,28 +417,23 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler announcer.unannounceSegment(segment); segmentsToDelete.add(segment); - Runnable runnable = new Runnable() - { - @Override - public void run() - { - try { - synchronized (segmentDeleteLock) { - if (segmentsToDelete.remove(segment)) { - segmentManager.dropSegment(segment); + Runnable runnable = () -> { + try { + synchronized (segmentDeleteLock) { + if (segmentsToDelete.remove(segment)) { + segmentManager.dropSegment(segment); - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); - } + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); } } } - catch (Exception e) { - log.makeAlert(e, "Failed to remove segment! Possible resource leak!") - .addData("segment", segment) - .emit(); - } + } + catch (Exception e) { + log.makeAlert(e, "Failed to remove segment! Possible resource leak!") + .addData("segment", segment) + .emit(); } }; @@ -543,7 +528,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler ); } }, - () -> resolveWaitingFutures() + this::resolveWaitingFutures ); } return requestStatuses.getIfPresent(changeRequest); @@ -588,7 +573,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler private final Object lock = new Object(); private volatile boolean finished = false; + @Nullable private volatile ScheduledFuture startedAnnouncing = null; + @Nullable private volatile ScheduledFuture nextAnnoucement = null; public BackgroundSegmentAnnouncer( @@ -755,6 +742,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler } private final STATE state; + @Nullable private final String failureCause; public static final Status SUCCESS = new Status(STATE.SUCCESS, null); @@ -763,7 +751,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler @JsonCreator Status( @JsonProperty("state") STATE state, - @JsonProperty("failureCause") String failureCause + @JsonProperty("failureCause") @Nullable String failureCause ) { Preconditions.checkNotNull(state, "state must be non-null"); @@ -782,6 +770,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler return state; } + @Nullable @JsonProperty public String getFailureCause() { diff --git a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java index ca56b10024e..734de202ce3 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java @@ -25,8 +25,6 @@ import com.google.inject.Inject; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -35,6 +33,8 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.server.initialization.ZkPathsConfig; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -54,6 +54,7 @@ public class ZkCoordinator private final DruidServerMetadata me; private final CuratorFramework curator; + @Nullable private volatile PathChildrenCache loadQueueCache; private volatile boolean started = false; private final ExecutorService segmentLoadUnloadService; @@ -107,22 +108,17 @@ public class ZkCoordinator curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); loadQueueCache.getListenable().addListener( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) - { - final ChildData child = event.getData(); - switch (event.getType()) { - case CHILD_ADDED: - childAdded(child); - break; - case CHILD_REMOVED: - log.info("zNode[%s] was removed", event.getData().getPath()); - break; - default: - log.info("Ignoring event[%s]", event); - } + (client, event) -> { + final ChildData child = event.getData(); + switch (event.getType()) { + case CHILD_ADDED: + childAdded(child); + break; + case CHILD_REMOVED: + log.info("zNode[%s] was removed", event.getData().getPath()); + break; + default: + log.info("Ignoring event[%s]", event); } } @@ -151,25 +147,20 @@ public class ZkCoordinator finalRequest.go( dataSegmentChangeHandler, - new DataSegmentChangeCallback() - { - @Override - public void execute() - { + () -> { + try { + curator.delete().guaranteed().forPath(path); + log.info("Completed request [%s]", finalRequest.asString()); + } + catch (Exception e) { try { curator.delete().guaranteed().forPath(path); - log.info("Completed request [%s]", finalRequest.asString()); } - catch (Exception e) { - try { - curator.delete().guaranteed().forPath(path); - } - catch (Exception e1) { - log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); - } - log.error(e, "Exception while removing zNode[%s]", path); - throw new RuntimeException(e); + catch (Exception e1) { + log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); } + log.error(e, "Exception while removing zNode[%s]", path); + throw new RuntimeException(e); } } );