mirror of https://github.com/apache/druid.git
Fix a resource leak with Window processing (#14573)
* Fix a resource leak with Window processing Additionally, in order to find the leak, there were adjustments to the StupidPool to track leaks a bit better. It would appear that the pool objects get GC'd during testing for some reason which was causing some incorrect identification of leaks from objects that had been returned but were GC'd along with the pool. * Suppress unused warning
This commit is contained in:
parent
12ce187ae4
commit
65e1b27aa7
|
@ -24,12 +24,13 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.base.Supplier;
|
||||
import org.apache.druid.java.util.common.Cleaners;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -105,7 +106,8 @@ public class StupidPool<T> implements NonBlockingPool<T>
|
|||
private final AtomicLong createdObjectsCounter = new AtomicLong(0);
|
||||
private final AtomicLong leakedObjectsCounter = new AtomicLong(0);
|
||||
|
||||
private final AtomicReference<RuntimeException> capturedException = new AtomicReference<>(null);
|
||||
private final AtomicReference<CopyOnWriteArrayList<LeakedException>> capturedException =
|
||||
new AtomicReference<>(null);
|
||||
|
||||
//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
|
||||
private final int objectsCacheMaxCount;
|
||||
|
@ -149,30 +151,41 @@ public class StupidPool<T> implements NonBlockingPool<T>
|
|||
ObjectResourceHolder resourceHolder = objects.poll();
|
||||
if (resourceHolder == null) {
|
||||
if (POISONED.get() && capturedException.get() != null) {
|
||||
throw capturedException.get();
|
||||
throw makeExceptionForLeaks(capturedException.get());
|
||||
}
|
||||
return makeObjectWithHandler();
|
||||
} else {
|
||||
poolSize.decrementAndGet();
|
||||
if (POISONED.get()) {
|
||||
final RuntimeException exception = capturedException.get();
|
||||
if (exception == null) {
|
||||
resourceHolder.notifier.except = new RE("Thread[%s]: leaky leak!", Thread.currentThread().getName());
|
||||
final CopyOnWriteArrayList<LeakedException> exceptionList = capturedException.get();
|
||||
if (exceptionList == null) {
|
||||
resourceHolder.notifier.except = new LeakedException(Thread.currentThread().getName());
|
||||
} else {
|
||||
throw exception;
|
||||
throw makeExceptionForLeaks(exceptionList);
|
||||
}
|
||||
}
|
||||
return resourceHolder;
|
||||
}
|
||||
}
|
||||
|
||||
private RuntimeException makeExceptionForLeaks(CopyOnWriteArrayList<LeakedException> exceptionList)
|
||||
{
|
||||
RuntimeException toThrow = new RuntimeException(
|
||||
"Leaks happened, each suppressed exception represents one code path that checked out an object and didn't return it."
|
||||
);
|
||||
for (LeakedException exception : exceptionList) {
|
||||
toThrow.addSuppressed(exception);
|
||||
}
|
||||
return toThrow;
|
||||
}
|
||||
|
||||
private ObjectResourceHolder makeObjectWithHandler()
|
||||
{
|
||||
T object = generator.get();
|
||||
createdObjectsCounter.incrementAndGet();
|
||||
ObjectId objectId = new ObjectId();
|
||||
ObjectLeakNotifier notifier = new ObjectLeakNotifier(this, POISONED.get());
|
||||
// Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken
|
||||
// Using objectId as referent for Cleaner, because if the object itself (e.g. ByteBuffer) is leaked after taken
|
||||
// from the pool, and the ResourceHolder is not closed, Cleaner won't notify about the leak.
|
||||
return new ObjectResourceHolder(object, objectId, Cleaners.register(objectId, notifier), notifier);
|
||||
}
|
||||
|
@ -198,6 +211,7 @@ public class StupidPool<T> implements NonBlockingPool<T>
|
|||
private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier)
|
||||
{
|
||||
long currentPoolSize;
|
||||
notifier.except = null;
|
||||
do {
|
||||
currentPoolSize = poolSize.get();
|
||||
if (currentPoolSize >= objectsCacheMaxCount) {
|
||||
|
@ -310,14 +324,14 @@ public class StupidPool<T> implements NonBlockingPool<T>
|
|||
final AtomicLong leakedObjectsCounter;
|
||||
final AtomicBoolean disabled = new AtomicBoolean(false);
|
||||
|
||||
private RuntimeException except;
|
||||
private LeakedException except;
|
||||
|
||||
ObjectLeakNotifier(StupidPool<?> pool, boolean poisoned)
|
||||
{
|
||||
poolReference = new WeakReference<>(pool);
|
||||
leakedObjectsCounter = pool.leakedObjectsCounter;
|
||||
|
||||
except = poisoned ? new RE("Thread[%s]: drip drip", Thread.currentThread().getName()) : null;
|
||||
except = poisoned ? new LeakedException(Thread.currentThread().getName()) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -329,7 +343,12 @@ public class StupidPool<T> implements NonBlockingPool<T>
|
|||
final StupidPool<?> pool = poolReference.get();
|
||||
log.warn("Not closed! Object leaked from %s. Allowing gc to prevent leak.", pool);
|
||||
if (except != null && pool != null) {
|
||||
pool.capturedException.set(except);
|
||||
CopyOnWriteArrayList<LeakedException> exceptions = pool.capturedException.get();
|
||||
if (exceptions == null) {
|
||||
pool.capturedException.compareAndSet(null, new CopyOnWriteArrayList<>());
|
||||
}
|
||||
exceptions = pool.capturedException.get();
|
||||
exceptions.add(except);
|
||||
log.error(except, "notifier[%s], dumping stack trace from object checkout and poisoning pool", this);
|
||||
}
|
||||
}
|
||||
|
@ -357,4 +376,25 @@ public class StupidPool<T> implements NonBlockingPool<T>
|
|||
private static class ObjectId
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* This exception exists primarily to defer string interpolation to when getMessage is called instead of
|
||||
* interpolating on constructor. While not a primary bottleneck, the string interpolation for poisoned stupid
|
||||
* pools does show up in profiling so avoiding it is just good hygiene.
|
||||
*/
|
||||
private static class LeakedException extends RuntimeException
|
||||
{
|
||||
private final String threadName;
|
||||
|
||||
public LeakedException(String threadName)
|
||||
{
|
||||
this.threadName = threadName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMessage()
|
||||
{
|
||||
return StringUtils.format("Originally checked out by thread [%s]", threadName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,16 +19,13 @@
|
|||
|
||||
package org.apache.druid.query.operator;
|
||||
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.query.QueryPlus;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
|
||||
public class LimitTimeIntervalOperator implements Operator
|
||||
{
|
||||
|
@ -37,16 +34,11 @@ public class LimitTimeIntervalOperator implements Operator
|
|||
|
||||
public LimitTimeIntervalOperator(
|
||||
Operator segmentOperator,
|
||||
QueryPlus<RowsAndColumns> queryPlus
|
||||
Interval interval
|
||||
)
|
||||
{
|
||||
this.segmentOperator = segmentOperator;
|
||||
|
||||
final List<Interval> intervals = queryPlus.getQuery().getIntervals();
|
||||
if (intervals.size() != 1) {
|
||||
throw new ISE("Can only handle a single interval, got[%s]", intervals);
|
||||
}
|
||||
interval = intervals.get(0);
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -61,11 +53,13 @@ public class LimitTimeIntervalOperator implements Operator
|
|||
@Override
|
||||
public Signal push(RowsAndColumns rac)
|
||||
{
|
||||
final RowsAndColumnsDecorator decor = RowsAndColumnsDecorator.fromRAC(rac);
|
||||
if (!Intervals.isEternity(interval)) {
|
||||
if (Intervals.isEternity(interval)) {
|
||||
return receiver.push(rac);
|
||||
} else {
|
||||
final RowsAndColumnsDecorator decor = RowsAndColumnsDecorator.fromRAC(rac);
|
||||
decor.limitTimeRange(interval);
|
||||
return receiver.push(decor.toRowsAndColumns());
|
||||
}
|
||||
return receiver.push(decor.toRowsAndColumns());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,16 +20,25 @@
|
|||
package org.apache.druid.query.operator;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.query.QueryProcessingPool;
|
||||
import org.apache.druid.query.QueryRunner;
|
||||
import org.apache.druid.query.QueryRunnerFactory;
|
||||
import org.apache.druid.query.QueryToolChest;
|
||||
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
|
||||
public class WindowOperatorQueryQueryRunnerFactory implements QueryRunnerFactory<RowsAndColumns, WindowOperatorQuery>
|
||||
{
|
||||
|
@ -41,10 +50,21 @@ public class WindowOperatorQueryQueryRunnerFactory implements QueryRunnerFactory
|
|||
return (queryPlus, responseContext) ->
|
||||
new OperatorSequence(() -> {
|
||||
Operator op = new SegmentToRowsAndColumnsOperator(segment);
|
||||
op = new LimitTimeIntervalOperator(op, queryPlus);
|
||||
|
||||
final List<Interval> intervals = queryPlus.getQuery().getIntervals();
|
||||
if (intervals.size() != 1) {
|
||||
throw DruidException.defensive("Can only handle a single interval, got [%s]", intervals);
|
||||
}
|
||||
|
||||
final Interval interval = intervals.get(0);
|
||||
if (!Intervals.isEternity(interval)) {
|
||||
op = new LimitTimeIntervalOperator(op, interval);
|
||||
}
|
||||
|
||||
for (OperatorFactory leaf : ((WindowOperatorQuery) queryPlus.getQuery()).getLeafOperators()) {
|
||||
op = leaf.wrap(op);
|
||||
}
|
||||
|
||||
return op;
|
||||
});
|
||||
}
|
||||
|
@ -58,20 +78,47 @@ public class WindowOperatorQueryQueryRunnerFactory implements QueryRunnerFactory
|
|||
// This merge is extremely naive, there is no ordering being imposed over the data, nor is there any attempt
|
||||
// to shrink the size of the data before pushing it across the wire. This code implementation is intended more
|
||||
// to make this work for tests and less to work in production. That's why the WindowOperatorQuery forces
|
||||
// a super-secrete context parameter to be set to actually allow it to run a query that pushes all the way down
|
||||
// a super-secret context parameter to be set to actually allow it to run a query that pushes all the way down
|
||||
// like this. When this gets fixed, we can remove that parameter.
|
||||
return (queryPlus, responseContext) -> Sequences.concat(
|
||||
Sequences.map(
|
||||
Sequences.simple(queryRunners),
|
||||
new Function<QueryRunner<RowsAndColumns>, Sequence<RowsAndColumns>>()
|
||||
{
|
||||
@SuppressWarnings("ConstantConditions")
|
||||
@Nullable
|
||||
@Override
|
||||
public Sequence<RowsAndColumns> apply(
|
||||
@Nullable QueryRunner<RowsAndColumns> input
|
||||
)
|
||||
{
|
||||
return input.run(queryPlus, responseContext);
|
||||
return Sequences.map(
|
||||
input.run(queryPlus, responseContext),
|
||||
new Function<RowsAndColumns, RowsAndColumns>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public RowsAndColumns apply(@Nullable RowsAndColumns input)
|
||||
{
|
||||
// This is interim code to force a materialization by synthesizing the wire transfer
|
||||
// that will need to naturally happen as we flesh out this code more. For now, we
|
||||
// materialize the bytes on-heap and then read them back in as a frame.
|
||||
if (input instanceof LazilyDecoratedRowsAndColumns) {
|
||||
final WireTransferable wire = WireTransferable.fromRAC(input);
|
||||
final byte[] frameBytes = wire.bytesToTransfer();
|
||||
|
||||
RowSignature.Builder sigBob = RowSignature.builder();
|
||||
for (String column : input.getColumnNames()) {
|
||||
sigBob.add(column, input.findColumn(column).toAccessor().getType());
|
||||
}
|
||||
|
||||
return new FrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
|
||||
}
|
||||
return input;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
}
|
||||
}
|
||||
)
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.druid.frame.write.FrameWriterFactory;
|
|||
import org.apache.druid.frame.write.FrameWriters;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
|
@ -41,6 +42,7 @@ import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
|
|||
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
|
||||
import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator;
|
||||
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
|
||||
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
import org.apache.druid.segment.Cursor;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
|
@ -56,11 +58,16 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
||||
{
|
||||
private static final Map<Class<?>, Function<LazilyDecoratedRowsAndColumns, ?>> AS_MAP =
|
||||
RowsAndColumns.makeAsMap(LazilyDecoratedRowsAndColumns.class);
|
||||
|
||||
private RowsAndColumns base;
|
||||
private Interval interval;
|
||||
private Filter filter;
|
||||
|
@ -118,33 +125,50 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
|||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
{
|
||||
if (RowsAndColumnsDecorator.class.equals(clazz)) {
|
||||
// If we don't have a projection defined, then it's safe to continue collecting more decorations as we
|
||||
// can meaningfully merge them together.
|
||||
if (viewableColumns == null || viewableColumns.isEmpty()) {
|
||||
return (T) new DefaultRowsAndColumnsDecorator(
|
||||
base,
|
||||
interval,
|
||||
filter,
|
||||
virtualColumns,
|
||||
limit,
|
||||
ordering
|
||||
);
|
||||
} else {
|
||||
return (T) new DefaultRowsAndColumnsDecorator(this);
|
||||
}
|
||||
//noinspection ReturnOfNull
|
||||
return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@SemanticCreator
|
||||
public RowsAndColumnsDecorator toRowsAndColumnsDecorator()
|
||||
{
|
||||
// If we don't have a projection defined, then it's safe to continue collecting more decorations as we
|
||||
// can meaningfully merge them together.
|
||||
if (viewableColumns == null || viewableColumns.isEmpty()) {
|
||||
return new DefaultRowsAndColumnsDecorator(base, interval, filter, virtualColumns, limit, ordering);
|
||||
} else {
|
||||
return new DefaultRowsAndColumnsDecorator(this);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@SemanticCreator
|
||||
public WireTransferable toWireTransferable()
|
||||
{
|
||||
return () -> {
|
||||
final Pair<byte[], RowSignature> materialized = materialize();
|
||||
if (materialized == null) {
|
||||
return new byte[]{};
|
||||
} else {
|
||||
return materialized.lhs;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void maybeMaterialize()
|
||||
{
|
||||
if (!(interval == null && filter == null && limit == -1 && ordering == null)) {
|
||||
materialize();
|
||||
final Pair<byte[], RowSignature> thePair = materialize();
|
||||
if (thePair == null) {
|
||||
reset(new EmptyRowsAndColumns());
|
||||
} else {
|
||||
reset(new FrameRowsAndColumns(Frame.wrap(thePair.lhs), thePair.rhs));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void materialize()
|
||||
private Pair<byte[], RowSignature> materialize()
|
||||
{
|
||||
if (ordering != null) {
|
||||
throw new ISE("Cannot reorder[%s] scan data right now", ordering);
|
||||
|
@ -152,13 +176,26 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
|||
|
||||
final StorageAdapter as = base.as(StorageAdapter.class);
|
||||
if (as == null) {
|
||||
reset(naiveMaterialize(base));
|
||||
return naiveMaterialize(base);
|
||||
} else {
|
||||
reset(materializeStorageAdapter(as));
|
||||
return materializeStorageAdapter(as);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private RowsAndColumns materializeStorageAdapter(StorageAdapter as)
|
||||
private void reset(RowsAndColumns rac)
|
||||
{
|
||||
base = rac;
|
||||
interval = null;
|
||||
filter = null;
|
||||
virtualColumns = null;
|
||||
limit = -1;
|
||||
viewableColumns = null;
|
||||
ordering = null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private Pair<byte[], RowSignature> materializeStorageAdapter(StorageAdapter as)
|
||||
{
|
||||
final Sequence<Cursor> cursors = as.makeCursors(
|
||||
filter,
|
||||
|
@ -231,25 +268,15 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
|||
// This means that the accumulate was never called, which can only happen if we didn't have any cursors.
|
||||
// We would only have zero cursors if we essentially didn't match anything, meaning that our RowsAndColumns
|
||||
// should be completely empty.
|
||||
return new EmptyRowsAndColumns();
|
||||
return null;
|
||||
} else {
|
||||
final byte[] bytes = writer.toByteArray();
|
||||
return new FrameRowsAndColumns(Frame.wrap(bytes), siggy.get());
|
||||
return Pair.of(bytes, siggy.get());
|
||||
}
|
||||
}
|
||||
|
||||
private void reset(RowsAndColumns rac)
|
||||
{
|
||||
base = rac;
|
||||
interval = null;
|
||||
filter = null;
|
||||
virtualColumns = null;
|
||||
limit = -1;
|
||||
viewableColumns = null;
|
||||
ordering = null;
|
||||
}
|
||||
|
||||
private RowsAndColumns naiveMaterialize(RowsAndColumns rac)
|
||||
@Nullable
|
||||
private Pair<byte[], RowSignature> naiveMaterialize(RowsAndColumns rac)
|
||||
{
|
||||
final int numRows = rac.numRows();
|
||||
|
||||
|
@ -264,7 +291,7 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
|||
// long as is required by the time filter produces all 0s, so either 0 is included and matches all rows or
|
||||
// it's not and we skip all rows.
|
||||
if (!interval.contains(0)) {
|
||||
return new EmptyRowsAndColumns();
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
final ColumnAccessor accessor = racColumn.toAccessor();
|
||||
|
@ -355,7 +382,6 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns
|
|||
frameWriter.addSelection();
|
||||
}
|
||||
|
||||
return new FrameRowsAndColumns(Frame.wrap(frameWriter.toByteArray()), sigBob.build());
|
||||
return Pair.of(frameWriter.toByteArray(), sigBob.build());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,13 +19,19 @@
|
|||
|
||||
package org.apache.druid.query.rowsandcols;
|
||||
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* An interface representing a chunk of RowsAndColumns. Essentially a RowsAndColumns is just a batch of rows
|
||||
|
@ -69,6 +75,31 @@ public interface RowsAndColumns
|
|||
return retVal;
|
||||
}
|
||||
|
||||
static <T> Map<Class<?>, Function<T, ?>> makeAsMap(Class<T> clazz)
|
||||
{
|
||||
Map<Class<?>, Function<T, ?>> retVal = new HashMap<>();
|
||||
|
||||
for (Method method : clazz.getMethods()) {
|
||||
if (method.isAnnotationPresent(SemanticCreator.class)) {
|
||||
if (method.getParameterCount() != 0) {
|
||||
throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method);
|
||||
}
|
||||
|
||||
retVal.put(method.getReturnType(), arg -> {
|
||||
try {
|
||||
return method.invoke(arg);
|
||||
}
|
||||
catch (InvocationTargetException | IllegalAccessException e) {
|
||||
throw DruidException.defensive().build(e, "Problem invoking method [%s]", method);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The set of column names available from the RowsAndColumns
|
||||
*
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.rowsandcols;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* Annotation used to indicate that the method is used as a creator for a semantic interface.
|
||||
*
|
||||
* Used in conjuction with {@link RowsAndColumns#makeAsMap(Class)} to build maps for simplified implementation of
|
||||
* the {@link RowsAndColumns#as(Class)} method.
|
||||
*/
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.METHOD)
|
||||
public @interface SemanticCreator
|
||||
{
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.query.rowsandcols.concrete;
|
||||
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
||||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
|
@ -28,10 +29,12 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
|||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
@ -41,6 +44,7 @@ public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseab
|
|||
|
||||
private final QueryableIndex index;
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private final Closer closer = Closer.create();
|
||||
private final AtomicInteger numRows = new AtomicInteger(-1);
|
||||
|
||||
|
@ -72,6 +76,9 @@ public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseab
|
|||
@Override
|
||||
public Column findColumn(String name)
|
||||
{
|
||||
if (closed.get()) {
|
||||
throw DruidException.defensive("Cannot be accessed after being closed!?");
|
||||
}
|
||||
final ColumnHolder columnHolder = index.getColumnHolder(name);
|
||||
if (columnHolder == null) {
|
||||
return null;
|
||||
|
@ -83,15 +90,18 @@ public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseab
|
|||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
@Override
|
||||
public <T> T as(Class<T> clazz)
|
||||
public <T> T as(@Nonnull Class<T> clazz)
|
||||
{
|
||||
//noinspection ReturnOfNull
|
||||
return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
closer.close();
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
closer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static HashMap<Class<?>, Function<QueryableIndexRowsAndColumns, ?>> makeAsMap()
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
|
|||
import org.apache.druid.query.rowsandcols.column.Column;
|
||||
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class DefaultNaiveSortMaker implements NaiveSortMaker
|
||||
|
@ -61,6 +62,7 @@ public class DefaultNaiveSortMaker implements NaiveSortMaker
|
|||
}
|
||||
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public RowsAndColumns moreData(RowsAndColumns rac)
|
||||
{
|
||||
|
@ -75,7 +77,12 @@ public class DefaultNaiveSortMaker implements NaiveSortMaker
|
|||
return new EmptyRowsAndColumns();
|
||||
}
|
||||
|
||||
ConcatRowsAndColumns rac = new ConcatRowsAndColumns(racBuffer);
|
||||
final RowsAndColumns rac;
|
||||
if (racBuffer.size() == 1) {
|
||||
rac = racBuffer.get(0);
|
||||
} else {
|
||||
rac = new ConcatRowsAndColumns(racBuffer);
|
||||
}
|
||||
|
||||
// One int for the racBuffer, another for the rowIndex
|
||||
int[] sortedPointers = new int[rac.numRows()];
|
||||
|
|
|
@ -62,6 +62,7 @@ import java.util.function.Function;
|
|||
/**
|
||||
* Helps tests make segments.
|
||||
*/
|
||||
@SuppressWarnings({"NotNullFieldNotInitialized", "FieldMayBeFinal", "ConstantConditions", "NullableProblems"})
|
||||
public class IndexBuilder
|
||||
{
|
||||
private static final int ROWS_PER_INDEX_FOR_MERGING = 1;
|
||||
|
@ -261,7 +262,7 @@ public class IndexBuilder
|
|||
tmpDir,
|
||||
schema.getDimensionsSpec(),
|
||||
indexSpec,
|
||||
Integer.MAX_VALUE
|
||||
-1
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
|
|
@ -109,7 +109,6 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
|
|||
this.filename = filename;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
@Override
|
||||
public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
|
||||
QueryRunnerFactoryConglomerate conglomerate,
|
||||
|
@ -152,7 +151,13 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void windowQueryTest()
|
||||
{
|
||||
Thread thread = null;
|
||||
String oldName = null;
|
||||
try {
|
||||
thread = Thread.currentThread();
|
||||
oldName = thread.getName();
|
||||
thread.setName("drillWindowQuery-" + filename);
|
||||
|
||||
final String query = getQueryString();
|
||||
final String results = getExpectedResults();
|
||||
|
||||
|
@ -160,14 +165,17 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
|
|||
.skipVectorize(true)
|
||||
.sql(query)
|
||||
.queryContext(ImmutableMap.of("windowsAreForClosers", true, "windowsAllTheWayDown", true))
|
||||
.expectedResults((sql, results1) -> {
|
||||
Assert.assertEquals(results, results1);
|
||||
})
|
||||
.expectedResults((sql, results1) -> Assert.assertEquals(results, String.valueOf(results1)))
|
||||
.run();
|
||||
}
|
||||
catch (Throwable e) {
|
||||
log.info(e, "Got a throwable, here it is. Ignoring for now.");
|
||||
}
|
||||
finally {
|
||||
if (thread != null && oldName != null) {
|
||||
thread.setName(oldName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
|
@ -182,6 +190,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
|
|||
return readStringFromResource(".e");
|
||||
}
|
||||
|
||||
@SuppressWarnings({"UnstableApiUsage", "ConstantConditions"})
|
||||
@Nonnull
|
||||
private String readStringFromResource(String s) throws IOException
|
||||
{
|
||||
|
@ -192,6 +201,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
|
|||
return query;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private void attachIndex(SpecificSegmentsQuerySegmentWalker texasRanger, String dataSource, DimensionSchema... dims)
|
||||
throws IOException
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue