more shard relocation tests, create a cached executor that can be used to allocate threads that are not affected by the actual thread pool impl (can always fork, no blocking)

This commit is contained in:
kimchy 2010-07-19 01:45:09 +03:00
parent f9cd7cb932
commit d657d4447b
32 changed files with 222 additions and 226 deletions

View File

@ -65,6 +65,7 @@ import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
@ -211,6 +212,16 @@ public class TransportClient extends AbstractClient {
injector.getInstance(TimerService.class).close();
injector.getInstance(ThreadPool.class).shutdown();
try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
try {
injector.getInstance(ThreadPool.class).shutdownNow();
} catch (Exception e) {
// ignore
}
ThreadLocals.clearReferencesThreadLocals();
}

View File

@ -74,7 +74,7 @@ public class ShardStateAction extends AbstractComponent {
}
public void shardFailed(final ShardRouting shardRouting, final String reason) throws ElasticSearchException {
logger.warn("Sending failed shard for {}, reason [{}]", shardRouting, reason);
logger.warn("sending failed shard for {}, reason [{}]", shardRouting, reason);
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() {
@ -90,7 +90,7 @@ public class ShardStateAction extends AbstractComponent {
public void shardStarted(final ShardRouting shardRouting, final String reason) throws ElasticSearchException {
if (logger.isDebugEnabled()) {
logger.debug("Sending shard started for {}, reason [{}]", shardRouting, reason);
logger.debug("sending shard started for {}, reason [{}]", shardRouting, reason);
}
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
@ -106,7 +106,7 @@ public class ShardStateAction extends AbstractComponent {
}
private void innerShardFailed(final ShardRouting shardRouting, final String reason) {
logger.warn("Received shard failed for {}, reason [{}]", shardRouting, reason);
logger.warn("received shard failed for {}, reason [{}]", shardRouting, reason);
clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
@ -131,7 +131,7 @@ public class ShardStateAction extends AbstractComponent {
private void innerShardStarted(final ShardRouting shardRouting, final String reason) {
if (logger.isDebugEnabled()) {
logger.debug("Received shard started for {}, reason [{}]", shardRouting, reason);
logger.debug("received shard started for {}, reason [{}]", shardRouting, reason);
}
clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {

View File

@ -66,7 +66,7 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
}
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executorService().execute(new Runnable() {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
FileInputStream is;

View File

@ -50,7 +50,7 @@ public class FsAppendableBlobContainer extends AbstractFsBlobContainer implement
}
@Override public void append(final AppendBlobListener listener) {
blobStore.executorService().execute(new Runnable() {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
RandomAccessFile raf = null;
try {

View File

@ -27,24 +27,20 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
import java.util.concurrent.Executor;
/**
* @author kimchy (shay.banon)
*/
public class FsBlobStore extends AbstractComponent implements BlobStore {
private final File path;
private final Executor executor;
private final ExecutorService executorService;
private final File path;
private final int bufferSizeInBytes;
public FsBlobStore(Settings settings, File path) {
public FsBlobStore(Settings settings, Executor executor, File path) {
super(settings);
this.path = path;
if (!path.exists()) {
@ -57,7 +53,7 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
throw new BlobStoreException("Path is not a directory at [" + path + "]");
}
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
this.executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "fs_blobstore"));
this.executor = executor;
}
@Override public String toString() {
@ -72,8 +68,8 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
return this.bufferSizeInBytes;
}
public ExecutorService executorService() {
return executorService;
public Executor executor() {
return executor;
}
@Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
@ -89,13 +85,7 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
}
@Override public void close() {
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
executorService.shutdownNow();
// nothing to do here...
}
private synchronized File buildAndCreate(BlobPath path) {

View File

@ -39,7 +39,7 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
}
@Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
blobStore.executorService().execute(new Runnable() {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
File file = new File(path, blobName);
RandomAccessFile raf;

View File

@ -80,7 +80,7 @@ public class HandlesStreamInput extends StreamInput {
} else if (b == 3) {
return identityHandles.get(in.readVInt());
} else {
throw new IOException("Expected handle header");
throw new IOException("Expected handle header, got [" + b + "]");
}
}

View File

@ -152,7 +152,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
pingService.start();
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
joinCluster();
}
@ -377,7 +377,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
clusterBlocks = ClusterBlocks.builder().blocks(clusterBlocks).addGlobalBlock(NO_MASTER_BLOCK).build();
masterFD.stop("no master elected since master left (reason = " + reason + ")");
// try and join the cluster again...
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
joinCluster();
}

View File

@ -214,7 +214,7 @@ public class MasterFaultDetection extends AbstractComponent {
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
if (notifiedMasterFailure.compareAndSet(false, true)) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
for (Listener listener : listeners) {
listener.onMasterFailure(masterNode, reason);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.index.gateway.fs.FsIndexGatewayModule;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.io.IOException;
@ -36,7 +37,7 @@ import java.io.IOException;
*/
public class FsGateway extends BlobStoreGateway {
@Inject public FsGateway(Settings settings, Environment environment, ClusterName clusterName) throws IOException {
@Inject public FsGateway(Settings settings, Environment environment, ClusterName clusterName, ThreadPool threadPool) throws IOException {
super(settings);
File gatewayFile;
@ -47,7 +48,7 @@ public class FsGateway extends BlobStoreGateway {
} else {
gatewayFile = new File(location);
}
initialize(new FsBlobStore(componentSettings, gatewayFile), clusterName, null);
initialize(new FsBlobStore(componentSettings, threadPool.cached(), gatewayFile), clusterName, null);
}
@Override public String type() {

View File

@ -119,7 +119,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
return;
}
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
indexShard.recovering();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.ElasticSearchWrapperException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
@ -26,7 +27,7 @@ import org.elasticsearch.index.shard.ShardId;
/**
* @author kimchy (shay.banon)
*/
public class RecoverFilesRecoveryException extends IndexShardException {
public class RecoverFilesRecoveryException extends IndexShardException implements ElasticSearchWrapperException {
private final int numberOfFiles;

View File

@ -142,7 +142,7 @@ public class RecoverySource extends AbstractComponent {
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : response.phase1FileNames) {
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {
@ -286,8 +286,25 @@ public class RecoverySource extends AbstractComponent {
}
@Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
// we don't spawn, but we execute the expensive recovery process on a cached thread pool
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
} catch (Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
// ignore
}
}
}
});
}
@Override public boolean spawn() {
return false;
}
}
}

View File

@ -40,10 +40,7 @@ import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import javax.annotation.Nullable;
import java.io.IOException;
@ -103,7 +100,7 @@ public class RecoveryTarget extends AbstractComponent {
public void startRecovery(final StartRecoveryRequest request, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery("No node to recovery from, retry on next cluster state update");
listener.onIgnoreRecovery(false, "No node to recovery from, retry on next cluster state update");
return;
}
final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
@ -113,11 +110,11 @@ public class RecoveryTarget extends AbstractComponent {
preRecoveryState = shard.recovering();
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery("Already in recovering process, " + e.getMessage());
listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage());
return;
}
final IndexShardState fPreRecoveryState = preRecoveryState;
threadPool.execute(new Runnable() {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
doRecovery(shard, fPreRecoveryState, request, listener);
}
@ -128,7 +125,7 @@ public class RecoveryTarget extends AbstractComponent {
// we know we are on a thread, we can spin till we can engage in recovery
if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) {
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("shard closed, stop recovery");
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
shard.restoreRecoveryState(preRecoveryState);
@ -148,7 +145,7 @@ public class RecoveryTarget extends AbstractComponent {
}).txGet();
if (recoveryStatus.retry) {
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("shard closed, stop recovery");
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval());
@ -178,7 +175,7 @@ public class RecoveryTarget extends AbstractComponent {
} catch (Exception e) {
removeAndCleanOnGoingRecovery(request.shardId());
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("shard closed, stop recovery");
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
logger.trace("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
@ -187,11 +184,33 @@ public class RecoveryTarget extends AbstractComponent {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
try {
shard.restoreRecoveryState(preRecoveryState);
} catch (IndexShardNotRecoveringException e1) {
// ignore this, we might be closing...
}
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
if (cause instanceof ConnectTransportException) {
listener.onIgnoreRecovery(true, "source node disconnected");
return;
}
if (cause instanceof IndexShardClosedException) {
listener.onIgnoreRecovery(true, "source node disconnected");
return;
}
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
} finally {
recoveryThrottler.recoveryDone(shard.shardId(), "peer recovery target");
@ -203,7 +222,7 @@ public class RecoveryTarget extends AbstractComponent {
void onRetryRecovery(TimeValue retryAfter);
void onIgnoreRecovery(String reason);
void onIgnoreRecovery(boolean cleanShard, String reason);
void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure);
}

View File

@ -388,7 +388,19 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}, retryAfter);
}
@Override public void onIgnoreRecovery(String reason) {
@Override public void onIgnoreRecovery(boolean cleanShard, String reason) {
if (!cleanShard) {
return;
}
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.cleanShard(shardRouting.shardId().id());
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after ignore recovery", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
}
@Override public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure) {
@ -401,6 +413,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.cleanShard(shardRouting.shardId().id());
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.shardId().id());
}

View File

@ -67,9 +67,13 @@ public class IndicesMemoryCleaner extends AbstractComponent {
totalShards++;
Translog translog = ((InternalIndexShard) indexShard).translog();
if (translog.size() > translogNumberOfOperationsThreshold) {
try {
indexShard.flush(new Engine.Flush());
cleanedShards++;
cleaned = indexShard.estimateFlushableMemorySize().bytes();
indexShard.flush(new Engine.Flush());
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
}
}
}
}

View File

@ -79,15 +79,10 @@ public interface ThreadPool extends Executor {
boolean isStarted();
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks that were
* awaiting execution.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
* Returns a cached executor that will always allocate threads.
*/
Executor cached();
void shutdownNow();
/**
@ -101,55 +96,10 @@ public interface ThreadPool extends Executor {
void execute(Runnable command);
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's <tt>get</tt> method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* <tt>result = exec.submit(aCallable).get();</tt>
*
* <p> Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's <tt>get</tt> method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's <tt>get</tt> method will
* return <tt>null</tt> upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task, FutureListener<T> listener);
@ -158,86 +108,12 @@ public interface ThreadPool extends Executor {
Future<?> submit(Runnable task, FutureListener<?> listener);
/**
* Creates and executes a one-shot action that becomes enabled
* after the given delay.
*
* @param command the task to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @return a ScheduledFuture representing pending completion of
* the task and whose <tt>get()</tt> method will return
* <tt>null</tt> upon completion
* @throws java.util.concurrent.RejectedExecutionException
* if the task cannot be
* scheduled for execution
* @throws NullPointerException if command is null
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* Creates and executes a ScheduledFuture that becomes enabled after the
* given delay.
*
* @param callable the function to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @return a ScheduledFuture that can be used to extract result or cancel
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if callable is null
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the given
* period; that is executions will commence after
* <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
* <tt>initialDelay + 2 * period</tt>, and so on.
* If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
* termination of the executor. If any execution of this task
* takes longer than its period, then subsequent executions
* may start late, but will not concurrently execute.
*
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param period the period between successive executions
* @param unit the time unit of the initialDelay and period parameters
* @return a ScheduledFuture representing pending completion of
* the task, and whose <tt>get()</tt> method will throw an
* exception upon cancellation
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if command is null
* @throws IllegalArgumentException if period less than or equal to zero
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the
* given delay between the termination of one execution and the
* commencement of the next. If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
* termination of the executor.
*
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param delay the delay between the termination of one
* execution and the commencement of the next
* @param unit the time unit of the initialDelay and delay parameters
* @return a ScheduledFuture representing pending completion of
* the task, and whose <tt>get()</tt> method will throw an
* exception upon cancellation
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if command is null
* @throws IllegalArgumentException if delay less than or equal to zero
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture<?> schedule(Runnable command, TimeValue delay);

View File

@ -27,7 +27,7 @@ import org.elasticsearch.threadpool.cached.CachedThreadPoolModule;
import static org.elasticsearch.common.inject.ModulesFactory.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ThreadPoolModule extends AbstractModule {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -63,7 +64,8 @@ public class BlockingThreadPool extends AbstractThreadPool {
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], capacity[{}], wait_time[{}], scheduled_size[{}]", getType(), min, max, keepAlive, capacity, waitTime, scheduledSize);
executorService = TransferThreadPoolExecutor.newBlockingExecutor(min, max, keepAlive.millis(), TimeUnit.MILLISECONDS, waitTime.millis(), TimeUnit.MILLISECONDS, capacity, EsExecutors.daemonThreadFactory(settings, "[tp]"));
scheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
cached = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true;
}

View File

@ -51,12 +51,13 @@ public class CachedThreadPool extends AbstractThreadPool {
super(settings);
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
logger.debug("Initializing {} thread pool with keep_alive[{}], scheduled_size[{}]", new Object[]{getType(), keepAlive, scheduledSize});
logger.debug("Initializing {} thread pool with keep_alive[{}], scheduled_size[{}]", getType(), keepAlive, scheduledSize);
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
EsExecutors.daemonThreadFactory(settings, "[tp]"));
scheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
cached = executorService;
started = true;
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.TransferThreadPoolExecutor;
import org.elasticsearch.threadpool.support.AbstractThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -54,8 +55,9 @@ public class ScalingThreadPool extends AbstractThreadPool {
this.keepAlive = componentSettings.getAsTime("keep_alive", timeValueSeconds(60));
this.scheduledSize = componentSettings.getAsInt("scheduled_size", 20);
logger.debug("Initializing {} thread pool with min[{}], max[{}], keep_alive[{}], scheduled_size[{}]", getType(), min, max, keepAlive, scheduledSize);
scheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
scheduledExecutorService = Executors.newScheduledThreadPool(scheduledSize, EsExecutors.daemonThreadFactory(settings, "[sc]"));
executorService = TransferThreadPoolExecutor.newScalingExecutor(min, max, keepAlive.nanos(), TimeUnit.NANOSECONDS, EsExecutors.daemonThreadFactory(settings, "[tp]"));
cached = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, "[cached]"));
started = true;
}

View File

@ -40,6 +40,8 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
protected ScheduledExecutorService scheduledExecutorService;
protected ExecutorService cached;
protected AbstractThreadPool(Settings settings) {
super(settings);
}
@ -58,6 +60,10 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
return started;
}
@Override public Executor cached() {
return cached;
}
@Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return scheduledExecutorService.schedule(command, delay, unit);
}
@ -83,6 +89,9 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
logger.debug("shutting down {} thread pool", getType());
executorService.shutdown();
scheduledExecutorService.shutdown();
if (!cached.isShutdown()) {
cached.shutdown();
}
}
@Override public void shutdownNow() {
@ -93,10 +102,14 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
if (!executorService.isTerminated()) {
scheduledExecutorService.shutdownNow();
}
if (!cached.isTerminated()) {
cached.shutdownNow();
}
}
@Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
boolean result = executorService.awaitTermination(timeout, unit);
result &= cached.awaitTermination(timeout, unit);
result &= scheduledExecutorService.awaitTermination(timeout, unit);
return result;
}

View File

@ -42,7 +42,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
closeAllNodes();
}
@Test public void recoverWhileUnderLoadTest() throws Exception {
@Test public void recoverWhileUnderLoadAllocateBackupsTest() throws Exception {
startNode("server1");
client("server1").admin().indices().prepareCreate("test").execute().actionGet();
@ -65,8 +65,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
writers[i].start();
}
// wait till we index 2000
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 2000) {
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) {
Thread.sleep(100);
client("server1").admin().indices().prepareRefresh().execute().actionGet();
}
@ -75,8 +74,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
client("server1").admin().indices().prepareFlush().execute().actionGet();
// wait till we index another 2000
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 4000) {
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) {
Thread.sleep(100);
client("server1").admin().indices().prepareRefresh().execute().actionGet();
}
@ -85,11 +83,70 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
startNode("server2");
// make sure the cluster state is green, and all has been recovered
assertThat(client("server1").admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("server1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
// wait till we index 10,0000
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 10000) {
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 100000) {
Thread.sleep(100);
client("server1").admin().indices().prepareRefresh().execute().actionGet();
}
stop.set(true);
stopLatch.await();
client("server1").admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get()));
}
}
@Test public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception {
startNode("server1");
client("server1").admin().indices().prepareCreate("test").execute().actionGet();
final AtomicLong idGenerator = new AtomicLong();
final AtomicBoolean stop = new AtomicBoolean(false);
Thread[] writers = new Thread[5];
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
for (int i = 0; i < writers.length; i++) {
writers[i] = new Thread() {
@Override public void run() {
while (!stop.get()) {
long id = idGenerator.incrementAndGet();
client("server1").prepareIndex("test", "type1", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
}
stopLatch.countDown();
}
};
writers[i].start();
}
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) {
Thread.sleep(100);
client("server1").admin().indices().prepareRefresh().execute().actionGet();
}
// now flush, just to make sure we have some data in the index, not just translog
client("server1").admin().indices().prepareFlush().execute().actionGet();
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) {
Thread.sleep(100);
client("server1").admin().indices().prepareRefresh().execute().actionGet();
}
// now start another node, while we index
startNode("server2");
startNode("server3");
startNode("server4");
assertThat(client("server1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
while (client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 150000) {
Thread.sleep(100);
client("server1").admin().indices().prepareRefresh().execute().actionGet();
}

View File

@ -94,7 +94,7 @@ public class AbstractCloudBlobContainer extends AbstractBlobContainer {
listener.onFailure(e);
}
}
}, cloudBlobStore.executorService());
}, cloudBlobStore.executor());
}
// inDirectory expects a directory, not a blob prefix

View File

@ -34,11 +34,7 @@ import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.domain.Location;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
import java.util.concurrent.Executor;
/**
* @author kimchy (shay.banon)
@ -51,17 +47,17 @@ public class CloudBlobStore extends AbstractComponent implements BlobStore {
private final Location location;
private final ExecutorService executorService;
private final Executor executor;
private final int bufferSizeInBytes;
public CloudBlobStore(Settings settings, BlobStoreContext blobStoreContext, String container, String location) {
public CloudBlobStore(Settings settings, BlobStoreContext blobStoreContext, Executor executor, String container, String location) {
super(settings);
this.blobStoreContext = blobStoreContext;
this.container = container;
this.executor = executor;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
this.executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "cloud_blobstore"));
if (location == null) {
this.location = null;
@ -91,8 +87,8 @@ public class CloudBlobStore extends AbstractComponent implements BlobStore {
return this.bufferSizeInBytes;
}
public ExecutorService executorService() {
return executorService;
public Executor executor() {
return executor;
}
public String container() {
@ -124,12 +120,5 @@ public class CloudBlobStore extends AbstractComponent implements BlobStore {
}
@Override public void close() {
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
executorService.shutdownNow();
}
}

View File

@ -56,7 +56,7 @@ public class CloudImmutableBlobContainer extends AbstractCloudBlobContainer impl
listener.onFailure(t);
}
}
}, cloudBlobStore.executorService());
}, cloudBlobStore.executor());
}
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.index.gateway.cloud.CloudIndexGatewayModule;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -38,7 +39,7 @@ import java.io.IOException;
*/
public class CloudGateway extends BlobStoreGateway {
@Inject public CloudGateway(Settings settings, ClusterName clusterName, CloudBlobStoreService blobStoreService) throws IOException {
@Inject public CloudGateway(Settings settings, ClusterName clusterName, ThreadPool threadPool, CloudBlobStoreService blobStoreService) throws IOException {
super(settings);
String location = componentSettings.get("location");
@ -47,7 +48,7 @@ public class CloudGateway extends BlobStoreGateway {
throw new ElasticSearchIllegalArgumentException("Cloud gateway requires 'container' setting");
}
initialize(new CloudBlobStore(settings, blobStoreService.context(), container, location), clusterName, new ByteSizeValue(100, ByteSizeUnit.MB));
initialize(new CloudBlobStore(settings, blobStoreService.context(), threadPool.cached(), container, location), clusterName, new ByteSizeValue(100, ByteSizeUnit.MB));
}
@Override public String type() {

View File

@ -87,7 +87,7 @@ public abstract class AbstractHdfsBlobContainer extends AbstractBlobContainer {
}
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executorService().execute(new Runnable() {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];

View File

@ -55,7 +55,7 @@ public class HdfsAppendableBlobContainer extends AbstractHdfsBlobContainer imple
}
@Override public void append(final AppendBlobListener listener) {
blobStore.executorService().execute(new Runnable() {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
try {
listener.withStream(out);

View File

@ -30,10 +30,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
import java.util.concurrent.Executor;
/**
* @author kimchy (shay.banon)
@ -44,11 +41,11 @@ public class HdfsBlobStore implements BlobStore {
private final Path path;
private final Executor executor;
private final int bufferSizeInBytes;
private final ExecutorService executorService;
public HdfsBlobStore(Settings settings, FileSystem fileSystem, Path path) throws IOException {
public HdfsBlobStore(Settings settings, FileSystem fileSystem, Executor executor, Path path) throws IOException {
this.fileSystem = fileSystem;
this.path = path;
@ -57,7 +54,7 @@ public class HdfsBlobStore implements BlobStore {
}
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
executorService = Executors.newCachedThreadPool(daemonThreadFactory(settings, "hdfs_blobstore"));
this.executor = executor;
}
@Override public String toString() {
@ -76,8 +73,8 @@ public class HdfsBlobStore implements BlobStore {
return path;
}
public ExecutorService executorService() {
return executorService;
public Executor executor() {
return executor;
}
@Override public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {

View File

@ -38,7 +38,7 @@ public class HdfsImmutableBlobContainer extends AbstractHdfsBlobContainer implem
}
@Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
blobStore.executorService().execute(new Runnable() {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
Path file = new Path(path, blobName);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.URI;
@ -44,7 +45,7 @@ public class HdfsGateway extends BlobStoreGateway {
private final FileSystem fileSystem;
@Inject public HdfsGateway(Settings settings, ClusterName clusterName) throws IOException {
@Inject public HdfsGateway(Settings settings, ClusterName clusterName, ThreadPool threadPool) throws IOException {
super(settings);
this.closeFileSystem = componentSettings.getAsBoolean("close_fs", true);
@ -68,7 +69,7 @@ public class HdfsGateway extends BlobStoreGateway {
fileSystem = FileSystem.get(URI.create(uri), conf);
initialize(new HdfsBlobStore(settings, fileSystem, hPath), clusterName, null);
initialize(new HdfsBlobStore(settings, fileSystem, threadPool.cached(), hPath), clusterName, null);
}
@Override public String type() {