clean up some thread pools in tests (#17421)

This commit is contained in:
Clint Wylie 2024-10-28 09:05:15 -07:00 committed by GitHub
parent 65acc86756
commit 73675d0671
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 542 additions and 471 deletions

View File

@ -75,46 +75,53 @@ public class GuavaUtilsTest
int tasks = 3;
ExecutorService service = Execs.multiThreaded(tasks, "GuavaUtilsTest-%d");
ListeningExecutorService exc = MoreExecutors.listeningDecorator(service);
//a flag what time to throw exception.
AtomicBoolean someoneFailed = new AtomicBoolean(false);
List<CountDownLatch> latches = new ArrayList<>(tasks);
Function<Integer, List<ListenableFuture<Object>>> function = (taskCount) -> {
List<ListenableFuture<Object>> futures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
final CountDownLatch latch = new CountDownLatch(1);
latches.add(latch);
ListenableFuture<Object> future = exc.submit(new Callable<Object>() {
@Override
public Object call() throws RuntimeException, InterruptedException
try {
//a flag what time to throw exception.
AtomicBoolean someoneFailed = new AtomicBoolean(false);
List<CountDownLatch> latches = new ArrayList<>(tasks);
Function<Integer, List<ListenableFuture<Object>>> function = (taskCount) -> {
List<ListenableFuture<Object>> futures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
final CountDownLatch latch = new CountDownLatch(1);
latches.add(latch);
ListenableFuture<Object> future = exc.submit(new Callable<Object>()
{
latch.await(60, TimeUnit.SECONDS);
if (someoneFailed.compareAndSet(false, true)) {
throw new RuntimeException("This exception simulates an error");
@Override
public Object call() throws RuntimeException, InterruptedException
{
latch.await(60, TimeUnit.SECONDS);
if (someoneFailed.compareAndSet(false, true)) {
throw new RuntimeException("This exception simulates an error");
}
return null;
}
return null;
}
});
futures.add(future);
});
futures.add(future);
}
return futures;
};
List<ListenableFuture<Object>> futures = function.apply(tasks);
Assert.assertEquals(tasks, futures.stream().filter(f -> !f.isDone()).count());
// "release" the last tasks, which will cause it to fail as someoneFailed will still be false
latches.get(tasks - 1).countDown();
ListenableFuture<List<Object>> future = Futures.allAsList(futures);
ExecutionException thrown = Assert.assertThrows(
ExecutionException.class,
future::get
);
Assert.assertEquals("This exception simulates an error", thrown.getCause().getMessage());
GuavaUtils.cancelAll(true, future, futures);
Assert.assertEquals(0, futures.stream().filter(f -> !f.isDone()).count());
for (CountDownLatch latch : latches) {
latch.countDown();
}
return futures;
};
List<ListenableFuture<Object>> futures = function.apply(tasks);
Assert.assertEquals(tasks, futures.stream().filter(f -> !f.isDone()).count());
// "release" the last tasks, which will cause it to fail as someoneFailed will still be false
latches.get(tasks - 1).countDown();
ListenableFuture<List<Object>> future = Futures.allAsList(futures);
ExecutionException thrown = Assert.assertThrows(
ExecutionException.class,
future::get
);
Assert.assertEquals("This exception simulates an error", thrown.getCause().getMessage());
GuavaUtils.cancelAll(true, future, futures);
Assert.assertEquals(0, futures.stream().filter(f -> !f.isDone()).count());
for (CountDownLatch latch : latches) {
latch.countDown();
}
finally {
exc.shutdownNow();
service.shutdownNow();
}
}
}

View File

@ -72,21 +72,27 @@ public class HttpEmitterTest
.setHttpTimeoutAllowanceFactor(timeoutAllowanceFactor)
.setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER);
try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER)) {
long startMs = System.currentTimeMillis();
emitter.start();
emitter.emitAndReturnBatch(new IntEvent());
emitter.flush();
long fillTimeMs = System.currentTimeMillis() - startMs;
MatcherAssert.assertThat(
(double) timeoutUsed.get(),
Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5))
);
long startMs = System.currentTimeMillis();
emitter.start();
emitter.emitAndReturnBatch(new IntEvent());
emitter.flush();
long fillTimeMs = System.currentTimeMillis() - startMs;
MatcherAssert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
startMs = System.currentTimeMillis();
final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
Thread.sleep(1000);
batch.seal();
emitter.flush();
fillTimeMs = System.currentTimeMillis() - startMs;
MatcherAssert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)));
startMs = System.currentTimeMillis();
final Batch batch = emitter.emitAndReturnBatch(new IntEvent());
Thread.sleep(1000);
batch.seal();
emitter.flush();
fillTimeMs = System.currentTimeMillis() - startMs;
MatcherAssert.assertThat(
(double) timeoutUsed.get(),
Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5))
);
}
}
}

View File

@ -52,61 +52,63 @@ public class HttpPostEmitterLoggerStressTest
.setBatchQueueSizeLimit(10)
.setMinHttpTimeoutMillis(100)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());
try (HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper())) {
emitter.start();
emitter.start();
httpClient.setGoHandler(new GoHandler() {
@Override
protected ListenableFuture<Response> go(Request request)
httpClient.setGoHandler(new GoHandler()
{
return GoHandlers.immediateFuture(EmitterTest.okResponse());
@Override
protected ListenableFuture<Response> go(Request request)
{
return GoHandlers.immediateFuture(EmitterTest.okResponse());
}
});
Event smallEvent = ServiceMetricEvent.builder()
.setFeed("smallEvents")
.setDimension("test", "hi")
.setMetric("metric", 10)
.build("qwerty", "asdfgh");
for (int i = 0; i < 1000; i++) {
emitter.emit(smallEvent);
Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
});
Event smallEvent = ServiceMetricEvent.builder()
.setFeed("smallEvents")
.setDimension("test", "hi")
.setMetric("metric", 10)
.build("qwerty", "asdfgh");
// by the end of this test, there should be no outstanding failed buffers
for (int i = 0; i < 1000; i++) {
emitter.emit(smallEvent);
// with a flush time of 5s, min timeout of 100ms, 20s should be
// easily enough to get through all of the events
Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
while (emitter.getTotalFailedBuffers() > 0) {
Thread.sleep(500);
}
// there is also no reason to have too many log events
// refer to: https://github.com/apache/druid/issues/11279;
long countOfTimeouts = logCapture.getLogEvents().stream()
.filter(ev -> ev.getLevel() == Level.DEBUG)
.filter(ev -> ev.getThrown() instanceof TimeoutException)
.count();
// 1000 events limit, implies we should have no more than
// 1000 rejected send events within the expected 20sec
// duration of the test
long limitTimeoutEvents = 1000;
Assert.assertTrue(
String.format(
Locale.getDefault(),
"too many timeouts (%d), expect less than (%d)",
countOfTimeouts,
limitTimeoutEvents
),
countOfTimeouts < limitTimeoutEvents
);
}
// by the end of this test, there should be no outstanding failed buffers
// with a flush time of 5s, min timeout of 100ms, 20s should be
// easily enough to get through all of the events
while (emitter.getTotalFailedBuffers() > 0) {
Thread.sleep(500);
}
// there is also no reason to have too many log events
// refer to: https://github.com/apache/druid/issues/11279;
long countOfTimeouts = logCapture.getLogEvents().stream()
.filter(ev -> ev.getLevel() == Level.DEBUG)
.filter(ev -> ev.getThrown() instanceof TimeoutException)
.count();
// 1000 events limit, implies we should have no more than
// 1000 rejected send events within the expected 20sec
// duration of the test
long limitTimeoutEvents = 1000;
Assert.assertTrue(
String.format(
Locale.getDefault(),
"too many timeouts (%d), expect less than (%d)",
countOfTimeouts,
limitTimeoutEvents),
countOfTimeouts < limitTimeoutEvents);
emitter.close();
}
}

View File

@ -65,78 +65,80 @@ public class HttpPostEmitterStressTest
// For this test, we don't need any batches to be dropped, i. e. "gaps" in data
.setBatchQueueSizeLimit(1000)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER);
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
final List<List<Batch>> eventBatchesPerThread = new ArrayList<>(nThreads);
for (int i = 0; i < nThreads; i++) {
eventsPerThread.add(new IntArrayList());
eventBatchesPerThread.add(new ArrayList<Batch>());
}
for (int i = 0; i < N; i++) {
eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
}
final BitSet emittedEvents = new BitSet(N);
httpClient.setGoHandler(new GoHandler()
{
@Override
protected ListenableFuture<Response> go(Request request)
{
ByteBuffer batch = request.getByteBufferData().slice();
while (batch.remaining() > 0) {
emittedEvents.set(batch.getInt());
}
return GoHandlers.immediateFuture(EmitterTest.okResponse());
try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER)) {
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
final List<List<Batch>> eventBatchesPerThread = new ArrayList<>(nThreads);
for (int i = 0; i < nThreads; i++) {
eventsPerThread.add(new IntArrayList());
eventBatchesPerThread.add(new ArrayList<Batch>());
}
});
emitter.start();
final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
final int threadIndex = i;
new Thread() {
for (int i = 0; i < N; i++) {
eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
}
final BitSet emittedEvents = new BitSet(N);
httpClient.setGoHandler(new GoHandler()
{
@Override
public void run()
protected ListenableFuture<Response> go(Request request)
{
IntList events = eventsPerThread.get(threadIndex);
List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
IntEvent event = new IntEvent();
for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
event.index = events.getInt(i);
eventBatches.add(emitter.emitAndReturnBatch(event));
if (i % 16 == 0) {
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
ByteBuffer batch = request.getByteBufferData().slice();
while (batch.remaining() > 0) {
emittedEvents.set(batch.getInt());
}
return GoHandlers.immediateFuture(EmitterTest.okResponse());
}
});
emitter.start();
final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
final int threadIndex = i;
new Thread()
{
@Override
public void run()
{
IntList events = eventsPerThread.get(threadIndex);
List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
IntEvent event = new IntEvent();
for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
event.index = events.getInt(i);
eventBatches.add(emitter.emitAndReturnBatch(event));
if (i % 16 == 0) {
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
threadsCompleted.countDown();
}
threadsCompleted.countDown();
}
}.start();
}
threadsCompleted.await();
emitter.flush();
System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers());
for (int eventIndex = 0; eventIndex < N; eventIndex++) {
if (!emittedEvents.get(eventIndex)) {
for (int threadIndex = 0; threadIndex < eventsPerThread.size(); threadIndex++) {
IntList threadEvents = eventsPerThread.get(threadIndex);
int indexOfEvent = threadEvents.indexOf(eventIndex);
if (indexOfEvent >= 0) {
Batch batch = eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
System.err.println(batch);
int bufferWatermark = batch.getSealedBufferWatermark();
ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
batchBuffer.limit(bufferWatermark);
while (batchBuffer.remaining() > 0) {
System.err.println(batchBuffer.getInt());
}.start();
}
threadsCompleted.await();
emitter.flush();
System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers());
for (int eventIndex = 0; eventIndex < N; eventIndex++) {
if (!emittedEvents.get(eventIndex)) {
for (int threadIndex = 0; threadIndex < eventsPerThread.size(); threadIndex++) {
IntList threadEvents = eventsPerThread.get(threadIndex);
int indexOfEvent = threadEvents.indexOf(eventIndex);
if (indexOfEvent >= 0) {
Batch batch = eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
System.err.println(batch);
int bufferWatermark = batch.getSealedBufferWatermark();
ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
batchBuffer.limit(bufferWatermark);
while (batchBuffer.remaining() > 0) {
System.err.println(batchBuffer.getInt());
}
break;
}
break;
}
throw new AssertionError("event " + eventIndex);
}
throw new AssertionError("event " + eventIndex);
}
}
}
@ -151,34 +153,36 @@ public class HttpPostEmitterStressTest
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());
try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper())) {
emitter.start();
emitter.start();
httpClient.setGoHandler(new GoHandler() {
@Override
protected ListenableFuture<Response> go(Request request)
httpClient.setGoHandler(new GoHandler()
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
@Override
protected ListenableFuture<Response> go(Request request)
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
});
char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);
Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.setMetric("metric", 10)
.build("qwerty", "asdfgh");
for (int i = 0; i < 1000; i++) {
emitter.emit(bigEvent);
Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11);
}
});
char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);
Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.setMetric("metric", 10)
.build("qwerty", "asdfgh");
for (int i = 0; i < 1000; i++) {
emitter.emit(bigEvent);
Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11);
emitter.flush();
}
emitter.flush();
}
@Test
@ -191,64 +195,67 @@ public class HttpPostEmitterStressTest
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(10)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper());
try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper())) {
emitter.start();
emitter.start();
httpClient.setGoHandler(new GoHandler() {
@Override
protected ListenableFuture<Response> go(Request request)
httpClient.setGoHandler(new GoHandler()
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
});
char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);
Event smallEvent = ServiceMetricEvent.builder()
.setFeed("smallEvents")
.setDimension("test", "hi")
.setMetric("metric", 10)
.build("qwerty", "asdfgh");
Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.setMetric("metric", 10)
.build("qwerty", "asdfgh");
final CountDownLatch threadsCompleted = new CountDownLatch(2);
new Thread() {
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {
emitter.emit(smallEvent);
Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
@Override
protected ListenableFuture<Response> go(Request request)
{
return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE);
}
threadsCompleted.countDown();
}
}.start();
new Thread() {
@Override
public void run()
});
char[] chars = new char[600000];
Arrays.fill(chars, '*');
String bigString = new String(chars);
Event smallEvent = ServiceMetricEvent.builder()
.setFeed("smallEvents")
.setDimension("test", "hi")
.setMetric("metric", 10)
.build("qwerty", "asdfgh");
Event bigEvent = ServiceMetricEvent.builder()
.setFeed("bigEvents")
.setDimension("test", bigString)
.setMetric("metric", 10)
.build("qwerty", "asdfgh");
final CountDownLatch threadsCompleted = new CountDownLatch(2);
new Thread()
{
for (int i = 0; i < 1000; i++) {
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {
emitter.emit(bigEvent);
emitter.emit(smallEvent);
Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
threadsCompleted.countDown();
}
threadsCompleted.countDown();
}
}.start();
threadsCompleted.await();
emitter.flush();
}.start();
new Thread()
{
@Override
public void run()
{
for (int i = 0; i < 1000; i++) {
emitter.emit(bigEvent);
Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10);
Assert.assertTrue(emitter.getBuffersToEmit() <= 12);
}
threadsCompleted.countDown();
}
}.start();
threadsCompleted.await();
emitter.flush();
}
}
}

View File

@ -73,26 +73,26 @@ public class HttpPostEmitterTest
.setMaxBatchSize(1024 * 1024)
.setBatchQueueSizeLimit(1000)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER);
emitter.start();
try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER)) {
emitter.start();
// emit first event
emitter.emitAndReturnBatch(new IntEvent());
Thread.sleep(1000L);
// emit first event
emitter.emitAndReturnBatch(new IntEvent());
Thread.sleep(1000L);
// get concurrentBatch reference and set value to lon as if it would fail while
// HttpPostEmitter#onSealExclusive method invocation.
Field concurrentBatch = emitter.getClass().getDeclaredField("concurrentBatch");
concurrentBatch.setAccessible(true);
((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L);
// something terrible happened previously so that batch has to recover
// get concurrentBatch reference and set value to lon as if it would fail while
// HttpPostEmitter#onSealExclusive method invocation.
Field concurrentBatch = emitter.getClass().getDeclaredField("concurrentBatch");
concurrentBatch.setAccessible(true);
((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L);
// something terrible happened previously so that batch has to recover
// emit second event
emitter.emitAndReturnBatch(new IntEvent());
// emit second event
emitter.emitAndReturnBatch(new IntEvent());
emitter.flush();
emitter.close();
emitter.flush();
Assert.assertEquals(2, emitter.getTotalEmittedEvents());
Assert.assertEquals(2, emitter.getTotalEmittedEvents());
}
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
@ -50,6 +51,12 @@ public class BasicMonitorSchedulerTest
exec = Execs.scheduledSingleThreaded("BasicMonitorSchedulerTest");
}
@After
public void teardown()
{
exec.shutdownNow();
}
@Test
public void testStart_RepeatScheduling() throws InterruptedException
{

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.jackson.DefaultObjectMapper;
@ -150,67 +151,72 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
temporaryFolder.newFolder(),
1024 * 1024
);
final ListeningExecutorService service = MoreExecutors.listeningDecorator(exec);
try {
final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
bufferSupplier,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
1024,
0.7f,
1,
temporaryStorage,
new DefaultObjectMapper(),
concurrencyHint,
null,
false,
service,
0,
false,
0,
4,
parallelCombineThreads,
mergeThreadLocal
);
closer.register(grouper);
grouper.init();
final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
bufferSupplier,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
1024,
0.7f,
1,
temporaryStorage,
new DefaultObjectMapper(),
concurrencyHint,
null,
false,
MoreExecutors.listeningDecorator(exec),
0,
false,
0,
4,
parallelCombineThreads,
mergeThreadLocal
);
closer.register(grouper);
grouper.init();
final int numRows = 1000;
final int numRows = 1000;
Future<?>[] futures = new Future[concurrencyHint];
Future<?>[] futures = new Future[concurrencyHint];
for (int i = 0; i < concurrencyHint; i++) {
futures[i] = exec.submit(() -> {
for (long j = 0; j < numRows; j++) {
if (!grouper.aggregate(new LongKey(j)).isOk()) {
throw new ISE("Grouper is full");
for (int i = 0; i < concurrencyHint; i++) {
futures[i] = exec.submit(() -> {
for (long j = 0; j < numRows; j++) {
if (!grouper.aggregate(new LongKey(j)).isOk()) {
throw new ISE("Grouper is full");
}
}
}
});
});
}
for (Future eachFuture : futures) {
eachFuture.get();
}
final List<Entry<LongKey>> expected = new ArrayList<>();
for (long i = 0; i < numRows; i++) {
expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long) concurrencyHint}));
}
final CloseableIterator<Entry<LongKey>> iterator = closer.register(grouper.iterator(true));
if (parallelCombineThreads > 1 && (mergeThreadLocal || temporaryStorage.currentSize() > 0)) {
// Parallel combiner configured, and expected to actually be used due to thread-local merge (either explicitly
// configured, or due to spilling).
Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
} else {
Assert.assertFalse(TEST_RESOURCE_HOLDER.taken);
}
GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator);
}
for (Future eachFuture : futures) {
eachFuture.get();
finally {
service.shutdownNow();
}
final List<Entry<LongKey>> expected = new ArrayList<>();
for (long i = 0; i < numRows; i++) {
expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long) concurrencyHint}));
}
final CloseableIterator<Entry<LongKey>> iterator = closer.register(grouper.iterator(true));
if (parallelCombineThreads > 1 && (mergeThreadLocal || temporaryStorage.currentSize() > 0)) {
// Parallel combiner configured, and expected to actually be used due to thread-local merge (either explicitly
// configured, or due to spilling).
Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
} else {
Assert.assertFalse(TEST_RESOURCE_HOLDER.taken);
}
GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator);
}
@Test
@ -221,56 +227,62 @@ public class ConcurrentGrouperTest extends InitializedNullHandlingTest
return;
}
final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
bufferSupplier,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
1024,
0.7f,
1,
new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
new DefaultObjectMapper(),
concurrencyHint,
null,
false,
MoreExecutors.listeningDecorator(exec),
0,
true,
1,
4,
parallelCombineThreads,
mergeThreadLocal
);
closer.register(grouper);
grouper.init();
ListeningExecutorService service = MoreExecutors.listeningDecorator(exec);
try {
final ConcurrentGrouper<LongKey> grouper = new ConcurrentGrouper<>(
bufferSupplier,
TEST_RESOURCE_HOLDER,
KEY_SERDE_FACTORY,
KEY_SERDE_FACTORY,
NULL_FACTORY,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
1024,
0.7f,
1,
new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
new DefaultObjectMapper(),
concurrencyHint,
null,
false,
service,
0,
true,
1,
4,
parallelCombineThreads,
mergeThreadLocal
);
closer.register(grouper);
grouper.init();
final int numRows = 1000;
final int numRows = 1000;
Future<?>[] futures = new Future[concurrencyHint];
Future<?>[] futures = new Future[concurrencyHint];
for (int i = 0; i < concurrencyHint; i++) {
futures[i] = exec.submit(() -> {
for (long j = 0; j < numRows; j++) {
if (!grouper.aggregate(new LongKey(j)).isOk()) {
throw new ISE("Grouper is full");
for (int i = 0; i < concurrencyHint; i++) {
futures[i] = exec.submit(() -> {
for (long j = 0; j < numRows; j++) {
if (!grouper.aggregate(new LongKey(j)).isOk()) {
throw new ISE("Grouper is full");
}
}
}
});
});
}
for (Future eachFuture : futures) {
eachFuture.get();
}
final QueryTimeoutException e = Assert.assertThrows(
QueryTimeoutException.class,
() -> closer.register(grouper.iterator(true))
);
Assert.assertEquals("Query timeout", e.getErrorCode());
}
for (Future eachFuture : futures) {
eachFuture.get();
finally {
service.shutdownNow();
}
final QueryTimeoutException e = Assert.assertThrows(
QueryTimeoutException.class,
() -> closer.register(grouper.iterator(true))
);
Assert.assertEquals("Query timeout", e.getErrorCode());
}
static class TestResourceHolder extends ReferenceCountingResourceHolder<ByteBuffer>

View File

@ -299,30 +299,35 @@ public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
Execs.multiThreaded(threads, "NestedColumnSupplierTest-%d")
);
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
smokeTest(column);
try {
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
smokeTest(column);
}
}
}
}
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
finally {
executorService.shutdownNow();
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(NestedDataComplexColumn column) throws IOException

View File

@ -255,30 +255,35 @@ public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d")
Execs.multiThreaded(threads, "NestedDataColumnSupplierV4Test-%d")
);
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
smokeTest(column);
try {
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
smokeTest(column);
}
}
}
}
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
finally {
executorService.shutdownNow();
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
@Test

View File

@ -219,30 +219,35 @@ public class ScalarDoubleColumnSupplierTest extends InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
Execs.multiThreaded(threads, "ScalarDoubleColumnSupplierTest-%d")
);
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (ScalarDoubleColumn column = (ScalarDoubleColumn) supplier.get()) {
smokeTest(supplier, column);
try {
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (ScalarDoubleColumn column = (ScalarDoubleColumn) supplier.get()) {
smokeTest(supplier, column);
}
}
}
}
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
finally {
executorService.shutdownNow();
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(ScalarDoubleColumnAndIndexSupplier supplier, ScalarDoubleColumn column)

View File

@ -219,30 +219,35 @@ public class ScalarLongColumnSupplierTest extends InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
Execs.multiThreaded(threads, "ScalarLongColumnSupplierTest-%d")
);
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (ScalarLongColumn column = (ScalarLongColumn) supplier.get()) {
smokeTest(supplier, column);
try {
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (ScalarLongColumn column = (ScalarLongColumn) supplier.get()) {
smokeTest(supplier, column);
}
}
}
}
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
finally {
executorService.shutdownNow();
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(ScalarLongColumnAndIndexSupplier supplier, ScalarLongColumn column)

View File

@ -217,30 +217,35 @@ public class ScalarStringColumnSupplierTest extends InitializedNullHandlingTest
final int threads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
Execs.multiThreaded(threads, "ScalarStringColumnSupplierTest-%d")
);
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (StringUtf8DictionaryEncodedColumn column = (StringUtf8DictionaryEncodedColumn) supplier.get()) {
smokeTest(supplier, column);
try {
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (StringUtf8DictionaryEncodedColumn column = (StringUtf8DictionaryEncodedColumn) supplier.get()) {
smokeTest(supplier, column);
}
}
}
}
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
finally {
executorService.shutdownNow();
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(ScalarStringColumnAndIndexSupplier supplier, StringUtf8DictionaryEncodedColumn column)

View File

@ -346,28 +346,33 @@ public class VariantColumnSupplierTest extends InitializedNullHandlingTest
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d")
);
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (VariantColumn column = (VariantColumn) supplier.get()) {
smokeTest(supplier, column, data, expectedTypes);
try {
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (VariantColumn column = (VariantColumn) supplier.get()) {
smokeTest(supplier, column, data, expectedTypes);
}
}
}
}
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
finally {
executorService.shutdownNow();
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(