refactor new thread pools in, remove dynamic ones
This commit is contained in:
parent
de4b21e986
commit
703657699a
|
@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import static java.util.concurrent.Executors.*;
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
|
|
@ -30,7 +30,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
|
|
@ -1,208 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* Factory and utility methods for handling {@link DynamicThreadPoolExecutor}.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class DynamicExecutors {
|
||||
/**
|
||||
* Creates a thread pool that creates new threads as needed, but will reuse
|
||||
* previously constructed threads when they are available. Calls to
|
||||
* <tt>execute</tt> will reuse previously constructed threads if
|
||||
* available. If no existing thread is available, a new thread will be
|
||||
* created and added to the pool. No more than <tt>max</tt> threads will
|
||||
* be created. Threads that have not been used for a <tt>keepAlive</tt>
|
||||
* timeout are terminated and removed from the cache. Thus, a pool that
|
||||
* remains idle for long enough will not consume any resources other than
|
||||
* the <tt>min</tt> specified.
|
||||
*
|
||||
* @param min the number of threads to keep in the pool, even if they are
|
||||
* idle.
|
||||
* @param max the maximum number of threads to allow in the pool.
|
||||
* @param keepAliveTime when the number of threads is greater than the min,
|
||||
* this is the maximum time that excess idle threads will wait
|
||||
* for new tasks before terminating (in milliseconds).
|
||||
* @return the newly created thread pool
|
||||
*/
|
||||
public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) {
|
||||
return newScalingThreadPool(min, max, keepAliveTime, Executors.defaultThreadFactory());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a thread pool, same as in
|
||||
* {@link #newScalingThreadPool(int, int, long)}, using the provided
|
||||
* ThreadFactory to create new threads when needed.
|
||||
*
|
||||
* @param min the number of threads to keep in the pool, even if they are
|
||||
* idle.
|
||||
* @param max the maximum number of threads to allow in the pool.
|
||||
* @param keepAliveTime when the number of threads is greater than the min,
|
||||
* this is the maximum time that excess idle threads will wait
|
||||
* for new tasks before terminating (in milliseconds).
|
||||
* @param threadFactory the factory to use when creating new threads.
|
||||
* @return the newly created thread pool
|
||||
*/
|
||||
public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime, ThreadFactory threadFactory) {
|
||||
DynamicThreadPoolExecutor.DynamicQueue<Runnable> queue = new DynamicThreadPoolExecutor.DynamicQueue<Runnable>();
|
||||
ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory);
|
||||
executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.ForceQueuePolicy());
|
||||
queue.setThreadPoolExecutor(executor);
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a thread pool similar to that constructed by
|
||||
* {@link #newScalingThreadPool(int, int, long)}, but blocks the call to
|
||||
* <tt>execute</tt> if the queue has reached it's capacity, and all
|
||||
* <tt>max</tt> threads are busy handling requests.
|
||||
* <p/>
|
||||
* If the wait time of this queue has elapsed, a
|
||||
* {@link RejectedExecutionException} will be thrown.
|
||||
*
|
||||
* @param min the number of threads to keep in the pool, even if they are
|
||||
* idle.
|
||||
* @param max the maximum number of threads to allow in the pool.
|
||||
* @param keepAliveTime when the number of threads is greater than the min,
|
||||
* this is the maximum time that excess idle threads will wait
|
||||
* for new tasks before terminating (in milliseconds).
|
||||
* @param capacity the fixed capacity of the underlying queue (resembles
|
||||
* backlog).
|
||||
* @param waitTime the wait time (in milliseconds) for space to become
|
||||
* available in the queue.
|
||||
* @return the newly created thread pool
|
||||
*/
|
||||
public static ExecutorService newBlockingThreadPool(int min, int max, long keepAliveTime, int capacity, long waitTime) {
|
||||
return newBlockingThreadPool(min, max, keepAliveTime, capacity, waitTime, Executors.defaultThreadFactory());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a thread pool, same as in
|
||||
* {@link #newBlockingThreadPool(int, int, long, int, long)}, using the
|
||||
* provided ThreadFactory to create new threads when needed.
|
||||
*
|
||||
* @param min the number of threads to keep in the pool, even if they are
|
||||
* idle.
|
||||
* @param max the maximum number of threads to allow in the pool.
|
||||
* @param keepAliveTime when the number of threads is greater than the min,
|
||||
* this is the maximum time that excess idle threads will wait
|
||||
* for new tasks before terminating (in milliseconds).
|
||||
* @param capacity the fixed capacity of the underlying queue (resembles
|
||||
* backlog).
|
||||
* @param waitTime the wait time (in milliseconds) for space to become
|
||||
* available in the queue.
|
||||
* @param threadFactory the factory to use when creating new threads.
|
||||
* @return the newly created thread pool
|
||||
*/
|
||||
public static ExecutorService newBlockingThreadPool(int min, int max,
|
||||
long keepAliveTime, int capacity, long waitTime,
|
||||
ThreadFactory threadFactory) {
|
||||
DynamicThreadPoolExecutor.DynamicQueue<Runnable> queue = new DynamicThreadPoolExecutor.DynamicQueue<Runnable>(capacity);
|
||||
ThreadPoolExecutor executor = new DynamicThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue, threadFactory);
|
||||
executor.setRejectedExecutionHandler(new DynamicThreadPoolExecutor.TimedBlockingPolicy(waitTime));
|
||||
queue.setThreadPoolExecutor(executor);
|
||||
return executor;
|
||||
}
|
||||
|
||||
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
|
||||
String name = settings.get("name");
|
||||
if (name == null) {
|
||||
name = "elasticsearch";
|
||||
} else {
|
||||
name = "elasticsearch[" + name + "]";
|
||||
}
|
||||
return daemonThreadFactory(name + namePrefix);
|
||||
}
|
||||
|
||||
/**
|
||||
* A priority based thread factory, for all Thread priority constants:
|
||||
* <tt>Thread.MIN_PRIORITY, Thread.NORM_PRIORITY, Thread.MAX_PRIORITY</tt>;
|
||||
* <p/>
|
||||
* This factory is used instead of Executers.DefaultThreadFactory to allow
|
||||
* manipulation of priority and thread owner name.
|
||||
*
|
||||
* @param namePrefix a name prefix for this thread
|
||||
* @return a thread factory based on given priority.
|
||||
*/
|
||||
public static ThreadFactory daemonThreadFactory(String namePrefix) {
|
||||
final ThreadFactory f = Executors.defaultThreadFactory();
|
||||
final String o = namePrefix + "-";
|
||||
|
||||
return new ThreadFactory() {
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = f.newThread(r);
|
||||
|
||||
/*
|
||||
* Thread name: owner-pool-N-thread-M, where N is the sequence
|
||||
* number of this factory, and M is the sequence number of the
|
||||
* thread created by this factory.
|
||||
*/
|
||||
t.setName(o + t.getName());
|
||||
|
||||
/* override default definition t.setDaemon(false); */
|
||||
t.setDaemon(true);
|
||||
|
||||
return t;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* A priority based thread factory, for all Thread priority constants:
|
||||
* <tt>Thread.MIN_PRIORITY, Thread.NORM_PRIORITY, Thread.MAX_PRIORITY</tt>;
|
||||
* <p/>
|
||||
* This factory is used instead of Executers.DefaultThreadFactory to allow
|
||||
* manipulation of priority and thread owner name.
|
||||
*
|
||||
* @param priority The priority to be assigned to each thread;
|
||||
* can be either <tt>Thread.MIN_PRIORITY, Thread.NORM_PRIORITY</tt>
|
||||
* or Thread.MAX_PRIORITY.
|
||||
* @param namePrefix a name prefix for this thread
|
||||
* @return a thread factory based on given priority.
|
||||
*/
|
||||
public static ThreadFactory priorityThreadFactory(int priority, String namePrefix) {
|
||||
final ThreadFactory f = DynamicExecutors.daemonThreadFactory(namePrefix);
|
||||
final int p = priority;
|
||||
|
||||
return new ThreadFactory() {
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = f.newThread(r);
|
||||
|
||||
/* override default thread priority of Thread.NORM_PRIORITY */
|
||||
if (p != Thread.NORM_PRIORITY)
|
||||
t.setPriority(p);
|
||||
|
||||
return t;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Cannot instantiate.
|
||||
*/
|
||||
private DynamicExecutors() {
|
||||
}
|
||||
}
|
|
@ -1,164 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* An {@link ExecutorService} that executes each submitted task using one of
|
||||
* possibly several pooled threads, normally configured using
|
||||
* {@link DynamicExecutors} factory methods.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
/**
|
||||
* number of threads that are actively executing tasks
|
||||
*/
|
||||
private final AtomicInteger activeCount = new AtomicInteger();
|
||||
|
||||
public DynamicThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
|
||||
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
|
||||
ThreadFactory threadFactory) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
|
||||
}
|
||||
|
||||
@Override public int getActiveCount() {
|
||||
return activeCount.get();
|
||||
}
|
||||
|
||||
@Override protected void beforeExecute(Thread t, Runnable r) {
|
||||
activeCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override protected void afterExecute(Runnable r, Throwable t) {
|
||||
activeCount.decrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Much like a {@link SynchronousQueue} which acts as a rendezvous channel. It
|
||||
* is well suited for handoff designs, in which a tasks is only queued if there
|
||||
* is an available thread to pick it up.
|
||||
* <p/>
|
||||
* This queue is correlated with a thread-pool, and allows insertions to the
|
||||
* queue only if there is a free thread that can poll this task. Otherwise, the
|
||||
* task is rejected and the decision is left up to one of the
|
||||
* {@link RejectedExecutionHandler} policies:
|
||||
* <ol>
|
||||
* <li> {@link ForceQueuePolicy} - forces the queue to accept the rejected task. </li>
|
||||
* <li> {@link TimedBlockingPolicy} - waits for a given time for the task to be
|
||||
* executed.</li>
|
||||
* </ol>
|
||||
*
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public static class DynamicQueue<E> extends LinkedBlockingQueue<E> {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* The executor this Queue belongs to
|
||||
*/
|
||||
private transient ThreadPoolExecutor executor;
|
||||
|
||||
/**
|
||||
* Creates a <tt>DynamicQueue</tt> with a capacity of
|
||||
* {@link Integer#MAX_VALUE}.
|
||||
*/
|
||||
public DynamicQueue() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a <tt>DynamicQueue</tt> with the given (fixed) capacity.
|
||||
*
|
||||
* @param capacity the capacity of this queue.
|
||||
*/
|
||||
public DynamicQueue(int capacity) {
|
||||
super(capacity);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the executor this queue belongs to.
|
||||
*/
|
||||
public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts the specified element at the tail of this queue if there is at
|
||||
* least one available thread to run the current task. If all pool threads
|
||||
* are actively busy, it rejects the offer.
|
||||
*
|
||||
* @param o the element to add.
|
||||
* @return <tt>true</tt> if it was possible to add the element to this
|
||||
* queue, else <tt>false</tt>
|
||||
* @see ThreadPoolExecutor#execute(Runnable)
|
||||
*/
|
||||
@Override
|
||||
public boolean offer(E o) {
|
||||
int allWorkingThreads = executor.getActiveCount() + super.size();
|
||||
return allWorkingThreads < executor.getPoolSize() && super.offer(o);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A handler for rejected tasks that adds the specified element to this queue,
|
||||
* waiting if necessary for space to become available.
|
||||
*/
|
||||
public static class ForceQueuePolicy implements RejectedExecutionHandler {
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
try {
|
||||
executor.getQueue().put(r);
|
||||
} catch (InterruptedException e) {
|
||||
//should never happen since we never wait
|
||||
throw new RejectedExecutionException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A handler for rejected tasks that inserts the specified element into this
|
||||
* queue, waiting if necessary up to the specified wait time for space to become
|
||||
* available.
|
||||
*/
|
||||
public static class TimedBlockingPolicy implements RejectedExecutionHandler {
|
||||
private final long waitTime;
|
||||
|
||||
/**
|
||||
* @param waitTime wait time in milliseconds for space to become available.
|
||||
*/
|
||||
public TimedBlockingPolicy(long waitTime) {
|
||||
this.waitTime = waitTime;
|
||||
}
|
||||
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
try {
|
||||
boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS);
|
||||
if (!successful)
|
||||
throw new RejectedExecutionException("Rejected execution after waiting "
|
||||
+ waitTime + " ms for task [" + r.getClass() + "] to be executed.");
|
||||
} catch (InterruptedException e) {
|
||||
throw new RejectedExecutionException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class Executors {
|
||||
|
||||
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
|
||||
String name = settings.get("name");
|
||||
if (name == null) {
|
||||
name = "elasticsearch";
|
||||
} else {
|
||||
name = "elasticsearch[" + name + "]";
|
||||
}
|
||||
return daemonThreadFactory(name + namePrefix);
|
||||
}
|
||||
|
||||
/**
|
||||
* A priority based thread factory, for all Thread priority constants:
|
||||
* <tt>Thread.MIN_PRIORITY, Thread.NORM_PRIORITY, Thread.MAX_PRIORITY</tt>;
|
||||
* <p/>
|
||||
* This factory is used instead of Executers.DefaultThreadFactory to allow
|
||||
* manipulation of priority and thread owner name.
|
||||
*
|
||||
* @param namePrefix a name prefix for this thread
|
||||
* @return a thread factory based on given priority.
|
||||
*/
|
||||
public static ThreadFactory daemonThreadFactory(String namePrefix) {
|
||||
final ThreadFactory f = java.util.concurrent.Executors.defaultThreadFactory();
|
||||
final String o = namePrefix + "-";
|
||||
|
||||
return new ThreadFactory() {
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = f.newThread(r);
|
||||
|
||||
/*
|
||||
* Thread name: owner-pool-N-thread-M, where N is the sequence
|
||||
* number of this factory, and M is the sequence number of the
|
||||
* thread created by this factory.
|
||||
*/
|
||||
t.setName(o + t.getName());
|
||||
|
||||
/* override default definition t.setDaemon(false); */
|
||||
t.setDaemon(true);
|
||||
|
||||
return t;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Cannot instantiate.
|
||||
*/
|
||||
private Executors() {
|
||||
}
|
||||
}
|
|
@ -52,7 +52,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
|
|
@ -43,7 +43,7 @@ import static java.util.concurrent.Executors.*;
|
|||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.*;
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
|
|
@ -52,7 +52,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
|
|
@ -34,7 +34,7 @@ import javax.management.remote.JMXServiceURL;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static java.util.concurrent.Executors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
|
|
|
@ -24,11 +24,10 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.Executors;
|
||||
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
|
||||
import org.elasticsearch.threadpool.support.AbstractThreadPool;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -63,8 +62,8 @@ public class BlockingThreadPool extends AbstractThreadPool {
|
|||
this.waitTime = componentSettings.getAsTime("wait_time", timeValueSeconds(60));
|
||||
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
|
||||
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize);
|
||||
executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, DynamicExecutors.daemonThreadFactory(settings, "[tp]"));
|
||||
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, DynamicExecutors.daemonThreadFactory(settings, "[sc]"));
|
||||
executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, Executors.daemonThreadFactory(settings, "[tp]"));
|
||||
scheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(scheduledSize, Executors.daemonThreadFactory(settings, "[sc]"));
|
||||
started = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,10 +22,9 @@ package org.elasticsearch.threadpool.cached;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.Executors;
|
||||
import org.elasticsearch.threadpool.support.AbstractThreadPool;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -56,8 +55,8 @@ public class CachedThreadPool extends AbstractThreadPool {
|
|||
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
|
||||
keepAlive.millis(), TimeUnit.MILLISECONDS,
|
||||
new SynchronousQueue<Runnable>(),
|
||||
DynamicExecutors.daemonThreadFactory(settings, "[tp]"));
|
||||
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, DynamicExecutors.daemonThreadFactory(settings, "[sc]"));
|
||||
Executors.daemonThreadFactory(settings, "[tp]"));
|
||||
scheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(scheduledSize, Executors.daemonThreadFactory(settings, "[sc]"));
|
||||
started = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,11 +22,10 @@ package org.elasticsearch.threadpool.scaling;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.Executors;
|
||||
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
|
||||
import org.elasticsearch.threadpool.support.AbstractThreadPool;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -55,8 +54,8 @@ public class ScalingThreadPool extends AbstractThreadPool {
|
|||
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
|
||||
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
|
||||
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize);
|
||||
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, DynamicExecutors.daemonThreadFactory(settings, "[sc]"));
|
||||
executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, DynamicExecutors.daemonThreadFactory(settings, "[tp]"));
|
||||
scheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(scheduledSize, Executors.daemonThreadFactory(settings, "[sc]"));
|
||||
executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, Executors.daemonThreadFactory(settings, "[tp]"));
|
||||
started = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import java.util.concurrent.ScheduledFuture;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
|
|
|
@ -68,7 +68,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
|
|||
import static org.elasticsearch.common.transport.NetworkExceptionHelper.*;
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
import static org.elasticsearch.transport.Transport.Helper.*;
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,11 +23,9 @@ import com.google.inject.Module;
|
|||
import org.elasticsearch.cloud.jclouds.logging.JCloudsLoggingModule;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.Executors;
|
||||
import org.jclouds.concurrent.config.ExecutorServiceModule;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
|
@ -36,7 +34,7 @@ public class JCloudsUtils {
|
|||
public static Iterable<? extends Module> buildModules(Settings settings) {
|
||||
return ImmutableList.of(new JCloudsLoggingModule(settings),
|
||||
new ExecutorServiceModule(
|
||||
Executors.newCachedThreadPool(DynamicExecutors.daemonThreadFactory(settings, "jclouds-user")),
|
||||
Executors.newCachedThreadPool(DynamicExecutors.daemonThreadFactory(settings, "jclouds-io"))));
|
||||
java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory(settings, "jclouds-user")),
|
||||
java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory(settings, "jclouds-io"))));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.io.IOException;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
|
|
@ -47,7 +47,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
|
||||
import static org.elasticsearch.common.util.concurrent.DynamicExecutors.*;
|
||||
import static org.elasticsearch.common.util.concurrent.Executors.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
|
Loading…
Reference in New Issue