Setting index/bulk thread pools with queue_size can cause replica shard failures

closes #3526
This commit is contained in:
Shay Banon 2013-08-23 19:24:47 +02:00
parent 19cce0b329
commit 86c95ab2ab
9 changed files with 398 additions and 77 deletions

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.indices.IndicesService;
@ -695,7 +696,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (request.operationThreaded()) {
request.beforeLocalFork();
try {
threadPool.executor(executor).execute(new Runnable() {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void run() {
try {
@ -710,6 +711,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
listener.onResponse(response.response());
}
}
// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}
});
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {

View File

@ -0,0 +1,33 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
/**
* An extension to runnable.
*/
public abstract class AbstractRunnable implements Runnable {
/**
* Should the runnable force its execution in case it gets rejected?
*/
public boolean isForceExecution() {
return false;
}
}

View File

@ -19,8 +19,11 @@
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.common.metrics.CounterMetric;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
@ -31,6 +34,20 @@ public class EsAbortPolicy implements XRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if (!(queue instanceof SizeBlockingQueue)) {
throw new ElasticSearchIllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue) queue).forcePut(r);
} catch (InterruptedException e) {
throw new ElasticSearchInterruptedException(e.getMessage(), e);
}
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of [" + r.getClass().getName() + "]");
}

View File

@ -57,14 +57,16 @@ public class EsExecutors {
return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy());
}
public static EsThreadPoolExecutor newFixed(int size, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
public static EsThreadPoolExecutor newFixed(int size, int queueCapacity, ThreadFactory threadFactory) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue();
} else {
queue = new SizeBlockingQueue<Runnable>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
}
return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy());
}
public static EsThreadPoolExecutor newFixed(int size, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, XRejectedExecutionHandler rejectedExecutionHandler) {
return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, rejectedExecutionHandler);
}
public static String threadName(Settings settings, String namePrefix) {
String name = settings.get("name");
if (name == null) {
@ -111,6 +113,7 @@ public class EsExecutors {
private EsExecutors() {
}
static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
ThreadPoolExecutor executor;

View File

@ -0,0 +1,204 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticSearchIllegalStateException;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A size based queue wrapping another blocking queue to provide (somewhat relaxed) capacity checks.
* Mainly makes sense to use with blocking queues that are unbounded to provide the ability to do
* capacity verification.
*/
public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> queue;
private final int capacity;
private final AtomicInteger size = new AtomicInteger();
public SizeBlockingQueue(BlockingQueue<E> queue, int capacity) {
assert capacity >= 0;
this.queue = queue;
this.capacity = capacity;
}
@Override
public int size() {
return size.get();
}
@Override
public Iterator<E> iterator() {
final Iterator<E> it = queue.iterator();
return new Iterator<E>() {
E current;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public E next() {
current = it.next();
return current;
}
@Override
public void remove() {
// note, we can't call #remove on the iterator because we need to know
// if it was removed or not
if (queue.remove(current)) {
size.decrementAndGet();
}
}
};
}
@Override
public E peek() {
return queue.peek();
}
@Override
public E poll() {
E e = queue.poll();
if (e != null) {
size.decrementAndGet();
}
return e;
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = queue.poll(timeout, unit);
if (e != null) {
size.decrementAndGet();
}
return e;
}
@Override
public boolean remove(Object o) {
boolean v = queue.remove(o);
if (v) {
size.decrementAndGet();
}
return v;
}
/**
* Forces adding an element to the queue, without doing size checks.
*/
public void forcePut(E e) throws InterruptedException {
size.incrementAndGet();
try {
queue.put(e);
} catch (InterruptedException ie) {
size.decrementAndGet();
throw ie;
}
}
@Override
public boolean offer(E e) {
int count = size.incrementAndGet();
if (count > capacity) {
size.decrementAndGet();
return false;
}
boolean offered = queue.offer(e);
if (!offered) {
size.decrementAndGet();
}
return offered;
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// note, not used in ThreadPoolExecutor
throw new ElasticSearchIllegalStateException("offer with timeout not allowed on size queue");
}
@Override
public void put(E e) throws InterruptedException {
// note, not used in ThreadPoolExecutor
throw new ElasticSearchIllegalStateException("put not allowed on size queue");
}
@Override
public E take() throws InterruptedException {
E e;
try {
e = queue.take();
size.decrementAndGet();
} catch (InterruptedException ie) {
throw ie;
}
return e;
}
@Override
public int remainingCapacity() {
return capacity - size.get();
}
@Override
public int drainTo(Collection<? super E> c) {
int v = queue.drainTo(c);
size.addAndGet(-v);
return v;
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
int v = queue.drainTo(c, maxElements);
size.addAndGet(-v);
return v;
}
@Override
public Object[] toArray() {
return queue.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return (T[]) queue.toArray(a);
}
@Override
public boolean contains(Object o) {
return queue.contains(o);
}
@Override
public boolean containsAll(Collection<?> c) {
return queue.containsAll(c);
}
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
@ -298,19 +297,17 @@ public class ThreadPool extends AbstractComponent {
} else if ("fixed".equals(type)) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors());
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));
String defaultQueueType = defaultSettings.get("queue_type", "linked");
if (previousExecutorHolder != null) {
if ("fixed".equals(previousInfo.getType())) {
SizeValue updatedQueueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.getQueueSize())));
String updatedQueueType = settings.get("queue_type", previousInfo.getQueueType());
if (Objects.equal(previousInfo.getQueueSize(), updatedQueueSize) && previousInfo.getQueueType().equals(updatedQueueType)) {
if (Objects.equal(previousInfo.getQueueSize(), updatedQueueSize)) {
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
if (previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}], queue_type [{}]", name, type, updatedSize, updatedQueueSize, updatedQueueType);
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setCorePoolSize(updatedSize);
((EsThreadPoolExecutor) previousExecutorHolder.executor).setMaximumPoolSize(updatedSize);
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize, updatedQueueType));
return new ExecutorHolder(previousExecutorHolder.executor, new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize));
}
return previousExecutorHolder;
}
@ -319,18 +316,13 @@ public class ThreadPool extends AbstractComponent {
defaultSize = previousInfo.getMax();
}
defaultQueueSize = previousInfo.getQueueSize();
if (previousInfo.getQueueType() != null) {
defaultQueueType = previousInfo.getQueueType();
}
}
int size = settings.getAsInt("size", defaultSize);
SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize)));
String queueType = settings.get("queue_type", defaultQueueType);
BlockingQueue<Runnable> workQueue = newQueue(queueSize, queueType);
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], queue_type [{}]", name, type, size, queueSize, queueType);
Executor executor = EsExecutors.newFixed(size, workQueue, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize, queueType));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
Executor executor = EsExecutors.newFixed(size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize));
} else if ("scaling".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
@ -403,24 +395,6 @@ public class ThreadPool extends AbstractComponent {
}
}
private BlockingQueue<Runnable> newQueue(SizeValue queueSize, String queueType) {
if (queueSize == null) {
return ConcurrentCollections.newBlockingQueue();
} else if (queueSize.singles() == 0) {
return new SynchronousQueue<Runnable>();
} else if (queueSize.singles() > 0) {
if ("linked".equals(queueType)) {
return new LinkedBlockingQueue<Runnable>((int) queueSize.singles());
} else if ("array".equals(queueType)) {
return new ArrayBlockingQueue<Runnable>((int) queueSize.singles());
} else {
throw new ElasticSearchIllegalArgumentException("illegal queue_type set to [" + queueType + "], should be either linked or array");
}
} else { // queueSize.singles() < 0, just treat it as unbounded queue
return ConcurrentCollections.newBlockingQueue();
}
}
class ExecutorShutdownListener implements EsThreadPoolExecutor.ShutdownListener {
private ExecutorHolder holder;
@ -555,7 +529,6 @@ public class ThreadPool extends AbstractComponent {
private int max;
private TimeValue keepAlive;
private SizeValue queueSize;
private String queueType;
Info() {
@ -570,17 +543,12 @@ public class ThreadPool extends AbstractComponent {
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) {
this(name, type, min, max, keepAlive, queueSize, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize, String queueType) {
this.name = name;
this.type = type;
this.min = min;
this.max = max;
this.keepAlive = keepAlive;
this.queueSize = queueSize;
this.queueType = queueType;
}
public String getName() {
@ -609,12 +577,6 @@ public class ThreadPool extends AbstractComponent {
return this.queueSize;
}
@Nullable
public String getQueueType() {
return this.queueType;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
@ -629,7 +591,7 @@ public class ThreadPool extends AbstractComponent {
}
in.readBoolean(); // here to conform with removed waitTime
in.readBoolean(); // here to conform with removed rejected setting
queueType = in.readOptionalString();
in.readBoolean(); // here to conform with queue type
}
@Override
@ -650,9 +612,9 @@ public class ThreadPool extends AbstractComponent {
out.writeBoolean(true);
queueSize.writeTo(out);
}
out.writeBoolean(false); // here to conform with remobed waitTime
out.writeBoolean(false); // here to conform with removed waitTime
out.writeBoolean(false); // here to conform with removed rejected setting
out.writeOptionalString(queueType);
out.writeBoolean(false); // here to conform with queue type
}
@Override
@ -671,9 +633,6 @@ public class ThreadPool extends AbstractComponent {
if (queueSize != null) {
builder.field(Fields.QUEUE_SIZE, queueSize.toString());
}
if (queueType != null) {
builder.field(Fields.QUEUE_TYPE, queueType);
}
builder.endObject();
return builder;
}
@ -684,7 +643,6 @@ public class ThreadPool extends AbstractComponent {
static final XContentBuilderString MAX = new XContentBuilderString("max");
static final XContentBuilderString KEEP_ALIVE = new XContentBuilderString("keep_alive");
static final XContentBuilderString QUEUE_SIZE = new XContentBuilderString("queue_size");
static final XContentBuilderString QUEUE_TYPE = new XContentBuilderString("queue_type");
}
}

View File

@ -96,7 +96,6 @@ public class SimpleThreadPoolTests extends AbstractNodesTests {
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.getName().equals(Names.SEARCH)) {
assertThat(info.getType(), equalTo("fixed"));
assertThat(info.getQueueType(), equalTo("linked"));
found = true;
break;
}
@ -104,7 +103,6 @@ public class SimpleThreadPoolTests extends AbstractNodesTests {
assertThat(found, equalTo(true));
Map<String, Object> poolMap = getPoolSettingsThroughJson(nodeInfo.getThreadPool(), Names.SEARCH);
assertThat(poolMap.get("queue_type").toString(), equalTo("linked"));
}
}

View File

@ -20,14 +20,14 @@
package org.elasticsearch.test.unit.common.util.concurrent;
import com.google.common.base.Predicate;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadBarrier;
import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.test.integration.ElasticsearchTestCase;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
@ -40,6 +40,119 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
return TimeUnit.values()[between(0, TimeUnit.values().length - 1)];
}
@Test
public void testFixedForcedExecution() throws Exception {
EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test"));
final CountDownLatch wait = new CountDownLatch(1);
final CountDownLatch exec1Wait = new CountDownLatch(1);
final AtomicBoolean executed1 = new AtomicBoolean();
executor.execute(new Runnable() {
@Override
public void run() {
try {
wait.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
executed1.set(true);
exec1Wait.countDown();
}
});
final CountDownLatch exec2Wait = new CountDownLatch(1);
final AtomicBoolean executed2 = new AtomicBoolean();
executor.execute(new Runnable() {
@Override
public void run() {
executed2.set(true);
exec2Wait.countDown();
}
});
final AtomicBoolean executed3 = new AtomicBoolean();
final CountDownLatch exec3Wait = new CountDownLatch(1);
executor.execute(new AbstractRunnable() {
@Override
public void run() {
executed3.set(true);
exec3Wait.countDown();
}
@Override
public boolean isForceExecution() {
return true;
}
});
wait.countDown();
exec1Wait.await();
exec2Wait.await();
exec3Wait.await();
assertThat(executed1.get(), equalTo(true));
assertThat(executed2.get(), equalTo(true));
assertThat(executed3.get(), equalTo(true));
executor.shutdownNow();
}
@Test
public void testFixedRejected() throws Exception {
EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test"));
final CountDownLatch wait = new CountDownLatch(1);
final CountDownLatch exec1Wait = new CountDownLatch(1);
final AtomicBoolean executed1 = new AtomicBoolean();
executor.execute(new Runnable() {
@Override
public void run() {
try {
wait.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
executed1.set(true);
exec1Wait.countDown();
}
});
final CountDownLatch exec2Wait = new CountDownLatch(1);
final AtomicBoolean executed2 = new AtomicBoolean();
executor.execute(new Runnable() {
@Override
public void run() {
executed2.set(true);
exec2Wait.countDown();
}
});
final AtomicBoolean executed3 = new AtomicBoolean();
try {
executor.execute(new Runnable() {
@Override
public void run() {
executed3.set(true);
}
});
assert false : "should be rejected...";
} catch (EsRejectedExecutionException e) {
// all is well
}
wait.countDown();
exec1Wait.await();
exec2Wait.await();
assertThat(executed1.get(), equalTo(true));
assertThat(executed2.get(), equalTo(true));
assertThat(executed3.get(), equalTo(false));
executor.shutdownNow();
}
@Test
public void testScaleUp() throws Exception {
final int min = between(1, 3);

View File

@ -26,7 +26,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.junit.Test;
import java.util.concurrent.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
@ -101,7 +104,6 @@ public class UpdateThreadPoolSettingsTests {
@Test
public void testFixedExecutorType() {
ThreadPool threadPool = new ThreadPool(settingsBuilder().put("threadpool.search.type", "fixed").build(), null);
assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("linked"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
@ -151,20 +153,6 @@ public class UpdateThreadPoolSettingsTests {
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue", "500")
.build());
assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("linked"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(LinkedBlockingQueue.class));
// Set different queue and size type
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue_type", "array")
.put("threadpool.search.size", "12")
.build());
// Make sure keep size changed
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed"));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(12));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(12));
assertThat(info(threadPool, Names.SEARCH).getQueueType(), equalTo("array"));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue(), instanceOf(ArrayBlockingQueue.class));
threadPool.shutdown();
}