Fix size blocking queue to not lie about its weight

Today when offering an item to a size blocking queue that is at
capacity, we first increment the size of the queue and then check if the
capacity is exceeded or not. If the capacity is indeed exceeded, we do
not add the item to the queue and immediately decrement the size of the
queue. However, this incremented size is exposed externally even though
the offered item was never added to the queue (this is effectively a
race on the size of the queue). This can lead to misleading statistics
such as the size of a queue backing a thread pool. This commit fixes
this issue so that such a size is never exposed. To do this, we replace
the hidden CAS loop that increments the size of the queue with a CAS
loop that only increments the size of the queue if we are going to be
successful in adding the item to the queue.

Relates #28557
This commit is contained in:
Jason Tedor 2018-02-08 06:54:39 -05:00 committed by GitHub
parent 16f7e00514
commit 86fd48e5f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 102 additions and 4 deletions

View File

@ -130,10 +130,14 @@ public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQu
@Override @Override
public boolean offer(E e) { public boolean offer(E e) {
int count = size.incrementAndGet(); while (true) {
if (count > capacity()) { final int current = size.get();
size.decrementAndGet(); if (current >= capacity()) {
return false; return false;
}
if (size.compareAndSet(current, 1 + current)) {
break;
}
} }
boolean offered = queue.offer(e); boolean offered = queue.offer(e);
if (!offered) { if (!offered) {

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.equalTo;
public class SizeBlockingQueueTests extends ESTestCase {
/*
* Tests that the size of a queue remains at most the capacity while offers are made to a queue when at capacity. This test would have
* previously failed when the size of the queue was incremented and exposed externally even though the item offered to the queue was
* never actually added to the queue.
*/
public void testQueueSize() throws InterruptedException {
final int capacity = randomIntBetween(1, 32);
final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(capacity);
final SizeBlockingQueue<Integer> sizeBlockingQueue = new SizeBlockingQueue<>(blockingQueue, capacity);
// fill the queue to capacity
for (int i = 0; i < capacity; i++) {
sizeBlockingQueue.offer(i);
}
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean spin = new AtomicBoolean(true);
final AtomicInteger maxSize = new AtomicInteger();
// this thread will repeatedly poll the size of the queue keeping track of the maximum size that it sees
final Thread queueSizeThread = new Thread(() -> {
try {
latch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
while (spin.get()) {
maxSize.set(Math.max(maxSize.get(), sizeBlockingQueue.size()));
}
});
queueSizeThread.start();
// this thread will try to offer items to the queue while the queue size thread is polling the size
final Thread queueOfferThread = new Thread(() -> {
try {
latch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
for (int i = 0; i < 4096; i++) {
sizeBlockingQueue.offer(capacity + i);
}
});
queueOfferThread.start();
// synchronize the start of the two threads
latch.countDown();
// wait for the offering thread to finish
queueOfferThread.join();
// stop the queue size thread
spin.set(false);
queueSizeThread.join();
// the maximum size of the queue should be equal to the capacity
assertThat(maxSize.get(), equalTo(capacity));
}
}