make joinables closeable (#9982)

* make joinables closeable

* tests and adjustments

* refactor to make join stuffs impelement ReferenceCountedObject instead of Closable, more tests

* fixes

* javadocs and stuff

* fix bugs

* more test

* fix lgtm alert

* simplify

* fixup javadoc

* review stuffs

* safeguard against exceptions

* i hate this checkstyle rule

* make IndexedTable extend Closeable
This commit is contained in:
Clint Wylie 2020-06-09 20:12:36 -07:00 committed by GitHub
parent 1c9ca55247
commit f8b643ec72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 940 additions and 263 deletions

View File

@ -38,6 +38,7 @@ import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ColumnConfig;
@ -150,7 +151,7 @@ public class JoinAndLookupBenchmark
0 0
); );
hashJoinLookupStringKeySegment = new HashJoinSegment( hashJoinLookupStringKeySegment = new HashJoinSegment(
baseSegment, ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupStringKey, joinableClausesLookupStringKey,
preAnalysisLookupStringKey preAnalysisLookupStringKey
); );
@ -177,7 +178,7 @@ public class JoinAndLookupBenchmark
0 0
); );
hashJoinLookupLongKeySegment = new HashJoinSegment( hashJoinLookupLongKeySegment = new HashJoinSegment(
baseSegment, ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupLongKey, joinableClausesLookupLongKey,
preAnalysisLookupLongKey preAnalysisLookupLongKey
); );
@ -204,7 +205,7 @@ public class JoinAndLookupBenchmark
0 0
); );
hashJoinIndexedTableStringKeySegment = new HashJoinSegment( hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
baseSegment, ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableStringKey, joinableClausesIndexedTableStringKey,
preAnalysisIndexedTableStringKey preAnalysisIndexedTableStringKey
); );
@ -231,7 +232,7 @@ public class JoinAndLookupBenchmark
0 0
); );
hashJoinIndexedTableLongKeySegment = new HashJoinSegment( hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
baseSegment, ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableLonggKey, joinableClausesIndexedTableLonggKey,
preAnalysisIndexedTableLongKey preAnalysisIndexedTableLongKey
); );

View File

@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
@Guice(moduleFactory = DruidTestModuleFactory.class) @Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITTestCoordinatorPausedTest extends AbstractITBatchIndexTest public class ITTestCoordinatorPausedTest extends AbstractITBatchIndexTest
{ {
private static final Logger LOG = new Logger(ITUnionQueryTest.class); private static final Logger LOG = new Logger(ITTestCoordinatorPausedTest.class);
private static final String INDEX_DATASOURCE = "wikipedia_index_test"; private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.apache.druid.tests.indexer; package org.apache.druid.tests.query;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -39,6 +39,8 @@ import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.ServerDiscoveryUtil; import org.apache.druid.testing.utils.ServerDiscoveryUtil;
import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -62,7 +64,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
private static final String UNION_TASK_RESOURCE = "/indexer/wikipedia_union_index_task.json"; private static final String UNION_TASK_RESOURCE = "/indexer/wikipedia_union_index_task.json";
private static final String EVENT_RECEIVER_SERVICE_PREFIX = "eventReceiverServiceName"; private static final String EVENT_RECEIVER_SERVICE_PREFIX = "eventReceiverServiceName";
private static final String UNION_DATA_FILE = "/data/union_query/wikipedia_index_data.json"; private static final String UNION_DATA_FILE = "/data/union_query/wikipedia_index_data.json";
private static final String UNION_QUERIES_RESOURCE = "/indexer/union_queries.json"; private static final String UNION_QUERIES_RESOURCE = "/queries/union_queries.json";
private static final String UNION_DATASOURCE = "wikipedia_index_test"; private static final String UNION_DATASOURCE = "wikipedia_index_test";
@Inject @Inject
@ -92,7 +94,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
closer.register(unloader(fullDatasourceName + i)); closer.register(unloader(fullDatasourceName + i));
} }
try { try {
// Load 4 datasources with same dimensions // Load 3 datasources with same dimensions
String task = setShutOffTime( String task = setShutOffTime(
getResourceAsString(UNION_TASK_RESOURCE), getResourceAsString(UNION_TASK_RESOURCE),
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)) DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
@ -117,6 +119,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
() -> { () -> {
for (int i = 0; i < numTasks; i++) { for (int i = 0; i < numTasks; i++) {
final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01"); final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01");
// there are 10 rows, but query only covers the first 5
if (countRows < 5) { if (countRows < 5) {
LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i); LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i);
return false; return false;

View File

@ -123,7 +123,7 @@ public abstract class BaseQuery<T> implements Query<T>
{ {
return DataSourceAnalysis.forDataSource(query.getDataSource()) return DataSourceAnalysis.forDataSource(query.getDataSource())
.getBaseQuerySegmentSpec() .getBaseQuerySegmentSpec()
.orElse(query.getQuerySegmentSpec()); .orElseGet(query::getQuerySegmentSpec);
} }
@Override @Override

View File

@ -22,50 +22,42 @@ package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.ReferenceCounter; import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.Segment;
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T> public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{ {
private final QueryRunnerFactory<T, Query<T>> factory; private final QueryRunnerFactory<T, Query<T>> factory;
private final Segment segment; private final SegmentReference segment;
private final ReferenceCounter segmentReferenceCounter;
private final SegmentDescriptor descriptor; private final SegmentDescriptor descriptor;
public ReferenceCountingSegmentQueryRunner( public ReferenceCountingSegmentQueryRunner(
QueryRunnerFactory<T, Query<T>> factory, QueryRunnerFactory<T, Query<T>> factory,
Segment segment, SegmentReference segment,
ReferenceCounter segmentReferenceCounter,
SegmentDescriptor descriptor SegmentDescriptor descriptor
) )
{ {
this.factory = factory; this.factory = factory;
this.segment = segment; this.segment = segment;
this.segmentReferenceCounter = segmentReferenceCounter;
this.descriptor = descriptor; this.descriptor = descriptor;
} }
@Override @Override
public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext) public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext)
{ {
if (segmentReferenceCounter.increment()) { return segment.acquireReferences().map(closeable -> {
try { try {
final Sequence<T> baseSequence = factory.createRunner(segment).run(queryPlus, responseContext); final Sequence<T> baseSequence = factory.createRunner(segment).run(queryPlus, responseContext);
return Sequences.withBaggage(baseSequence, closeable);
return Sequences.withBaggage(baseSequence, segmentReferenceCounter.decrementOnceCloseable());
} }
catch (Throwable t) { catch (Throwable t) {
try { try {
segmentReferenceCounter.decrement(); closeable.close();
} }
catch (Exception e) { catch (Exception e) {
t.addSuppressed(e); t.addSuppressed(e);
} }
throw t; throw t;
} }
} else { }).orElseGet(() -> new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(queryPlus, responseContext));
// Segment was closed before we had a chance to increment the reference count
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(queryPlus, responseContext);
}
} }
} }

View File

@ -19,19 +19,11 @@
package org.apache.druid.segment; package org.apache.druid.segment;
import javax.annotation.Nullable; /**
* @deprecated use {@link Segment} directly as this does nothing
*/
@Deprecated
public abstract class AbstractSegment implements Segment public abstract class AbstractSegment implements Segment
{ {
@Override // i used to have a purpose
@Nullable
public <T> T as(Class<T> clazz)
{
if (clazz.equals(QueryableIndex.class)) {
return (T) asQueryableIndex();
} else if (clazz.equals(StorageAdapter.class)) {
return (T) asStorageAdapter();
}
return null;
}
} }

View File

@ -26,7 +26,7 @@ import org.joda.time.Interval;
/** /**
*/ */
public class IncrementalIndexSegment extends AbstractSegment public class IncrementalIndexSegment implements Segment
{ {
private final IncrementalIndex index; private final IncrementalIndex index;
private final SegmentId segmentId; private final SegmentId segmentId;

View File

@ -24,7 +24,7 @@ import org.joda.time.Interval;
/** /**
*/ */
public class QueryableIndexSegment extends AbstractSegment public class QueryableIndexSegment implements Segment
{ {
private final QueryableIndex index; private final QueryableIndex index;
private final QueryableIndexStorageAdapter storageAdapter; private final QueryableIndexStorageAdapter storageAdapter;

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import java.io.Closeable;
import java.util.Optional;
/**
* Interface for an object that may have a reference acquired in the form of a {@link Closeable}. This is intended to be
* used with an implementation of {@link ReferenceCountingCloseableObject}, or anything else that wishes to provide
* a method to account for the acquire and release of a reference to the object.
*/
public interface ReferenceCountedObject
{
/**
* This method is expected to increment a reference count and provide a {@link Closeable} that decrements the
* reference count when closed. This is likely just a wrapper around
* {@link ReferenceCountingCloseableObject#incrementReferenceAndDecrementOnceCloseable()}, but may also include any
* other associated references which should be incremented when this method is called, and decremented/released by the
* closeable.
*
* IMPORTANT NOTE: to fulfill the contract of this method, implementors must return a closeable to indicate that the
* reference can be acquired, even if there is nothing to close. Implementors should avoid allowing this method or the
* {@link Closeable} it creates to throw exceptions.
*
* For callers: if this method returns non-empty, IT MUST BE CLOSED, else reference counts can potentially leak.
*/
Optional<Closeable> acquireReferences();
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* ReferenceCountingCloseableObject implements something like automatic reference count-based resource management,
* backed by a {@link Phaser}.
*
* ReferenceCountingCloseableObject allows consumers to call {@link #close()} before some other "users", which called
* {@link #increment()} or {@link #incrementReferenceAndDecrementOnceCloseable()}, but have not called
* {@link #decrement()} yet or the closer for {@link #incrementReferenceAndDecrementOnceCloseable()}, and the wrapped
* object won't be actually closed until that all references are released.
*/
public abstract class ReferenceCountingCloseableObject<BaseObject extends Closeable> implements Closeable
{
private static final Logger log = new Logger(ReferenceCountingCloseableObject.class);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Phaser referents = new Phaser(1)
{
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
// Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
if (registeredParties != 0) {
log.error("registeredParties[%s] is not 0", registeredParties);
}
try {
baseObject.close();
}
catch (Exception e) {
try {
log.error(e, "Exception while closing reference counted object[%s]", baseObject);
}
catch (Exception e2) {
// ignore
}
}
// Always terminate.
return true;
}
};
protected final BaseObject baseObject;
public ReferenceCountingCloseableObject(BaseObject object)
{
this.baseObject = object;
}
public int getNumReferences()
{
return Math.max(referents.getRegisteredParties() - 1, 0);
}
public boolean isClosed()
{
return referents.isTerminated();
}
/**
* Increment the reference count by one.
*/
public boolean increment()
{
// Negative return from referents.register() means the Phaser is terminated.
return referents.register() >= 0;
}
/**
* Decrement the reference count by one.
*/
public void decrement()
{
referents.arriveAndDeregister();
}
/**
* Returns an {@link Optional} of a {@link Closeable} from {@link #decrementOnceCloseable}, if it is able to
* successfully {@link #increment}, else nothing indicating that the reference could not be acquired.
*/
public Optional<Closeable> incrementReferenceAndDecrementOnceCloseable()
{
final Closer closer;
if (increment()) {
closer = Closer.create();
closer.register(decrementOnceCloseable());
} else {
closer = null;
}
return Optional.ofNullable(closer);
}
/**
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
public Closeable decrementOnceCloseable()
{
AtomicBoolean decremented = new AtomicBoolean(false);
return () -> {
if (decremented.compareAndSet(false, true)) {
decrement();
} else {
log.warn("close() is called more than once on ReferenceCountingCloseableObject.decrementOnceCloseable()");
}
};
}
@Override
public void close()
{
if (closed.compareAndSet(false, true)) {
referents.arriveAndDeregister();
} else {
log.warn("close() is called more than once on ReferenceCountingCloseableObject");
}
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.segment; package org.apache.druid.segment;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.Overshadowable; import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.ShardSpec;
@ -28,50 +27,20 @@ import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.util.concurrent.Phaser; import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* ReferenceCountingSegment allows to call {@link #close()} before some other "users", which called {@link * {@link Segment} that is also a {@link ReferenceCountingSegment}, allowing query engines that operate directly on
* #increment()}, has not called {@link #decrement()} yet, and the wrapped {@link Segment} won't be actually closed * segments to track references so that dropping a {@link Segment} can be done safely to ensure there are no in-flight
* until that. So ReferenceCountingSegment implements something like automatic reference count-based resource * queries.
* management.
*/ */
public class ReferenceCountingSegment extends AbstractSegment public class ReferenceCountingSegment extends ReferenceCountingCloseableObject<Segment>
implements Overshadowable<ReferenceCountingSegment>, ReferenceCounter implements SegmentReference, Overshadowable<ReferenceCountingSegment>
{ {
private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class);
private final Segment baseSegment;
private final short startRootPartitionId; private final short startRootPartitionId;
private final short endRootPartitionId; private final short endRootPartitionId;
private final short minorVersion; private final short minorVersion;
private final short atomicUpdateGroupSize; private final short atomicUpdateGroupSize;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Phaser referents = new Phaser(1)
{
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
// Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
if (registeredParties != 0) {
log.error("registeredParties[%s] is not 0", registeredParties);
}
try {
baseSegment.close();
}
catch (Exception e) {
try {
log.error(e, "Exception while closing segment[%s]", baseSegment.getId());
}
catch (Exception e2) {
// ignore
}
}
// Always terminate.
return true;
}
};
public static ReferenceCountingSegment wrapRootGenerationSegment(Segment baseSegment) public static ReferenceCountingSegment wrapRootGenerationSegment(Segment baseSegment)
{ {
@ -106,7 +75,7 @@ public class ReferenceCountingSegment extends AbstractSegment
short atomicUpdateGroupSize short atomicUpdateGroupSize
) )
{ {
this.baseSegment = baseSegment; super(baseSegment);
this.startRootPartitionId = (short) startRootPartitionId; this.startRootPartitionId = (short) startRootPartitionId;
this.endRootPartitionId = (short) endRootPartitionId; this.endRootPartitionId = (short) endRootPartitionId;
this.minorVersion = minorVersion; this.minorVersion = minorVersion;
@ -116,105 +85,43 @@ public class ReferenceCountingSegment extends AbstractSegment
@Nullable @Nullable
public Segment getBaseSegment() public Segment getBaseSegment()
{ {
return !isClosed() ? baseSegment : null; return !isClosed() ? baseObject : null;
}
public int getNumReferences()
{
return Math.max(referents.getRegisteredParties() - 1, 0);
}
public boolean isClosed()
{
return referents.isTerminated();
} }
@Override @Override
@Nullable @Nullable
public SegmentId getId() public SegmentId getId()
{ {
return !isClosed() ? baseSegment.getId() : null; return !isClosed() ? baseObject.getId() : null;
} }
@Override @Override
@Nullable @Nullable
public Interval getDataInterval() public Interval getDataInterval()
{ {
return !isClosed() ? baseSegment.getDataInterval() : null; return !isClosed() ? baseObject.getDataInterval() : null;
} }
@Override @Override
@Nullable @Nullable
public QueryableIndex asQueryableIndex() public QueryableIndex asQueryableIndex()
{ {
return !isClosed() ? baseSegment.asQueryableIndex() : null; return !isClosed() ? baseObject.asQueryableIndex() : null;
} }
@Override @Override
@Nullable @Nullable
public StorageAdapter asStorageAdapter() public StorageAdapter asStorageAdapter()
{ {
return !isClosed() ? baseSegment.asStorageAdapter() : null; return !isClosed() ? baseObject.asStorageAdapter() : null;
}
@Override
public void close()
{
if (closed.compareAndSet(false, true)) {
referents.arriveAndDeregister();
} else {
log.warn("close() is called more than once on ReferenceCountingSegment");
}
}
public ReferenceCounter referenceCounter()
{
return this;
}
@Override
public boolean increment()
{
// Negative return from referents.register() means the Phaser is terminated.
return referents.register() >= 0;
}
/**
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
@Override
public Closeable decrementOnceCloseable()
{
AtomicBoolean decremented = new AtomicBoolean(false);
return () -> {
if (decremented.compareAndSet(false, true)) {
decrement();
} else {
log.warn("close() is called more than once on ReferenceCountingSegment.decrementOnceCloseable()");
}
};
}
@Override
public void decrement()
{
referents.arriveAndDeregister();
}
@Override
public <T> T as(Class<T> clazz)
{
return getBaseSegment().as(clazz);
} }
@Override @Override
public boolean overshadows(ReferenceCountingSegment other) public boolean overshadows(ReferenceCountingSegment other)
{ {
if (baseSegment.getId().getDataSource().equals(other.baseSegment.getId().getDataSource()) if (baseObject.getId().getDataSource().equals(other.baseObject.getId().getDataSource())
&& baseSegment.getId().getInterval().overlaps(other.baseSegment.getId().getInterval())) { && baseObject.getId().getInterval().overlaps(other.baseObject.getId().getInterval())) {
final int majorVersionCompare = baseSegment.getId().getVersion() final int majorVersionCompare = baseObject.getId().getVersion().compareTo(other.baseObject.getId().getVersion());
.compareTo(other.baseSegment.getId().getVersion());
if (majorVersionCompare > 0) { if (majorVersionCompare > 0) {
return true; return true;
} else if (majorVersionCompare == 0) { } else if (majorVersionCompare == 0) {
@ -245,7 +152,7 @@ public class ReferenceCountingSegment extends AbstractSegment
@Override @Override
public String getVersion() public String getVersion()
{ {
return baseSegment.getId().getVersion(); return baseObject.getId().getVersion();
} }
@Override @Override
@ -259,4 +166,10 @@ public class ReferenceCountingSegment extends AbstractSegment
{ {
return atomicUpdateGroupSize; return atomicUpdateGroupSize;
} }
@Override
public Optional<Closeable> acquireReferences()
{
return incrementReferenceAndDecrementOnceCloseable();
}
} }

View File

@ -30,7 +30,7 @@ import javax.annotation.Nullable;
/** /**
* A {@link Segment} that is based on a stream of objects. * A {@link Segment} that is based on a stream of objects.
*/ */
public class RowBasedSegment<RowType> extends AbstractSegment public class RowBasedSegment<RowType> implements Segment
{ {
private final SegmentId segmentId; private final SegmentId segmentId;
private final StorageAdapter storageAdapter; private final StorageAdapter storageAdapter;

View File

@ -57,6 +57,15 @@ public interface Segment extends Closeable
* @param <T> desired interface * @param <T> desired interface
* @return instance of clazz, or null if the interface is not supported by this segment * @return instance of clazz, or null if the interface is not supported by this segment
*/ */
@SuppressWarnings("unused")
@Nullable @Nullable
<T> T as(Class<T> clazz); default <T> T as(Class<T> clazz)
{
if (clazz.equals(QueryableIndex.class)) {
return (T) asQueryableIndex();
} else if (clazz.equals(StorageAdapter.class)) {
return (T) asStorageAdapter();
}
return null;
}
} }

View File

@ -19,26 +19,12 @@
package org.apache.druid.segment; package org.apache.druid.segment;
import java.io.Closeable;
/** /**
* An interface to reference-counted objects. Used by {@link ReferenceCountingSegment}. Thread-safe. * A {@link Segment} with a associated references, such as {@link ReferenceCountingSegment} where the reference is
* the segment itself, and {@link org.apache.druid.segment.join.HashJoinSegment} which wraps a
* {@link ReferenceCountingSegment} and also includes the associated list of
* {@link org.apache.druid.segment.join.JoinableClause}
*/ */
public interface ReferenceCounter public interface SegmentReference extends Segment, ReferenceCountedObject
{ {
/**
* Increment the reference count by one.
*/
boolean increment();
/**
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
Closeable decrementOnceCloseable();
/**
* Decrement the reference count by one.
*/
void decrement();
} }

View File

@ -20,26 +20,29 @@
package org.apache.druid.segment.join; package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.AbstractSegment; import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis; import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Optional;
/** /**
* Represents a deep, left-heavy join of a left-hand side baseSegment onto a series of right-hand side clauses. * Represents a deep, left-heavy join of a left-hand side baseSegment onto a series of right-hand side clauses.
* *
* In other words, logically the operation is: join(join(join(baseSegment, clauses[0]), clauses[1]), clauses[2]) etc. * In other words, logically the operation is: join(join(join(baseSegment, clauses[0]), clauses[1]), clauses[2]) etc.
*/ */
public class HashJoinSegment extends AbstractSegment public class HashJoinSegment implements SegmentReference
{ {
private final Segment baseSegment; private final SegmentReference baseSegment;
private final List<JoinableClause> clauses; private final List<JoinableClause> clauses;
private final JoinFilterPreAnalysis joinFilterPreAnalysis; private final JoinFilterPreAnalysis joinFilterPreAnalysis;
@ -50,7 +53,7 @@ public class HashJoinSegment extends AbstractSegment
* @param joinFilterPreAnalysis Pre-analysis computed by {@link org.apache.druid.segment.join.filter.JoinFilterAnalyzer#computeJoinFilterPreAnalysis} * @param joinFilterPreAnalysis Pre-analysis computed by {@link org.apache.druid.segment.join.filter.JoinFilterAnalyzer#computeJoinFilterPreAnalysis}
*/ */
public HashJoinSegment( public HashJoinSegment(
Segment baseSegment, SegmentReference baseSegment,
List<JoinableClause> clauses, List<JoinableClause> clauses,
JoinFilterPreAnalysis joinFilterPreAnalysis JoinFilterPreAnalysis joinFilterPreAnalysis
) )
@ -98,4 +101,36 @@ public class HashJoinSegment extends AbstractSegment
{ {
baseSegment.close(); baseSegment.close();
} }
@Override
public Optional<Closeable> acquireReferences()
{
Closer closer = Closer.create();
try {
boolean acquireFailed = baseSegment.acquireReferences().map(closeable -> {
closer.register(closeable);
return false;
}).orElse(true);
for (JoinableClause joinClause : clauses) {
if (acquireFailed) {
break;
}
acquireFailed |= joinClause.acquireReferences().map(closeable -> {
closer.register(closeable);
return false;
}).orElse(true);
}
if (acquireFailed) {
CloseQuietly.close(closer);
return Optional.empty();
} else {
return Optional.of(closer);
}
}
catch (Exception ex) {
CloseQuietly.close(closer);
return Optional.empty();
}
}
} }

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.join; package org.apache.druid.segment.join;
import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ReferenceCountedObject;
import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -33,7 +34,7 @@ import java.util.Set;
* This class's most important method is {@link #makeJoinMatcher}. Its main user is * This class's most important method is {@link #makeJoinMatcher}. Its main user is
* {@link HashJoinEngine#makeJoinCursor}. * {@link HashJoinEngine#makeJoinCursor}.
*/ */
public interface Joinable public interface Joinable extends ReferenceCountedObject
{ {
int CARDINALITY_UNKNOWN = -1; int CARDINALITY_UNKNOWN = -1;

View File

@ -21,9 +21,12 @@ package org.apache.druid.segment.join;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.ReferenceCountedObject;
import java.io.Closeable;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -33,7 +36,7 @@ import java.util.stream.Collectors;
* *
* Created from {@link org.apache.druid.query.planning.PreJoinableClause} by {@link Joinables#createSegmentMapFn}. * Created from {@link org.apache.druid.query.planning.PreJoinableClause} by {@link Joinables#createSegmentMapFn}.
*/ */
public class JoinableClause public class JoinableClause implements ReferenceCountedObject
{ {
private final String prefix; private final String prefix;
private final Joinable joinable; private final Joinable joinable;
@ -151,4 +154,10 @@ public class JoinableClause
", condition=" + condition + ", condition=" + condition +
'}'; '}';
} }
@Override
public Optional<Closeable> acquireReferences()
{
return joinable.acquireReferences();
}
} }

View File

@ -22,7 +22,7 @@ package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.filter.Filter; import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.planning.PreJoinableClause; import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer; import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
@ -88,7 +88,7 @@ public class Joinables
* @param originalFilter The original filter from the query. * @param originalFilter The original filter from the query.
* @param virtualColumns The virtual columns from the query. * @param virtualColumns The virtual columns from the query.
*/ */
public static Function<Segment, Segment> createSegmentMapFn( public static Function<SegmentReference, SegmentReference> createSegmentMapFn(
final List<PreJoinableClause> clauses, final List<PreJoinableClause> clauses,
final JoinableFactory joinableFactory, final JoinableFactory joinableFactory,
final AtomicLong cpuTimeAccumulator, final AtomicLong cpuTimeAccumulator,

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.join.JoinMatcher;
import org.apache.druid.segment.join.Joinable; import org.apache.druid.segment.join.Joinable;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -123,4 +124,11 @@ public class LookupJoinable implements Joinable
} }
return Optional.of(correlatedValues); return Optional.of(correlatedValues);
} }
@Override
public Optional<Closeable> acquireReferences()
{
// nothing to close for lookup joinables, they are managed externally and have no per query accounting of usage
return Optional.of(() -> {});
}
} }

View File

@ -20,9 +20,11 @@
package org.apache.druid.segment.join.table; package org.apache.druid.segment.join.table;
import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.segment.ReferenceCountedObject;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.Set; import java.util.Set;
/** /**
@ -30,8 +32,14 @@ import java.util.Set;
* *
* The main user of this class is {@link IndexedTableJoinable}, and its main purpose is to participate in joins. * The main user of this class is {@link IndexedTableJoinable}, and its main purpose is to participate in joins.
*/ */
public interface IndexedTable public interface IndexedTable extends ReferenceCountedObject, Closeable
{ {
/**
* Returns the version of this table, used to compare against when loading a new version of the table
*/
@SuppressWarnings("unused")
String version();
/** /**
* Returns the columns of this table that have indexes. * Returns the columns of this table that have indexes.
*/ */

View File

@ -27,6 +27,7 @@ import org.apache.druid.segment.join.JoinMatcher;
import org.apache.druid.segment.join.Joinable; import org.apache.druid.segment.join.Joinable;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -133,4 +134,10 @@ public class IndexedTableJoinable implements Joinable
return Optional.of(correlatedValues); return Optional.of(correlatedValues);
} }
} }
@Override
public Optional<Closeable> acquireReferences()
{
return table.acquireReferences();
}
} }

View File

@ -30,11 +30,13 @@ import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import java.io.Closeable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -50,12 +52,14 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
private final RowSignature rowSignature; private final RowSignature rowSignature;
private final List<Function<RowType, Object>> columnFunctions; private final List<Function<RowType, Object>> columnFunctions;
private final Set<String> keyColumns; private final Set<String> keyColumns;
private final String version;
public RowBasedIndexedTable( public RowBasedIndexedTable(
final List<RowType> table, final List<RowType> table,
final RowAdapter<RowType> rowAdapter, final RowAdapter<RowType> rowAdapter,
final RowSignature rowSignature, final RowSignature rowSignature,
final Set<String> keyColumns final Set<String> keyColumns,
final String version
) )
{ {
this.table = table; this.table = table;
@ -63,6 +67,7 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
this.columnFunctions = this.columnFunctions =
rowSignature.getColumnNames().stream().map(rowAdapter::columnFunction).collect(Collectors.toList()); rowSignature.getColumnNames().stream().map(rowAdapter::columnFunction).collect(Collectors.toList());
this.keyColumns = keyColumns; this.keyColumns = keyColumns;
this.version = version;
if (new HashSet<>(keyColumns).size() != keyColumns.size()) { if (new HashSet<>(keyColumns).size() != keyColumns.size()) {
throw new ISE("keyColumns[%s] must not contain duplicates", keyColumns); throw new ISE("keyColumns[%s] must not contain duplicates", keyColumns);
@ -106,6 +111,12 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
} }
} }
@Override
public String version()
{
return version;
}
@Override @Override
public Set<String> keyColumns() public Set<String> keyColumns()
{ {
@ -163,4 +174,17 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
{ {
return table.size(); return table.size();
} }
@Override
public Optional<Closeable> acquireReferences()
{
// nothing to close by default, whatever loaded this thing (probably) lives on heap
return Optional.of(() -> {});
}
@Override
public void close()
{
// nothing to close
}
} }

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.easymock.EasyMock;
import org.joda.time.Days; import org.joda.time.Days;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
@ -38,39 +39,50 @@ public class ReferenceCountingSegmentTest
private ReferenceCountingSegment segment; private ReferenceCountingSegment segment;
private ExecutorService exec; private ExecutorService exec;
private final SegmentId segmentId = SegmentId.dummy("test_segment");
private final Interval dataInterval = new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc());
private QueryableIndex index;
private StorageAdapter adapter;
private int underlyingSegmentClosedCount;
@Before @Before
public void setUp() public void setUp()
{ {
underlyingSegmentClosedCount = 0;
index = EasyMock.createNiceMock(QueryableIndex.class);
adapter = EasyMock.createNiceMock(StorageAdapter.class);
segment = ReferenceCountingSegment.wrapRootGenerationSegment( segment = ReferenceCountingSegment.wrapRootGenerationSegment(
new AbstractSegment() new Segment()
{ {
@Override @Override
public SegmentId getId() public SegmentId getId()
{ {
return SegmentId.dummy("test_segment"); return segmentId;
} }
@Override @Override
public Interval getDataInterval() public Interval getDataInterval()
{ {
return new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc()); return dataInterval;
} }
@Override @Override
public QueryableIndex asQueryableIndex() public QueryableIndex asQueryableIndex()
{ {
return null; return index;
} }
@Override @Override
public StorageAdapter asStorageAdapter() public StorageAdapter asStorageAdapter()
{ {
return null; return adapter;
} }
@Override @Override
public void close() public void close()
{ {
underlyingSegmentClosedCount++;
} }
} }
); );
@ -81,13 +93,17 @@ public class ReferenceCountingSegmentTest
@Test @Test
public void testMultipleClose() throws Exception public void testMultipleClose() throws Exception
{ {
Assert.assertEquals(0, underlyingSegmentClosedCount);
Assert.assertFalse(segment.isClosed()); Assert.assertFalse(segment.isClosed());
Assert.assertTrue(segment.increment()); Assert.assertTrue(segment.increment());
Assert.assertEquals(1, segment.getNumReferences()); Assert.assertEquals(1, segment.getNumReferences());
Closeable closeable = segment.decrementOnceCloseable(); Closeable closeable = segment.decrementOnceCloseable();
Assert.assertEquals(0, underlyingSegmentClosedCount);
closeable.close(); closeable.close();
Assert.assertEquals(0, underlyingSegmentClosedCount);
closeable.close(); closeable.close();
Assert.assertEquals(0, underlyingSegmentClosedCount);
exec.submit( exec.submit(
() -> { () -> {
try { try {
@ -99,10 +115,16 @@ public class ReferenceCountingSegmentTest
} }
).get(); ).get();
Assert.assertEquals(0, segment.getNumReferences()); Assert.assertEquals(0, segment.getNumReferences());
Assert.assertEquals(0, underlyingSegmentClosedCount);
Assert.assertFalse(segment.isClosed()); Assert.assertFalse(segment.isClosed());
// close for reals
segment.close(); segment.close();
Assert.assertTrue(segment.isClosed());
Assert.assertEquals(1, underlyingSegmentClosedCount);
// ... but make sure it only happens once
segment.close(); segment.close();
Assert.assertEquals(1, underlyingSegmentClosedCount);
exec.submit( exec.submit(
() -> { () -> {
try { try {
@ -116,13 +138,25 @@ public class ReferenceCountingSegmentTest
Assert.assertEquals(0, segment.getNumReferences()); Assert.assertEquals(0, segment.getNumReferences());
Assert.assertTrue(segment.isClosed()); Assert.assertTrue(segment.isClosed());
Assert.assertEquals(1, underlyingSegmentClosedCount);
segment.increment(); segment.increment();
segment.increment(); segment.increment();
segment.increment(); segment.increment();
Assert.assertEquals(0, segment.getNumReferences()); Assert.assertEquals(0, segment.getNumReferences());
Assert.assertEquals(1, underlyingSegmentClosedCount);
segment.close(); segment.close();
Assert.assertEquals(0, segment.getNumReferences()); Assert.assertEquals(0, segment.getNumReferences());
Assert.assertEquals(1, underlyingSegmentClosedCount);
} }
@Test
public void testExposesWrappedSegment()
{
Assert.assertEquals(segmentId, segment.getId());
Assert.assertEquals(dataInterval, segment.getDataInterval());
Assert.assertEquals(index, segment.asQueryableIndex());
Assert.assertEquals(adapter, segment.asStorageAdapter());
}
} }

View File

@ -20,29 +20,37 @@
package org.apache.druid.segment.join; package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.join.filter.JoinFilterAnalyzer; import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis; import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
import org.apache.druid.segment.join.filter.JoinableClauses; import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.table.IndexedTableJoinable; import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Optional;
public class HashJoinSegmentTest public class HashJoinSegmentTest extends InitializedNullHandlingTest
{ {
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -51,17 +59,30 @@ public class HashJoinSegmentTest
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();
private QueryableIndexSegment baseSegment; private QueryableIndexSegment baseSegment;
private ReferenceCountingSegment referencedSegment;
private HashJoinSegment hashJoinSegment; private HashJoinSegment hashJoinSegment;
@BeforeClass private int allReferencesAcquireCount;
public static void setUpStatic() private int allReferencesCloseCount;
{ private int referencedSegmentAcquireCount;
NullHandling.initializeForTests(); private int referencedSegmentClosedCount;
} private int indexedTableJoinableReferenceAcquireCount;
private int indexedTableJoinableReferenceCloseCount;
private boolean j0Closed;
private boolean j1Closed;
@Before @Before
public void setUp() throws IOException public void setUp() throws IOException
{ {
allReferencesAcquireCount = 0;
allReferencesCloseCount = 0;
referencedSegmentAcquireCount = 0;
referencedSegmentClosedCount = 0;
indexedTableJoinableReferenceAcquireCount = 0;
indexedTableJoinableReferenceCloseCount = 0;
j0Closed = false;
j1Closed = false;
baseSegment = new QueryableIndexSegment( baseSegment = new QueryableIndexSegment(
JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(), JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(),
SegmentId.dummy("facts") SegmentId.dummy("facts")
@ -70,13 +91,39 @@ public class HashJoinSegmentTest
List<JoinableClause> joinableClauses = ImmutableList.of( List<JoinableClause> joinableClauses = ImmutableList.of(
new JoinableClause( new JoinableClause(
"j0.", "j0.",
new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()), new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable())
{
@Override
public Optional<Closeable> acquireReferences()
{
if (!j0Closed) {
indexedTableJoinableReferenceAcquireCount++;
Closer closer = Closer.create();
closer.register(() -> indexedTableJoinableReferenceCloseCount++);
return Optional.of(closer);
}
return Optional.empty();
}
},
JoinType.LEFT, JoinType.LEFT,
JoinConditionAnalysis.forExpression("1", "j0.", ExprMacroTable.nil()) JoinConditionAnalysis.forExpression("1", "j0.", ExprMacroTable.nil())
), ),
new JoinableClause( new JoinableClause(
"j1.", "j1.",
new IndexedTableJoinable(JoinTestHelper.createRegionsIndexedTable()), new IndexedTableJoinable(JoinTestHelper.createRegionsIndexedTable())
{
@Override
public Optional<Closeable> acquireReferences()
{
if (!j1Closed) {
indexedTableJoinableReferenceAcquireCount++;
Closer closer = Closer.create();
closer.register(() -> indexedTableJoinableReferenceCloseCount++);
return Optional.of(closer);
}
return Optional.empty();
}
},
JoinType.LEFT, JoinType.LEFT,
JoinConditionAnalysis.forExpression("1", "j1.", ExprMacroTable.nil()) JoinConditionAnalysis.forExpression("1", "j1.", ExprMacroTable.nil())
) )
@ -92,11 +139,70 @@ public class HashJoinSegmentTest
QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE
); );
referencedSegment = ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment);
SegmentReference testWrapper = new SegmentReference()
{
@Override
public Optional<Closeable> acquireReferences()
{
Closer closer = Closer.create();
return referencedSegment.acquireReferences().map(closeable -> {
referencedSegmentAcquireCount++;
closer.register(closeable);
closer.register(() -> referencedSegmentClosedCount++);
return closer;
});
}
@Override
public SegmentId getId()
{
return referencedSegment.getId();
}
@Override
public Interval getDataInterval()
{
return referencedSegment.getDataInterval();
}
@Nullable
@Override
public QueryableIndex asQueryableIndex()
{
return referencedSegment.asQueryableIndex();
}
@Override
public StorageAdapter asStorageAdapter()
{
return referencedSegment.asStorageAdapter();
}
@Override
public void close()
{
referencedSegment.close();
}
};
hashJoinSegment = new HashJoinSegment( hashJoinSegment = new HashJoinSegment(
baseSegment, testWrapper,
joinableClauses, joinableClauses,
joinFilterPreAnalysis joinFilterPreAnalysis
); )
{
@Override
public Optional<Closeable> acquireReferences()
{
Closer closer = Closer.create();
return super.acquireReferences().map(closeable -> {
allReferencesAcquireCount++;
closer.register(closeable);
closer.register(() -> allReferencesCloseCount++);
return closer;
});
}
};
} }
@Test @Test
@ -118,7 +224,7 @@ public class HashJoinSegmentTest
); );
final HashJoinSegment ignored = new HashJoinSegment( final HashJoinSegment ignored = new HashJoinSegment(
baseSegment, ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClauses, joinableClauses,
joinFilterPreAnalysis joinFilterPreAnalysis
); );
@ -150,4 +256,79 @@ public class HashJoinSegmentTest
CoreMatchers.instanceOf(HashJoinSegmentStorageAdapter.class) CoreMatchers.instanceOf(HashJoinSegmentStorageAdapter.class)
); );
} }
@Test
public void testJoinableClausesAreClosedWhenReferencesUsed() throws IOException
{
Assert.assertFalse(referencedSegment.isClosed());
Optional<Closeable> maybeCloseable = hashJoinSegment.acquireReferences();
Assert.assertTrue(maybeCloseable.isPresent());
Assert.assertEquals(1, referencedSegmentAcquireCount);
Assert.assertEquals(2, indexedTableJoinableReferenceAcquireCount);
Assert.assertEquals(1, allReferencesAcquireCount);
Assert.assertEquals(0, referencedSegmentClosedCount);
Assert.assertEquals(0, indexedTableJoinableReferenceCloseCount);
Assert.assertEquals(0, allReferencesCloseCount);
Closeable closer = maybeCloseable.get();
closer.close();
Assert.assertFalse(referencedSegment.isClosed());
Assert.assertEquals(1, referencedSegmentClosedCount);
Assert.assertEquals(2, indexedTableJoinableReferenceCloseCount);
Assert.assertEquals(1, allReferencesCloseCount);
}
@Test
public void testJoinableClausesClosedIfSegmentIsAlreadyClosed()
{
Assert.assertFalse(referencedSegment.isClosed());
referencedSegment.close();
Assert.assertTrue(referencedSegment.isClosed());
Optional<Closeable> maybeCloseable = hashJoinSegment.acquireReferences();
Assert.assertFalse(maybeCloseable.isPresent());
Assert.assertEquals(0, referencedSegmentAcquireCount);
Assert.assertEquals(0, indexedTableJoinableReferenceAcquireCount);
Assert.assertEquals(0, allReferencesAcquireCount);
Assert.assertEquals(0, referencedSegmentClosedCount);
Assert.assertEquals(0, indexedTableJoinableReferenceCloseCount);
Assert.assertEquals(0, allReferencesCloseCount);
}
@Test
public void testJoinableClausesClosedIfJoinableZeroIsAlreadyClosed()
{
Assert.assertFalse(referencedSegment.isClosed());
j0Closed = true;
Optional<Closeable> maybeCloseable = hashJoinSegment.acquireReferences();
Assert.assertFalse(maybeCloseable.isPresent());
Assert.assertEquals(1, referencedSegmentAcquireCount);
Assert.assertEquals(0, indexedTableJoinableReferenceAcquireCount);
Assert.assertEquals(0, allReferencesAcquireCount);
Assert.assertEquals(1, referencedSegmentClosedCount);
Assert.assertEquals(0, indexedTableJoinableReferenceCloseCount);
Assert.assertEquals(0, allReferencesCloseCount);
}
@Test
public void testJoinableClausesClosedIfJoinableOneIsAlreadyClosed()
{
Assert.assertFalse(referencedSegment.isClosed());
j1Closed = true;
Optional<Closeable> maybeCloseable = hashJoinSegment.acquireReferences();
Assert.assertFalse(maybeCloseable.isPresent());
Assert.assertEquals(1, referencedSegmentAcquireCount);
Assert.assertEquals(1, indexedTableJoinableReferenceAcquireCount);
Assert.assertEquals(0, allReferencesAcquireCount);
Assert.assertEquals(1, referencedSegmentClosedCount);
Assert.assertEquals(1, indexedTableJoinableReferenceCloseCount);
Assert.assertEquals(0, allReferencesCloseCount);
}
} }

View File

@ -142,6 +142,8 @@ public class JoinTestHelper
} }
}; };
public static final String INDEXED_TABLE_VERSION = DateTimes.nowUtc().toString();
private static RowAdapter<Map<String, Object>> createMapRowAdapter(final RowSignature signature) private static RowAdapter<Map<String, Object>> createMapRowAdapter(final RowSignature signature)
{ {
return new RowAdapter<Map<String, Object>>() return new RowAdapter<Map<String, Object>>()
@ -255,7 +257,8 @@ public class JoinTestHelper
rows, rows,
createMapRowAdapter(COUNTRIES_SIGNATURE), createMapRowAdapter(COUNTRIES_SIGNATURE),
COUNTRIES_SIGNATURE, COUNTRIES_SIGNATURE,
ImmutableSet.of("countryNumber", "countryIsoCode") ImmutableSet.of("countryNumber", "countryIsoCode"),
INDEXED_TABLE_VERSION
) )
); );
} }
@ -268,7 +271,8 @@ public class JoinTestHelper
rows, rows,
createMapRowAdapter(REGIONS_SIGNATURE), createMapRowAdapter(REGIONS_SIGNATURE),
REGIONS_SIGNATURE, REGIONS_SIGNATURE,
ImmutableSet.of("regionIsoCode", "countryIsoCode") ImmutableSet.of("regionIsoCode", "countryIsoCode"),
INDEXED_TABLE_VERSION
) )
); );
} }

View File

@ -27,7 +27,7 @@ import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.extraction.MapLookupExtractor; import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.planning.PreJoinableClause; import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.join.lookup.LookupJoinable; import org.apache.druid.segment.join.lookup.LookupJoinable;
@ -95,7 +95,7 @@ public class JoinablesTest
@Test @Test
public void test_createSegmentMapFn_noClauses() public void test_createSegmentMapFn_noClauses()
{ {
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn( final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
ImmutableList.of(), ImmutableList.of(),
NoopJoinableFactory.INSTANCE, NoopJoinableFactory.INSTANCE,
new AtomicLong(), new AtomicLong(),
@ -124,7 +124,7 @@ public class JoinablesTest
expectedException.expect(IllegalStateException.class); expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("dataSource is not joinable"); expectedException.expectMessage("dataSource is not joinable");
final Function<Segment, Segment> ignored = Joinables.createSegmentMapFn( final Function<SegmentReference, SegmentReference> ignored = Joinables.createSegmentMapFn(
ImmutableList.of(clause), ImmutableList.of(clause),
NoopJoinableFactory.INSTANCE, NoopJoinableFactory.INSTANCE,
new AtomicLong(), new AtomicLong(),
@ -153,7 +153,7 @@ public class JoinablesTest
conditionAnalysis conditionAnalysis
); );
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn( final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
ImmutableList.of(clause), ImmutableList.of(clause),
(dataSource, condition) -> { (dataSource, condition) -> {
if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.join.table;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -96,7 +97,8 @@ public class IndexedTableJoinableTest
inlineDataSource.getRowsAsList(), inlineDataSource.getRowsAsList(),
inlineDataSource.rowAdapter(), inlineDataSource.rowAdapter(),
inlineDataSource.getRowSignature(), inlineDataSource.getRowSignature(),
ImmutableSet.of("str") ImmutableSet.of("str"),
DateTimes.nowUtc().toString()
); );
private IndexedTableJoinable target; private IndexedTableJoinable target;

View File

@ -172,4 +172,11 @@ public class RowBasedIndexedTableTest
expectedException.expect(IndexOutOfBoundsException.class); expectedException.expect(IndexOutOfBoundsException.class);
countriesTable.columnReader(99); countriesTable.columnReader(99);
} }
@Test
public void testVersion()
{
Assert.assertEquals(JoinTestHelper.INDEXED_TABLE_VERSION, countriesTable.version());
Assert.assertEquals(JoinTestHelper.INDEXED_TABLE_VERSION, regionsTable.version());
}
} }

View File

@ -261,7 +261,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
// For nested queries, we need to look at the intervals of the inner most query. // For nested queries, we need to look at the intervals of the inner most query.
this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec() this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec()
.map(QuerySegmentSpec::getIntervals) .map(QuerySegmentSpec::getIntervals)
.orElse(query.getIntervals()); .orElseGet(() -> query.getIntervals());
} }
private ImmutableMap<String, Object> makeDownstreamQueryContext() private ImmutableMap<String, Object> makeDownstreamQueryContext()

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.join; package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.IndexedTable;
@ -49,7 +50,8 @@ public class InlineJoinableFactory implements JoinableFactory
inlineDataSource.getRowsAsList(), inlineDataSource.getRowsAsList(),
inlineDataSource.rowAdapter(), inlineDataSource.rowAdapter(),
inlineDataSource.getRowSignature(), inlineDataSource.getRowSignature(),
rightKeyColumns rightKeyColumns,
DateTimes.nowUtc().toString()
) )
) )
); );

View File

@ -19,18 +19,22 @@
package org.apache.druid.segment.realtime; package org.apache.druid.segment.realtime;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/** /**
*/ */
@ -71,27 +75,6 @@ public class FireHydrant
return adapter.get().getDataInterval(); return adapter.get().getDataInterval();
} }
public ReferenceCountingSegment getIncrementedSegment()
{
ReferenceCountingSegment segment = adapter.get();
while (true) {
if (segment.increment()) {
return segment;
}
// segment.increment() returned false, means it is closed. Since close() in swapSegment() happens after segment
// swap, the new segment should already be visible.
ReferenceCountingSegment newSegment = adapter.get();
if (segment == newSegment) {
throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()");
}
if (newSegment == null) {
throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment");
}
segment = newSegment;
// Spin loop.
}
}
public int getCount() public int getCount()
{ {
return count; return count;
@ -133,12 +116,75 @@ public class FireHydrant
} }
} }
public Pair<Segment, Closeable> getAndIncrementSegment() public ReferenceCountingSegment getIncrementedSegment()
{
ReferenceCountingSegment segment = adapter.get();
while (true) {
if (segment.increment()) {
return segment;
}
// segment.increment() returned false, means it is closed. Since close() in swapSegment() happens after segment
// swap, the new segment should already be visible.
ReferenceCountingSegment newSegment = adapter.get();
if (segment == newSegment) {
throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()");
}
if (newSegment == null) {
throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment");
}
segment = newSegment;
// Spin loop.
}
}
public Pair<ReferenceCountingSegment, Closeable> getAndIncrementSegment()
{ {
ReferenceCountingSegment segment = getIncrementedSegment(); ReferenceCountingSegment segment = getIncrementedSegment();
return new Pair<>(segment, segment.decrementOnceCloseable()); return new Pair<>(segment, segment.decrementOnceCloseable());
} }
/**
* This method is like a combined form of {@link #getIncrementedSegment} and {@link #getAndIncrementSegment} that
* deals in {@link SegmentReference} instead of directly with {@link ReferenceCountingSegment} in order to acquire
* reference count for both hydrant's segment and any tracked joinables taking part in the query.
*/
public Optional<Pair<SegmentReference, Closeable>> getSegmentForQuery(
Function<SegmentReference, SegmentReference> segmentMapFn
)
{
ReferenceCountingSegment sinkSegment = adapter.get();
SegmentReference segment = segmentMapFn.apply(sinkSegment);
while (true) {
Optional<Closeable> reference = segment.acquireReferences();
if (reference.isPresent()) {
return Optional.of(new Pair<>(segment, reference.get()));
}
// segment.acquireReferences() returned false, means it is closed. Since close() in swapSegment() happens after
// segment swap, the new segment should already be visible.
ReferenceCountingSegment newSinkSegment = adapter.get();
if (newSinkSegment == null) {
throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment");
}
if (sinkSegment == newSinkSegment) {
if (newSinkSegment.isClosed()) {
throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()");
}
// if segment is not closed, but is same segment it means we are having trouble getting references for joinables
// of a HashJoinSegment created by segmentMapFn
return Optional.empty();
}
segment = segmentMapFn.apply(newSinkSegment);
// Spin loop.
}
}
@VisibleForTesting
public ReferenceCountingSegment getHydrantSegment()
{
return adapter.get();
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -59,7 +59,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
@ -756,7 +756,7 @@ public class AppenderatorImpl implements Appenderator
Closer closer = Closer.create(); Closer closer = Closer.create();
try { try {
for (FireHydrant fireHydrant : sink) { for (FireHydrant fireHydrant : sink) {
Pair<Segment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment(); Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant); log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
indexes.add(queryableIndex); indexes.add(queryableIndex);

View File

@ -28,7 +28,6 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.CloseQuietly;
@ -57,7 +56,7 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables; import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.FireHydrant;
@ -171,7 +170,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
} }
// segmentMapFn maps each base Segment into a joined Segment if necessary. // segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn( final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(), analysis.getPreJoinableClauses(),
joinableFactory, joinableFactory,
cpuTimeAccumulator, cpuTimeAccumulator,
@ -211,15 +210,24 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) { if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>()); return new Pair<>(hydrant.getSegmentDataInterval(), new NoopQueryRunner<>());
} }
// Prevent the underlying segment from swapping when its being iterated // Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segmentAndCloseable = hydrant.getAndIncrementSegment(); final Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable =
try { hydrant.getSegmentForQuery(segmentMapFn);
final Segment mappedSegment = segmentMapFn.apply(segmentAndCloseable.lhs);
QueryRunner<T> runner = factory.createRunner(mappedSegment); // if optional isn't present, we failed to acquire reference to the segment or any joinables
if (!maybeSegmentAndCloseable.isPresent()) {
return new Pair<>(
hydrant.getSegmentDataInterval(),
new ReportTimelineMissingSegmentQueryRunner<>(descriptor)
);
}
final Pair<SegmentReference, Closeable> segmentAndCloseable = maybeSegmentAndCloseable.get();
try {
QueryRunner<T> runner = factory.createRunner(segmentAndCloseable.lhs);
// 1) Only use caching if data is immutable // 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local // 2) Hydrants are not the same between replicas, make sure cache is local
@ -245,7 +253,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
runner, runner,
segmentAndCloseable.rhs segmentAndCloseable.rhs
); );
return new Pair<>(mappedSegment.getDataInterval(), runner); return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
CloseQuietly.close(segmentAndCloseable.rhs); CloseQuietly.close(segmentAndCloseable.rhs);

View File

@ -56,7 +56,7 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.Metadata; import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
@ -424,7 +424,7 @@ public class RealtimePlumber implements Plumber
Closer closer = Closer.create(); Closer closer = Closer.create();
try { try {
for (FireHydrant fireHydrant : sink) { for (FireHydrant fireHydrant : sink) {
Pair<Segment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment(); Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant); log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex); indexes.add(queryableIndex);

View File

@ -24,6 +24,7 @@ import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder; import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
@ -35,7 +36,9 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables; import org.apache.druid.segment.join.Joinables;
@ -89,11 +92,15 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource()); throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource());
} }
final Iterable<Segment> segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals); // wrap in ReferenceCountingSegment, these aren't currently managed by SegmentManager so reference tracking doesn't
// matter, but at least some or all will be in a future PR
final Iterable<ReferenceCountingSegment> segments =
FunctionalIterable.create(segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals))
.transform(ReferenceCountingSegment::wrapRootGenerationSegment);
final Query<T> prioritizedAndLaned = prioritizeAndLaneQuery(query, segments); final Query<T> prioritizedAndLaned = prioritizeAndLaneQuery(query, segments);
final AtomicLong cpuAccumulator = new AtomicLong(0L); final AtomicLong cpuAccumulator = new AtomicLong(0L);
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn( final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(), analysis.getPreJoinableClauses(),
joinableFactory, joinableFactory,
cpuAccumulator, cpuAccumulator,
@ -129,7 +136,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
throw new ISE("Cannot run with specific segments"); throw new ISE("Cannot run with specific segments");
} }
private <T> Query<T> prioritizeAndLaneQuery(Query<T> query, Iterable<Segment> segments) private <T> Query<T> prioritizeAndLaneQuery(Query<T> query, Iterable<? extends Segment> segments)
{ {
Set<SegmentServerSelector> segmentServerSelectors = new HashSet<>(); Set<SegmentServerSelector> segmentServerSelectors = new HashSet<>();
for (Segment s : segments) { for (Segment s : segments) {

View File

@ -53,9 +53,8 @@ import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCounter;
import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables; import org.apache.druid.segment.join.Joinables;
import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SegmentManager;
@ -194,7 +193,7 @@ public class ServerManager implements QuerySegmentWalker
} }
// segmentMapFn maps each base Segment into a joined Segment if necessary. // segmentMapFn maps each base Segment into a joined Segment if necessary.
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn( final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(), analysis.getPreJoinableClauses(),
joinableFactory, joinableFactory,
cpuTimeAccumulator, cpuTimeAccumulator,
@ -230,7 +229,6 @@ public class ServerManager implements QuerySegmentWalker
factory, factory,
toolChest, toolChest,
segmentMapFn.apply(segment), segmentMapFn.apply(segment),
segment.referenceCounter(),
descriptor, descriptor,
cpuTimeAccumulator cpuTimeAccumulator
) )
@ -253,8 +251,7 @@ public class ServerManager implements QuerySegmentWalker
private <T> QueryRunner<T> buildAndDecorateQueryRunner( private <T> QueryRunner<T> buildAndDecorateQueryRunner(
final QueryRunnerFactory<T, Query<T>> factory, final QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest, final QueryToolChest<T, Query<T>> toolChest,
final Segment segment, final SegmentReference segment,
final ReferenceCounter segmentReferenceCounter,
final SegmentDescriptor segmentDescriptor, final SegmentDescriptor segmentDescriptor,
final AtomicLong cpuTimeAccumulator final AtomicLong cpuTimeAccumulator
) )
@ -266,7 +263,7 @@ public class ServerManager implements QuerySegmentWalker
MetricsEmittingQueryRunner<T> metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>( MetricsEmittingQueryRunner<T> metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>(
emitter, emitter,
toolChest, toolChest,
new ReferenceCountingSegmentQueryRunner<>(factory, segment, segmentReferenceCounter, segmentDescriptor), new ReferenceCountingSegmentQueryRunner<>(factory, segment, segmentDescriptor),
QueryMetrics::reportSegmentTime, QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(segmentIdString) queryMetrics -> queryMetrics.segment(segmentIdString)
); );

View File

@ -20,7 +20,6 @@
package org.apache.druid.segment.loading; package org.apache.druid.segment.loading;
import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.segment.AbstractSegment;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.StorageAdapter;
@ -49,7 +48,7 @@ public class CacheTestSegmentLoader implements SegmentLoader
@Override @Override
public Segment getSegment(final DataSegment segment, boolean lazy) public Segment getSegment(final DataSegment segment, boolean lazy)
{ {
return new AbstractSegment() return new Segment()
{ {
@Override @Override
public SegmentId getId() public SegmentId getId()

View File

@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
public class FireHydrantTest extends InitializedNullHandlingTest
{
private IncrementalIndexSegment incrementalIndexSegment;
private QueryableIndexSegment queryableIndexSegment;
private FireHydrant hydrant;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
incrementalIndexSegment = new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), SegmentId.dummy("test"));
queryableIndexSegment = new QueryableIndexSegment(TestIndex.getMMappedTestIndex(), SegmentId.dummy("test"));
// hydrant starts out with incremental segment loaded
hydrant = new FireHydrant(incrementalIndexSegment, 0);
}
@Test
public void testGetIncrementedSegmentNotSwapped()
{
Assert.assertEquals(0, hydrant.getHydrantSegment().getNumReferences());
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
Assert.assertNotNull(segment);
Assert.assertTrue(segment.getBaseSegment() == incrementalIndexSegment);
Assert.assertEquals(1, segment.getNumReferences());
}
@Test
public void testGetIncrementedSegmentSwapped()
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
hydrant.swapSegment(queryableIndexSegment);
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
Assert.assertNotNull(segment);
Assert.assertTrue(segment.getBaseSegment() == queryableIndexSegment);
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Assert.assertEquals(1, segment.getNumReferences());
}
@Test
public void testGetIncrementedSegmentClosed()
{
expectedException.expect(ISE.class);
expectedException.expectMessage("segment.close() is called somewhere outside FireHydrant.swapSegment()");
hydrant.getHydrantSegment().close();
Assert.assertEquals(0, hydrant.getHydrantSegment().getNumReferences());
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
}
@Test
public void testGetAndIncrementSegment() throws IOException
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = hydrant.getAndIncrementSegment();
Assert.assertEquals(1, segmentAndCloseable.lhs.getNumReferences());
segmentAndCloseable.rhs.close();
Assert.assertEquals(0, segmentAndCloseable.lhs.getNumReferences());
}
@Test
public void testGetSegmentForQuery() throws IOException
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = hydrant.getSegmentForQuery(
Function.identity()
);
Assert.assertTrue(maybeSegmentAndCloseable.isPresent());
Assert.assertEquals(1, incrementalSegmentReference.getNumReferences());
Pair<SegmentReference, Closeable> segmentAndCloseable = maybeSegmentAndCloseable.get();
segmentAndCloseable.rhs.close();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
}
@Test
public void testGetSegmentForQuerySwapped() throws IOException
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
hydrant.swapSegment(queryableIndexSegment);
ReferenceCountingSegment queryableSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Assert.assertEquals(0, queryableSegmentReference.getNumReferences());
Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = hydrant.getSegmentForQuery(
Function.identity()
);
Assert.assertTrue(maybeSegmentAndCloseable.isPresent());
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Assert.assertEquals(1, queryableSegmentReference.getNumReferences());
Pair<SegmentReference, Closeable> segmentAndCloseable = maybeSegmentAndCloseable.get();
segmentAndCloseable.rhs.close();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Assert.assertEquals(0, queryableSegmentReference.getNumReferences());
}
@Test
public void testGetSegmentForQueryButNotAbleToAcquireReferences()
{
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = hydrant.getSegmentForQuery(
segmentReference -> new SegmentReference()
{
@Override
public Optional<Closeable> acquireReferences()
{
return Optional.empty();
}
@Override
public SegmentId getId()
{
return incrementalIndexSegment.getId();
}
@Override
public Interval getDataInterval()
{
return incrementalIndexSegment.getDataInterval();
}
@Nullable
@Override
public QueryableIndex asQueryableIndex()
{
return incrementalIndexSegment.asQueryableIndex();
}
@Override
public StorageAdapter asStorageAdapter()
{
return incrementalIndexSegment.asStorageAdapter();
}
@Override
public void close()
{
incrementalIndexSegment.close();
}
}
);
Assert.assertFalse(maybeSegmentAndCloseable.isPresent());
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
}
@Test
public void testGetSegmentForQueryButNotAbleToAcquireReferencesSegmentClosed()
{
expectedException.expect(ISE.class);
expectedException.expectMessage("segment.close() is called somewhere outside FireHydrant.swapSegment()");
ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment();
Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
incrementalSegmentReference.close();
Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = hydrant.getSegmentForQuery(
Function.identity()
);
}
}

View File

@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.AbstractSegment;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
@ -90,7 +89,7 @@ public class SegmentManagerTest
} }
}; };
private static class SegmentForTesting extends AbstractSegment private static class SegmentForTesting implements Segment
{ {
private final String version; private final String version;
private final Interval interval; private final Interval interval;

View File

@ -45,6 +45,7 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables; import org.apache.druid.segment.join.Joinables;
import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.TimelineObjectHolder;
@ -139,7 +140,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
} }
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn( final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(), analysis.getPreJoinableClauses(),
joinableFactory, joinableFactory,
new AtomicLong(), new AtomicLong(),
@ -197,7 +198,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest, final QueryToolChest<T, Query<T>> toolChest,
final QueryRunnerFactory<T, Query<T>> factory, final QueryRunnerFactory<T, Query<T>> factory,
final Iterable<WindowedSegment> segments, final Iterable<WindowedSegment> segments,
final Function<Segment, Segment> segmentMapFn final Function<SegmentReference, SegmentReference> segmentMapFn
) )
{ {
final List<WindowedSegment> segmentsList = Lists.newArrayList(segments); final List<WindowedSegment> segmentsList = Lists.newArrayList(segments);
@ -217,7 +218,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
.transform( .transform(
segment -> segment ->
new SpecificSegmentQueryRunner<>( new SpecificSegmentQueryRunner<>(
factory.createRunner(segmentMapFn.apply(segment.getSegment())), factory.createRunner(segmentMapFn.apply(ReferenceCountingSegment.wrapRootGenerationSegment(segment.getSegment()))),
new SpecificSegmentSpec(segment.getDescriptor()) new SpecificSegmentSpec(segment.getDescriptor())
) )
) )

View File

@ -57,7 +57,6 @@ import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.search.SearchQuery; import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.search.SearchResultValue; import org.apache.druid.query.search.SearchResultValue;
import org.apache.druid.segment.AbstractSegment;
import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.ReferenceCountingSegment;
@ -600,7 +599,7 @@ public class ServerManagerTest
} }
} }
private static class SegmentForTesting extends AbstractSegment private static class SegmentForTesting implements Segment
{ {
private final String version; private final String version;
private final Interval interval; private final Interval interval;

View File

@ -126,7 +126,7 @@ public class QueryMaker
return DataSourceAnalysis.forDataSource(query.getDataSource()) return DataSourceAnalysis.forDataSource(query.getDataSource())
.getBaseQuerySegmentSpec() .getBaseQuerySegmentSpec()
.map(QuerySegmentSpec::getIntervals) .map(QuerySegmentSpec::getIntervals)
.orElse(query.getIntervals()); .orElseGet(query::getIntervals);
} }
private <T> Sequence<Object[]> execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes) private <T> Sequence<Object[]> execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)