Spotbugs: NP_STORE_INTO_NONNULL_FIELD (#8021)

This commit is contained in:
Fokko Driesprong 2019-07-21 15:23:47 +02:00 committed by Benedict Jin
parent f24e2f16af
commit e1a745717e
54 changed files with 609 additions and 624 deletions

View File

@ -70,7 +70,6 @@
<Bug pattern="NP_NULL_PARAM_DEREF"/> <Bug pattern="NP_NULL_PARAM_DEREF"/>
<Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL"/> <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL"/>
<Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/> <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
<Bug pattern="NP_STORE_INTO_NONNULL_FIELD"/>
<Bug pattern="NS_DANGEROUS_NON_SHORT_CIRCUIT"/> <Bug pattern="NS_DANGEROUS_NON_SHORT_CIRCUIT"/>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION"/> <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
<Bug pattern="OS_OPEN_STREAM"/> <Bug pattern="OS_OPEN_STREAM"/>

View File

@ -24,11 +24,14 @@ import com.yahoo.sketches.quantiles.UpdateDoublesSketch;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
public class DoublesSketchBuildAggregator implements Aggregator public class DoublesSketchBuildAggregator implements Aggregator
{ {
private final ColumnValueSelector<Double> valueSelector; private final ColumnValueSelector<Double> valueSelector;
@Nullable
private UpdateDoublesSketch sketch; private UpdateDoublesSketch sketch;
public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size) public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size)

View File

@ -24,10 +24,13 @@ import com.yahoo.sketches.quantiles.DoublesUnion;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
public class DoublesSketchMergeAggregator implements Aggregator public class DoublesSketchMergeAggregator implements Aggregator
{ {
private final ColumnValueSelector selector; private final ColumnValueSelector selector;
@Nullable
private DoublesUnion union; private DoublesUnion union;
public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k) public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k)

View File

@ -26,12 +26,16 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
public class SketchAggregator implements Aggregator public class SketchAggregator implements Aggregator
{ {
private final BaseObjectColumnValueSelector selector; private final BaseObjectColumnValueSelector selector;
private final int size; private final int size;
@Nullable
private Union union; private Union union;
public SketchAggregator(BaseObjectColumnValueSelector selector, int size) public SketchAggregator(BaseObjectColumnValueSelector selector, int size)

View File

@ -35,6 +35,8 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
@ -49,7 +51,7 @@ public class SketchHolder
); );
public static final Comparator<Object> COMPARATOR = Ordering.from( public static final Comparator<Object> COMPARATOR = Ordering.from(
new Comparator() new Comparator<Object>()
{ {
@Override @Override
public int compare(Object o1, Object o2) public int compare(Object o1, Object o2)
@ -108,7 +110,9 @@ public class SketchHolder
private final Object obj; private final Object obj;
@Nullable
private volatile Double cachedEstimate = null; private volatile Double cachedEstimate = null;
@Nullable
private volatile Sketch cachedSketch = null; private volatile Sketch cachedSketch = null;
private SketchHolder(Object obj) private SketchHolder(Object obj)

View File

@ -26,6 +26,8 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
/** /**
@ -38,7 +40,9 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator
private final DimensionSelector keySelector; private final DimensionSelector keySelector;
private final BaseDoubleColumnValueSelector[] valueSelectors; private final BaseDoubleColumnValueSelector[] valueSelectors;
@Nullable
private double[] values; // not part of the state, but to reuse in aggregate() method private double[] values; // not part of the state, but to reuse in aggregate() method
@Nullable
private ArrayOfDoublesUpdatableSketch sketch; private ArrayOfDoublesUpdatableSketch sketch;
public ArrayOfDoublesSketchBuildAggregator( public ArrayOfDoublesSketchBuildAggregator(

View File

@ -30,6 +30,8 @@ import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.List; import java.util.List;
@ -50,6 +52,7 @@ public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregat
private final BaseDoubleColumnValueSelector[] valueSelectors; private final BaseDoubleColumnValueSelector[] valueSelectors;
private final int nominalEntries; private final int nominalEntries;
private final int maxIntermediateSize; private final int maxIntermediateSize;
@Nullable
private double[] values; // not part of the state, but to reuse in aggregate() method private double[] values; // not part of the state, but to reuse in aggregate() method
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES); private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);

View File

@ -25,6 +25,8 @@ import com.yahoo.sketches.tuple.ArrayOfDoublesUnion;
import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector;
import javax.annotation.Nullable;
/** /**
* This aggregator merges existing sketches. * This aggregator merges existing sketches.
* The input column contains ArrayOfDoublesSketch. * The input column contains ArrayOfDoublesSketch.
@ -34,6 +36,7 @@ public class ArrayOfDoublesSketchMergeAggregator implements Aggregator
{ {
private final BaseObjectColumnValueSelector<ArrayOfDoublesSketch> selector; private final BaseObjectColumnValueSelector<ArrayOfDoublesSketch> selector;
@Nullable
private ArrayOfDoublesUnion union; private ArrayOfDoublesUnion union;
public ArrayOfDoublesSketchMergeAggregator( public ArrayOfDoublesSketchMergeAggregator(

View File

@ -298,14 +298,13 @@ public class GroupByQueryEngine
private final ByteBuffer metricsBuffer; private final ByteBuffer metricsBuffer;
private final int maxIntermediateRows; private final int maxIntermediateRows;
private final List<DimensionSpec> dimensionSpecs;
private final List<DimensionSelector> dimensions; private final List<DimensionSelector> dimensions;
private final ArrayList<String> dimNames; private final ArrayList<String> dimNames;
private final List<AggregatorFactory> aggregatorSpecs;
private final BufferAggregator[] aggregators; private final BufferAggregator[] aggregators;
private final String[] metricNames; private final String[] metricNames;
private final int[] sizesRequired; private final int[] sizesRequired;
@Nullable
private List<ByteBuffer> unprocessedKeys; private List<ByteBuffer> unprocessedKeys;
private Iterator<Row> delegate; private Iterator<Row> delegate;
@ -320,7 +319,7 @@ public class GroupByQueryEngine
unprocessedKeys = null; unprocessedKeys = null;
delegate = Collections.emptyIterator(); delegate = Collections.emptyIterator();
dimensionSpecs = query.getDimensions(); List<DimensionSpec> dimensionSpecs = query.getDimensions();
dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
@ -340,7 +339,7 @@ public class GroupByQueryEngine
dimNames.add(dimSpec.getOutputName()); dimNames.add(dimSpec.getOutputName());
} }
aggregatorSpecs = query.getAggregatorSpecs(); List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
aggregators = new BufferAggregator[aggregatorSpecs.size()]; aggregators = new BufferAggregator[aggregatorSpecs.size()];
metricNames = new String[aggregatorSpecs.size()]; metricNames = new String[aggregatorSpecs.size()];
sizesRequired = new int[aggregatorSpecs.size()]; sizesRequired = new int[aggregatorSpecs.size()];

View File

@ -28,6 +28,8 @@ import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
@ -59,7 +61,9 @@ public class BufferArrayGrouper implements VectorGrouper, IntGrouper
private ByteBuffer valBuffer; private ByteBuffer valBuffer;
// Scratch objects used by aggregateVector(). Only set if initVectorized() is called. // Scratch objects used by aggregateVector(). Only set if initVectorized() is called.
@Nullable
private int[] vAggregationPositions = null; private int[] vAggregationPositions = null;
@Nullable
private int[] vAggregationRows = null; private int[] vAggregationRows = null;
static long requiredBufferCapacity( static long requiredBufferCapacity(

View File

@ -28,10 +28,11 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.AbstractList; import java.util.AbstractList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.function.ToIntFunction; import java.util.function.ToIntFunction;
@ -42,7 +43,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
private static final int DEFAULT_INITIAL_BUCKETS = 1024; private static final int DEFAULT_INITIAL_BUCKETS = 1024;
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f; private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
private ByteBuffer buffer;
private boolean initialized = false; private boolean initialized = false;
// The BufferHashGrouper normally sorts by all fields of the grouping key with lexicographic ascending order. // The BufferHashGrouper normally sorts by all fields of the grouping key with lexicographic ascending order.
@ -54,16 +54,17 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query. // to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query.
private final boolean useDefaultSorting; private final boolean useDefaultSorting;
// Track the offsets of used buckets using this list. @Nullable
// When a new bucket is initialized by initializeNewBucketKey(), an offset is added to this list.
// When expanding the table, the list is reset() and filled with the new offsets of the copied buckets.
private ByteBuffer offsetListBuffer;
private ByteBufferIntList offsetList; private ByteBufferIntList offsetList;
// Scratch objects used by aggregateVector(). Only set if initVectorized() is called. // Scratch objects used by aggregateVector(). Only set if initVectorized() is called.
@Nullable
private ByteBuffer vKeyBuffer = null; private ByteBuffer vKeyBuffer = null;
@Nullable
private int[] vKeyHashCodes = null; private int[] vKeyHashCodes = null;
@Nullable
private int[] vAggregationPositions = null; private int[] vAggregationPositions = null;
@Nullable
private int[] vAggregationRows = null; private int[] vAggregationRows = null;
public BufferHashGrouper( public BufferHashGrouper(
@ -93,7 +94,7 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
public void init() public void init()
{ {
if (!initialized) { if (!initialized) {
this.buffer = bufferSupplier.get(); ByteBuffer buffer = bufferSupplier.get();
int hashTableSize = ByteBufferHashTable.calculateTableArenaSizeWithPerBucketAdditionalSize( int hashTableSize = ByteBufferHashTable.calculateTableArenaSizeWithPerBucketAdditionalSize(
buffer.capacity(), buffer.capacity(),
@ -106,7 +107,10 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
hashTableBuffer.limit(hashTableSize); hashTableBuffer.limit(hashTableSize);
hashTableBuffer = hashTableBuffer.slice(); hashTableBuffer = hashTableBuffer.slice();
offsetListBuffer = buffer.duplicate(); // Track the offsets of used buckets using this list.
// When a new bucket is initialized by initializeNewBucketKey(), an offset is added to this list.
// When expanding the table, the list is reset() and filled with the new offsets of the copied buckets.
ByteBuffer offsetListBuffer = buffer.duplicate();
offsetListBuffer.position(hashTableSize); offsetListBuffer.position(hashTableSize);
offsetListBuffer.limit(buffer.capacity()); offsetListBuffer.limit(buffer.capacity());
offsetListBuffer = offsetListBuffer.slice(); offsetListBuffer = offsetListBuffer.slice();
@ -315,19 +319,14 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// Sort offsets in-place. // Sort offsets in-place.
Collections.sort( Collections.sort(
wrappedOffsets, wrappedOffsets,
new Comparator<Integer>() (lhs, rhs) -> {
{ final ByteBuffer tableBuffer = hashTable.getTableBuffer();
@Override return comparator.compare(
public int compare(Integer lhs, Integer rhs) tableBuffer,
{ tableBuffer,
final ByteBuffer tableBuffer = hashTable.getTableBuffer(); lhs + HASH_SIZE,
return comparator.compare( rhs + HASH_SIZE
tableBuffer, );
tableBuffer,
lhs + HASH_SIZE,
rhs + HASH_SIZE
);
}
} }
); );

View File

@ -363,6 +363,7 @@ public class GroupByQueryEngineV2
protected final GroupByColumnSelectorPlus[] dims; protected final GroupByColumnSelectorPlus[] dims;
protected final DateTime timestamp; protected final DateTime timestamp;
@Nullable
protected CloseableGrouperIterator<KeyType, Row> delegate = null; protected CloseableGrouperIterator<KeyType, Row> delegate = null;
protected final boolean allSingleValueDims; protected final boolean allSingleValueDims;

View File

@ -256,35 +256,27 @@ public class RowBasedGrouperHelper
valueTypes valueTypes
); );
final Accumulator<AggregateResult, Row> accumulator = new Accumulator<AggregateResult, Row>() final Accumulator<AggregateResult, Row> accumulator = (priorResult, row) -> {
{ BaseQuery.checkInterrupted();
@Override
public AggregateResult accumulate(
final AggregateResult priorResult,
final Row row
)
{
BaseQuery.checkInterrupted();
if (priorResult != null && !priorResult.isOk()) { if (priorResult != null && !priorResult.isOk()) {
// Pass-through error returns without doing more work. // Pass-through error returns without doing more work.
return priorResult; return priorResult;
}
if (!grouper.isInitialized()) {
grouper.init();
}
columnSelectorRow.set(row);
final Comparable[] key = new Comparable[keySize];
valueExtractFn.apply(row, key);
final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key));
columnSelectorRow.set(null);
return aggregateResult;
} }
if (!grouper.isInitialized()) {
grouper.init();
}
columnSelectorRow.set(row);
final Comparable[] key = new Comparable[keySize];
valueExtractFn.apply(row, key);
final AggregateResult aggregateResult = grouper.aggregate(new RowBasedKey(key));
columnSelectorRow.set(null);
return aggregateResult;
}; };
return new Pair<>(grouper, accumulator); return new Pair<>(grouper, accumulator);
@ -302,33 +294,12 @@ public class RowBasedGrouperHelper
{ {
if (isInputRaw) { if (isInputRaw) {
if (query.getGranularity() instanceof AllGranularity) { if (query.getGranularity() instanceof AllGranularity) {
return new TimestampExtractFunction() return row -> query.getIntervals().get(0).getStartMillis();
{
@Override
public long apply(Row row)
{
return query.getIntervals().get(0).getStartMillis();
}
};
} else { } else {
return new TimestampExtractFunction() return row -> query.getGranularity().bucketStart(row.getTimestamp()).getMillis();
{
@Override
public long apply(Row row)
{
return query.getGranularity().bucketStart(row.getTimestamp()).getMillis();
}
};
} }
} else { } else {
return new TimestampExtractFunction() return Row::getTimestampFromEpoch;
{
@Override
public long apply(Row row)
{
return row.getTimestampFromEpoch();
}
};
} }
} }
@ -358,60 +329,40 @@ public class RowBasedGrouperHelper
); );
if (includeTimestamp) { if (includeTimestamp) {
return new ValueExtractFunction() return (row, key) -> {
{ key[0] = timestampExtractFn.apply(row);
@Override for (int i = 1; i < key.length; i++) {
public Comparable[] apply(Row row, Comparable[] key) final Comparable val = inputRawSuppliers[i - 1].get();
{ key[i] = valueConvertFns[i - 1].apply(val);
key[0] = timestampExtractFn.apply(row);
for (int i = 1; i < key.length; i++) {
final Comparable val = inputRawSuppliers[i - 1].get();
key[i] = valueConvertFns[i - 1].apply(val);
}
return key;
} }
return key;
}; };
} else { } else {
return new ValueExtractFunction() return (row, key) -> {
{ for (int i = 0; i < key.length; i++) {
@Override final Comparable val = inputRawSuppliers[i].get();
public Comparable[] apply(Row row, Comparable[] key) key[i] = valueConvertFns[i].apply(val);
{
for (int i = 0; i < key.length; i++) {
final Comparable val = inputRawSuppliers[i].get();
key[i] = valueConvertFns[i].apply(val);
}
return key;
} }
return key;
}; };
} }
} else { } else {
if (includeTimestamp) { if (includeTimestamp) {
return new ValueExtractFunction() return (row, key) -> {
{ key[0] = timestampExtractFn.apply(row);
@Override for (int i = 1; i < key.length; i++) {
public Comparable[] apply(Row row, Comparable[] key) final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName());
{ key[i] = valueConvertFns[i - 1].apply(val);
key[0] = timestampExtractFn.apply(row);
for (int i = 1; i < key.length; i++) {
final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName());
key[i] = valueConvertFns[i - 1].apply(val);
}
return key;
} }
return key;
}; };
} else { } else {
return new ValueExtractFunction() return (row, key) -> {
{ for (int i = 0; i < key.length; i++) {
@Override final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName());
public Comparable[] apply(Row row, Comparable[] key) key[i] = valueConvertFns[i].apply(val);
{
for (int i = 0; i < key.length; i++) {
final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName());
key[i] = valueConvertFns[i].apply(val);
}
return key;
} }
return key;
}; };
} }
} }
@ -437,56 +388,51 @@ public class RowBasedGrouperHelper
return new CloseableGrouperIterator<>( return new CloseableGrouperIterator<>(
grouper.iterator(true), grouper.iterator(true),
new Function<Grouper.Entry<RowBasedKey>, Row>() entry -> {
{ Map<String, Object> theMap = Maps.newLinkedHashMap();
@Override
public Row apply(Grouper.Entry<RowBasedKey> entry)
{
Map<String, Object> theMap = Maps.newLinkedHashMap();
// Get timestamp, maybe. // Get timestamp, maybe.
final DateTime timestamp; final DateTime timestamp;
final int dimStart; final int dimStart;
if (includeTimestamp) { if (includeTimestamp) {
timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0]))); timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0])));
dimStart = 1; dimStart = 1;
} else { } else {
timestamp = null; timestamp = null;
dimStart = 0; dimStart = 0;
}
// Add dimensions.
if (dimsToInclude == null) {
for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
Object dimVal = entry.getKey().getKey()[i];
theMap.put(
query.getDimensions().get(i - dimStart).getOutputName(),
dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
);
}
} else {
Map<String, Object> dimensions = new HashMap<>();
for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
Object dimVal = entry.getKey().getKey()[i];
dimensions.put(
query.getDimensions().get(i - dimStart).getOutputName(),
dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
);
}
for (String dimToInclude : dimsToInclude) {
theMap.put(dimToInclude, dimensions.get(dimToInclude));
}
}
// Add aggregations.
for (int i = 0; i < entry.getValues().length; i++) {
theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
}
return new MapBasedRow(timestamp, theMap);
} }
// Add dimensions.
if (dimsToInclude == null) {
for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
Object dimVal = entry.getKey().getKey()[i];
theMap.put(
query.getDimensions().get(i - dimStart).getOutputName(),
dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
);
}
} else {
Map<String, Object> dimensions = new HashMap<>();
for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
Object dimVal = entry.getKey().getKey()[i];
dimensions.put(
query.getDimensions().get(i - dimStart).getOutputName(),
dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
);
}
for (String dimToInclude : dimsToInclude) {
theMap.put(dimToInclude, dimensions.get(dimToInclude));
}
}
// Add aggregations.
for (int i = 0; i < entry.getValues().length; i++) {
theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
}
return new MapBasedRow(timestamp, theMap);
}, },
closeable closeable
); );
@ -713,47 +659,30 @@ public class RowBasedGrouperHelper
if (includeTimestamp) { if (includeTimestamp) {
if (sortByDimsFirst) { if (sortByDimsFirst) {
return new Comparator<Grouper.Entry<RowBasedKey>>() return (entry1, entry2) -> {
{ final int cmp = compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
@Override if (cmp != 0) {
public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2) return cmp;
{
final int cmp = compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
if (cmp != 0) {
return cmp;
}
return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
} }
return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
}; };
} else { } else {
return new Comparator<Grouper.Entry<RowBasedKey>>() return (entry1, entry2) -> {
{ final int timeCompare = Longs.compare(
@Override (long) entry1.getKey().getKey()[0],
public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2) (long) entry2.getKey().getKey()[0]
{ );
final int timeCompare = Longs.compare(
(long) entry1.getKey().getKey()[0],
(long) entry2.getKey().getKey()[0]
);
if (timeCompare != 0) { if (timeCompare != 0) {
return timeCompare; return timeCompare;
}
return compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
} }
return compareDimsInRows(entry1.getKey(), entry2.getKey(), 1);
}; };
} }
} else { } else {
return new Comparator<Grouper.Entry<RowBasedKey>>() return (entry1, entry2) -> compareDimsInRows(entry1.getKey(), entry2.getKey(), 0);
{
@Override
public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
{
return compareDimsInRows(entry1.getKey(), entry2.getKey(), 0);
}
};
} }
} }
@ -804,74 +733,57 @@ public class RowBasedGrouperHelper
if (includeTimestamp) { if (includeTimestamp) {
if (sortByDimsFirst) { if (sortByDimsFirst) {
return new Comparator<Grouper.Entry<RowBasedKey>>() return (entry1, entry2) -> {
{ final int cmp = compareDimsInRowsWithAggs(
@Override
public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
{
final int cmp = compareDimsInRowsWithAggs(
entry1,
entry2,
1,
needsReverses,
aggFlags,
fieldIndices,
isNumericField,
comparators
);
if (cmp != 0) {
return cmp;
}
return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
}
};
} else {
return new Comparator<Grouper.Entry<RowBasedKey>>()
{
@Override
public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
{
final int timeCompare = Longs.compare(
(long) entry1.getKey().getKey()[0],
(long) entry2.getKey().getKey()[0]
);
if (timeCompare != 0) {
return timeCompare;
}
return compareDimsInRowsWithAggs(
entry1,
entry2,
1,
needsReverses,
aggFlags,
fieldIndices,
isNumericField,
comparators
);
}
};
}
} else {
return new Comparator<Grouper.Entry<RowBasedKey>>()
{
@Override
public int compare(Grouper.Entry<RowBasedKey> entry1, Grouper.Entry<RowBasedKey> entry2)
{
return compareDimsInRowsWithAggs(
entry1, entry1,
entry2, entry2,
0, 1,
needsReverses, needsReverses,
aggFlags, aggFlags,
fieldIndices, fieldIndices,
isNumericField, isNumericField,
comparators comparators
); );
} if (cmp != 0) {
}; return cmp;
}
return Longs.compare((long) entry1.getKey().getKey()[0], (long) entry2.getKey().getKey()[0]);
};
} else {
return (entry1, entry2) -> {
final int timeCompare = Longs.compare(
(long) entry1.getKey().getKey()[0],
(long) entry2.getKey().getKey()[0]
);
if (timeCompare != 0) {
return timeCompare;
}
return compareDimsInRowsWithAggs(
entry1,
entry2,
1,
needsReverses,
aggFlags,
fieldIndices,
isNumericField,
comparators
);
};
}
} else {
return (entry1, entry2) -> compareDimsInRowsWithAggs(
entry1,
entry2,
0,
needsReverses,
aggFlags,
fieldIndices,
isNumericField,
comparators
);
} }
} }
@ -981,6 +893,7 @@ public class RowBasedGrouperHelper
// dictionary id -> rank of the sorted dictionary // dictionary id -> rank of the sorted dictionary
// This is initialized in the constructor and bufferComparator() with static dictionary and dynamic dictionary, // This is initialized in the constructor and bufferComparator() with static dictionary and dynamic dictionary,
// respectively. // respectively.
@Nullable
private int[] rankOfDictionaryIds = null; private int[] rankOfDictionaryIds = null;
RowBasedKeySerde( RowBasedKeySerde(
@ -1118,68 +1031,53 @@ public class RowBasedGrouperHelper
if (includeTimestamp) { if (includeTimestamp) {
if (sortByDimsFirst) { if (sortByDimsFirst) {
return new Grouper.BufferComparator() return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
{ final int cmp = compareDimsInBuffersForNullFudgeTimestamp(
@Override serdeHelperComparators,
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) lhsBuffer,
{ rhsBuffer,
final int cmp = compareDimsInBuffersForNullFudgeTimestamp( lhsPosition,
serdeHelperComparators, rhsPosition
lhsBuffer, );
rhsBuffer, if (cmp != 0) {
lhsPosition, return cmp;
rhsPosition
);
if (cmp != 0) {
return cmp;
}
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
} }
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
}; };
} else { } else {
return new Grouper.BufferComparator() return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
{ final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) { if (timeCompare != 0) {
return timeCompare; return timeCompare;
}
return compareDimsInBuffersForNullFudgeTimestamp(
serdeHelperComparators,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
} }
return compareDimsInBuffersForNullFudgeTimestamp(
serdeHelperComparators,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
}; };
} }
} else { } else {
return new Grouper.BufferComparator() return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
{ for (int i = 0; i < dimCount; i++) {
@Override final int cmp = serdeHelperComparators[i].compare(
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) lhsBuffer,
{ rhsBuffer,
for (int i = 0; i < dimCount; i++) { lhsPosition,
final int cmp = serdeHelperComparators[i].compare( rhsPosition
lhsBuffer, );
rhsBuffer,
lhsPosition,
rhsPosition
);
if (cmp != 0) { if (cmp != 0) {
return cmp; return cmp;
}
} }
return 0;
} }
return 0;
}; };
} }
} }
@ -1246,84 +1144,69 @@ public class RowBasedGrouperHelper
if (includeTimestamp) { if (includeTimestamp) {
if (sortByDimsFirst) { if (sortByDimsFirst) {
return new Grouper.BufferComparator() return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
{ final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
@Override adjustedSerdeHelperComparators,
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) needsReverses,
{ fieldCount,
final int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown( lhsBuffer,
adjustedSerdeHelperComparators, rhsBuffer,
needsReverses, lhsPosition,
fieldCount, rhsPosition
lhsBuffer, );
rhsBuffer, if (cmp != 0) {
lhsPosition,
rhsPosition
);
if (cmp != 0) {
return cmp;
}
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
}
};
} else {
return new Grouper.BufferComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
adjustedSerdeHelperComparators,
needsReverses,
fieldCount,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
return cmp; return cmp;
} }
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
};
} else {
return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
int cmp = compareDimsInBuffersForNullFudgeTimestampForPushDown(
adjustedSerdeHelperComparators,
needsReverses,
fieldCount,
lhsBuffer,
rhsBuffer,
lhsPosition,
rhsPosition
);
return cmp;
}; };
} }
} else { } else {
return new Grouper.BufferComparator() return (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
{ for (int i = 0; i < fieldCount; i++) {
@Override final int cmp;
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition) if (needsReverses.get(i)) {
{ cmp = adjustedSerdeHelperComparators[i].compare(
for (int i = 0; i < fieldCount; i++) { rhsBuffer,
final int cmp; lhsBuffer,
if (needsReverses.get(i)) { rhsPosition,
cmp = adjustedSerdeHelperComparators[i].compare( lhsPosition
rhsBuffer, );
lhsBuffer, } else {
rhsPosition, cmp = adjustedSerdeHelperComparators[i].compare(
lhsPosition lhsBuffer,
); rhsBuffer,
} else { lhsPosition,
cmp = adjustedSerdeHelperComparators[i].compare( rhsPosition
lhsBuffer, );
rhsBuffer,
lhsPosition,
rhsPosition
);
}
if (cmp != 0) {
return cmp;
}
} }
return 0; if (cmp != 0) {
return cmp;
}
} }
return 0;
}; };
} }
} }

View File

@ -26,6 +26,8 @@ import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.serde.ComplexMetricSerde; import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.serde.ComplexMetrics;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -96,7 +98,9 @@ public class MetricHolder
private final String name; private final String name;
private final String typeName; private final String typeName;
private final MetricType type; private final MetricType type;
@Nullable
CompressedColumnarFloatsSupplier floatType = null; CompressedColumnarFloatsSupplier floatType = null;
@Nullable
GenericIndexed<?> complexType = null; GenericIndexed<?> complexType = null;
private MetricHolder( private MetricHolder(

View File

@ -46,10 +46,12 @@ public class SingleScanTimeDimensionSelector implements DimensionSelector
private final List<String> timeValues = new ArrayList<>(); private final List<String> timeValues = new ArrayList<>();
private final SingleIndexedInt row = new SingleIndexedInt(); private final SingleIndexedInt row = new SingleIndexedInt();
private String currentValue = null;
private long currentTimestamp = Long.MIN_VALUE; private long currentTimestamp = Long.MIN_VALUE;
private int index = -1; private int index = -1;
@Nullable
private String currentValue = null;
public SingleScanTimeDimensionSelector( public SingleScanTimeDimensionSelector(
BaseLongColumnValueSelector selector, BaseLongColumnValueSelector selector,
@Nullable ExtractionFn extractionFn, @Nullable ExtractionFn extractionFn,

View File

@ -232,9 +232,11 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
private final DimensionDictionary dimLookup; private final DimensionDictionary dimLookup;
private final MultiValueHandling multiValueHandling; private final MultiValueHandling multiValueHandling;
private final boolean hasBitmapIndexes; private final boolean hasBitmapIndexes;
private SortedDimensionDictionary sortedLookup;
private boolean hasMultipleValues = false; private boolean hasMultipleValues = false;
@Nullable
private SortedDimensionDictionary sortedLookup;
public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes) public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes)
{ {
this.dimLookup = new DimensionDictionary(); this.dimLookup = new DimensionDictionary();
@ -252,7 +254,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
final int nullId = dimLookup.getId(null); final int nullId = dimLookup.getId(null);
encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new int[]{dimLookup.add(null)} : new int[]{nullId}; encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new int[]{dimLookup.add(null)} : new int[]{nullId};
} else if (dimValues instanceof List) { } else if (dimValues instanceof List) {
List<Object> dimValuesList = (List) dimValues; List<Object> dimValuesList = (List<Object>) dimValues;
if (dimValuesList.isEmpty()) { if (dimValuesList.isEmpty()) {
dimLookup.add(null); dimLookup.add(null);
encodedDimensionValues = IntArrays.EMPTY_ARRAY; encodedDimensionValues = IntArrays.EMPTY_ARRAY;

View File

@ -77,27 +77,36 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
private static final Indexed<String> NULL_STR_DIM_VAL = new ListIndexed<>(Collections.singletonList(null)); private static final Indexed<String> NULL_STR_DIM_VAL = new ListIndexed<>(Collections.singletonList(null));
private static final Splitter SPLITTER = Splitter.on(","); private static final Splitter SPLITTER = Splitter.on(",");
private ColumnarIntsSerializer encodedValueSerializer; private final String dimensionName;
private String dimensionName;
private GenericIndexedWriter<String> dictionaryWriter;
private String firstDictionaryValue;
private int dictionarySize;
private GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
private ByteBufferWriter<ImmutableRTree> spatialWriter;
private ArrayList<IntBuffer> dimConversions;
private int cardinality = 0;
private boolean hasNull = false;
private MutableBitmap nullRowsBitmap;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private int rowCount = 0;
private ColumnCapabilities capabilities;
private List<IndexableAdapter> adapters;
private final IndexSpec indexSpec;
private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
private final ProgressIndicator progress; private final ProgressIndicator progress;
private final Closer closer; private final Closer closer;
private final IndexSpec indexSpec;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final MutableBitmap nullRowsBitmap;
private final ColumnCapabilities capabilities;
private int dictionarySize;
private int rowCount = 0;
private int cardinality = 0;
private boolean hasNull = false;
@Nullable
private GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
@Nullable
private ByteBufferWriter<ImmutableRTree> spatialWriter;
@Nullable
private ArrayList<IntBuffer> dimConversions;
@Nullable
private List<IndexableAdapter> adapters;
@Nullable
private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
@Nullable
private ColumnarIntsSerializer encodedValueSerializer;
@Nullable
private GenericIndexedWriter<String> dictionaryWriter;
@Nullable
private String firstDictionaryValue;
public StringDimensionMergerV9( public StringDimensionMergerV9(
String dimensionName, String dimensionName,
@ -537,13 +546,12 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
.withBitmapIndex(bitmapWriter) .withBitmapIndex(bitmapWriter)
.withSpatialIndex(spatialWriter) .withSpatialIndex(spatialWriter)
.withByteOrder(IndexIO.BYTE_ORDER); .withByteOrder(IndexIO.BYTE_ORDER);
final ColumnDescriptor serdeficator = builder
.addSerde(partBuilder.build())
.build();
//log.info("Completed dimension column[%s] in %,d millis.", dimensionName, System.currentTimeMillis() - dimStartTime); //log.info("Completed dimension column[%s] in %,d millis.", dimensionName, System.currentTimeMillis() - dimStartTime);
return serdeficator; return builder
.addSerde(partBuilder.build())
.build();
} }
protected interface IndexSeeker protected interface IndexSeeker

View File

@ -23,18 +23,25 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import javax.annotation.Nullable;
/** /**
*/ */
public class ColumnBuilder public class ColumnBuilder
{ {
@Nullable
private ValueType type = null; private ValueType type = null;
private boolean hasMultipleValues = false; private boolean hasMultipleValues = false;
private boolean filterable = false; private boolean filterable = false;
private Supplier<? extends BaseColumn> columnSupplier = null;
private boolean dictionaryEncoded = false; private boolean dictionaryEncoded = false;
@Nullable
private Supplier<? extends BaseColumn> columnSupplier = null;
@Nullable
private Supplier<BitmapIndex> bitmapIndex = null; private Supplier<BitmapIndex> bitmapIndex = null;
@Nullable
private Supplier<SpatialIndex> spatialIndex = null; private Supplier<SpatialIndex> spatialIndex = null;
@Nullable
private SmooshedFileMapper fileMapper = null; private SmooshedFileMapper fileMapper = null;
public ColumnBuilder setFileMapper(SmooshedFileMapper fileMapper) public ColumnBuilder setFileMapper(SmooshedFileMapper fileMapper)

View File

@ -23,12 +23,16 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable;
/** /**
* *
*/ */
public class ColumnCapabilitiesImpl implements ColumnCapabilities public class ColumnCapabilitiesImpl implements ColumnCapabilities
{ {
@Nullable
private ValueType type = null; private ValueType type = null;
private boolean dictionaryEncoded = false; private boolean dictionaryEncoded = false;
private boolean runLengthEncoded = false; private boolean runLengthEncoded = false;
private boolean hasInvertedIndexes = false; private boolean hasInvertedIndexes = false;

View File

@ -28,6 +28,8 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.serde.ColumnPartSerde; import org.apache.druid.segment.serde.ColumnPartSerde;
import org.apache.druid.segment.serde.Serializer; import org.apache.druid.segment.serde.Serializer;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
@ -111,7 +113,9 @@ public class ColumnDescriptor implements Serializer
public static class Builder public static class Builder
{ {
@Nullable
private ValueType valueType = null; private ValueType valueType = null;
@Nullable
private Boolean hasMultipleValues = null; private Boolean hasMultipleValues = null;
private final List<ColumnPartSerde> parts = new ArrayList<>(); private final List<ColumnPartSerde> parts = new ArrayList<>();

View File

@ -25,6 +25,8 @@ import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -45,6 +47,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
private final CompressionStrategy compression; private final CompressionStrategy compression;
private int numInserted = 0; private int numInserted = 0;
@Nullable
private ByteBuffer endBuffer; private ByteBuffer endBuffer;
BlockLayoutColumnarDoublesSerializer( BlockLayoutColumnarDoublesSerializer(

View File

@ -25,6 +25,8 @@ import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -45,6 +47,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
private final CompressionStrategy compression; private final CompressionStrategy compression;
private int numInserted = 0; private int numInserted = 0;
@Nullable
private ByteBuffer endBuffer; private ByteBuffer endBuffer;
BlockLayoutColumnarFloatsSerializer( BlockLayoutColumnarFloatsSerializer(

View File

@ -24,6 +24,8 @@ import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -47,6 +49,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
private int numInserted = 0; private int numInserted = 0;
private int numInsertedForNextFlush; private int numInsertedForNextFlush;
@Nullable
private ByteBuffer endBuffer; private ByteBuffer endBuffer;
BlockLayoutColumnarLongsSerializer( BlockLayoutColumnarLongsSerializer(

View File

@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
@ -36,7 +38,9 @@ public class ByteBufferWriter<T> implements Serializer
private final SegmentWriteOutMedium segmentWriteOutMedium; private final SegmentWriteOutMedium segmentWriteOutMedium;
private final ObjectStrategy<T> strategy; private final ObjectStrategy<T> strategy;
@Nullable
private WriteOutBytes headerOut = null; private WriteOutBytes headerOut = null;
@Nullable
private WriteOutBytes valueOut = null; private WriteOutBytes valueOut = null;
public ByteBufferWriter(SegmentWriteOutMedium segmentWriteOutMedium, ObjectStrategy<T> strategy) public ByteBufferWriter(SegmentWriteOutMedium segmentWriteOutMedium, ObjectStrategy<T> strategy)

View File

@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -45,9 +47,11 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
private final int chunkFactor; private final int chunkFactor;
private final CompressionStrategy compression; private final CompressionStrategy compression;
private final GenericIndexedWriter<ByteBuffer> flattener; private final GenericIndexedWriter<ByteBuffer> flattener;
private ByteBuffer endBuffer;
private int numInserted; private int numInserted;
@Nullable
private ByteBuffer endBuffer;
CompressedColumnarIntsSerializer( CompressedColumnarIntsSerializer(
final SegmentWriteOutMedium segmentWriteOutMedium, final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase, final String filenameBase,

View File

@ -25,6 +25,8 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -67,10 +69,11 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
private final CompressionStrategy compression; private final CompressionStrategy compression;
private final GenericIndexedWriter<ByteBuffer> flattener; private final GenericIndexedWriter<ByteBuffer> flattener;
private final ByteBuffer intBuffer; private final ByteBuffer intBuffer;
private ByteBuffer endBuffer;
private int numInserted; private int numInserted;
@Nullable
private ByteBuffer endBuffer;
CompressedVSizeColumnarIntsSerializer( CompressedVSizeColumnarIntsSerializer(
final SegmentWriteOutMedium segmentWriteOutMedium, final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase, final String filenameBase,

View File

@ -198,6 +198,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
private int logBaseTwoOfElementsPerValueFile; private int logBaseTwoOfElementsPerValueFile;
private int relativeIndexMask; private int relativeIndexMask;
@Nullable
private final ByteBuffer theBuffer; private final ByteBuffer theBuffer;
/** /**

View File

@ -140,10 +140,13 @@ public class GenericIndexedWriter<T> implements Serializer
private boolean objectsSorted = true; private boolean objectsSorted = true;
@Nullable @Nullable
private T prevObject = null; private T prevObject = null;
@Nullable
private WriteOutBytes headerOut = null; private WriteOutBytes headerOut = null;
@Nullable
private WriteOutBytes valuesOut = null; private WriteOutBytes valuesOut = null;
private int numWritten = 0; private int numWritten = 0;
private boolean requireMultipleFiles = false; private boolean requireMultipleFiles = false;
@Nullable
private LongList headerOutLong; private LongList headerOutLong;
private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES); private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES);

View File

@ -27,6 +27,8 @@ import it.unimi.dsi.fastutil.longs.LongList;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
@ -42,7 +44,6 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
private final String filenameBase; private final String filenameBase;
private final ByteOrder order; private final ByteOrder order;
private final CompressionStrategy compression; private final CompressionStrategy compression;
private LongList tempOut = null;
private int numInserted = 0; private int numInserted = 0;
@ -52,6 +53,9 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
private long maxVal = Long.MIN_VALUE; private long maxVal = Long.MIN_VALUE;
private long minVal = Long.MAX_VALUE; private long minVal = Long.MAX_VALUE;
@Nullable
private LongList tempOut = null;
@Nullable
private ColumnarLongsSerializer delegate; private ColumnarLongsSerializer delegate;
IntermediateColumnarLongsSerializer( IntermediateColumnarLongsSerializer(

View File

@ -21,6 +21,8 @@ package org.apache.druid.segment.data;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -31,7 +33,9 @@ public class LongsLongEncodingWriter implements CompressionFactory.LongEncodingW
private final ByteBuffer orderBuffer; private final ByteBuffer orderBuffer;
private final ByteOrder order; private final ByteOrder order;
@Nullable
private ByteBuffer outBuffer = null; private ByteBuffer outBuffer = null;
@Nullable
private OutputStream outStream = null; private OutputStream outStream = null;
public LongsLongEncodingWriter(ByteOrder order) public LongsLongEncodingWriter(ByteOrder order)

View File

@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
@ -46,9 +48,11 @@ public class VSizeColumnarIntsSerializer extends SingleValueColumnarIntsSerializ
private final int numBytes; private final int numBytes;
private final ByteBuffer helperBuffer = ByteBuffer.allocate(Integer.BYTES); private final ByteBuffer helperBuffer = ByteBuffer.allocate(Integer.BYTES);
private WriteOutBytes valuesOut = null;
private boolean bufPaddingWritten = false; private boolean bufPaddingWritten = false;
@Nullable
private WriteOutBytes valuesOut = null;
public VSizeColumnarIntsSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final int maxValue) public VSizeColumnarIntsSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final int maxValue)
{ {
this.segmentWriteOutMedium = segmentWriteOutMedium; this.segmentWriteOutMedium = segmentWriteOutMedium;

View File

@ -26,6 +26,8 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes; import org.apache.druid.segment.writeout.WriteOutBytes;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
@ -83,7 +85,9 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
private final WriteInt writeInt; private final WriteInt writeInt;
private final SegmentWriteOutMedium segmentWriteOutMedium; private final SegmentWriteOutMedium segmentWriteOutMedium;
@Nullable
private WriteOutBytes headerOut = null; private WriteOutBytes headerOut = null;
@Nullable
private WriteOutBytes valuesOut = null; private WriteOutBytes valuesOut = null;
private int numWritten = 0; private int numWritten = 0;
private boolean numBytesForMaxWritten = false; private boolean numBytesForMaxWritten = false;

View File

@ -21,6 +21,8 @@ package org.apache.druid.segment.data;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -189,6 +191,7 @@ public class VSizeLongSerde
private static final class Size1Ser implements LongSerializer private static final class Size1Ser implements LongSerializer
{ {
@Nullable
OutputStream output = null; OutputStream output = null;
ByteBuffer buffer; ByteBuffer buffer;
byte curByte = 0; byte curByte = 0;
@ -242,6 +245,7 @@ public class VSizeLongSerde
private static final class Size2Ser implements LongSerializer private static final class Size2Ser implements LongSerializer
{ {
@Nullable
OutputStream output = null; OutputStream output = null;
ByteBuffer buffer; ByteBuffer buffer;
byte curByte = 0; byte curByte = 0;
@ -295,8 +299,8 @@ public class VSizeLongSerde
private static final class Mult4Ser implements LongSerializer private static final class Mult4Ser implements LongSerializer
{ {
@Nullable
OutputStream output = null; OutputStream output;
ByteBuffer buffer; ByteBuffer buffer;
int numBytes; int numBytes;
byte curByte = 0; byte curByte = 0;
@ -361,6 +365,7 @@ public class VSizeLongSerde
private static final class Mult8Ser implements LongSerializer private static final class Mult8Ser implements LongSerializer
{ {
@Nullable
OutputStream output; OutputStream output;
ByteBuffer buffer; ByteBuffer buffer;
int numBytes; int numBytes;

View File

@ -330,6 +330,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public static class Builder public static class Builder
{ {
@Nullable
private IncrementalIndexSchema incrementalIndexSchema; private IncrementalIndexSchema incrementalIndexSchema;
private boolean deserializeComplexMetrics; private boolean deserializeComplexMetrics;
private boolean reportParseExceptions; private boolean reportParseExceptions;
@ -505,8 +506,8 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
static class IncrementalIndexRowResult static class IncrementalIndexRowResult
{ {
private IncrementalIndexRow incrementalIndexRow; private final IncrementalIndexRow incrementalIndexRow;
private List<String> parseExceptionMessages; private final List<String> parseExceptionMessages;
IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List<String> parseExceptionMessages) IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List<String> parseExceptionMessages)
{ {
@ -527,9 +528,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
static class AddToFactsResult static class AddToFactsResult
{ {
private int rowCount; private final int rowCount;
private final long bytesInMemory; private final long bytesInMemory;
private List<String> parseExceptionMessages; private final List<String> parseExceptionMessages;
public AddToFactsResult( public AddToFactsResult(
int rowCount, int rowCount,
@ -997,53 +998,48 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs, final boolean descending) public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs, final boolean descending)
{ {
return new Iterable<Row>() return () -> {
{ final List<DimensionDesc> dimensions = getDimensions();
@Override
public Iterator<Row> iterator()
{
final List<DimensionDesc> dimensions = getDimensions();
return Iterators.transform( return Iterators.transform(
getFacts().iterator(descending), getFacts().iterator(descending),
incrementalIndexRow -> { incrementalIndexRow -> {
final int rowOffset = incrementalIndexRow.getRowIndex(); final int rowOffset = incrementalIndexRow.getRowIndex();
Object[] theDims = incrementalIndexRow.getDims(); Object[] theDims = incrementalIndexRow.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap(); Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) { for (int i = 0; i < theDims.length; ++i) {
Object dim = theDims[i]; Object dim = theDims[i];
DimensionDesc dimensionDesc = dimensions.get(i); DimensionDesc dimensionDesc = dimensions.get(i);
if (dimensionDesc == null) { if (dimensionDesc == null) {
continue; continue;
}
String dimensionName = dimensionDesc.getName();
DimensionHandler handler = dimensionDesc.getHandler();
if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) {
theVals.put(dimensionName, null);
continue;
}
final DimensionIndexer indexer = dimensionDesc.getIndexer();
Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim);
theVals.put(dimensionName, rowVals);
} }
String dimensionName = dimensionDesc.getName();
AggregatorType[] aggs = getAggsForRow(rowOffset); DimensionHandler handler = dimensionDesc.getHandler();
for (int i = 0; i < aggs.length; ++i) { if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) {
theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i)); theVals.put(dimensionName, null);
continue;
} }
final DimensionIndexer indexer = dimensionDesc.getIndexer();
if (postAggs != null) { Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim);
for (PostAggregator postAgg : postAggs) { theVals.put(dimensionName, rowVals);
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals);
} }
);
} AggregatorType[] aggs = getAggsForRow(rowOffset);
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
}
if (postAggs != null) {
for (PostAggregator postAgg : postAggs) {
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals);
}
);
}; };
} }

View File

@ -48,6 +48,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
private static class DimensionAccessor private static class DimensionAccessor
{ {
private final IncrementalIndex.DimensionDesc dimensionDesc; private final IncrementalIndex.DimensionDesc dimensionDesc;
@Nullable
private final MutableBitmap[] invertedIndexes; private final MutableBitmap[] invertedIndexes;
private final DimensionIndexer indexer; private final DimensionIndexer indexer;

View File

@ -22,6 +22,8 @@ package org.apache.druid.segment.incremental;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.LongColumnSelector; import org.apache.druid.segment.LongColumnSelector;
import javax.annotation.Nullable;
/** /**
* IncrementalIndexRowHolder is a simple {@link #get}/{@link #set} holder of {@link IncrementalIndexRow}. It is used * IncrementalIndexRowHolder is a simple {@link #get}/{@link #set} holder of {@link IncrementalIndexRow}. It is used
* to implement various machinery around {@link IncrementalIndex}, e. g. {@link * to implement various machinery around {@link IncrementalIndex}, e. g. {@link
@ -33,6 +35,7 @@ import org.apache.druid.segment.LongColumnSelector;
*/ */
public class IncrementalIndexRowHolder implements LongColumnSelector public class IncrementalIndexRowHolder implements LongColumnSelector
{ {
@Nullable
private IncrementalIndexRow currEntry = null; private IncrementalIndexRow currEntry = null;
public IncrementalIndexRow get() public IncrementalIndexRow get()

View File

@ -33,6 +33,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -59,14 +61,17 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
protected final int maxRowCount; protected final int maxRowCount;
@Nullable
private volatile Map<String, ColumnSelectorFactory> selectors; private volatile Map<String, ColumnSelectorFactory> selectors;
//given a ByteBuffer and an offset where all aggregates for a row are stored //given a ByteBuffer and an offset where all aggregates for a row are stored
//offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate
//is stored //is stored
@Nullable
private volatile int[] aggOffsetInBuffer; private volatile int[] aggOffsetInBuffer;
private volatile int aggsTotalSize; private volatile int aggsTotalSize;
@Nullable
private String outOfRowsReason = null; private String outOfRowsReason = null;
OffheapIncrementalIndex( OffheapIncrementalIndex(

View File

@ -60,8 +60,10 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private final long maxBytesPerRowForAggregators; private final long maxBytesPerRowForAggregators;
protected final int maxRowCount; protected final int maxRowCount;
protected final long maxBytesInMemory; protected final long maxBytesInMemory;
private volatile Map<String, ColumnSelectorFactory> selectors;
@Nullable
private volatile Map<String, ColumnSelectorFactory> selectors;
@Nullable
private String outOfRowsReason = null; private String outOfRowsReason = null;
OnheapIncrementalIndex( OnheapIncrementalIndex(

View File

@ -22,16 +22,15 @@ package org.apache.druid.segment.serde;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnConfig;
import java.nio.ByteBuffer; import javax.annotation.Nullable;
/** /**
*/ */
public class ComplexColumnPartSerde implements ColumnPartSerde public class ComplexColumnPartSerde implements ColumnPartSerde
{ {
private final String typeName; private final String typeName;
@Nullable
private final ComplexMetricSerde serde; private final ComplexMetricSerde serde;
private final Serializer serializer; private final Serializer serializer;
@ -70,21 +69,18 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
@Override @Override
public Deserializer getDeserializer() public Deserializer getDeserializer()
{ {
return new Deserializer() return (buffer, builder, columnConfig) -> {
{ if (serde != null) {
@Override serde.deserializeColumn(buffer, builder);
public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
{
if (serde != null) {
serde.deserializeColumn(buffer, builder);
}
} }
}; };
} }
public static class SerializerBuilder public static class SerializerBuilder
{ {
@Nullable
private String typeName = null; private String typeName = null;
@Nullable
private GenericColumnSerializer delegate = null; private GenericColumnSerializer delegate = null;
public SerializerBuilder withTypeName(final String typeName) public SerializerBuilder withTypeName(final String typeName)

View File

@ -143,13 +143,21 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
public static class SerializerBuilder public static class SerializerBuilder
{ {
private VERSION version = null;
private int flags = STARTING_FLAGS; private int flags = STARTING_FLAGS;
@Nullable
private VERSION version = null;
@Nullable
private GenericIndexedWriter<String> dictionaryWriter = null; private GenericIndexedWriter<String> dictionaryWriter = null;
@Nullable
private ColumnarIntsSerializer valueWriter = null; private ColumnarIntsSerializer valueWriter = null;
@Nullable
private BitmapSerdeFactory bitmapSerdeFactory = null; private BitmapSerdeFactory bitmapSerdeFactory = null;
@Nullable
private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = null; private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = null;
@Nullable
private ByteBufferWriter<ImmutableRTree> spatialIndexWriter = null; private ByteBufferWriter<ImmutableRTree> spatialIndexWriter = null;
@Nullable
private ByteOrder byteOrder = null; private ByteOrder byteOrder = null;
public SerializerBuilder withDictionary(GenericIndexedWriter<String> dictionaryWriter) public SerializerBuilder withDictionary(GenericIndexedWriter<String> dictionaryWriter)

View File

@ -23,14 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ColumnarDoubles; import org.apache.druid.segment.data.ColumnarDoubles;
import org.apache.druid.segment.data.CompressedColumnarDoublesSuppliers; import org.apache.druid.segment.data.CompressedColumnarDoublesSuppliers;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
public class DoubleNumericColumnPartSerde implements ColumnPartSerde public class DoubleNumericColumnPartSerde implements ColumnPartSerde
@ -66,7 +64,9 @@ public class DoubleNumericColumnPartSerde implements ColumnPartSerde
public static class SerializerBuilder public static class SerializerBuilder
{ {
@Nullable
private ByteOrder byteOrder = null; private ByteOrder byteOrder = null;
@Nullable
private Serializer delegate = null; private Serializer delegate = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder) public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
@ -97,24 +97,19 @@ public class DoubleNumericColumnPartSerde implements ColumnPartSerde
@Override @Override
public Deserializer getDeserializer() public Deserializer getDeserializer()
{ {
return new Deserializer() return (buffer, builder, columnConfig) -> {
{ final Supplier<ColumnarDoubles> column = CompressedColumnarDoublesSuppliers.fromByteBuffer(
@Override buffer,
public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig) byteOrder
{ );
final Supplier<ColumnarDoubles> column = CompressedColumnarDoublesSuppliers.fromByteBuffer( DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier(
buffer, column,
byteOrder IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
); );
DoubleNumericColumnSupplier columnSupplier = new DoubleNumericColumnSupplier( builder.setType(ValueType.DOUBLE)
column, .setHasMultipleValues(false)
IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() .setNumericColumnSupplier(columnSupplier);
);
builder.setType(ValueType.DOUBLE)
.setHasMultipleValues(false)
.setNumericColumnSupplier(columnSupplier);
}
}; };
} }
} }

View File

@ -54,7 +54,7 @@ public class DoubleNumericColumnPartSerdeV2 implements ColumnPartSerde
private final ByteOrder byteOrder; private final ByteOrder byteOrder;
@Nullable @Nullable
private Serializer serializer; private final Serializer serializer;
private final BitmapSerdeFactory bitmapSerdeFactory; private final BitmapSerdeFactory bitmapSerdeFactory;
public DoubleNumericColumnPartSerdeV2( public DoubleNumericColumnPartSerdeV2(
@ -87,8 +87,11 @@ public class DoubleNumericColumnPartSerdeV2 implements ColumnPartSerde
public static class SerializerBuilder public static class SerializerBuilder
{ {
@Nullable
private ByteOrder byteOrder = null; private ByteOrder byteOrder = null;
@Nullable
private Serializer delegate = null; private Serializer delegate = null;
@Nullable
private BitmapSerdeFactory bitmapSerdeFactory = null; private BitmapSerdeFactory bitmapSerdeFactory = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder) public SerializerBuilder withByteOrder(final ByteOrder byteOrder)

View File

@ -66,7 +66,9 @@ public class FloatNumericColumnPartSerde implements ColumnPartSerde
public static class SerializerBuilder public static class SerializerBuilder
{ {
@Nullable
private ByteOrder byteOrder = null; private ByteOrder byteOrder = null;
@Nullable
private Serializer delegate = null; private Serializer delegate = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder) public SerializerBuilder withByteOrder(final ByteOrder byteOrder)

View File

@ -52,7 +52,7 @@ public class FloatNumericColumnPartSerdeV2 implements ColumnPartSerde
private final ByteOrder byteOrder; private final ByteOrder byteOrder;
@Nullable @Nullable
private Serializer serializer; private final Serializer serializer;
private final BitmapSerdeFactory bitmapSerdeFactory; private final BitmapSerdeFactory bitmapSerdeFactory;
private FloatNumericColumnPartSerdeV2( private FloatNumericColumnPartSerdeV2(
@ -85,8 +85,11 @@ public class FloatNumericColumnPartSerdeV2 implements ColumnPartSerde
public static class SerializerBuilder public static class SerializerBuilder
{ {
@Nullable
private ByteOrder byteOrder = null; private ByteOrder byteOrder = null;
@Nullable
private Serializer delegate = null; private Serializer delegate = null;
@Nullable
private BitmapSerdeFactory bitmapSerdeFactory = null; private BitmapSerdeFactory bitmapSerdeFactory = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder) public SerializerBuilder withByteOrder(final ByteOrder byteOrder)

View File

@ -22,13 +22,11 @@ package org.apache.druid.segment.serde;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.CompressedColumnarLongsSupplier; import org.apache.druid.segment.data.CompressedColumnarLongsSupplier;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
/** /**
@ -66,7 +64,9 @@ public class LongNumericColumnPartSerde implements ColumnPartSerde
public static class SerializerBuilder public static class SerializerBuilder
{ {
@Nullable
private ByteOrder byteOrder = null; private ByteOrder byteOrder = null;
@Nullable
private Serializer delegate = null; private Serializer delegate = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder) public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
@ -97,23 +97,18 @@ public class LongNumericColumnPartSerde implements ColumnPartSerde
@Override @Override
public Deserializer getDeserializer() public Deserializer getDeserializer()
{ {
return new Deserializer() return (buffer, builder, columnConfig) -> {
{ final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer(
@Override buffer,
public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig) byteOrder
{ );
final CompressedColumnarLongsSupplier column = CompressedColumnarLongsSupplier.fromByteBuffer( LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier(
buffer, column,
byteOrder IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
); );
LongNumericColumnSupplier columnSupplier = new LongNumericColumnSupplier( builder.setType(ValueType.LONG)
column, .setHasMultipleValues(false)
IndexIO.LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() .setNumericColumnSupplier(columnSupplier);
);
builder.setType(ValueType.LONG)
.setHasMultipleValues(false)
.setNumericColumnSupplier(columnSupplier);
}
}; };
} }
} }

View File

@ -50,14 +50,16 @@ public class LongNumericColumnPartSerdeV2 implements ColumnPartSerde
); );
} }
@Nullable
private final Serializer serializer;
@Nullable
private final ByteOrder byteOrder; private final ByteOrder byteOrder;
@Nullable @Nullable
private Serializer serializer;
private final BitmapSerdeFactory bitmapSerdeFactory; private final BitmapSerdeFactory bitmapSerdeFactory;
private LongNumericColumnPartSerdeV2( private LongNumericColumnPartSerdeV2(
ByteOrder byteOrder, @Nullable ByteOrder byteOrder,
BitmapSerdeFactory bitmapSerdeFactory, @Nullable BitmapSerdeFactory bitmapSerdeFactory,
@Nullable Serializer serializer @Nullable Serializer serializer
) )
{ {
@ -85,8 +87,11 @@ public class LongNumericColumnPartSerdeV2 implements ColumnPartSerde
public static class SerializerBuilder public static class SerializerBuilder
{ {
@Nullable
private ByteOrder byteOrder = null; private ByteOrder byteOrder = null;
@Nullable
private Serializer delegate = null; private Serializer delegate = null;
@Nullable
private BitmapSerdeFactory bitmapSerdeFactory = null; private BitmapSerdeFactory bitmapSerdeFactory = null;
public SerializerBuilder withByteOrder(final ByteOrder byteOrder) public SerializerBuilder withByteOrder(final ByteOrder byteOrder)

View File

@ -104,7 +104,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
* {@link EventReceiverFirehose} that may change in the future. * {@link EventReceiverFirehose} that may change in the future.
*/ */
private final long maxIdleTimeMillis; private final long maxIdleTimeMillis;
private final @Nullable ChatHandlerProvider chatHandlerProvider; private final ChatHandlerProvider chatHandlerProvider;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper; private final ObjectMapper smileMapper;
private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister; private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
@ -225,6 +225,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
* This field and {@link #rowsRunOut} are not volatile because they are accessed only from {@link #hasMore()} and * This field and {@link #rowsRunOut} are not volatile because they are accessed only from {@link #hasMore()} and
* {@link #nextRow()} methods that are called from a single thread according to {@link Firehose} spec. * {@link #nextRow()} methods that are called from a single thread according to {@link Firehose} spec.
*/ */
@Nullable
private InputRow nextRow = null; private InputRow nextRow = null;
private boolean rowsRunOut = false; private boolean rowsRunOut = false;
@ -241,7 +242,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
* If they were not volatile, NPE would be possible in {@link #delayedCloseExecutor}. See * If they were not volatile, NPE would be possible in {@link #delayedCloseExecutor}. See
* https://shipilev.net/blog/2016/close-encounters-of-jmm-kind/#wishful-hb-actual for explanations. * https://shipilev.net/blog/2016/close-encounters-of-jmm-kind/#wishful-hb-actual for explanations.
*/ */
@Nullable
private volatile Long idleCloseTimeNs = null; private volatile Long idleCloseTimeNs = null;
@Nullable
private volatile Long requestedShutdownTimeNs = null; private volatile Long requestedShutdownTimeNs = null;
EventReceiverFirehose(InputRowParser<Map<String, Object>> parser) EventReceiverFirehose(InputRowParser<Map<String, Object>> parser)

View File

@ -40,6 +40,7 @@ public class PredicateFirehose implements Firehose
private final Firehose firehose; private final Firehose firehose;
private final Predicate<InputRow> predicate; private final Predicate<InputRow> predicate;
@Nullable
private InputRow savedInputRow = null; private InputRow savedInputRow = null;
public PredicateFirehose(Firehose firehose, Predicate<InputRow> predicate) public PredicateFirehose(Firehose firehose, Predicate<InputRow> predicate)

View File

@ -35,6 +35,8 @@ import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.ResultSetException; import org.skife.jdbi.v2.exceptions.ResultSetException;
import org.skife.jdbi.v2.exceptions.StatementException; import org.skife.jdbi.v2.exceptions.StatementException;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -51,6 +53,7 @@ public class SqlFirehoseFactory extends PrefetchSqlFirehoseFactory<String>
{ {
@JsonProperty @JsonProperty
private final List<String> sqls; private final List<String> sqls;
@Nullable
@JsonProperty @JsonProperty
private final MetadataStorageConnectorConfig connectorConfig; private final MetadataStorageConnectorConfig connectorConfig;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;

View File

@ -69,7 +69,8 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
private final ConcurrentMap<DataSegment, SegmentZNode> segmentLookup = new ConcurrentHashMap<>(); private final ConcurrentMap<DataSegment, SegmentZNode> segmentLookup = new ConcurrentHashMap<>();
private final Function<DataSegment, DataSegment> segmentTransformer; private final Function<DataSegment, DataSegment> segmentTransformer;
private final ChangeRequestHistory<DataSegmentChangeRequest> changes = new ChangeRequestHistory(); private final ChangeRequestHistory<DataSegmentChangeRequest> changes = new ChangeRequestHistory<>();
@Nullable
private final SegmentZNode dummyZnode; private final SegmentZNode dummyZnode;
@Inject @Inject
@ -87,20 +88,15 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
this.server = server; this.server = server;
this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName()); this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName());
segmentTransformer = new Function<DataSegment, DataSegment>() segmentTransformer = input -> {
{ DataSegment rv = input;
@Override if (config.isSkipDimensionsAndMetrics()) {
public DataSegment apply(DataSegment input) rv = rv.withDimensions(null).withMetrics(null);
{
DataSegment rv = input;
if (config.isSkipDimensionsAndMetrics()) {
rv = rv.withDimensions(null).withMetrics(null);
}
if (config.isSkipLoadSpec()) {
rv = rv.withLoadSpec(null);
}
return rv;
} }
if (config.isSkipLoadSpec()) {
rv = rv.withLoadSpec(null);
}
return rv;
}; };
if (this.config.isSkipSegmentAnnouncementOnZk()) { if (this.config.isSkipSegmentAnnouncementOnZk()) {

View File

@ -21,15 +21,15 @@ package org.apache.druid.server.coordination;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import javax.validation.constraints.NotNull; import javax.annotation.Nullable;
/** /**
*/ */
public class BatchDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider public class BatchDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider
{ {
@JacksonInject @JacksonInject
@NotNull @Nullable
private BatchDataSegmentAnnouncer batchAnnouncer = null; private final BatchDataSegmentAnnouncer batchAnnouncer = null;
@Override @Override
public DataSegmentAnnouncer get() public DataSegmentAnnouncer get()

View File

@ -39,6 +39,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration; import org.joda.time.Duration;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import java.io.InputStream; import java.io.InputStream;
import java.net.URL; import java.net.URL;
@ -84,12 +85,14 @@ public class ChangeRequestHttpSyncer<T>
private final LifecycleLock startStopLock = new LifecycleLock(); private final LifecycleLock startStopLock = new LifecycleLock();
private final String logIdentity; private final String logIdentity;
private ChangeRequestHistory.Counter counter = null;
private long unstableStartTime = -1; private long unstableStartTime = -1;
private int consecutiveFailedAttemptCount = 0; private int consecutiveFailedAttemptCount = 0;
private long lastSuccessfulSyncTime = 0; private long lastSuccessfulSyncTime = 0;
private long lastSyncTime = 0; private long lastSyncTime = 0;
@Nullable
private ChangeRequestHistory.Counter counter = null;
public ChangeRequestHttpSyncer( public ChangeRequestHttpSyncer(
ObjectMapper smileMapper, ObjectMapper smileMapper,
HttpClient httpClient, HttpClient httpClient,

View File

@ -44,6 +44,8 @@ import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SegmentManager;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -240,14 +242,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
addSegments( addSegments(
cachedSegments, cachedSegments,
new DataSegmentChangeCallback() () -> log.info("Cache load took %,d ms", System.currentTimeMillis() - start)
{
@Override
public void execute()
{
log.info("Cache load took %,d ms", System.currentTimeMillis() - start);
}
}
); );
} }
@ -348,35 +343,30 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>(); final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) { for (final DataSegment segment : segments) {
loadingExecutor.submit( loadingExecutor.submit(
new Runnable() () -> {
{ try {
@Override log.info(
public void run() "Loading segment[%d/%d][%s]",
{ counter.incrementAndGet(),
numSegments,
segment.getId()
);
loadSegment(segment, callback);
try { try {
log.info( backgroundSegmentAnnouncer.announceSegment(segment);
"Loading segment[%d/%d][%s]",
counter.incrementAndGet(),
numSegments,
segment.getId()
);
loadSegment(segment, callback);
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
} }
catch (SegmentLoadingException e) { catch (InterruptedException e) {
log.error(e, "[%s] failed to load", segment.getId()); Thread.currentThread().interrupt();
failedSegments.add(segment); throw new SegmentLoadingException(e, "Loading Interrupted");
}
finally {
latch.countDown();
} }
} }
catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getId());
failedSegments.add(segment);
}
finally {
latch.countDown();
}
} }
); );
} }
@ -427,28 +417,23 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
announcer.unannounceSegment(segment); announcer.unannounceSegment(segment);
segmentsToDelete.add(segment); segmentsToDelete.add(segment);
Runnable runnable = new Runnable() Runnable runnable = () -> {
{ try {
@Override synchronized (segmentDeleteLock) {
public void run() if (segmentsToDelete.remove(segment)) {
{ segmentManager.dropSegment(segment);
try {
synchronized (segmentDeleteLock) {
if (segmentsToDelete.remove(segment)) {
segmentManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString()); File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getId().toString());
if (!segmentInfoCacheFile.delete()) { if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
} }
} }
} }
catch (Exception e) { }
log.makeAlert(e, "Failed to remove segment! Possible resource leak!") catch (Exception e) {
.addData("segment", segment) log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
.emit(); .addData("segment", segment)
} .emit();
} }
}; };
@ -543,7 +528,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
); );
} }
}, },
() -> resolveWaitingFutures() this::resolveWaitingFutures
); );
} }
return requestStatuses.getIfPresent(changeRequest); return requestStatuses.getIfPresent(changeRequest);
@ -588,7 +573,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
private final Object lock = new Object(); private final Object lock = new Object();
private volatile boolean finished = false; private volatile boolean finished = false;
@Nullable
private volatile ScheduledFuture startedAnnouncing = null; private volatile ScheduledFuture startedAnnouncing = null;
@Nullable
private volatile ScheduledFuture nextAnnoucement = null; private volatile ScheduledFuture nextAnnoucement = null;
public BackgroundSegmentAnnouncer( public BackgroundSegmentAnnouncer(
@ -755,6 +742,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
} }
private final STATE state; private final STATE state;
@Nullable
private final String failureCause; private final String failureCause;
public static final Status SUCCESS = new Status(STATE.SUCCESS, null); public static final Status SUCCESS = new Status(STATE.SUCCESS, null);
@ -763,7 +751,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
@JsonCreator @JsonCreator
Status( Status(
@JsonProperty("state") STATE state, @JsonProperty("state") STATE state,
@JsonProperty("failureCause") String failureCause @JsonProperty("failureCause") @Nullable String failureCause
) )
{ {
Preconditions.checkNotNull(state, "state must be non-null"); Preconditions.checkNotNull(state, "state must be non-null");
@ -782,6 +770,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
return state; return state;
} }
@Nullable
@JsonProperty @JsonProperty
public String getFailureCause() public String getFailureCause()
{ {

View File

@ -25,8 +25,6 @@ import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@ -35,6 +33,8 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.initialization.ZkPathsConfig;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -54,6 +54,7 @@ public class ZkCoordinator
private final DruidServerMetadata me; private final DruidServerMetadata me;
private final CuratorFramework curator; private final CuratorFramework curator;
@Nullable
private volatile PathChildrenCache loadQueueCache; private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false; private volatile boolean started = false;
private final ExecutorService segmentLoadUnloadService; private final ExecutorService segmentLoadUnloadService;
@ -107,22 +108,17 @@ public class ZkCoordinator
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadQueueCache.getListenable().addListener( loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener() (client, event) -> {
{ final ChildData child = event.getData();
@Override switch (event.getType()) {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) case CHILD_ADDED:
{ childAdded(child);
final ChildData child = event.getData(); break;
switch (event.getType()) { case CHILD_REMOVED:
case CHILD_ADDED: log.info("zNode[%s] was removed", event.getData().getPath());
childAdded(child); break;
break; default:
case CHILD_REMOVED: log.info("Ignoring event[%s]", event);
log.info("zNode[%s] was removed", event.getData().getPath());
break;
default:
log.info("Ignoring event[%s]", event);
}
} }
} }
@ -151,25 +147,20 @@ public class ZkCoordinator
finalRequest.go( finalRequest.go(
dataSegmentChangeHandler, dataSegmentChangeHandler,
new DataSegmentChangeCallback() () -> {
{ try {
@Override curator.delete().guaranteed().forPath(path);
public void execute() log.info("Completed request [%s]", finalRequest.asString());
{ }
catch (Exception e) {
try { try {
curator.delete().guaranteed().forPath(path); curator.delete().guaranteed().forPath(path);
log.info("Completed request [%s]", finalRequest.asString());
} }
catch (Exception e) { catch (Exception e1) {
try { log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw new RuntimeException(e);
} }
log.error(e, "Exception while removing zNode[%s]", path);
throw new RuntimeException(e);
} }
} }
); );