mirror of https://github.com/apache/druid.git
Fix another deadlock which can occur while acquiring merge buffers (#16372)
Fixes a deadlock while acquiring merge buffers
This commit is contained in:
parent
03566b0115
commit
dded473ac0
|
@ -28,6 +28,7 @@ import javax.annotation.Nullable;
|
|||
import javax.inject.Inject;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Reserves the {@link GroupByQueryResources} for a given group by query and maps them to the query's resource ID.
|
||||
|
@ -67,21 +68,25 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
* nested ones execute via an unoptimized way.
|
||||
* 3. There's some knowledge to the mergeResults that the query runner passed to it is the one created by the corresponding toolchest's
|
||||
* mergeRunners (which is the typical use case). This is encoded in the argument {@code willMergeRunner}, and is to be set by the callers.
|
||||
* The only production use case where this isn't true is when the broker is merging the results gathered from the historical)
|
||||
* The only production use case where this isn't true is when the broker is merging the results gathered from the historical
|
||||
* <p>
|
||||
* TESTING
|
||||
* Unit tests mimic the broker-historical interaction in many places, which can lead to the code not working as intended because the assumptions don't hold.
|
||||
* In many test cases, there are two nested mergeResults calls, the outer call mimics what the broker does, while the inner one mimics what the historical does,
|
||||
* and the assumption (1) fails. Therefore, the testing code should assign a unique resource id b/w each mergeResults call, and also make sure that the top level mergeResults
|
||||
* would have willMergeRunner = false, since it's being called on top of a mergeResults's runner, while the inner one would have willMergeRunner = true because its being
|
||||
* called on actual runners (as it happens in the brokers, and the historicals)
|
||||
* called on actual runners (as it happens in the brokers, and the historicals).
|
||||
* <p>
|
||||
* There is a test in GroupByResourcesReservationPoolTest that checks for deadlocks when the operations are interleaved in a
|
||||
* certain maanner. It is ignored because it sleeps and can increase time when the test suite is run. Developers making any changes
|
||||
* to this class, or a related class should manually verify that all the tests in the test class are running as expected.
|
||||
*/
|
||||
public class GroupByResourcesReservationPool
|
||||
{
|
||||
/**
|
||||
* Map of query's resource id -> group by resources reserved for the query to execute
|
||||
*/
|
||||
final ConcurrentHashMap<QueryResourceId, GroupByQueryResources> pool = new ConcurrentHashMap<>();
|
||||
final ConcurrentHashMap<QueryResourceId, AtomicReference<GroupByQueryResources>> pool = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Buffer pool from where the merge buffers are picked and reserved
|
||||
|
@ -104,19 +109,42 @@ public class GroupByResourcesReservationPool
|
|||
}
|
||||
|
||||
/**
|
||||
* Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map
|
||||
* Reserves appropriate resources, and maps it to the queryResourceId (usually the query's resource id) in the internal map.
|
||||
* This is a blocking call, and can block up to the given query's timeout
|
||||
*/
|
||||
public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner)
|
||||
{
|
||||
if (queryResourceId == null) {
|
||||
throw DruidException.defensive("Query resource id must be populated");
|
||||
}
|
||||
pool.compute(queryResourceId, (id, existingResource) -> {
|
||||
if (existingResource != null) {
|
||||
throw DruidException.defensive("Resource with the given identifier [%s] is already present", id);
|
||||
}
|
||||
return GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig);
|
||||
});
|
||||
|
||||
// First check if the query resource id is present in the map, and if not, populate a dummy reference. This will
|
||||
// block other threads from populating the map with the same query id, and is essentially same as reserving a spot in
|
||||
// the map for the given query id. Since the actual allocation of the resource might take longer than expected, we
|
||||
// do it out of the critical section, once we have "reserved" the spot
|
||||
AtomicReference<GroupByQueryResources> reference = new AtomicReference<>(null);
|
||||
AtomicReference<GroupByQueryResources> existingResource = pool.putIfAbsent(queryResourceId, reference);
|
||||
|
||||
// Multiple attempts made to allocate the query resource for a given resource id. Throw an exception
|
||||
//noinspection VariableNotUsedInsideIf
|
||||
if (existingResource != null) {
|
||||
throw DruidException.defensive("Resource with the given identifier [%s] is already present", queryResourceId);
|
||||
}
|
||||
|
||||
GroupByQueryResources resources;
|
||||
try {
|
||||
// We have reserved a spot in the map. Now begin the blocking call.
|
||||
resources = GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
// Unable to allocate the resources, perform cleanup and rethrow the exception
|
||||
pool.remove(queryResourceId);
|
||||
throw t;
|
||||
}
|
||||
|
||||
// Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the
|
||||
// allocated resources from it
|
||||
reference.compareAndSet(null, resources);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,7 +153,19 @@ public class GroupByResourcesReservationPool
|
|||
@Nullable
|
||||
public GroupByQueryResources fetch(QueryResourceId queryResourceId)
|
||||
{
|
||||
return pool.get(queryResourceId);
|
||||
AtomicReference<GroupByQueryResources> resourcesReference = pool.get(queryResourceId);
|
||||
if (resourcesReference == null) {
|
||||
// There weren't any resources allocated corresponding to the provided resource id
|
||||
return null;
|
||||
}
|
||||
GroupByQueryResources resource = resourcesReference.get();
|
||||
if (resource == null) {
|
||||
throw DruidException.defensive(
|
||||
"Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found",
|
||||
queryResourceId
|
||||
);
|
||||
}
|
||||
return resource;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -133,9 +173,17 @@ public class GroupByResourcesReservationPool
|
|||
*/
|
||||
public void clean(QueryResourceId queryResourceId)
|
||||
{
|
||||
GroupByQueryResources resources = pool.remove(queryResourceId);
|
||||
if (resources != null) {
|
||||
resources.close();
|
||||
AtomicReference<GroupByQueryResources> resourcesReference = pool.remove(queryResourceId);
|
||||
if (resourcesReference != null) {
|
||||
GroupByQueryResources resource = resourcesReference.get();
|
||||
// Reference should refer to a non-empty resource
|
||||
if (resource == null) {
|
||||
throw DruidException.defensive(
|
||||
"Query id [%s] had a non-null reference in the resource reservation pool, but no resources were found",
|
||||
queryResourceId
|
||||
);
|
||||
}
|
||||
resource.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,6 +142,8 @@ public class GroupingEngine
|
|||
* {@link GroupByMergingQueryRunner} for a particular query. The resources are to be acquired once throughout the
|
||||
* execution of the query, or need to be re-acquired (if needed). Users must ensure that throughout the execution,
|
||||
* a query already holding the resources shouldn't request for more resources, because that can cause deadlocks.
|
||||
* <p>
|
||||
* This method throws an exception if it is not able to allocate sufficient resources required for the query to succeed
|
||||
*/
|
||||
public static GroupByQueryResources prepareResource(
|
||||
GroupByQuery query,
|
||||
|
|
|
@ -0,0 +1,227 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.groupby;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.collections.BlockingPool;
|
||||
import org.apache.druid.collections.DefaultBlockingPool;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.query.QueryResourceId;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
public class GroupByResourcesReservationPoolTest
|
||||
{
|
||||
|
||||
/**
|
||||
* CONFIG + QUERY require exactly 1 merge buffer to succeed if 'willMergeRunners' is true while allocating the resources
|
||||
*/
|
||||
private static final GroupByQueryConfig CONFIG = new GroupByQueryConfig();
|
||||
private static final GroupByQuery QUERY = GroupByQuery.builder()
|
||||
.setInterval(Intervals.ETERNITY)
|
||||
.setDataSource("foo")
|
||||
.setDimensions(
|
||||
ImmutableList.of(
|
||||
new DefaultDimensionSpec("dim2", "_d0")
|
||||
)
|
||||
)
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setContext(
|
||||
ImmutableMap.of("timeout", 0)
|
||||
) // Query can block indefinitely
|
||||
.build();
|
||||
|
||||
/**
|
||||
* This test confirms that the interleaved GroupByResourcesReservationPool.reserve() and GroupByResourcesReservationPool.clean()
|
||||
* between multiple threads succeed. It is specifically designed to test the case when the operations are interleaved in the
|
||||
* following manner:
|
||||
* <p>
|
||||
* THREAD1 THREAD2
|
||||
* pool.reserve(query1)
|
||||
* pool.reserve(query2)
|
||||
* pool.clean(query1)
|
||||
* <p>
|
||||
* This test assumes a few things about the implementation of the interfaces, which are laid out in the comments.
|
||||
* <p>
|
||||
* The test should complete under 10 seconds, and the majority of the time would be consumed by waiting for the thread
|
||||
* that sleeps for 5 seconds
|
||||
*/
|
||||
@Ignore(
|
||||
"Isn't run as a part of CI since it sleeps for 5 seconds. Callers must run the test manually if any changes are made "
|
||||
+ "to the corresponding class"
|
||||
)
|
||||
@Test(timeout = 100_000L)
|
||||
public void testInterleavedReserveAndRemove()
|
||||
{
|
||||
ExecutorService executor = Execs.multiThreaded(3, "group-by-resources-reservation-pool-test-%d");
|
||||
|
||||
// Sanity checks that the query will acquire exactly one merge buffer. This safeguards the test being useful in
|
||||
// case the merge buffer acquisition code changes to acquire less than one merge buffer (the test would be
|
||||
// useless in that case) or more than one merge buffer (the test would incorrectly fail in that case)
|
||||
Assert.assertEquals(
|
||||
1,
|
||||
GroupByQueryResources.countRequiredMergeBufferNumForMergingQueryRunner(CONFIG, QUERY)
|
||||
+ GroupByQueryResources.countRequiredMergeBufferNumForToolchestMerge(QUERY)
|
||||
);
|
||||
|
||||
// Blocking pool with a single buffer, which means only one of the queries can succeed at a time
|
||||
BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
|
||||
GroupByResourcesReservationPool groupByResourcesReservationPool =
|
||||
new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
|
||||
|
||||
// Latch indicating that the first thread has called reservationPool.reserve()
|
||||
CountDownLatch reserveCalledByFirstThread = new CountDownLatch(1);
|
||||
// Latch indicating that the second thread has called reservationPool.reserve()
|
||||
CountDownLatch reserveCalledBySecondThread = new CountDownLatch(1);
|
||||
// Latch indicating that all the threads have been completed successfully. Main thread waits on this latch before exiting
|
||||
CountDownLatch threadsCompleted = new CountDownLatch(2);
|
||||
|
||||
// THREAD 1
|
||||
executor.submit(() -> {
|
||||
|
||||
QueryResourceId queryResourceId1 = new QueryResourceId("test-id-1")
|
||||
{
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
// IMPORTANT ASSUMPTION: For the test to be useful, it assumes that under the hood we are using a
|
||||
// ConcurrentHashMap<QueryResourceId, GroupByResources> (or a concurrent map with similar implementation) that
|
||||
// implements granular locking of the nodes
|
||||
// The hashCode of the queryResourceId used in Thread1 and Thread2 is the same. Therefore, both the queryIds
|
||||
// would be guarded by the same lock
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
return super.equals(o);
|
||||
}
|
||||
};
|
||||
groupByResourcesReservationPool.reserve(queryResourceId1, QUERY, true);
|
||||
reserveCalledByFirstThread.countDown();
|
||||
try {
|
||||
reserveCalledBySecondThread.await();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Assert.fail("Interrupted while waiting for second reserve call to be made");
|
||||
}
|
||||
groupByResourcesReservationPool.clean(queryResourceId1);
|
||||
threadsCompleted.countDown();
|
||||
});
|
||||
|
||||
// THREAD 2
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
reserveCalledByFirstThread.await();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Assert.fail("Interrupted while waiting for first reserve call to be made");
|
||||
}
|
||||
|
||||
QueryResourceId queryResourceId2 = new QueryResourceId("test-id-2")
|
||||
{
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
return super.equals(o);
|
||||
}
|
||||
};
|
||||
|
||||
// Since the reserve() call is blocking, we need to execute it separately, so that we can count down the latch
|
||||
// and inform Thread1 the reserve call has been made by this thread
|
||||
executor.submit(() -> {
|
||||
groupByResourcesReservationPool.reserve(queryResourceId2, QUERY, true);
|
||||
threadsCompleted.countDown();
|
||||
});
|
||||
try {
|
||||
// This sleep call "ensures" that the statment pool.reserve(queryResourceId2) is called before we release the
|
||||
// latch (that will cause Thread1 to release the acquired resources). It still doesn't guarantee the previous
|
||||
// statement, however that's the best we can do, given that reserve() is blocking
|
||||
Thread.sleep(5_000);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Assert.fail("Interrupted while sleeping");
|
||||
}
|
||||
reserveCalledBySecondThread.countDown();
|
||||
});
|
||||
|
||||
try {
|
||||
threadsCompleted.await();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Assert.fail("Interrupted while waiting for the threads to complete");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleSimultaneousAllocationAttemptsFail()
|
||||
{
|
||||
BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
|
||||
GroupByResourcesReservationPool groupByResourcesReservationPool =
|
||||
new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
|
||||
QueryResourceId queryResourceId = new QueryResourceId("test-id");
|
||||
|
||||
groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
|
||||
|
||||
Assert.assertThrows(
|
||||
DruidException.class,
|
||||
() -> groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleSequentialAllocationAttemptsSucceed()
|
||||
{
|
||||
BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1);
|
||||
GroupByResourcesReservationPool groupByResourcesReservationPool =
|
||||
new GroupByResourcesReservationPool(mergeBufferPool, CONFIG);
|
||||
QueryResourceId queryResourceId = new QueryResourceId("test-id");
|
||||
|
||||
groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
|
||||
GroupByQueryResources oldResources = groupByResourcesReservationPool.fetch(queryResourceId);
|
||||
|
||||
// Cleanup the resources
|
||||
groupByResourcesReservationPool.clean(queryResourceId);
|
||||
|
||||
// Repeat the calls
|
||||
groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true);
|
||||
GroupByQueryResources newResources = groupByResourcesReservationPool.fetch(queryResourceId);
|
||||
Assert.assertNotNull(newResources);
|
||||
|
||||
Assert.assertNotSame(oldResources, newResources);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue