$expunge operation ignoring ExpungeThreadCount setting in certain cases (#5637)

* $expunge operation ignoring ExpungeThreadCount setting in certain cases - implementation
This commit is contained in:
volodymyr-korzh 2024-01-30 07:17:49 -07:00 committed by GitHub
parent c7fd015195
commit 500490761b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 42 additions and 6 deletions

View File

@ -0,0 +1,7 @@
---
type: fix
issue: 5636
jira: SMILE-7648
title: "Previously, the number of threads allocated to the $expunge operation in certain cases could be more
than configured, this would cause hundreds of threads to be created and all available database connections
to be consumed. This has been fixed."

View File

@ -18,7 +18,8 @@ import java.util.Set;
import java.util.function.Consumer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -98,10 +99,10 @@ public class PartitionRunnerTest {
getPartitionRunner(5).runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();
PartitionCall partitionCall1 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
assertThat(partitionCall1.threadName, isOneOf(TEST_THREADNAME_1, TEST_THREADNAME_2));
assertThat(partitionCall1.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
assertEquals(5, partitionCall1.size);
PartitionCall partitionCall2 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
assertThat(partitionCall2.threadName, isOneOf(TEST_THREADNAME_1, TEST_THREADNAME_2));
assertThat(partitionCall2.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
assertEquals(5, partitionCall2.size);
assertNotEquals(partitionCall1.threadName, partitionCall2.threadName);
}
@ -119,14 +120,38 @@ public class PartitionRunnerTest {
getPartitionRunner(5).runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();
PartitionCall partitionCall1 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
assertThat(partitionCall1.threadName, isOneOf(TEST_THREADNAME_1, TEST_THREADNAME_2));
assertThat(partitionCall1.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
assertEquals(true, nums.remove(partitionCall1.size));
PartitionCall partitionCall2 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
assertThat(partitionCall2.threadName, isOneOf(TEST_THREADNAME_1, TEST_THREADNAME_2));
assertThat(partitionCall2.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
assertEquals(true, nums.remove(partitionCall2.size));
assertNotEquals(partitionCall1.threadName, partitionCall2.threadName);
}
/**
* See #5636 $expunge operation ignoring ExpungeThreadCount setting in certain cases
*/
@Test
public void testExpunge_withTasksSizeBiggerThanExecutorQueue_usesConfiguredNumberOfThreads() throws InterruptedException {
// setup
List<IResourcePersistentId> resourceIds = buildPidList(2500);
Consumer<List<IResourcePersistentId>> partitionConsumer = buildPartitionConsumer(myLatch);
// with batch size = 2 we expect 2500/2 runnableTasks to be created
myLatch.setExpectedCount(1250);
// execute
getPartitionRunner(2, 2).runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();
// validate - only two threads should be used for execution
for (int i = 0; i < 1250; i++) {
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, i);
assertThat(partitionCall.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
}
}
@Test
public void tenItemsOneThread() throws InterruptedException {
List<IResourcePersistentId> resourceIds = buildPidList(10);

View File

@ -184,9 +184,13 @@ public class PartitionRunner {
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
};
// setting corePoolSize and maximumPoolSize to be the same as threadCount
// to ensure that the number of allocated threads for the expunge operation does not exceed the configured limit
// see ThreadPoolExecutor documentation for details
return new ThreadPoolExecutor(
threadCount,
MAX_POOL_SIZE,
threadCount,
0L,
TimeUnit.MILLISECONDS,
executorQueue,