From f8b643ec720bf34acf84fda01a3d876f98e81bb9 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 9 Jun 2020 20:12:36 -0700 Subject: [PATCH] make joinables closeable (#9982) * make joinables closeable * tests and adjustments * refactor to make join stuffs impelement ReferenceCountedObject instead of Closable, more tests * fixes * javadocs and stuff * fix bugs * more test * fix lgtm alert * simplify * fixup javadoc * review stuffs * safeguard against exceptions * i hate this checkstyle rule * make IndexedTable extend Closeable --- .../benchmark/JoinAndLookupBenchmark.java | 9 +- .../indexer/ITTestCoordinatorPausedTest.java | 2 +- .../{indexer => query}/ITUnionQueryTest.java | 9 +- .../{indexer => queries}/union_queries.json | 0 .../org/apache/druid/query/BaseQuery.java | 2 +- .../ReferenceCountingSegmentQueryRunner.java | 22 +- .../apache/druid/segment/AbstractSegment.java | 18 +- .../segment/IncrementalIndexSegment.java | 2 +- .../druid/segment/QueryableIndexSegment.java | 2 +- .../druid/segment/ReferenceCountedObject.java | 46 ++++ .../ReferenceCountingCloseableObject.java | 144 ++++++++++++ .../segment/ReferenceCountingSegment.java | 131 ++--------- .../apache/druid/segment/RowBasedSegment.java | 2 +- .../org/apache/druid/segment/Segment.java | 11 +- ...enceCounter.java => SegmentReference.java} | 24 +- .../druid/segment/join/HashJoinSegment.java | 45 +++- .../apache/druid/segment/join/Joinable.java | 3 +- .../druid/segment/join/JoinableClause.java | 11 +- .../apache/druid/segment/join/Joinables.java | 4 +- .../segment/join/lookup/LookupJoinable.java | 8 + .../segment/join/table/IndexedTable.java | 10 +- .../join/table/IndexedTableJoinable.java | 7 + .../join/table/RowBasedIndexedTable.java | 26 ++- .../segment/ReferenceCountingSegmentTest.java | 46 +++- .../segment/join/HashJoinSegmentTest.java | 207 +++++++++++++++-- .../druid/segment/join/JoinTestHelper.java | 8 +- .../druid/segment/join/JoinablesTest.java | 8 +- .../join/table/IndexedTableJoinableTest.java | 4 +- .../join/table/RowBasedIndexedTableTest.java | 7 + .../druid/client/CachingClusteredClient.java | 2 +- .../segment/join/InlineJoinableFactory.java | 4 +- .../druid/segment/realtime/FireHydrant.java | 90 ++++++-- .../appenderator/AppenderatorImpl.java | 4 +- .../appenderator/SinkQuerySegmentWalker.java | 26 ++- .../realtime/plumber/RealtimePlumber.java | 4 +- .../druid/server/LocalQuerySegmentWalker.java | 13 +- .../server/coordination/ServerManager.java | 11 +- .../loading/CacheTestSegmentLoader.java | 3 +- .../segment/realtime/FireHydrantTest.java | 213 ++++++++++++++++++ .../druid/server/SegmentManagerTest.java | 3 +- .../server/TestClusterQuerySegmentWalker.java | 7 +- .../coordination/ServerManagerTest.java | 3 +- .../druid/sql/calcite/rel/QueryMaker.java | 2 +- 43 files changed, 940 insertions(+), 263 deletions(-) rename integration-tests/src/test/java/org/apache/druid/tests/{indexer => query}/ITUnionQueryTest.java (96%) rename integration-tests/src/test/resources/{indexer => queries}/union_queries.json (100%) create mode 100644 processing/src/main/java/org/apache/druid/segment/ReferenceCountedObject.java create mode 100644 processing/src/main/java/org/apache/druid/segment/ReferenceCountingCloseableObject.java rename processing/src/main/java/org/apache/druid/segment/{ReferenceCounter.java => SegmentReference.java} (59%) create mode 100644 server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java index cb630a2e400..567a34842c9 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java @@ -38,6 +38,7 @@ import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnConfig; @@ -150,7 +151,7 @@ public class JoinAndLookupBenchmark 0 ); hashJoinLookupStringKeySegment = new HashJoinSegment( - baseSegment, + ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment), joinableClausesLookupStringKey, preAnalysisLookupStringKey ); @@ -177,7 +178,7 @@ public class JoinAndLookupBenchmark 0 ); hashJoinLookupLongKeySegment = new HashJoinSegment( - baseSegment, + ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment), joinableClausesLookupLongKey, preAnalysisLookupLongKey ); @@ -204,7 +205,7 @@ public class JoinAndLookupBenchmark 0 ); hashJoinIndexedTableStringKeySegment = new HashJoinSegment( - baseSegment, + ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment), joinableClausesIndexedTableStringKey, preAnalysisIndexedTableStringKey ); @@ -231,7 +232,7 @@ public class JoinAndLookupBenchmark 0 ); hashJoinIndexedTableLongKeySegment = new HashJoinSegment( - baseSegment, + ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment), joinableClausesIndexedTableLonggKey, preAnalysisIndexedTableLongKey ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java index 7d34b47d706..269c74d4cd1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit; @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITTestCoordinatorPausedTest extends AbstractITBatchIndexTest { - private static final Logger LOG = new Logger(ITUnionQueryTest.class); + private static final Logger LOG = new Logger(ITTestCoordinatorPausedTest.class); private static final String INDEX_DATASOURCE = "wikipedia_index_test"; private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITUnionQueryTest.java similarity index 96% rename from integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java rename to integration-tests/src/test/java/org/apache/druid/tests/query/ITUnionQueryTest.java index 4bf68ad8834..c720b1fab5b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITUnionQueryTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.tests.indexer; +package org.apache.druid.tests.query; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; @@ -39,6 +39,8 @@ import org.apache.druid.testing.guice.TestClient; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.ServerDiscoveryUtil; import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractITBatchIndexTest; +import org.apache.druid.tests.indexer.AbstractIndexerTest; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -62,7 +64,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest private static final String UNION_TASK_RESOURCE = "/indexer/wikipedia_union_index_task.json"; private static final String EVENT_RECEIVER_SERVICE_PREFIX = "eventReceiverServiceName"; private static final String UNION_DATA_FILE = "/data/union_query/wikipedia_index_data.json"; - private static final String UNION_QUERIES_RESOURCE = "/indexer/union_queries.json"; + private static final String UNION_QUERIES_RESOURCE = "/queries/union_queries.json"; private static final String UNION_DATASOURCE = "wikipedia_index_test"; @Inject @@ -92,7 +94,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest closer.register(unloader(fullDatasourceName + i)); } try { - // Load 4 datasources with same dimensions + // Load 3 datasources with same dimensions String task = setShutOffTime( getResourceAsString(UNION_TASK_RESOURCE), DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)) @@ -117,6 +119,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest () -> { for (int i = 0; i < numTasks; i++) { final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01"); + // there are 10 rows, but query only covers the first 5 if (countRows < 5) { LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i); return false; diff --git a/integration-tests/src/test/resources/indexer/union_queries.json b/integration-tests/src/test/resources/queries/union_queries.json similarity index 100% rename from integration-tests/src/test/resources/indexer/union_queries.json rename to integration-tests/src/test/resources/queries/union_queries.json diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java b/processing/src/main/java/org/apache/druid/query/BaseQuery.java index f95961cf40a..68be7ba48fd 100644 --- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java +++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java @@ -123,7 +123,7 @@ public abstract class BaseQuery implements Query { return DataSourceAnalysis.forDataSource(query.getDataSource()) .getBaseQuerySegmentSpec() - .orElse(query.getQuerySegmentSpec()); + .orElseGet(query::getQuerySegmentSpec); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java index 8691441ec20..a92bbd6421b 100644 --- a/processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -22,50 +22,42 @@ package org.apache.druid.query; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.segment.ReferenceCounter; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; public class ReferenceCountingSegmentQueryRunner implements QueryRunner { private final QueryRunnerFactory> factory; - private final Segment segment; - private final ReferenceCounter segmentReferenceCounter; + private final SegmentReference segment; private final SegmentDescriptor descriptor; public ReferenceCountingSegmentQueryRunner( QueryRunnerFactory> factory, - Segment segment, - ReferenceCounter segmentReferenceCounter, + SegmentReference segment, SegmentDescriptor descriptor ) { this.factory = factory; this.segment = segment; - this.segmentReferenceCounter = segmentReferenceCounter; this.descriptor = descriptor; } @Override public Sequence run(final QueryPlus queryPlus, ResponseContext responseContext) { - if (segmentReferenceCounter.increment()) { + return segment.acquireReferences().map(closeable -> { try { final Sequence baseSequence = factory.createRunner(segment).run(queryPlus, responseContext); - - return Sequences.withBaggage(baseSequence, segmentReferenceCounter.decrementOnceCloseable()); + return Sequences.withBaggage(baseSequence, closeable); } catch (Throwable t) { try { - segmentReferenceCounter.decrement(); + closeable.close(); } catch (Exception e) { t.addSuppressed(e); } throw t; } - } else { - // Segment was closed before we had a chance to increment the reference count - return new ReportTimelineMissingSegmentQueryRunner(descriptor).run(queryPlus, responseContext); - } + }).orElseGet(() -> new ReportTimelineMissingSegmentQueryRunner(descriptor).run(queryPlus, responseContext)); } } diff --git a/processing/src/main/java/org/apache/druid/segment/AbstractSegment.java b/processing/src/main/java/org/apache/druid/segment/AbstractSegment.java index 1518c44dcf0..c41be41ec4d 100644 --- a/processing/src/main/java/org/apache/druid/segment/AbstractSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/AbstractSegment.java @@ -19,19 +19,11 @@ package org.apache.druid.segment; -import javax.annotation.Nullable; - +/** + * @deprecated use {@link Segment} directly as this does nothing + */ +@Deprecated public abstract class AbstractSegment implements Segment { - @Override - @Nullable - public T as(Class clazz) - { - if (clazz.equals(QueryableIndex.class)) { - return (T) asQueryableIndex(); - } else if (clazz.equals(StorageAdapter.class)) { - return (T) asStorageAdapter(); - } - return null; - } + // i used to have a purpose } diff --git a/processing/src/main/java/org/apache/druid/segment/IncrementalIndexSegment.java b/processing/src/main/java/org/apache/druid/segment/IncrementalIndexSegment.java index 683106cf387..b270b54c4d1 100644 --- a/processing/src/main/java/org/apache/druid/segment/IncrementalIndexSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/IncrementalIndexSegment.java @@ -26,7 +26,7 @@ import org.joda.time.Interval; /** */ -public class IncrementalIndexSegment extends AbstractSegment +public class IncrementalIndexSegment implements Segment { private final IncrementalIndex index; private final SegmentId segmentId; diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java index 30b3fcba4df..a829dfae184 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexSegment.java @@ -24,7 +24,7 @@ import org.joda.time.Interval; /** */ -public class QueryableIndexSegment extends AbstractSegment +public class QueryableIndexSegment implements Segment { private final QueryableIndex index; private final QueryableIndexStorageAdapter storageAdapter; diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountedObject.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountedObject.java new file mode 100644 index 00000000000..4770743f3bb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountedObject.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import java.io.Closeable; +import java.util.Optional; + +/** + * Interface for an object that may have a reference acquired in the form of a {@link Closeable}. This is intended to be + * used with an implementation of {@link ReferenceCountingCloseableObject}, or anything else that wishes to provide + * a method to account for the acquire and release of a reference to the object. + */ +public interface ReferenceCountedObject +{ + /** + * This method is expected to increment a reference count and provide a {@link Closeable} that decrements the + * reference count when closed. This is likely just a wrapper around + * {@link ReferenceCountingCloseableObject#incrementReferenceAndDecrementOnceCloseable()}, but may also include any + * other associated references which should be incremented when this method is called, and decremented/released by the + * closeable. + * + * IMPORTANT NOTE: to fulfill the contract of this method, implementors must return a closeable to indicate that the + * reference can be acquired, even if there is nothing to close. Implementors should avoid allowing this method or the + * {@link Closeable} it creates to throw exceptions. + * + * For callers: if this method returns non-empty, IT MUST BE CLOSED, else reference counts can potentially leak. + */ + Optional acquireReferences(); +} diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingCloseableObject.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingCloseableObject.java new file mode 100644 index 00000000000..db37fc59568 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingCloseableObject.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.Closeable; +import java.util.Optional; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * ReferenceCountingCloseableObject implements something like automatic reference count-based resource management, + * backed by a {@link Phaser}. + * + * ReferenceCountingCloseableObject allows consumers to call {@link #close()} before some other "users", which called + * {@link #increment()} or {@link #incrementReferenceAndDecrementOnceCloseable()}, but have not called + * {@link #decrement()} yet or the closer for {@link #incrementReferenceAndDecrementOnceCloseable()}, and the wrapped + * object won't be actually closed until that all references are released. + */ +public abstract class ReferenceCountingCloseableObject implements Closeable +{ + private static final Logger log = new Logger(ReferenceCountingCloseableObject.class); + + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Phaser referents = new Phaser(1) + { + @Override + protected boolean onAdvance(int phase, int registeredParties) + { + // Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen + if (registeredParties != 0) { + log.error("registeredParties[%s] is not 0", registeredParties); + } + try { + baseObject.close(); + } + catch (Exception e) { + try { + log.error(e, "Exception while closing reference counted object[%s]", baseObject); + } + catch (Exception e2) { + // ignore + } + } + // Always terminate. + return true; + } + }; + + protected final BaseObject baseObject; + + public ReferenceCountingCloseableObject(BaseObject object) + { + this.baseObject = object; + } + + public int getNumReferences() + { + return Math.max(referents.getRegisteredParties() - 1, 0); + } + + public boolean isClosed() + { + return referents.isTerminated(); + } + + /** + * Increment the reference count by one. + */ + public boolean increment() + { + // Negative return from referents.register() means the Phaser is terminated. + return referents.register() >= 0; + } + + /** + * Decrement the reference count by one. + */ + public void decrement() + { + referents.arriveAndDeregister(); + } + + /** + * Returns an {@link Optional} of a {@link Closeable} from {@link #decrementOnceCloseable}, if it is able to + * successfully {@link #increment}, else nothing indicating that the reference could not be acquired. + */ + public Optional incrementReferenceAndDecrementOnceCloseable() + { + final Closer closer; + if (increment()) { + closer = Closer.create(); + closer.register(decrementOnceCloseable()); + } else { + closer = null; + } + return Optional.ofNullable(closer); + } + + /** + * Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the + * returned Closeable object for the second time, it won't call {@link #decrement()} again. + */ + public Closeable decrementOnceCloseable() + { + AtomicBoolean decremented = new AtomicBoolean(false); + return () -> { + if (decremented.compareAndSet(false, true)) { + decrement(); + } else { + log.warn("close() is called more than once on ReferenceCountingCloseableObject.decrementOnceCloseable()"); + } + }; + } + + @Override + public void close() + { + if (closed.compareAndSet(false, true)) { + referents.arriveAndDeregister(); + } else { + log.warn("close() is called more than once on ReferenceCountingCloseableObject"); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java index ba1c34318e3..5b531316cd5 100644 --- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java @@ -20,7 +20,6 @@ package org.apache.druid.segment; import com.google.common.base.Preconditions; -import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.Overshadowable; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.ShardSpec; @@ -28,50 +27,20 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; -import java.util.concurrent.Phaser; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Optional; /** - * ReferenceCountingSegment allows to call {@link #close()} before some other "users", which called {@link - * #increment()}, has not called {@link #decrement()} yet, and the wrapped {@link Segment} won't be actually closed - * until that. So ReferenceCountingSegment implements something like automatic reference count-based resource - * management. + * {@link Segment} that is also a {@link ReferenceCountingSegment}, allowing query engines that operate directly on + * segments to track references so that dropping a {@link Segment} can be done safely to ensure there are no in-flight + * queries. */ -public class ReferenceCountingSegment extends AbstractSegment - implements Overshadowable, ReferenceCounter +public class ReferenceCountingSegment extends ReferenceCountingCloseableObject + implements SegmentReference, Overshadowable { - private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class); - - private final Segment baseSegment; private final short startRootPartitionId; private final short endRootPartitionId; private final short minorVersion; private final short atomicUpdateGroupSize; - private final AtomicBoolean closed = new AtomicBoolean(false); - private final Phaser referents = new Phaser(1) - { - @Override - protected boolean onAdvance(int phase, int registeredParties) - { - // Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen - if (registeredParties != 0) { - log.error("registeredParties[%s] is not 0", registeredParties); - } - try { - baseSegment.close(); - } - catch (Exception e) { - try { - log.error(e, "Exception while closing segment[%s]", baseSegment.getId()); - } - catch (Exception e2) { - // ignore - } - } - // Always terminate. - return true; - } - }; public static ReferenceCountingSegment wrapRootGenerationSegment(Segment baseSegment) { @@ -106,7 +75,7 @@ public class ReferenceCountingSegment extends AbstractSegment short atomicUpdateGroupSize ) { - this.baseSegment = baseSegment; + super(baseSegment); this.startRootPartitionId = (short) startRootPartitionId; this.endRootPartitionId = (short) endRootPartitionId; this.minorVersion = minorVersion; @@ -116,105 +85,43 @@ public class ReferenceCountingSegment extends AbstractSegment @Nullable public Segment getBaseSegment() { - return !isClosed() ? baseSegment : null; - } - - public int getNumReferences() - { - return Math.max(referents.getRegisteredParties() - 1, 0); - } - - public boolean isClosed() - { - return referents.isTerminated(); + return !isClosed() ? baseObject : null; } @Override @Nullable public SegmentId getId() { - return !isClosed() ? baseSegment.getId() : null; + return !isClosed() ? baseObject.getId() : null; } @Override @Nullable public Interval getDataInterval() { - return !isClosed() ? baseSegment.getDataInterval() : null; + return !isClosed() ? baseObject.getDataInterval() : null; } @Override @Nullable public QueryableIndex asQueryableIndex() { - return !isClosed() ? baseSegment.asQueryableIndex() : null; + return !isClosed() ? baseObject.asQueryableIndex() : null; } @Override @Nullable public StorageAdapter asStorageAdapter() { - return !isClosed() ? baseSegment.asStorageAdapter() : null; - } - - @Override - public void close() - { - if (closed.compareAndSet(false, true)) { - referents.arriveAndDeregister(); - } else { - log.warn("close() is called more than once on ReferenceCountingSegment"); - } - } - - public ReferenceCounter referenceCounter() - { - return this; - } - - @Override - public boolean increment() - { - // Negative return from referents.register() means the Phaser is terminated. - return referents.register() >= 0; - } - - /** - * Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the - * returned Closeable object for the second time, it won't call {@link #decrement()} again. - */ - @Override - public Closeable decrementOnceCloseable() - { - AtomicBoolean decremented = new AtomicBoolean(false); - return () -> { - if (decremented.compareAndSet(false, true)) { - decrement(); - } else { - log.warn("close() is called more than once on ReferenceCountingSegment.decrementOnceCloseable()"); - } - }; - } - - @Override - public void decrement() - { - referents.arriveAndDeregister(); - } - - @Override - public T as(Class clazz) - { - return getBaseSegment().as(clazz); + return !isClosed() ? baseObject.asStorageAdapter() : null; } @Override public boolean overshadows(ReferenceCountingSegment other) { - if (baseSegment.getId().getDataSource().equals(other.baseSegment.getId().getDataSource()) - && baseSegment.getId().getInterval().overlaps(other.baseSegment.getId().getInterval())) { - final int majorVersionCompare = baseSegment.getId().getVersion() - .compareTo(other.baseSegment.getId().getVersion()); + if (baseObject.getId().getDataSource().equals(other.baseObject.getId().getDataSource()) + && baseObject.getId().getInterval().overlaps(other.baseObject.getId().getInterval())) { + final int majorVersionCompare = baseObject.getId().getVersion().compareTo(other.baseObject.getId().getVersion()); if (majorVersionCompare > 0) { return true; } else if (majorVersionCompare == 0) { @@ -245,7 +152,7 @@ public class ReferenceCountingSegment extends AbstractSegment @Override public String getVersion() { - return baseSegment.getId().getVersion(); + return baseObject.getId().getVersion(); } @Override @@ -259,4 +166,10 @@ public class ReferenceCountingSegment extends AbstractSegment { return atomicUpdateGroupSize; } + + @Override + public Optional acquireReferences() + { + return incrementReferenceAndDecrementOnceCloseable(); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java b/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java index 512cbc22a0c..36e84e8bd0e 100644 --- a/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java @@ -30,7 +30,7 @@ import javax.annotation.Nullable; /** * A {@link Segment} that is based on a stream of objects. */ -public class RowBasedSegment extends AbstractSegment +public class RowBasedSegment implements Segment { private final SegmentId segmentId; private final StorageAdapter storageAdapter; diff --git a/processing/src/main/java/org/apache/druid/segment/Segment.java b/processing/src/main/java/org/apache/druid/segment/Segment.java index 8fb9b4e620d..245c776b0ac 100644 --- a/processing/src/main/java/org/apache/druid/segment/Segment.java +++ b/processing/src/main/java/org/apache/druid/segment/Segment.java @@ -57,6 +57,15 @@ public interface Segment extends Closeable * @param desired interface * @return instance of clazz, or null if the interface is not supported by this segment */ + @SuppressWarnings("unused") @Nullable - T as(Class clazz); + default T as(Class clazz) + { + if (clazz.equals(QueryableIndex.class)) { + return (T) asQueryableIndex(); + } else if (clazz.equals(StorageAdapter.class)) { + return (T) asStorageAdapter(); + } + return null; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCounter.java b/processing/src/main/java/org/apache/druid/segment/SegmentReference.java similarity index 59% rename from processing/src/main/java/org/apache/druid/segment/ReferenceCounter.java rename to processing/src/main/java/org/apache/druid/segment/SegmentReference.java index 970d487b770..fae2a7b36f6 100644 --- a/processing/src/main/java/org/apache/druid/segment/ReferenceCounter.java +++ b/processing/src/main/java/org/apache/druid/segment/SegmentReference.java @@ -19,26 +19,12 @@ package org.apache.druid.segment; -import java.io.Closeable; - /** - * An interface to reference-counted objects. Used by {@link ReferenceCountingSegment}. Thread-safe. + * A {@link Segment} with a associated references, such as {@link ReferenceCountingSegment} where the reference is + * the segment itself, and {@link org.apache.druid.segment.join.HashJoinSegment} which wraps a + * {@link ReferenceCountingSegment} and also includes the associated list of + * {@link org.apache.druid.segment.join.JoinableClause} */ -public interface ReferenceCounter +public interface SegmentReference extends Segment, ReferenceCountedObject { - /** - * Increment the reference count by one. - */ - boolean increment(); - - /** - * Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the - * returned Closeable object for the second time, it won't call {@link #decrement()} again. - */ - Closeable decrementOnceCloseable(); - - /** - * Decrement the reference count by one. - */ - void decrement(); } diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index d321af95800..3ca240604da 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -20,26 +20,29 @@ package org.apache.druid.segment.join; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.segment.AbstractSegment; +import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Optional; /** * Represents a deep, left-heavy join of a left-hand side baseSegment onto a series of right-hand side clauses. * * In other words, logically the operation is: join(join(join(baseSegment, clauses[0]), clauses[1]), clauses[2]) etc. */ -public class HashJoinSegment extends AbstractSegment +public class HashJoinSegment implements SegmentReference { - private final Segment baseSegment; + private final SegmentReference baseSegment; private final List clauses; private final JoinFilterPreAnalysis joinFilterPreAnalysis; @@ -50,7 +53,7 @@ public class HashJoinSegment extends AbstractSegment * @param joinFilterPreAnalysis Pre-analysis computed by {@link org.apache.druid.segment.join.filter.JoinFilterAnalyzer#computeJoinFilterPreAnalysis} */ public HashJoinSegment( - Segment baseSegment, + SegmentReference baseSegment, List clauses, JoinFilterPreAnalysis joinFilterPreAnalysis ) @@ -98,4 +101,36 @@ public class HashJoinSegment extends AbstractSegment { baseSegment.close(); } + + @Override + public Optional acquireReferences() + { + Closer closer = Closer.create(); + try { + boolean acquireFailed = baseSegment.acquireReferences().map(closeable -> { + closer.register(closeable); + return false; + }).orElse(true); + + for (JoinableClause joinClause : clauses) { + if (acquireFailed) { + break; + } + acquireFailed |= joinClause.acquireReferences().map(closeable -> { + closer.register(closeable); + return false; + }).orElse(true); + } + if (acquireFailed) { + CloseQuietly.close(closer); + return Optional.empty(); + } else { + return Optional.of(closer); + } + } + catch (Exception ex) { + CloseQuietly.close(closer); + return Optional.empty(); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java index 001a0fccd21..7ad7799a109 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.join; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ReferenceCountedObject; import org.apache.druid.segment.column.ColumnCapabilities; import javax.annotation.Nullable; @@ -33,7 +34,7 @@ import java.util.Set; * This class's most important method is {@link #makeJoinMatcher}. Its main user is * {@link HashJoinEngine#makeJoinCursor}. */ -public interface Joinable +public interface Joinable extends ReferenceCountedObject { int CARDINALITY_UNKNOWN = -1; diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java index a2ddefe1bf0..2f8bd3ac322 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableClause.java @@ -21,9 +21,12 @@ package org.apache.druid.segment.join; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.ReferenceCountedObject; +import java.io.Closeable; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -33,7 +36,7 @@ import java.util.stream.Collectors; * * Created from {@link org.apache.druid.query.planning.PreJoinableClause} by {@link Joinables#createSegmentMapFn}. */ -public class JoinableClause +public class JoinableClause implements ReferenceCountedObject { private final String prefix; private final Joinable joinable; @@ -151,4 +154,10 @@ public class JoinableClause ", condition=" + condition + '}'; } + + @Override + public Optional acquireReferences() + { + return joinable.acquireReferences(); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinables.java b/processing/src/main/java/org/apache/druid/segment/join/Joinables.java index 76bd19e877d..c9a9d2fc247 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/Joinables.java +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinables.java @@ -22,7 +22,7 @@ package org.apache.druid.segment.join; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.planning.PreJoinableClause; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.join.filter.JoinFilterAnalyzer; @@ -88,7 +88,7 @@ public class Joinables * @param originalFilter The original filter from the query. * @param virtualColumns The virtual columns from the query. */ - public static Function createSegmentMapFn( + public static Function createSegmentMapFn( final List clauses, final JoinableFactory joinableFactory, final AtomicLong cpuTimeAccumulator, diff --git a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java index 7fae4e0db3b..9321a184ebb 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java @@ -31,6 +31,7 @@ import org.apache.druid.segment.join.JoinMatcher; import org.apache.druid.segment.join.Joinable; import javax.annotation.Nullable; +import java.io.Closeable; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -123,4 +124,11 @@ public class LookupJoinable implements Joinable } return Optional.of(correlatedValues); } + + @Override + public Optional acquireReferences() + { + // nothing to close for lookup joinables, they are managed externally and have no per query accounting of usage + return Optional.of(() -> {}); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java index cbc858112f3..b4721448729 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java @@ -20,9 +20,11 @@ package org.apache.druid.segment.join.table; import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.segment.ReferenceCountedObject; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; +import java.io.Closeable; import java.util.Set; /** @@ -30,8 +32,14 @@ import java.util.Set; * * The main user of this class is {@link IndexedTableJoinable}, and its main purpose is to participate in joins. */ -public interface IndexedTable +public interface IndexedTable extends ReferenceCountedObject, Closeable { + /** + * Returns the version of this table, used to compare against when loading a new version of the table + */ + @SuppressWarnings("unused") + String version(); + /** * Returns the columns of this table that have indexes. */ diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java index 66ef1213d6f..47166793ed2 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java @@ -27,6 +27,7 @@ import org.apache.druid.segment.join.JoinMatcher; import org.apache.druid.segment.join.Joinable; import javax.annotation.Nullable; +import java.io.Closeable; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -133,4 +134,10 @@ public class IndexedTableJoinable implements Joinable return Optional.of(correlatedValues); } } + + @Override + public Optional acquireReferences() + { + return table.acquireReferences(); + } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java index 87ac7d48e17..c18ec6a44b7 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java @@ -30,11 +30,13 @@ import org.apache.druid.segment.RowAdapter; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import java.io.Closeable; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -50,12 +52,14 @@ public class RowBasedIndexedTable implements IndexedTable private final RowSignature rowSignature; private final List> columnFunctions; private final Set keyColumns; + private final String version; public RowBasedIndexedTable( final List table, final RowAdapter rowAdapter, final RowSignature rowSignature, - final Set keyColumns + final Set keyColumns, + final String version ) { this.table = table; @@ -63,6 +67,7 @@ public class RowBasedIndexedTable implements IndexedTable this.columnFunctions = rowSignature.getColumnNames().stream().map(rowAdapter::columnFunction).collect(Collectors.toList()); this.keyColumns = keyColumns; + this.version = version; if (new HashSet<>(keyColumns).size() != keyColumns.size()) { throw new ISE("keyColumns[%s] must not contain duplicates", keyColumns); @@ -106,6 +111,12 @@ public class RowBasedIndexedTable implements IndexedTable } } + @Override + public String version() + { + return version; + } + @Override public Set keyColumns() { @@ -163,4 +174,17 @@ public class RowBasedIndexedTable implements IndexedTable { return table.size(); } + + @Override + public Optional acquireReferences() + { + // nothing to close by default, whatever loaded this thing (probably) lives on heap + return Optional.of(() -> {}); + } + + @Override + public void close() + { + // nothing to close + } } diff --git a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java index cee0a46e289..6566592e9d5 100644 --- a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java @@ -21,6 +21,7 @@ package org.apache.druid.segment; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.timeline.SegmentId; +import org.easymock.EasyMock; import org.joda.time.Days; import org.joda.time.Interval; import org.junit.Assert; @@ -38,39 +39,50 @@ public class ReferenceCountingSegmentTest private ReferenceCountingSegment segment; private ExecutorService exec; + private final SegmentId segmentId = SegmentId.dummy("test_segment"); + private final Interval dataInterval = new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc()); + private QueryableIndex index; + private StorageAdapter adapter; + private int underlyingSegmentClosedCount; + @Before public void setUp() { + underlyingSegmentClosedCount = 0; + index = EasyMock.createNiceMock(QueryableIndex.class); + adapter = EasyMock.createNiceMock(StorageAdapter.class); + segment = ReferenceCountingSegment.wrapRootGenerationSegment( - new AbstractSegment() + new Segment() { @Override public SegmentId getId() { - return SegmentId.dummy("test_segment"); + return segmentId; } @Override public Interval getDataInterval() { - return new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc()); + return dataInterval; } @Override public QueryableIndex asQueryableIndex() { - return null; + return index; } @Override public StorageAdapter asStorageAdapter() { - return null; + return adapter; } @Override public void close() { + underlyingSegmentClosedCount++; } } ); @@ -81,13 +93,17 @@ public class ReferenceCountingSegmentTest @Test public void testMultipleClose() throws Exception { + Assert.assertEquals(0, underlyingSegmentClosedCount); Assert.assertFalse(segment.isClosed()); Assert.assertTrue(segment.increment()); Assert.assertEquals(1, segment.getNumReferences()); Closeable closeable = segment.decrementOnceCloseable(); + Assert.assertEquals(0, underlyingSegmentClosedCount); closeable.close(); + Assert.assertEquals(0, underlyingSegmentClosedCount); closeable.close(); + Assert.assertEquals(0, underlyingSegmentClosedCount); exec.submit( () -> { try { @@ -99,10 +115,16 @@ public class ReferenceCountingSegmentTest } ).get(); Assert.assertEquals(0, segment.getNumReferences()); + Assert.assertEquals(0, underlyingSegmentClosedCount); Assert.assertFalse(segment.isClosed()); + // close for reals segment.close(); + Assert.assertTrue(segment.isClosed()); + Assert.assertEquals(1, underlyingSegmentClosedCount); + // ... but make sure it only happens once segment.close(); + Assert.assertEquals(1, underlyingSegmentClosedCount); exec.submit( () -> { try { @@ -116,13 +138,25 @@ public class ReferenceCountingSegmentTest Assert.assertEquals(0, segment.getNumReferences()); Assert.assertTrue(segment.isClosed()); + Assert.assertEquals(1, underlyingSegmentClosedCount); segment.increment(); segment.increment(); segment.increment(); Assert.assertEquals(0, segment.getNumReferences()); - + Assert.assertEquals(1, underlyingSegmentClosedCount); segment.close(); Assert.assertEquals(0, segment.getNumReferences()); + Assert.assertEquals(1, underlyingSegmentClosedCount); } + + @Test + public void testExposesWrappedSegment() + { + Assert.assertEquals(segmentId, segment.getId()); + Assert.assertEquals(dataInterval, segment.getDataInterval()); + Assert.assertEquals(index, segment.asQueryableIndex()); + Assert.assertEquals(adapter, segment.asStorageAdapter()); + } + } diff --git a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentTest.java index 3ff19e4d591..966d62023ff 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentTest.java @@ -20,29 +20,37 @@ package org.apache.druid.segment.join; import com.google.common.collect.ImmutableList; -import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.QueryContexts; +import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.join.filter.JoinFilterAnalyzer; import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis; import org.apache.druid.segment.join.filter.JoinableClauses; import org.apache.druid.segment.join.table.IndexedTableJoinable; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.SegmentId; import org.hamcrest.CoreMatchers; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; +import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Optional; -public class HashJoinSegmentTest +public class HashJoinSegmentTest extends InitializedNullHandlingTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -51,17 +59,30 @@ public class HashJoinSegmentTest public ExpectedException expectedException = ExpectedException.none(); private QueryableIndexSegment baseSegment; + private ReferenceCountingSegment referencedSegment; private HashJoinSegment hashJoinSegment; - @BeforeClass - public static void setUpStatic() - { - NullHandling.initializeForTests(); - } + private int allReferencesAcquireCount; + private int allReferencesCloseCount; + private int referencedSegmentAcquireCount; + private int referencedSegmentClosedCount; + private int indexedTableJoinableReferenceAcquireCount; + private int indexedTableJoinableReferenceCloseCount; + private boolean j0Closed; + private boolean j1Closed; @Before public void setUp() throws IOException { + allReferencesAcquireCount = 0; + allReferencesCloseCount = 0; + referencedSegmentAcquireCount = 0; + referencedSegmentClosedCount = 0; + indexedTableJoinableReferenceAcquireCount = 0; + indexedTableJoinableReferenceCloseCount = 0; + j0Closed = false; + j1Closed = false; + baseSegment = new QueryableIndexSegment( JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(), SegmentId.dummy("facts") @@ -70,13 +91,39 @@ public class HashJoinSegmentTest List joinableClauses = ImmutableList.of( new JoinableClause( "j0.", - new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()), + new IndexedTableJoinable(JoinTestHelper.createCountriesIndexedTable()) + { + @Override + public Optional acquireReferences() + { + if (!j0Closed) { + indexedTableJoinableReferenceAcquireCount++; + Closer closer = Closer.create(); + closer.register(() -> indexedTableJoinableReferenceCloseCount++); + return Optional.of(closer); + } + return Optional.empty(); + } + }, JoinType.LEFT, JoinConditionAnalysis.forExpression("1", "j0.", ExprMacroTable.nil()) ), new JoinableClause( "j1.", - new IndexedTableJoinable(JoinTestHelper.createRegionsIndexedTable()), + new IndexedTableJoinable(JoinTestHelper.createRegionsIndexedTable()) + { + @Override + public Optional acquireReferences() + { + if (!j1Closed) { + indexedTableJoinableReferenceAcquireCount++; + Closer closer = Closer.create(); + closer.register(() -> indexedTableJoinableReferenceCloseCount++); + return Optional.of(closer); + } + return Optional.empty(); + } + }, JoinType.LEFT, JoinConditionAnalysis.forExpression("1", "j1.", ExprMacroTable.nil()) ) @@ -92,11 +139,70 @@ public class HashJoinSegmentTest QueryContexts.DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE ); + referencedSegment = ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment); + SegmentReference testWrapper = new SegmentReference() + { + @Override + public Optional acquireReferences() + { + Closer closer = Closer.create(); + return referencedSegment.acquireReferences().map(closeable -> { + referencedSegmentAcquireCount++; + closer.register(closeable); + closer.register(() -> referencedSegmentClosedCount++); + return closer; + }); + } + + @Override + public SegmentId getId() + { + return referencedSegment.getId(); + } + + @Override + public Interval getDataInterval() + { + return referencedSegment.getDataInterval(); + } + + @Nullable + @Override + public QueryableIndex asQueryableIndex() + { + return referencedSegment.asQueryableIndex(); + } + + @Override + public StorageAdapter asStorageAdapter() + { + return referencedSegment.asStorageAdapter(); + } + + @Override + public void close() + { + referencedSegment.close(); + } + }; hashJoinSegment = new HashJoinSegment( - baseSegment, + testWrapper, joinableClauses, joinFilterPreAnalysis - ); + ) + { + @Override + public Optional acquireReferences() + { + Closer closer = Closer.create(); + return super.acquireReferences().map(closeable -> { + allReferencesAcquireCount++; + closer.register(closeable); + closer.register(() -> allReferencesCloseCount++); + return closer; + }); + } + }; } @Test @@ -118,7 +224,7 @@ public class HashJoinSegmentTest ); final HashJoinSegment ignored = new HashJoinSegment( - baseSegment, + ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment), joinableClauses, joinFilterPreAnalysis ); @@ -150,4 +256,79 @@ public class HashJoinSegmentTest CoreMatchers.instanceOf(HashJoinSegmentStorageAdapter.class) ); } + + @Test + public void testJoinableClausesAreClosedWhenReferencesUsed() throws IOException + { + Assert.assertFalse(referencedSegment.isClosed()); + + Optional maybeCloseable = hashJoinSegment.acquireReferences(); + Assert.assertTrue(maybeCloseable.isPresent()); + + Assert.assertEquals(1, referencedSegmentAcquireCount); + Assert.assertEquals(2, indexedTableJoinableReferenceAcquireCount); + Assert.assertEquals(1, allReferencesAcquireCount); + Assert.assertEquals(0, referencedSegmentClosedCount); + Assert.assertEquals(0, indexedTableJoinableReferenceCloseCount); + Assert.assertEquals(0, allReferencesCloseCount); + + Closeable closer = maybeCloseable.get(); + closer.close(); + + Assert.assertFalse(referencedSegment.isClosed()); + Assert.assertEquals(1, referencedSegmentClosedCount); + Assert.assertEquals(2, indexedTableJoinableReferenceCloseCount); + Assert.assertEquals(1, allReferencesCloseCount); + + } + + @Test + public void testJoinableClausesClosedIfSegmentIsAlreadyClosed() + { + Assert.assertFalse(referencedSegment.isClosed()); + + referencedSegment.close(); + Assert.assertTrue(referencedSegment.isClosed()); + + Optional maybeCloseable = hashJoinSegment.acquireReferences(); + Assert.assertFalse(maybeCloseable.isPresent()); + Assert.assertEquals(0, referencedSegmentAcquireCount); + Assert.assertEquals(0, indexedTableJoinableReferenceAcquireCount); + Assert.assertEquals(0, allReferencesAcquireCount); + Assert.assertEquals(0, referencedSegmentClosedCount); + Assert.assertEquals(0, indexedTableJoinableReferenceCloseCount); + Assert.assertEquals(0, allReferencesCloseCount); + } + + @Test + public void testJoinableClausesClosedIfJoinableZeroIsAlreadyClosed() + { + Assert.assertFalse(referencedSegment.isClosed()); + j0Closed = true; + + Optional maybeCloseable = hashJoinSegment.acquireReferences(); + Assert.assertFalse(maybeCloseable.isPresent()); + Assert.assertEquals(1, referencedSegmentAcquireCount); + Assert.assertEquals(0, indexedTableJoinableReferenceAcquireCount); + Assert.assertEquals(0, allReferencesAcquireCount); + Assert.assertEquals(1, referencedSegmentClosedCount); + Assert.assertEquals(0, indexedTableJoinableReferenceCloseCount); + Assert.assertEquals(0, allReferencesCloseCount); + } + + @Test + public void testJoinableClausesClosedIfJoinableOneIsAlreadyClosed() + { + Assert.assertFalse(referencedSegment.isClosed()); + j1Closed = true; + + Optional maybeCloseable = hashJoinSegment.acquireReferences(); + Assert.assertFalse(maybeCloseable.isPresent()); + Assert.assertEquals(1, referencedSegmentAcquireCount); + Assert.assertEquals(1, indexedTableJoinableReferenceAcquireCount); + Assert.assertEquals(0, allReferencesAcquireCount); + Assert.assertEquals(1, referencedSegmentClosedCount); + Assert.assertEquals(1, indexedTableJoinableReferenceCloseCount); + Assert.assertEquals(0, allReferencesCloseCount); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java b/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java index 0f960dee546..d33e2627a34 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java @@ -142,6 +142,8 @@ public class JoinTestHelper } }; + public static final String INDEXED_TABLE_VERSION = DateTimes.nowUtc().toString(); + private static RowAdapter> createMapRowAdapter(final RowSignature signature) { return new RowAdapter>() @@ -255,7 +257,8 @@ public class JoinTestHelper rows, createMapRowAdapter(COUNTRIES_SIGNATURE), COUNTRIES_SIGNATURE, - ImmutableSet.of("countryNumber", "countryIsoCode") + ImmutableSet.of("countryNumber", "countryIsoCode"), + INDEXED_TABLE_VERSION ) ); } @@ -268,7 +271,8 @@ public class JoinTestHelper rows, createMapRowAdapter(REGIONS_SIGNATURE), REGIONS_SIGNATURE, - ImmutableSet.of("regionIsoCode", "countryIsoCode") + ImmutableSet.of("regionIsoCode", "countryIsoCode"), + INDEXED_TABLE_VERSION ) ); } diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java index 4fa521dbcbc..ae36d175ff1 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java @@ -27,7 +27,7 @@ import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.extraction.MapLookupExtractor; import org.apache.druid.query.planning.PreJoinableClause; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.join.lookup.LookupJoinable; @@ -95,7 +95,7 @@ public class JoinablesTest @Test public void test_createSegmentMapFn_noClauses() { - final Function segmentMapFn = Joinables.createSegmentMapFn( + final Function segmentMapFn = Joinables.createSegmentMapFn( ImmutableList.of(), NoopJoinableFactory.INSTANCE, new AtomicLong(), @@ -124,7 +124,7 @@ public class JoinablesTest expectedException.expect(IllegalStateException.class); expectedException.expectMessage("dataSource is not joinable"); - final Function ignored = Joinables.createSegmentMapFn( + final Function ignored = Joinables.createSegmentMapFn( ImmutableList.of(clause), NoopJoinableFactory.INSTANCE, new AtomicLong(), @@ -153,7 +153,7 @@ public class JoinablesTest conditionAnalysis ); - final Function segmentMapFn = Joinables.createSegmentMapFn( + final Function segmentMapFn = Joinables.createSegmentMapFn( ImmutableList.of(clause), (dataSource, condition) -> { if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java index 13cc45fa590..c75232c9be9 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java @@ -22,6 +22,7 @@ package org.apache.druid.segment.join.table; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -96,7 +97,8 @@ public class IndexedTableJoinableTest inlineDataSource.getRowsAsList(), inlineDataSource.rowAdapter(), inlineDataSource.getRowSignature(), - ImmutableSet.of("str") + ImmutableSet.of("str"), + DateTimes.nowUtc().toString() ); private IndexedTableJoinable target; diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java index 49ebc36465b..31bdc56a0ec 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/RowBasedIndexedTableTest.java @@ -172,4 +172,11 @@ public class RowBasedIndexedTableTest expectedException.expect(IndexOutOfBoundsException.class); countriesTable.columnReader(99); } + + @Test + public void testVersion() + { + Assert.assertEquals(JoinTestHelper.INDEXED_TABLE_VERSION, countriesTable.version()); + Assert.assertEquals(JoinTestHelper.INDEXED_TABLE_VERSION, regionsTable.version()); + } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index c382be3493e..ded9cf09d4e 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -261,7 +261,7 @@ public class CachingClusteredClient implements QuerySegmentWalker // For nested queries, we need to look at the intervals of the inner most query. this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec() .map(QuerySegmentSpec::getIntervals) - .orElse(query.getIntervals()); + .orElseGet(() -> query.getIntervals()); } private ImmutableMap makeDownstreamQueryContext() diff --git a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java index 3890e6c3736..5945d42957c 100644 --- a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java +++ b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.join; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.DataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.segment.join.table.IndexedTable; @@ -49,7 +50,8 @@ public class InlineJoinableFactory implements JoinableFactory inlineDataSource.getRowsAsList(), inlineDataSource.rowAdapter(), inlineDataSource.getRowSignature(), - rightKeyColumns + rightKeyColumns, + DateTimes.nowUtc().toString() ) ) ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index fbd6e8d3988..bf5ba26c529 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -19,18 +19,22 @@ package org.apache.druid.segment.realtime; +import com.google.common.annotations.VisibleForTesting; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; /** */ @@ -71,27 +75,6 @@ public class FireHydrant return adapter.get().getDataInterval(); } - public ReferenceCountingSegment getIncrementedSegment() - { - ReferenceCountingSegment segment = adapter.get(); - while (true) { - if (segment.increment()) { - return segment; - } - // segment.increment() returned false, means it is closed. Since close() in swapSegment() happens after segment - // swap, the new segment should already be visible. - ReferenceCountingSegment newSegment = adapter.get(); - if (segment == newSegment) { - throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()"); - } - if (newSegment == null) { - throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment"); - } - segment = newSegment; - // Spin loop. - } - } - public int getCount() { return count; @@ -133,12 +116,75 @@ public class FireHydrant } } - public Pair getAndIncrementSegment() + public ReferenceCountingSegment getIncrementedSegment() + { + ReferenceCountingSegment segment = adapter.get(); + while (true) { + if (segment.increment()) { + return segment; + } + // segment.increment() returned false, means it is closed. Since close() in swapSegment() happens after segment + // swap, the new segment should already be visible. + ReferenceCountingSegment newSegment = adapter.get(); + if (segment == newSegment) { + throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()"); + } + if (newSegment == null) { + throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment"); + } + segment = newSegment; + // Spin loop. + } + } + + public Pair getAndIncrementSegment() { ReferenceCountingSegment segment = getIncrementedSegment(); return new Pair<>(segment, segment.decrementOnceCloseable()); } + /** + * This method is like a combined form of {@link #getIncrementedSegment} and {@link #getAndIncrementSegment} that + * deals in {@link SegmentReference} instead of directly with {@link ReferenceCountingSegment} in order to acquire + * reference count for both hydrant's segment and any tracked joinables taking part in the query. + */ + public Optional> getSegmentForQuery( + Function segmentMapFn + ) + { + ReferenceCountingSegment sinkSegment = adapter.get(); + SegmentReference segment = segmentMapFn.apply(sinkSegment); + while (true) { + Optional reference = segment.acquireReferences(); + if (reference.isPresent()) { + + return Optional.of(new Pair<>(segment, reference.get())); + } + // segment.acquireReferences() returned false, means it is closed. Since close() in swapSegment() happens after + // segment swap, the new segment should already be visible. + ReferenceCountingSegment newSinkSegment = adapter.get(); + if (newSinkSegment == null) { + throw new ISE("FireHydrant was 'closed' by swapping segment to null while acquiring a segment"); + } + if (sinkSegment == newSinkSegment) { + if (newSinkSegment.isClosed()) { + throw new ISE("segment.close() is called somewhere outside FireHydrant.swapSegment()"); + } + // if segment is not closed, but is same segment it means we are having trouble getting references for joinables + // of a HashJoinSegment created by segmentMapFn + return Optional.empty(); + } + segment = segmentMapFn.apply(newSinkSegment); + // Spin loop. + } + } + + @VisibleForTesting + public ReferenceCountingSegment getHydrantSegment() + { + return adapter.get(); + } + @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index fe6986d2f0d..24fcb67efa2 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -59,7 +59,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.indexing.DataSchema; @@ -756,7 +756,7 @@ public class AppenderatorImpl implements Appenderator Closer closer = Closer.create(); try { for (FireHydrant fireHydrant : sink) { - Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); + Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant); indexes.add(queryableIndex); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 504daf3d60a..4527287b4f6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -28,7 +28,6 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.CloseQuietly; @@ -57,7 +56,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.Joinables; import org.apache.druid.segment.realtime.FireHydrant; @@ -171,7 +170,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker } // segmentMapFn maps each base Segment into a joined Segment if necessary. - final Function segmentMapFn = Joinables.createSegmentMapFn( + final Function segmentMapFn = Joinables.createSegmentMapFn( analysis.getPreJoinableClauses(), joinableFactory, cpuTimeAccumulator, @@ -211,15 +210,24 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); if (skipIncrementalSegment && !hydrantDefinitelySwapped) { - return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>()); + return new Pair<>(hydrant.getSegmentDataInterval(), new NoopQueryRunner<>()); } // Prevent the underlying segment from swapping when its being iterated - final Pair segmentAndCloseable = hydrant.getAndIncrementSegment(); - try { - final Segment mappedSegment = segmentMapFn.apply(segmentAndCloseable.lhs); + final Optional> maybeSegmentAndCloseable = + hydrant.getSegmentForQuery(segmentMapFn); - QueryRunner runner = factory.createRunner(mappedSegment); + // if optional isn't present, we failed to acquire reference to the segment or any joinables + if (!maybeSegmentAndCloseable.isPresent()) { + return new Pair<>( + hydrant.getSegmentDataInterval(), + new ReportTimelineMissingSegmentQueryRunner<>(descriptor) + ); + } + final Pair segmentAndCloseable = maybeSegmentAndCloseable.get(); + try { + + QueryRunner runner = factory.createRunner(segmentAndCloseable.lhs); // 1) Only use caching if data is immutable // 2) Hydrants are not the same between replicas, make sure cache is local @@ -245,7 +253,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker runner, segmentAndCloseable.rhs ); - return new Pair<>(mappedSegment.getDataInterval(), runner); + return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); } catch (RuntimeException e) { CloseQuietly.close(segmentAndCloseable.rhs); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 7a7db66d1f9..1fdc0b0f680 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -56,7 +56,7 @@ import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.indexing.DataSchema; @@ -424,7 +424,7 @@ public class RealtimePlumber implements Plumber Closer closer = Closer.create(); try { for (FireHydrant fireHydrant : sink) { - Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); + Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); log.info("Adding hydrant[%s]", fireHydrant); indexes.add(queryableIndex); diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index d7f39adaa4b..5e3928c799e 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -24,6 +24,7 @@ import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.FluentQueryRunnerBuilder; import org.apache.druid.query.Query; @@ -35,7 +36,9 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.Joinables; @@ -89,11 +92,15 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource()); } - final Iterable segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals); + // wrap in ReferenceCountingSegment, these aren't currently managed by SegmentManager so reference tracking doesn't + // matter, but at least some or all will be in a future PR + final Iterable segments = + FunctionalIterable.create(segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals)) + .transform(ReferenceCountingSegment::wrapRootGenerationSegment); final Query prioritizedAndLaned = prioritizeAndLaneQuery(query, segments); final AtomicLong cpuAccumulator = new AtomicLong(0L); - final Function segmentMapFn = Joinables.createSegmentMapFn( + final Function segmentMapFn = Joinables.createSegmentMapFn( analysis.getPreJoinableClauses(), joinableFactory, cpuAccumulator, @@ -129,7 +136,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker throw new ISE("Cannot run with specific segments"); } - private Query prioritizeAndLaneQuery(Query query, Iterable segments) + private Query prioritizeAndLaneQuery(Query query, Iterable segments) { Set segmentServerSelectors = new HashSet<>(); for (Segment s : segments) { diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index ca26e0dd4f7..086673fabc1 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -53,9 +53,8 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; -import org.apache.druid.segment.ReferenceCounter; import org.apache.druid.segment.ReferenceCountingSegment; -import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.Joinables; import org.apache.druid.server.SegmentManager; @@ -194,7 +193,7 @@ public class ServerManager implements QuerySegmentWalker } // segmentMapFn maps each base Segment into a joined Segment if necessary. - final Function segmentMapFn = Joinables.createSegmentMapFn( + final Function segmentMapFn = Joinables.createSegmentMapFn( analysis.getPreJoinableClauses(), joinableFactory, cpuTimeAccumulator, @@ -230,7 +229,6 @@ public class ServerManager implements QuerySegmentWalker factory, toolChest, segmentMapFn.apply(segment), - segment.referenceCounter(), descriptor, cpuTimeAccumulator ) @@ -253,8 +251,7 @@ public class ServerManager implements QuerySegmentWalker private QueryRunner buildAndDecorateQueryRunner( final QueryRunnerFactory> factory, final QueryToolChest> toolChest, - final Segment segment, - final ReferenceCounter segmentReferenceCounter, + final SegmentReference segment, final SegmentDescriptor segmentDescriptor, final AtomicLong cpuTimeAccumulator ) @@ -266,7 +263,7 @@ public class ServerManager implements QuerySegmentWalker MetricsEmittingQueryRunner metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>( emitter, toolChest, - new ReferenceCountingSegmentQueryRunner<>(factory, segment, segmentReferenceCounter, segmentDescriptor), + new ReferenceCountingSegmentQueryRunner<>(factory, segment, segmentDescriptor), QueryMetrics::reportSegmentTime, queryMetrics -> queryMetrics.segment(segmentIdString) ); diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index e604b29f763..a42c553bad5 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.loading; import org.apache.druid.java.util.common.MapUtils; -import org.apache.druid.segment.AbstractSegment; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; @@ -49,7 +48,7 @@ public class CacheTestSegmentLoader implements SegmentLoader @Override public Segment getSegment(final DataSegment segment, boolean lazy) { - return new AbstractSegment() + return new Segment() { @Override public SegmentId getId() diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java new file mode 100644 index 00000000000..4f288426e50 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + +public class FireHydrantTest extends InitializedNullHandlingTest +{ + private IncrementalIndexSegment incrementalIndexSegment; + private QueryableIndexSegment queryableIndexSegment; + private FireHydrant hydrant; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setup() + { + incrementalIndexSegment = new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), SegmentId.dummy("test")); + queryableIndexSegment = new QueryableIndexSegment(TestIndex.getMMappedTestIndex(), SegmentId.dummy("test")); + + // hydrant starts out with incremental segment loaded + hydrant = new FireHydrant(incrementalIndexSegment, 0); + } + + @Test + public void testGetIncrementedSegmentNotSwapped() + { + Assert.assertEquals(0, hydrant.getHydrantSegment().getNumReferences()); + ReferenceCountingSegment segment = hydrant.getIncrementedSegment(); + Assert.assertNotNull(segment); + Assert.assertTrue(segment.getBaseSegment() == incrementalIndexSegment); + Assert.assertEquals(1, segment.getNumReferences()); + } + + @Test + public void testGetIncrementedSegmentSwapped() + { + ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + hydrant.swapSegment(queryableIndexSegment); + ReferenceCountingSegment segment = hydrant.getIncrementedSegment(); + Assert.assertNotNull(segment); + Assert.assertTrue(segment.getBaseSegment() == queryableIndexSegment); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + Assert.assertEquals(1, segment.getNumReferences()); + } + + @Test + public void testGetIncrementedSegmentClosed() + { + expectedException.expect(ISE.class); + expectedException.expectMessage("segment.close() is called somewhere outside FireHydrant.swapSegment()"); + hydrant.getHydrantSegment().close(); + Assert.assertEquals(0, hydrant.getHydrantSegment().getNumReferences()); + ReferenceCountingSegment segment = hydrant.getIncrementedSegment(); + } + + @Test + public void testGetAndIncrementSegment() throws IOException + { + ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + + Pair segmentAndCloseable = hydrant.getAndIncrementSegment(); + Assert.assertEquals(1, segmentAndCloseable.lhs.getNumReferences()); + segmentAndCloseable.rhs.close(); + Assert.assertEquals(0, segmentAndCloseable.lhs.getNumReferences()); + } + + @Test + public void testGetSegmentForQuery() throws IOException + { + ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + + Optional> maybeSegmentAndCloseable = hydrant.getSegmentForQuery( + Function.identity() + ); + Assert.assertTrue(maybeSegmentAndCloseable.isPresent()); + Assert.assertEquals(1, incrementalSegmentReference.getNumReferences()); + + Pair segmentAndCloseable = maybeSegmentAndCloseable.get(); + segmentAndCloseable.rhs.close(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + } + + @Test + public void testGetSegmentForQuerySwapped() throws IOException + { + ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment(); + hydrant.swapSegment(queryableIndexSegment); + ReferenceCountingSegment queryableSegmentReference = hydrant.getHydrantSegment(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + Assert.assertEquals(0, queryableSegmentReference.getNumReferences()); + + Optional> maybeSegmentAndCloseable = hydrant.getSegmentForQuery( + Function.identity() + ); + Assert.assertTrue(maybeSegmentAndCloseable.isPresent()); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + Assert.assertEquals(1, queryableSegmentReference.getNumReferences()); + + Pair segmentAndCloseable = maybeSegmentAndCloseable.get(); + segmentAndCloseable.rhs.close(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + Assert.assertEquals(0, queryableSegmentReference.getNumReferences()); + } + + @Test + public void testGetSegmentForQueryButNotAbleToAcquireReferences() + { + ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + + Optional> maybeSegmentAndCloseable = hydrant.getSegmentForQuery( + segmentReference -> new SegmentReference() + { + @Override + public Optional acquireReferences() + { + return Optional.empty(); + } + + @Override + public SegmentId getId() + { + return incrementalIndexSegment.getId(); + } + + @Override + public Interval getDataInterval() + { + return incrementalIndexSegment.getDataInterval(); + } + + @Nullable + @Override + public QueryableIndex asQueryableIndex() + { + return incrementalIndexSegment.asQueryableIndex(); + } + + @Override + public StorageAdapter asStorageAdapter() + { + return incrementalIndexSegment.asStorageAdapter(); + } + + @Override + public void close() + { + incrementalIndexSegment.close(); + } + } + ); + Assert.assertFalse(maybeSegmentAndCloseable.isPresent()); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + } + + @Test + public void testGetSegmentForQueryButNotAbleToAcquireReferencesSegmentClosed() + { + expectedException.expect(ISE.class); + expectedException.expectMessage("segment.close() is called somewhere outside FireHydrant.swapSegment()"); + ReferenceCountingSegment incrementalSegmentReference = hydrant.getHydrantSegment(); + Assert.assertEquals(0, incrementalSegmentReference.getNumReferences()); + incrementalSegmentReference.close(); + + Optional> maybeSegmentAndCloseable = hydrant.getSegmentForQuery( + Function.identity() + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 04f796e9918..562fc1dfffb 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.AbstractSegment; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; @@ -90,7 +89,7 @@ public class SegmentManagerTest } }; - private static class SegmentForTesting extends AbstractSegment + private static class SegmentForTesting implements Segment { private final String version; private final Interval interval; diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java index cc3a406cfe7..e41ad268d16 100644 --- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java @@ -45,6 +45,7 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.Joinables; import org.apache.druid.timeline.TimelineObjectHolder; @@ -139,7 +140,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); } - final Function segmentMapFn = Joinables.createSegmentMapFn( + final Function segmentMapFn = Joinables.createSegmentMapFn( analysis.getPreJoinableClauses(), joinableFactory, new AtomicLong(), @@ -197,7 +198,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker final QueryToolChest> toolChest, final QueryRunnerFactory> factory, final Iterable segments, - final Function segmentMapFn + final Function segmentMapFn ) { final List segmentsList = Lists.newArrayList(segments); @@ -217,7 +218,7 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker .transform( segment -> new SpecificSegmentQueryRunner<>( - factory.createRunner(segmentMapFn.apply(segment.getSegment())), + factory.createRunner(segmentMapFn.apply(ReferenceCountingSegment.wrapRootGenerationSegment(segment.getSegment()))), new SpecificSegmentSpec(segment.getDescriptor()) ) ) diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 356302cee22..9ca113e2515 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -57,7 +57,6 @@ import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.search.SearchQuery; import org.apache.druid.query.search.SearchResultValue; -import org.apache.druid.segment.AbstractSegment; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; @@ -600,7 +599,7 @@ public class ServerManagerTest } } - private static class SegmentForTesting extends AbstractSegment + private static class SegmentForTesting implements Segment { private final String version; private final Interval interval; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java index c0e73163c02..f8c3dc2a152 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java @@ -126,7 +126,7 @@ public class QueryMaker return DataSourceAnalysis.forDataSource(query.getDataSource()) .getBaseQuerySegmentSpec() .map(QuerySegmentSpec::getIntervals) - .orElse(query.getIntervals()); + .orElseGet(query::getIntervals); } private Sequence execute(Query query, final List newFields, final List newTypes)