mirror of https://github.com/apache/druid.git
Code cleanup from query profile project (#11822)
* Code cleanup from query profile project * Fix spelling errors * Fix Javadoc formatting * Abstract out repeated test code * Reuse constants in place of some string literals * Fix up some parameterized types * Reduce warnings reported by Eclipse * Reverted change due to lack of tests
This commit is contained in:
parent
f6e6ca2893
commit
a66f10eea1
|
@ -21,3 +21,6 @@ NOTICE.BINARY
|
||||||
README.BINARY
|
README.BINARY
|
||||||
README
|
README
|
||||||
*.lock
|
*.lock
|
||||||
|
**/.pmd
|
||||||
|
**/.pmdruleset.xml
|
||||||
|
.java-version
|
||||||
|
|
|
@ -26,16 +26,13 @@ import org.apache.druid.collections.bitmap.BitmapFactory;
|
||||||
import org.apache.druid.collections.bitmap.ImmutableBitmap;
|
import org.apache.druid.collections.bitmap.ImmutableBitmap;
|
||||||
import org.apache.druid.collections.bitmap.MutableBitmap;
|
import org.apache.druid.collections.bitmap.MutableBitmap;
|
||||||
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
|
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
|
||||||
import org.apache.druid.collections.spatial.ImmutableRTree;
|
|
||||||
import org.apache.druid.common.config.NullHandling;
|
import org.apache.druid.common.config.NullHandling;
|
||||||
import org.apache.druid.extendedset.intset.ConciseSetUtils;
|
import org.apache.druid.extendedset.intset.ConciseSetUtils;
|
||||||
import org.apache.druid.query.filter.BitmapIndexSelector;
|
import org.apache.druid.query.filter.BitmapIndexSelector;
|
||||||
import org.apache.druid.query.filter.BoundDimFilter;
|
import org.apache.druid.query.filter.BoundDimFilter;
|
||||||
import org.apache.druid.query.ordering.StringComparators;
|
import org.apache.druid.query.ordering.StringComparators;
|
||||||
import org.apache.druid.segment.column.BitmapIndex;
|
import org.apache.druid.segment.column.BitmapIndex;
|
||||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
|
||||||
import org.apache.druid.segment.data.BitmapSerdeFactory;
|
import org.apache.druid.segment.data.BitmapSerdeFactory;
|
||||||
import org.apache.druid.segment.data.CloseableIndexed;
|
|
||||||
import org.apache.druid.segment.data.GenericIndexed;
|
import org.apache.druid.segment.data.GenericIndexed;
|
||||||
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
|
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
|
||||||
import org.apache.druid.segment.filter.BoundFilter;
|
import org.apache.druid.segment.filter.BoundFilter;
|
||||||
|
@ -187,50 +184,7 @@ public class BoundFilterBenchmark
|
||||||
),
|
),
|
||||||
dictionary
|
dictionary
|
||||||
).get();
|
).get();
|
||||||
selector = new BitmapIndexSelector()
|
selector = new MockBitmapIndexSelector(dictionary, bitmapFactory, bitmapIndex);
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public CloseableIndexed<String> getDimensionValues(String dimension)
|
|
||||||
{
|
|
||||||
return dictionary;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ColumnCapabilities.Capable hasMultipleValues(final String dimension)
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getNumRows()
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BitmapFactory getBitmapFactory()
|
|
||||||
{
|
|
||||||
return bitmapFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ImmutableBitmap getBitmapIndex(String dimension, String value)
|
|
||||||
{
|
|
||||||
return bitmapIndex.getBitmap(bitmapIndex.getIndex(value));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BitmapIndex getBitmapIndex(String dimension)
|
|
||||||
{
|
|
||||||
return bitmapIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ImmutableRTree getSpatialIndex(String dimension)
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Benchmark
|
@Benchmark
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.druid.collections.bitmap.BitmapFactory;
|
||||||
import org.apache.druid.collections.bitmap.ImmutableBitmap;
|
import org.apache.druid.collections.bitmap.ImmutableBitmap;
|
||||||
import org.apache.druid.collections.bitmap.MutableBitmap;
|
import org.apache.druid.collections.bitmap.MutableBitmap;
|
||||||
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
|
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
|
||||||
import org.apache.druid.collections.spatial.ImmutableRTree;
|
|
||||||
import org.apache.druid.common.config.NullHandling;
|
import org.apache.druid.common.config.NullHandling;
|
||||||
import org.apache.druid.query.filter.BitmapIndexSelector;
|
import org.apache.druid.query.filter.BitmapIndexSelector;
|
||||||
import org.apache.druid.query.filter.DruidDoublePredicate;
|
import org.apache.druid.query.filter.DruidDoublePredicate;
|
||||||
|
@ -35,9 +34,7 @@ import org.apache.druid.query.filter.DruidFloatPredicate;
|
||||||
import org.apache.druid.query.filter.DruidLongPredicate;
|
import org.apache.druid.query.filter.DruidLongPredicate;
|
||||||
import org.apache.druid.query.filter.DruidPredicateFactory;
|
import org.apache.druid.query.filter.DruidPredicateFactory;
|
||||||
import org.apache.druid.segment.column.BitmapIndex;
|
import org.apache.druid.segment.column.BitmapIndex;
|
||||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
|
||||||
import org.apache.druid.segment.data.BitmapSerdeFactory;
|
import org.apache.druid.segment.data.BitmapSerdeFactory;
|
||||||
import org.apache.druid.segment.data.CloseableIndexed;
|
|
||||||
import org.apache.druid.segment.data.GenericIndexed;
|
import org.apache.druid.segment.data.GenericIndexed;
|
||||||
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
|
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
|
||||||
import org.apache.druid.segment.filter.DimensionPredicateFilter;
|
import org.apache.druid.segment.filter.DimensionPredicateFilter;
|
||||||
|
@ -158,50 +155,7 @@ public class DimensionPredicateFilterBenchmark
|
||||||
),
|
),
|
||||||
dictionary
|
dictionary
|
||||||
).get();
|
).get();
|
||||||
selector = new BitmapIndexSelector()
|
selector = new MockBitmapIndexSelector(dictionary, bitmapFactory, bitmapIndex);
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public CloseableIndexed<String> getDimensionValues(String dimension)
|
|
||||||
{
|
|
||||||
return dictionary;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ColumnCapabilities.Capable hasMultipleValues(final String dimension)
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getNumRows()
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BitmapFactory getBitmapFactory()
|
|
||||||
{
|
|
||||||
return bitmapFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ImmutableBitmap getBitmapIndex(String dimension, String value)
|
|
||||||
{
|
|
||||||
return bitmapIndex.getBitmap(bitmapIndex.getIndex(value));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BitmapIndex getBitmapIndex(String dimension)
|
|
||||||
{
|
|
||||||
return bitmapIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ImmutableRTree getSpatialIndex(String dimension)
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Benchmark
|
@Benchmark
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.druid.collections.bitmap.BitmapFactory;
|
||||||
import org.apache.druid.collections.bitmap.ImmutableBitmap;
|
import org.apache.druid.collections.bitmap.ImmutableBitmap;
|
||||||
import org.apache.druid.collections.bitmap.MutableBitmap;
|
import org.apache.druid.collections.bitmap.MutableBitmap;
|
||||||
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
|
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
|
||||||
import org.apache.druid.collections.spatial.ImmutableRTree;
|
|
||||||
import org.apache.druid.common.config.NullHandling;
|
import org.apache.druid.common.config.NullHandling;
|
||||||
import org.apache.druid.query.filter.BitmapIndexSelector;
|
import org.apache.druid.query.filter.BitmapIndexSelector;
|
||||||
import org.apache.druid.query.filter.BoundDimFilter;
|
import org.apache.druid.query.filter.BoundDimFilter;
|
||||||
|
@ -35,9 +34,7 @@ import org.apache.druid.query.filter.RegexDimFilter;
|
||||||
import org.apache.druid.query.filter.SelectorDimFilter;
|
import org.apache.druid.query.filter.SelectorDimFilter;
|
||||||
import org.apache.druid.query.ordering.StringComparators;
|
import org.apache.druid.query.ordering.StringComparators;
|
||||||
import org.apache.druid.segment.column.BitmapIndex;
|
import org.apache.druid.segment.column.BitmapIndex;
|
||||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
|
||||||
import org.apache.druid.segment.data.BitmapSerdeFactory;
|
import org.apache.druid.segment.data.BitmapSerdeFactory;
|
||||||
import org.apache.druid.segment.data.CloseableIndexed;
|
|
||||||
import org.apache.druid.segment.data.GenericIndexed;
|
import org.apache.druid.segment.data.GenericIndexed;
|
||||||
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
|
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
|
||||||
import org.apache.druid.segment.serde.StringBitmapIndexColumnPartSupplier;
|
import org.apache.druid.segment.serde.StringBitmapIndexColumnPartSupplier;
|
||||||
|
@ -158,50 +155,7 @@ public class LikeFilterBenchmark
|
||||||
),
|
),
|
||||||
dictionary
|
dictionary
|
||||||
).get();
|
).get();
|
||||||
selector = new BitmapIndexSelector()
|
selector = new MockBitmapIndexSelector(dictionary, bitmapFactory, bitmapIndex);
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public CloseableIndexed<String> getDimensionValues(String dimension)
|
|
||||||
{
|
|
||||||
return dictionary;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ColumnCapabilities.Capable hasMultipleValues(final String dimension)
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getNumRows()
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BitmapFactory getBitmapFactory()
|
|
||||||
{
|
|
||||||
return bitmapFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ImmutableBitmap getBitmapIndex(String dimension, String value)
|
|
||||||
{
|
|
||||||
return bitmapIndex.getBitmap(bitmapIndex.getIndex(value));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BitmapIndex getBitmapIndex(String dimension)
|
|
||||||
{
|
|
||||||
return bitmapIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ImmutableRTree getSpatialIndex(String dimension)
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Benchmark
|
@Benchmark
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.benchmark;
|
||||||
|
|
||||||
|
import org.apache.druid.collections.bitmap.BitmapFactory;
|
||||||
|
import org.apache.druid.collections.bitmap.ImmutableBitmap;
|
||||||
|
import org.apache.druid.collections.spatial.ImmutableRTree;
|
||||||
|
import org.apache.druid.query.filter.BitmapIndexSelector;
|
||||||
|
import org.apache.druid.segment.column.BitmapIndex;
|
||||||
|
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||||
|
import org.apache.druid.segment.data.CloseableIndexed;
|
||||||
|
import org.apache.druid.segment.data.GenericIndexed;
|
||||||
|
|
||||||
|
public class MockBitmapIndexSelector implements BitmapIndexSelector
|
||||||
|
{
|
||||||
|
private final GenericIndexed<String> dictionary;
|
||||||
|
private final BitmapFactory bitmapFactory;
|
||||||
|
private final BitmapIndex bitmapIndex;
|
||||||
|
|
||||||
|
public MockBitmapIndexSelector(
|
||||||
|
GenericIndexed<String> dictionary,
|
||||||
|
BitmapFactory bitmapFactory,
|
||||||
|
BitmapIndex bitmapIndex)
|
||||||
|
{
|
||||||
|
this.dictionary = dictionary;
|
||||||
|
this.bitmapFactory = bitmapFactory;
|
||||||
|
this.bitmapIndex = bitmapIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CloseableIndexed<String> getDimensionValues(String dimension)
|
||||||
|
{
|
||||||
|
return dictionary;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ColumnCapabilities.Capable hasMultipleValues(final String dimension)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getNumRows()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BitmapFactory getBitmapFactory()
|
||||||
|
{
|
||||||
|
return bitmapFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ImmutableBitmap getBitmapIndex(String dimension, String value)
|
||||||
|
{
|
||||||
|
return bitmapIndex.getBitmap(bitmapIndex.getIndex(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BitmapIndex getBitmapIndex(String dimension)
|
||||||
|
{
|
||||||
|
return bitmapIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ImmutableRTree getSpatialIndex(String dimension)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,6 +26,7 @@ import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
public class ISE extends IllegalStateException implements SanitizableException
|
public class ISE extends IllegalStateException implements SanitizableException
|
||||||
{
|
{
|
||||||
public ISE(String formatText, Object... arguments)
|
public ISE(String formatText, Object... arguments)
|
||||||
|
|
|
@ -28,6 +28,7 @@ public class ConcatSequence<T> implements Sequence<T>
|
||||||
{
|
{
|
||||||
private final Sequence<Sequence<T>> baseSequences;
|
private final Sequence<Sequence<T>> baseSequences;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public ConcatSequence(
|
public ConcatSequence(
|
||||||
Sequence<? extends Sequence<? extends T>> baseSequences
|
Sequence<? extends Sequence<? extends T>> baseSequences
|
||||||
)
|
)
|
||||||
|
|
|
@ -34,6 +34,7 @@ public class MergeSequence<T> extends YieldingSequenceBase<T>
|
||||||
private final Ordering<? super T> ordering;
|
private final Ordering<? super T> ordering;
|
||||||
private final Sequence<? extends Sequence<T>> baseSequences;
|
private final Sequence<? extends Sequence<T>> baseSequences;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public MergeSequence(
|
public MergeSequence(
|
||||||
Ordering<? super T> ordering,
|
Ordering<? super T> ordering,
|
||||||
Sequence<? extends Sequence<? extends T>> baseSequences
|
Sequence<? extends Sequence<? extends T>> baseSequences
|
||||||
|
|
|
@ -77,7 +77,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
private final int queueSize;
|
private final int queueSize;
|
||||||
private final boolean hasTimeout;
|
private final boolean hasTimeout;
|
||||||
private final long timeoutAtNanos;
|
private final long timeoutAtNanos;
|
||||||
private final int queryPriority; // not currently used :(
|
|
||||||
private final int yieldAfter;
|
private final int yieldAfter;
|
||||||
private final int batchSize;
|
private final int batchSize;
|
||||||
private final int parallelism;
|
private final int parallelism;
|
||||||
|
@ -107,7 +106,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
this.combineFn = combineFn;
|
this.combineFn = combineFn;
|
||||||
this.hasTimeout = hasTimeout;
|
this.hasTimeout = hasTimeout;
|
||||||
this.timeoutAtNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS);
|
this.timeoutAtNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||||
this.queryPriority = queryPriority;
|
|
||||||
this.parallelism = parallelism;
|
this.parallelism = parallelism;
|
||||||
this.yieldAfter = yieldAfter;
|
this.yieldAfter = yieldAfter;
|
||||||
this.batchSize = batchSize;
|
this.batchSize = batchSize;
|
||||||
|
@ -267,6 +265,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
* {@link MergeCombineAction} do a final merge combine of all the parallel computed results, again pushing
|
* {@link MergeCombineAction} do a final merge combine of all the parallel computed results, again pushing
|
||||||
* {@link ResultBatch} into a {@link BlockingQueue} with a {@link QueuePusher}.
|
* {@link ResultBatch} into a {@link BlockingQueue} with a {@link QueuePusher}.
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
private static class MergeCombinePartitioningAction<T> extends RecursiveAction
|
private static class MergeCombinePartitioningAction<T> extends RecursiveAction
|
||||||
{
|
{
|
||||||
private final List<Sequence<T>> sequences;
|
private final List<Sequence<T>> sequences;
|
||||||
|
@ -321,7 +320,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
try {
|
try {
|
||||||
final int parallelTaskCount = computeNumTasks();
|
final int parallelTaskCount = computeNumTasks();
|
||||||
|
|
||||||
// if we have a small number of sequences to merge, or computed paralellism is too low, do not run in parallel,
|
// if we have a small number of sequences to merge, or computed parallelism is too low, do not run in parallel,
|
||||||
// just serially perform the merge-combine with a single task
|
// just serially perform the merge-combine with a single task
|
||||||
if (parallelTaskCount < 2) {
|
if (parallelTaskCount < 2) {
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
|
@ -360,6 +359,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
catch (Throwable t) {
|
catch (Throwable t) {
|
||||||
closeAllCursors(sequenceCursors);
|
closeAllCursors(sequenceCursors);
|
||||||
cancellationGizmo.cancel(t);
|
cancellationGizmo.cancel(t);
|
||||||
|
// Should be the following, but can' change due to lack of
|
||||||
|
// unit tests.
|
||||||
|
// out.offer((ParallelMergeCombiningSequence.ResultBatch<T>) ResultBatch.TERMINAL);
|
||||||
out.offer(ResultBatch.TERMINAL);
|
out.offer(ResultBatch.TERMINAL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -449,14 +451,14 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
// running, minus 1 for the task that is running this calculation (as it will be replaced with the parallel tasks)
|
// running, minus 1 for the task that is running this calculation (as it will be replaced with the parallel tasks)
|
||||||
final int utilizationEstimate = runningThreadCount + submissionCount - 1;
|
final int utilizationEstimate = runningThreadCount + submissionCount - 1;
|
||||||
|
|
||||||
// 'computed parallelism' is the remaineder of the 'max parallelism' less current 'utilization estimate'
|
// 'computed parallelism' is the remainder of the 'max parallelism' less current 'utilization estimate'
|
||||||
final int computedParallelismForUtilization = maxParallelism - utilizationEstimate;
|
final int computedParallelismForUtilization = maxParallelism - utilizationEstimate;
|
||||||
|
|
||||||
// try to balance partition size with partition count so we don't end up with layer 2 'final merge' task that has
|
// try to balance partition size with partition count so we don't end up with layer 2 'final merge' task that has
|
||||||
// significantly more work to do than the layer 1 'parallel' tasks.
|
// significantly more work to do than the layer 1 'parallel' tasks.
|
||||||
final int computedParallelismForSequences = (int) Math.floor(Math.sqrt(sequences.size()));
|
final int computedParallelismForSequences = (int) Math.floor(Math.sqrt(sequences.size()));
|
||||||
|
|
||||||
// compute total number of layer 1 'parallel' tasks, for the utilzation parallelism, subtract 1 as the final merge
|
// compute total number of layer 1 'parallel' tasks, for the utilization parallelism, subtract 1 as the final merge
|
||||||
// task will take the remaining slot
|
// task will take the remaining slot
|
||||||
final int computedOptimalParallelism = Math.min(
|
final int computedOptimalParallelism = Math.min(
|
||||||
computedParallelismForSequences,
|
computedParallelismForSequences,
|
||||||
|
@ -484,7 +486,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This {@link RecursiveAction} is the work-horse of the {@link ParallelMergeCombiningSequence}, it merge-combines
|
* This {@link RecursiveAction} is the work-horse of the {@link ParallelMergeCombiningSequence}, it merge-combines
|
||||||
* a set of {@link BatchedResultsCursor} and produces output to a {@link BlockingQueue} with the help of a
|
* a set of {@link BatchedResultsCursor} and produces output to a {@link BlockingQueue} with the help of a
|
||||||
|
@ -501,6 +502,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
* how many times a task has continued executing, and utilized to compute a cumulative moving average of task run time
|
* how many times a task has continued executing, and utilized to compute a cumulative moving average of task run time
|
||||||
* per amount yielded in order to 'smooth' out the continual adjustment.
|
* per amount yielded in order to 'smooth' out the continual adjustment.
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
private static class MergeCombineAction<T> extends RecursiveAction
|
private static class MergeCombineAction<T> extends RecursiveAction
|
||||||
{
|
{
|
||||||
private final PriorityQueue<BatchedResultsCursor<T>> pQueue;
|
private final PriorityQueue<BatchedResultsCursor<T>> pQueue;
|
||||||
|
@ -539,6 +541,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
this.cancellationGizmo = cancellationGizmo;
|
this.cancellationGizmo = cancellationGizmo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
protected void compute()
|
protected void compute()
|
||||||
{
|
{
|
||||||
|
@ -682,6 +685,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
* majority of its time will be spent managed blocking until results are ready for each cursor, or will be incredibly
|
* majority of its time will be spent managed blocking until results are ready for each cursor, or will be incredibly
|
||||||
* short lived if all inputs are already available.
|
* short lived if all inputs are already available.
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
private static class PrepareMergeCombineInputsAction<T> extends RecursiveAction
|
private static class PrepareMergeCombineInputsAction<T> extends RecursiveAction
|
||||||
{
|
{
|
||||||
private final List<BatchedResultsCursor<T>> partition;
|
private final List<BatchedResultsCursor<T>> partition;
|
||||||
|
@ -717,6 +721,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
this.cancellationGizmo = cancellationGizmo;
|
this.cancellationGizmo = cancellationGizmo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
protected void compute()
|
protected void compute()
|
||||||
{
|
{
|
||||||
|
@ -757,7 +762,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link ForkJoinPool} friendly {@link BlockingQueue} feeder, adapted from 'QueueTaker' of Java documentation on
|
* {@link ForkJoinPool} friendly {@link BlockingQueue} feeder, adapted from 'QueueTaker' of Java documentation on
|
||||||
* {@link ForkJoinPool.ManagedBlocker},
|
* {@link ForkJoinPool.ManagedBlocker},
|
||||||
|
@ -816,7 +820,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holder object for an ordered batch of results from a sequence. Batching the results vastly reduces the amount of
|
* Holder object for an ordered batch of results from a sequence. Batching the results vastly reduces the amount of
|
||||||
* blocking that is needed to move results between stages of {@link MergeCombineAction} done in parallel, allowing
|
* blocking that is needed to move results between stages of {@link MergeCombineAction} done in parallel, allowing
|
||||||
|
@ -824,6 +827,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
*/
|
*/
|
||||||
static class ResultBatch<E>
|
static class ResultBatch<E>
|
||||||
{
|
{
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
static final ResultBatch TERMINAL = new ResultBatch();
|
static final ResultBatch TERMINAL = new ResultBatch();
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
@ -893,7 +897,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link ForkJoinPool} friendly conversion of {@link Sequence} to {@link Yielder< ResultBatch >}
|
* {@link ForkJoinPool} friendly conversion of {@link Sequence} to {@link Yielder< ResultBatch >}
|
||||||
*/
|
*/
|
||||||
|
@ -934,7 +937,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides a higher level cursor interface to provide individual results out {@link ResultBatch} provided by
|
* Provides a higher level cursor interface to provide individual results out {@link ResultBatch} provided by
|
||||||
* a {@link Yielder} or {@link BlockingQueue}. This is the mechanism that powers {@link MergeCombineAction}, where
|
* a {@link Yielder} or {@link BlockingQueue}. This is the mechanism that powers {@link MergeCombineAction}, where
|
||||||
|
@ -985,6 +987,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
return ordering.compare(get(), o.get());
|
return ordering.compare(get(), o.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o)
|
public boolean equals(Object o)
|
||||||
{
|
{
|
||||||
|
@ -1001,7 +1004,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link BatchedResultsCursor} that wraps a {@link Yielder} of {@link ResultBatch} to provide individual rows
|
* {@link BatchedResultsCursor} that wraps a {@link Yielder} of {@link ResultBatch} to provide individual rows
|
||||||
* of the result batch.
|
* of the result batch.
|
||||||
|
@ -1072,7 +1074,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link BatchedResultsCursor} that wraps a {@link BlockingQueue} of {@link ResultBatch} to provide individual
|
* {@link BatchedResultsCursor} that wraps a {@link BlockingQueue} of {@link ResultBatch} to provide individual
|
||||||
* rows from the result batch.
|
* rows from the result batch.
|
||||||
|
@ -1155,7 +1156,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Token to allow any {@link RecursiveAction} signal the others and the output sequence that something bad happened
|
* Token to allow any {@link RecursiveAction} signal the others and the output sequence that something bad happened
|
||||||
* and processing should cancel, such as a timeout or connection loss.
|
* and processing should cancel, such as a timeout or connection loss.
|
||||||
|
@ -1266,7 +1266,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holder to accumlate metrics for all work done {@link ParallelMergeCombiningSequence}, containing layer 1 task
|
* Holder to accumulate metrics for all work done {@link ParallelMergeCombiningSequence}, containing layer 1 task
|
||||||
* metrics in {@link #partitionMetrics} and final merge task metrics in {@link #mergeMetrics}, in order to compute
|
* metrics in {@link #partitionMetrics} and final merge task metrics in {@link #mergeMetrics}, in order to compute
|
||||||
* {@link MergeCombineMetrics} after the {@link ParallelMergeCombiningSequence} is completely consumed.
|
* {@link MergeCombineMetrics} after the {@link ParallelMergeCombiningSequence} is completely consumed.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -29,7 +29,7 @@ import org.joda.time.Interval;
|
||||||
* number. It's used where the data source, another essential part of {@link org.apache.druid.timeline.SegmentId}
|
* number. It's used where the data source, another essential part of {@link org.apache.druid.timeline.SegmentId}
|
||||||
* is determined by the context (e. g. in org.apache.druid.client.CachingClusteredClient, where SegmentDescriptor is
|
* is determined by the context (e. g. in org.apache.druid.client.CachingClusteredClient, where SegmentDescriptor is
|
||||||
* used when Brokers tell data servers which segments to include for a particular query) and where having lean JSON
|
* used when Brokers tell data servers which segments to include for a particular query) and where having lean JSON
|
||||||
* representations is important, because it's actively transferred detween Druid nodes. It's also for this reason that
|
* representations is important, because it's actively transferred between Druid nodes. It's also for this reason that
|
||||||
* the JSON field names of SegmentDescriptor are abbreviated.
|
* the JSON field names of SegmentDescriptor are abbreviated.
|
||||||
*/
|
*/
|
||||||
public class SegmentDescriptor
|
public class SegmentDescriptor
|
||||||
|
|
|
@ -107,7 +107,7 @@ public class JsonConfiguratorTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testsimpleConfigurate()
|
public void testSimpleConfigurate()
|
||||||
{
|
{
|
||||||
final JsonConfigurator configurator = new JsonConfigurator(mapper, validator);
|
final JsonConfigurator configurator = new JsonConfigurator(mapper, validator);
|
||||||
properties.setProperty(PROP_PREFIX + "prop1", "prop1");
|
properties.setProperty(PROP_PREFIX + "prop1", "prop1");
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.druid.common.config.NullHandling;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
|
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
|
||||||
|
import org.apache.druid.query.BaseQuery;
|
||||||
import org.apache.druid.query.Druids;
|
import org.apache.druid.query.Druids;
|
||||||
import org.apache.druid.query.QueryDataSource;
|
import org.apache.druid.query.QueryDataSource;
|
||||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
@ -295,7 +296,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
TimeseriesQuery.SKIP_EMPTY_BUCKETS,
|
TimeseriesQuery.SKIP_EMPTY_BUCKETS,
|
||||||
true,
|
true,
|
||||||
"sqlQueryId",
|
BaseQuery.SQL_QUERY_ID,
|
||||||
"dummy"
|
"dummy"
|
||||||
),
|
),
|
||||||
"d0"
|
"d0"
|
||||||
|
|
|
@ -1332,6 +1332,7 @@ public class CompactionTaskTest
|
||||||
RETRY_POLICY_FACTORY
|
RETRY_POLICY_FACTORY
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
final CompactionTask task = builder
|
final CompactionTask task = builder
|
||||||
.interval(Intervals.of("2000-01-01/2000-01-01"))
|
.interval(Intervals.of("2000-01-01/2000-01-01"))
|
||||||
.build();
|
.build();
|
||||||
|
@ -2012,6 +2013,7 @@ public class CompactionTaskTest
|
||||||
this.segments = segments;
|
this.segments = segments;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public <RetType> RetType submit(TaskAction<RetType> taskAction)
|
public <RetType> RetType submit(TaskAction<RetType> taskAction)
|
||||||
{
|
{
|
||||||
|
@ -2160,7 +2162,7 @@ public class CompactionTaskTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SettableColumnValueSelector makeNewSettableColumnValueSelector()
|
public SettableColumnValueSelector<?> makeNewSettableColumnValueSelector()
|
||||||
{
|
{
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import com.google.inject.Inject;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.RE;
|
import org.apache.druid.java.util.common.RE;
|
||||||
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
||||||
|
import org.apache.druid.query.BaseQuery;
|
||||||
import org.apache.druid.query.QueryException;
|
import org.apache.druid.query.QueryException;
|
||||||
import org.apache.druid.query.QueryInterruptedException;
|
import org.apache.druid.query.QueryInterruptedException;
|
||||||
import org.apache.druid.sql.http.SqlQuery;
|
import org.apache.druid.sql.http.SqlQuery;
|
||||||
|
@ -88,7 +89,7 @@ public class ITSqlCancelTest
|
||||||
queryResponseFutures.add(
|
queryResponseFutures.add(
|
||||||
sqlClient.queryAsync(
|
sqlClient.queryAsync(
|
||||||
sqlHelper.getQueryURL(config.getRouterUrl()),
|
sqlHelper.getQueryURL(config.getRouterUrl()),
|
||||||
new SqlQuery(QUERY, null, false, false, false, ImmutableMap.of("sqlQueryId", queryId), null)
|
new SqlQuery(QUERY, null, false, false, false, ImmutableMap.of(BaseQuery.SQL_QUERY_ID, queryId), null)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -125,7 +126,7 @@ public class ITSqlCancelTest
|
||||||
final Future<StatusResponseHolder> queryResponseFuture = sqlClient
|
final Future<StatusResponseHolder> queryResponseFuture = sqlClient
|
||||||
.queryAsync(
|
.queryAsync(
|
||||||
sqlHelper.getQueryURL(config.getRouterUrl()),
|
sqlHelper.getQueryURL(config.getRouterUrl()),
|
||||||
new SqlQuery(QUERY, null, false, false, false, ImmutableMap.of("sqlQueryId", "validId"), null)
|
new SqlQuery(QUERY, null, false, false, false, ImmutableMap.of(BaseQuery.SQL_QUERY_ID, "validId"), null)
|
||||||
);
|
);
|
||||||
|
|
||||||
// Wait until the sqlLifecycle is authorized and registered
|
// Wait until the sqlLifecycle is authorized and registered
|
||||||
|
|
|
@ -56,8 +56,8 @@ public class QueryContexts
|
||||||
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
|
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
|
||||||
public static final String REWRITE_JOIN_TO_FILTER_ENABLE_KEY = "enableRewriteJoinToFilter";
|
public static final String REWRITE_JOIN_TO_FILTER_ENABLE_KEY = "enableRewriteJoinToFilter";
|
||||||
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";
|
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";
|
||||||
// This flag control whether a sql join query with left scan should be attempted to be run as direct table access
|
// This flag controls whether a SQL join query with left scan should be attempted to be run as direct table access
|
||||||
// instead of being wrapped inside a query. With direct table access enabled, druid can push down the join operation to
|
// instead of being wrapped inside a query. With direct table access enabled, Druid can push down the join operation to
|
||||||
// data servers.
|
// data servers.
|
||||||
public static final String SQL_JOIN_LEFT_SCAN_DIRECT = "enableJoinLeftTableScanDirect";
|
public static final String SQL_JOIN_LEFT_SCAN_DIRECT = "enableJoinLeftTableScanDirect";
|
||||||
public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
|
public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
|
||||||
|
|
|
@ -262,31 +262,51 @@ public interface QueryMetrics<QueryType extends Query<?>>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers "query time" metric.
|
* Registers "query time" metric.
|
||||||
|
*
|
||||||
|
* Measures the time between a Jetty thread starting to handle a query, and the response being fully written to
|
||||||
|
* the response output stream. Does not include time spent waiting in a queue before the query runs.
|
||||||
*/
|
*/
|
||||||
QueryMetrics<QueryType> reportQueryTime(long timeNs);
|
QueryMetrics<QueryType> reportQueryTime(long timeNs);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers "query bytes" metric.
|
* Registers "query bytes" metric.
|
||||||
|
*
|
||||||
|
* Measures the total number of bytes written by the query server thread to the response output stream.
|
||||||
|
*
|
||||||
|
* Emitted once per query.
|
||||||
*/
|
*/
|
||||||
QueryMetrics<QueryType> reportQueryBytes(long byteCount);
|
QueryMetrics<QueryType> reportQueryBytes(long byteCount);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registeres "segments queried count" metric.
|
* Registers "segments queried count" metric.
|
||||||
*/
|
*/
|
||||||
QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount);
|
QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers "wait time" metric.
|
* Registers "wait time" metric.
|
||||||
|
*
|
||||||
|
* Measures the total time segment-processing runnables spent waiting for execution in the processing thread pool.
|
||||||
|
*
|
||||||
|
* Emitted once per segment.
|
||||||
*/
|
*/
|
||||||
QueryMetrics<QueryType> reportWaitTime(long timeNs);
|
QueryMetrics<QueryType> reportWaitTime(long timeNs);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers "segment time" metric.
|
* Registers "segment time" metric.
|
||||||
|
*
|
||||||
|
* Measures the total wall-clock time spent operating on segments in processing threads.
|
||||||
|
*
|
||||||
|
* Emitted once per segment.
|
||||||
*/
|
*/
|
||||||
QueryMetrics<QueryType> reportSegmentTime(long timeNs);
|
QueryMetrics<QueryType> reportSegmentTime(long timeNs);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers "segmentAndCache time" metric.
|
* Registers "segmentAndCache time" metric.
|
||||||
|
*
|
||||||
|
* Measures the total wall-clock time spent in processing threads, either operating on segments or retrieving items
|
||||||
|
* from cache.
|
||||||
|
*
|
||||||
|
* Emitted once per segment.
|
||||||
*/
|
*/
|
||||||
QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs);
|
QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs);
|
||||||
|
|
||||||
|
|
|
@ -83,7 +83,6 @@ public interface Filter
|
||||||
*/
|
*/
|
||||||
double estimateSelectivity(BitmapIndexSelector indexSelector);
|
double estimateSelectivity(BitmapIndexSelector indexSelector);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a ValueMatcher that applies this filter to row values.
|
* Get a ValueMatcher that applies this filter to row values.
|
||||||
*
|
*
|
||||||
|
|
|
@ -27,7 +27,7 @@ import javax.annotation.Nullable;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class provides a mechansim to influence whether or not indexes are used for a {@link Filter} during processing
|
* This class provides a mechanism to influence whether or not indexes are used for a {@link Filter} during processing
|
||||||
* by {@link org.apache.druid.segment.QueryableIndexStorageAdapter#analyzeFilter} (i.e. will a {@link Filter} be a "pre"
|
* by {@link org.apache.druid.segment.QueryableIndexStorageAdapter#analyzeFilter} (i.e. will a {@link Filter} be a "pre"
|
||||||
* filter in which we union indexes for all values that match the filter to create a
|
* filter in which we union indexes for all values that match the filter to create a
|
||||||
* {@link org.apache.druid.segment.BitmapOffset}/{@link org.apache.druid.segment.vector.BitmapVectorOffset}, or will it
|
* {@link org.apache.druid.segment.BitmapOffset}/{@link org.apache.druid.segment.vector.BitmapVectorOffset}, or will it
|
||||||
|
|
|
@ -34,11 +34,11 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Segment id is stored as a String rather than {@link org.apache.druid.timeline.SegmentId}, because when a
|
* Segment id is stored as a String rather than {@link org.apache.druid.timeline.SegmentId}, because when a
|
||||||
* SegmentAnalysis object is sent across Druid nodes, on the reciever (deserialization) side it's impossible to
|
* SegmentAnalysis object is sent across Druid nodes, on the receiver (deserialization) side it's impossible to
|
||||||
* unambiguously convert a segment id string (as transmitted in the JSON format) back into a {@code SegmentId} object
|
* unambiguously convert a segment id string (as transmitted in the JSON format) back into a {@code SegmentId} object
|
||||||
* ({@link org.apache.druid.timeline.SegmentId#tryParse} javadoc explains that ambiguities in details). It would be
|
* ({@link org.apache.druid.timeline.SegmentId#tryParse} javadoc explains that ambiguities in details). It would be
|
||||||
* fine to have the type of this field of Object, setting it to {@code SegmentId} on the sender side and remaining as
|
* fine to have the type of this field of Object, setting it to {@code SegmentId} on the sender side and remaining as
|
||||||
* a String on the reciever side, but it's even less type-safe than always storing the segment id as a String.
|
* a String on the receiver side, but it's even less type-safe than always storing the segment id as a String.
|
||||||
*/
|
*/
|
||||||
private final String id;
|
private final String id;
|
||||||
private final List<Interval> interval;
|
private final List<Interval> interval;
|
||||||
|
|
|
@ -48,7 +48,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable
|
||||||
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.
|
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.
|
||||||
* @throws IOException if an exception was thrown closing the index
|
* @throws IOException if an exception was thrown closing the index
|
||||||
*/
|
*/
|
||||||
//@Deprecated // This is still required for SimpleQueryableIndex. It should not go away unitl SimpleQueryableIndex is fixed
|
//@Deprecated // This is still required for SimpleQueryableIndex. It should not go away until SimpleQueryableIndex is fixed
|
||||||
@Override
|
@Override
|
||||||
void close();
|
void close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,13 @@ import org.apache.druid.query.monomorphicprocessing.HotLoopCallee;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indexed is a fixed-size, immutable, indexed set of values which allows
|
||||||
|
* locating a specific index via an exact match, the semantics of which are defined
|
||||||
|
* by the implementation. The indexed is ordered, and may contain duplicate values.
|
||||||
|
*
|
||||||
|
* @param <T> the type of the value
|
||||||
|
*/
|
||||||
@PublicApi
|
@PublicApi
|
||||||
public interface Indexed<T> extends Iterable<T>, HotLoopCallee
|
public interface Indexed<T> extends Iterable<T>, HotLoopCallee
|
||||||
{
|
{
|
||||||
|
|
|
@ -257,6 +257,7 @@ public class ListFilteredVirtualColumn implements VirtualColumn
|
||||||
{
|
{
|
||||||
return delegate.getBitmap(idMapping.getReverseId(idx));
|
return delegate.getBitmap(idMapping.getReverseId(idx));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getCardinality()
|
public int getCardinality()
|
||||||
{
|
{
|
||||||
|
|
|
@ -62,12 +62,12 @@ public class DataSourceMetadataQueryTest
|
||||||
@Test
|
@Test
|
||||||
public void testQuerySerialization() throws IOException
|
public void testQuerySerialization() throws IOException
|
||||||
{
|
{
|
||||||
Query query = Druids.newDataSourceMetadataQueryBuilder()
|
Query<?> query = Druids.newDataSourceMetadataQueryBuilder()
|
||||||
.dataSource("testing")
|
.dataSource("testing")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
String json = JSON_MAPPER.writeValueAsString(query);
|
String json = JSON_MAPPER.writeValueAsString(query);
|
||||||
Query serdeQuery = JSON_MAPPER.readValue(json, Query.class);
|
Query<?> serdeQuery = JSON_MAPPER.readValue(json, Query.class);
|
||||||
|
|
||||||
Assert.assertEquals(query, serdeQuery);
|
Assert.assertEquals(query, serdeQuery);
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ public class DataSourceMetadataQueryTest
|
||||||
|
|
||||||
final ObjectMapper mapper = new DefaultObjectMapper();
|
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
final Query serdeQuery = mapper.readValue(
|
final Query<?> serdeQuery = mapper.readValue(
|
||||||
mapper.writeValueAsBytes(
|
mapper.writeValueAsBytes(
|
||||||
mapper.readValue(
|
mapper.readValue(
|
||||||
mapper.writeValueAsString(
|
mapper.writeValueAsString(
|
||||||
|
@ -103,7 +103,7 @@ public class DataSourceMetadataQueryTest
|
||||||
), Query.class
|
), Query.class
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertEquals(1, serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY));
|
Assert.assertEquals((Integer) 1, serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY));
|
||||||
Assert.assertEquals(true, serdeQuery.getContextValue("useCache"));
|
Assert.assertEquals(true, serdeQuery.getContextValue("useCache"));
|
||||||
Assert.assertEquals("true", serdeQuery.getContextValue("populateCache"));
|
Assert.assertEquals("true", serdeQuery.getContextValue("populateCache"));
|
||||||
Assert.assertEquals(true, serdeQuery.getContextValue("finalize"));
|
Assert.assertEquals(true, serdeQuery.getContextValue("finalize"));
|
||||||
|
@ -232,7 +232,7 @@ public class DataSourceMetadataQueryTest
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertEquals(segments.size(), 2);
|
Assert.assertEquals(segments.size(), 2);
|
||||||
// should only have the latest segments.
|
// should only have the latest segments.
|
||||||
List<LogicalSegment> expected = Arrays.asList(
|
List<LogicalSegment> expected = Arrays.asList(
|
||||||
new LogicalSegment()
|
new LogicalSegment()
|
||||||
{
|
{
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.druid.query.QueryRunner;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class QueryableDruidServer<T extends QueryRunner>
|
public class QueryableDruidServer<T extends QueryRunner<?>>
|
||||||
{
|
{
|
||||||
private final DruidServer server;
|
private final DruidServer server;
|
||||||
private final T queryRunner;
|
private final T queryRunner;
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.client.selector;
|
||||||
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
|
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
|
||||||
import org.apache.druid.client.DataSegmentInterner;
|
import org.apache.druid.client.DataSegmentInterner;
|
||||||
import org.apache.druid.query.Query;
|
import org.apache.druid.query.Query;
|
||||||
|
import org.apache.druid.query.QueryRunner;
|
||||||
import org.apache.druid.server.coordination.DruidServerMetadata;
|
import org.apache.druid.server.coordination.DruidServerMetadata;
|
||||||
import org.apache.druid.server.coordination.ServerType;
|
import org.apache.druid.server.coordination.ServerType;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
|
@ -161,7 +162,7 @@ public class ServerSelector implements Overshadowable<ServerSelector>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public <T> QueryableDruidServer pick(@Nullable Query<T> query)
|
public <T> QueryableDruidServer<? extends QueryRunner<T>> pick(@Nullable Query<T> query)
|
||||||
{
|
{
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!historicalServers.isEmpty()) {
|
if (!historicalServers.isEmpty()) {
|
||||||
|
|
|
@ -49,7 +49,6 @@ public class FileRequestLoggerProvider implements RequestLoggerProvider
|
||||||
@NotNull
|
@NotNull
|
||||||
private ScheduledExecutorFactory factory = null;
|
private ScheduledExecutorFactory factory = null;
|
||||||
|
|
||||||
|
|
||||||
@JacksonInject
|
@JacksonInject
|
||||||
@NotNull
|
@NotNull
|
||||||
@Json
|
@Json
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.query.BaseQuery;
|
||||||
import org.apache.druid.query.Query;
|
import org.apache.druid.query.Query;
|
||||||
import org.apache.druid.query.TableDataSource;
|
import org.apache.druid.query.TableDataSource;
|
||||||
import org.apache.druid.server.RequestLogLine;
|
import org.apache.druid.server.RequestLogLine;
|
||||||
|
@ -60,7 +61,7 @@ public class LoggingRequestLogger implements RequestLogger
|
||||||
try {
|
try {
|
||||||
final Query query = requestLogLine.getQuery();
|
final Query query = requestLogLine.getQuery();
|
||||||
MDC.put("queryId", query.getId());
|
MDC.put("queryId", query.getId());
|
||||||
MDC.put("sqlQueryId", StringUtils.nullToEmptyNonDruidDataString(query.getSqlQueryId()));
|
MDC.put(BaseQuery.SQL_QUERY_ID, StringUtils.nullToEmptyNonDruidDataString(query.getSqlQueryId()));
|
||||||
MDC.put("dataSource", String.join(",", query.getDataSource().getTableNames()));
|
MDC.put("dataSource", String.join(",", query.getDataSource().getTableNames()));
|
||||||
MDC.put("queryType", query.getType());
|
MDC.put("queryType", query.getType());
|
||||||
MDC.put("isNested", String.valueOf(!(query.getDataSource() instanceof TableDataSource)));
|
MDC.put("isNested", String.valueOf(!(query.getDataSource() instanceof TableDataSource)));
|
||||||
|
|
|
@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
import com.google.inject.Provider;
|
import com.google.inject.Provider;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Marker interface for things that can provide a RequestLogger. This can be combined with jackson polymorphic serde
|
* A Marker interface for things that can provide a RequestLogger. This can be combined with Jackson polymorphic serde
|
||||||
* to provide new RequestLogger implementations as plugins.
|
* to provide new RequestLogger implementations as plugins.
|
||||||
*/
|
*/
|
||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||||
|
|
|
@ -153,7 +153,7 @@ public class DruidPlanner implements Closeable
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepare an SQL query for execution, including some initial parsing and validation and any dyanmic parameter type
|
* Prepare an SQL query for execution, including some initial parsing and validation and any dynamic parameter type
|
||||||
* resolution, to support prepared statements via JDBC.
|
* resolution, to support prepared statements via JDBC.
|
||||||
*
|
*
|
||||||
* In some future this could perhaps re-use some of the work done by {@link #validate()}
|
* In some future this could perhaps re-use some of the work done by {@link #validate()}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.Numbers;
|
import org.apache.druid.java.util.common.Numbers;
|
||||||
import org.apache.druid.math.expr.ExprMacroTable;
|
import org.apache.druid.math.expr.ExprMacroTable;
|
||||||
|
import org.apache.druid.query.BaseQuery;
|
||||||
import org.apache.druid.server.security.Access;
|
import org.apache.druid.server.security.Access;
|
||||||
import org.apache.druid.server.security.AuthenticationResult;
|
import org.apache.druid.server.security.AuthenticationResult;
|
||||||
import org.apache.druid.server.security.ResourceAction;
|
import org.apache.druid.server.security.ResourceAction;
|
||||||
|
@ -57,7 +58,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
public class PlannerContext
|
public class PlannerContext
|
||||||
{
|
{
|
||||||
// query context keys
|
// query context keys
|
||||||
public static final String CTX_SQL_QUERY_ID = "sqlQueryId";
|
public static final String CTX_SQL_QUERY_ID = BaseQuery.SQL_QUERY_ID;
|
||||||
public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp";
|
public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp";
|
||||||
public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone";
|
public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone";
|
||||||
public static final String CTX_SQL_STRINGIFY_ARRAYS = "sqlStringifyArrays";
|
public static final String CTX_SQL_STRINGIFY_ARRAYS = "sqlStringifyArrays";
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.io.Closer;
|
import org.apache.druid.java.util.common.io.Closer;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
import org.apache.druid.math.expr.ExprMacroTable;
|
import org.apache.druid.math.expr.ExprMacroTable;
|
||||||
|
import org.apache.druid.query.BaseQuery;
|
||||||
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
import org.apache.druid.server.QueryLifecycleFactory;
|
import org.apache.druid.server.QueryLifecycleFactory;
|
||||||
import org.apache.druid.server.QueryScheduler;
|
import org.apache.druid.server.QueryScheduler;
|
||||||
|
@ -232,7 +233,7 @@ public abstract class DruidAvaticaHandlerTest extends CalciteTestBase
|
||||||
final Properties propertiesLosAngeles = new Properties();
|
final Properties propertiesLosAngeles = new Properties();
|
||||||
propertiesLosAngeles.setProperty("sqlTimeZone", "America/Los_Angeles");
|
propertiesLosAngeles.setProperty("sqlTimeZone", "America/Los_Angeles");
|
||||||
propertiesLosAngeles.setProperty("user", "regularUserLA");
|
propertiesLosAngeles.setProperty("user", "regularUserLA");
|
||||||
propertiesLosAngeles.setProperty("sqlQueryId", DUMMY_SQL_QUERY_ID);
|
propertiesLosAngeles.setProperty(BaseQuery.SQL_QUERY_ID, DUMMY_SQL_QUERY_ID);
|
||||||
clientLosAngeles = DriverManager.getConnection(url, propertiesLosAngeles);
|
clientLosAngeles = DriverManager.getConnection(url, propertiesLosAngeles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
|
||||||
import org.apache.druid.java.util.common.io.Closer;
|
import org.apache.druid.java.util.common.io.Closer;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
import org.apache.druid.math.expr.ExprMacroTable;
|
import org.apache.druid.math.expr.ExprMacroTable;
|
||||||
|
import org.apache.druid.query.BaseQuery;
|
||||||
import org.apache.druid.query.Query;
|
import org.apache.druid.query.Query;
|
||||||
import org.apache.druid.query.QueryCapacityExceededException;
|
import org.apache.druid.query.QueryCapacityExceededException;
|
||||||
import org.apache.druid.query.QueryContexts;
|
import org.apache.druid.query.QueryContexts;
|
||||||
|
@ -1129,7 +1130,7 @@ public class SqlResourceTest extends CalciteTestBase
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
ImmutableMap.of("maxMergingDictionarySize", 1, "sqlQueryId", "id"),
|
ImmutableMap.of("maxMergingDictionarySize", 1, BaseQuery.SQL_QUERY_ID, "id"),
|
||||||
null
|
null
|
||||||
)
|
)
|
||||||
).lhs;
|
).lhs;
|
||||||
|
@ -1147,7 +1148,7 @@ public class SqlResourceTest extends CalciteTestBase
|
||||||
String errorMessage = "This will be support in Druid 9999";
|
String errorMessage = "This will be support in Druid 9999";
|
||||||
SqlQuery badQuery = EasyMock.createMock(SqlQuery.class);
|
SqlQuery badQuery = EasyMock.createMock(SqlQuery.class);
|
||||||
EasyMock.expect(badQuery.getQuery()).andReturn("SELECT ANSWER TO LIFE");
|
EasyMock.expect(badQuery.getQuery()).andReturn("SELECT ANSWER TO LIFE");
|
||||||
EasyMock.expect(badQuery.getContext()).andReturn(ImmutableMap.of("sqlQueryId", "id"));
|
EasyMock.expect(badQuery.getContext()).andReturn(ImmutableMap.of(BaseQuery.SQL_QUERY_ID, "id"));
|
||||||
EasyMock.expect(badQuery.getParameterList()).andThrow(new QueryUnsupportedException(errorMessage));
|
EasyMock.expect(badQuery.getParameterList()).andThrow(new QueryUnsupportedException(errorMessage));
|
||||||
EasyMock.replay(badQuery);
|
EasyMock.replay(badQuery);
|
||||||
final QueryException exception = doPost(badQuery).lhs;
|
final QueryException exception = doPost(badQuery).lhs;
|
||||||
|
@ -1291,7 +1292,7 @@ public class SqlResourceTest extends CalciteTestBase
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
ImmutableMap.of("priority", -5, "sqlQueryId", sqlQueryId),
|
ImmutableMap.of("priority", -5, BaseQuery.SQL_QUERY_ID, sqlQueryId),
|
||||||
null
|
null
|
||||||
),
|
),
|
||||||
makeRegularUserReq()
|
makeRegularUserReq()
|
||||||
|
@ -1332,7 +1333,7 @@ public class SqlResourceTest extends CalciteTestBase
|
||||||
public void testQueryTimeoutException() throws Exception
|
public void testQueryTimeoutException() throws Exception
|
||||||
{
|
{
|
||||||
final String sqlQueryId = "timeoutTest";
|
final String sqlQueryId = "timeoutTest";
|
||||||
Map<String, Object> queryContext = ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1, "sqlQueryId", sqlQueryId);
|
Map<String, Object> queryContext = ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1, BaseQuery.SQL_QUERY_ID, sqlQueryId);
|
||||||
final QueryException timeoutException = doPost(
|
final QueryException timeoutException = doPost(
|
||||||
new SqlQuery(
|
new SqlQuery(
|
||||||
"SELECT CAST(__time AS DATE), dim1, dim2, dim3 FROM druid.foo GROUP by __time, dim1, dim2, dim3 ORDER BY dim2 DESC",
|
"SELECT CAST(__time AS DATE), dim1, dim2, dim3 FROM druid.foo GROUP by __time, dim1, dim2, dim3 ORDER BY dim2 DESC",
|
||||||
|
@ -1483,7 +1484,7 @@ public class SqlResourceTest extends CalciteTestBase
|
||||||
|
|
||||||
private static SqlQuery createSimpleQueryWithId(String sqlQueryId, String sql)
|
private static SqlQuery createSimpleQueryWithId(String sqlQueryId, String sql)
|
||||||
{
|
{
|
||||||
return new SqlQuery(sql, null, false, false, false, ImmutableMap.of("sqlQueryId", sqlQueryId), null);
|
return new SqlQuery(sql, null, false, false, false, ImmutableMap.of(BaseQuery.SQL_QUERY_ID, sqlQueryId), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Pair<QueryException, List<Map<String, Object>>> doPost(final SqlQuery query) throws Exception
|
private Pair<QueryException, List<Map<String, Object>>> doPost(final SqlQuery query) throws Exception
|
||||||
|
|
Loading…
Reference in New Issue