diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc index 219abe8ad41..8e1a4db8bdb 100644 --- a/docs/reference/modules/snapshots.asciidoc +++ b/docs/reference/modules/snapshots.asciidoc @@ -67,7 +67,6 @@ on all data and master nodes. The following settings are supported: [horizontal] `location`:: Location of the snapshots. Mandatory. `compress`:: Turns on compression of the snapshot files. Defaults to `true`. -`concurrent_streams`:: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5` `chunk_size`:: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. 1g, 10m, 5k. Defaults to `null` (unlimited chunk size). `max_restore_bytes_per_sec`:: Throttles per node restore rate. Defaults to `20mb` per second. @@ -83,8 +82,6 @@ point to the root of the shared filesystem repository. The following settings ar [horizontal] `url`:: Location of the snapshots. Mandatory. -`concurrent_streams`:: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5` - [float] ===== Repository plugins diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 50d7a923dfe..21b07dc4d26 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -36,13 +36,23 @@ pools, but the important ones include: size `# of available processors`. queue_size `1000`. -`warmer`:: +`snapshot`:: + For snapshot/restore operations, defaults to `scaling` + keep-alive `5m`, + size `(# of available processors)/2`. + +`snapshot_data`:: + For snapshot/restore operations on data files, defaults to `scaling` + with a `5m` keep-alive, + size `5`. + +`warmer`:: For segment warm-up operations, defaults to `scaling` with a `5m` keep-alive. `refresh`:: For refresh operations, defaults to `scaling` - with a `5m` keep-alive. + with a `5m` keep-alive. Changing a specific thread pool can be done by setting its type and specific type parameters, for example, changing the `index` thread pool diff --git a/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java b/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java index a2a99bd2d86..ebdb810b27e 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java +++ b/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.threadpool.ThreadPool; import java.io.File; import java.util.concurrent.Executor; @@ -37,15 +38,16 @@ import java.util.concurrent.Executor; */ public class FsBlobStore extends AbstractComponent implements BlobStore { - private final Executor executor; + private final ThreadPool threadPool; private final File path; private final int bufferSizeInBytes; - public FsBlobStore(Settings settings, Executor executor, File path) { + public FsBlobStore(Settings settings, ThreadPool threadPool, File path) { super(settings); this.path = path; + this.threadPool = threadPool; if (!path.exists()) { boolean b = FileSystemUtils.mkdirs(path); if (!b) { @@ -56,7 +58,6 @@ public class FsBlobStore extends AbstractComponent implements BlobStore { throw new BlobStoreException("Path is not a directory at [" + path + "]"); } this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); - this.executor = executor; } @Override @@ -73,7 +74,7 @@ public class FsBlobStore extends AbstractComponent implements BlobStore { } public Executor executor() { - return executor; + return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA); } @Override diff --git a/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java b/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java index 42cdb9d5f34..3124fd9accc 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java +++ b/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.threadpool.ThreadPool; import java.net.MalformedURLException; import java.net.URL; @@ -37,7 +38,7 @@ import java.util.concurrent.Executor; */ public class URLBlobStore extends AbstractComponent implements BlobStore { - private final Executor executor; + private final ThreadPool threadPool; private final URL path; @@ -53,14 +54,14 @@ public class URLBlobStore extends AbstractComponent implements BlobStore { * * * @param settings settings - * @param executor executor for read operations + * @param threadPool thread pool for read operations * @param path base URL */ - public URLBlobStore(Settings settings, Executor executor, URL path) { + public URLBlobStore(Settings settings, ThreadPool threadPool, URL path) { super(settings); this.path = path; this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); - this.executor = executor; + this.threadPool = threadPool; } /** @@ -95,7 +96,7 @@ public class URLBlobStore extends AbstractComponent implements BlobStore { * @return executor */ public Executor executor() { - return executor; + return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA); } /** diff --git a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index aae81bc8b7b..aed5fe0edc7 100644 --- a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; -import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShardState; @@ -41,6 +40,7 @@ import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.fs.FsTranslog; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; @@ -274,7 +274,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen return; } if (indexShard.state() == IndexShardState.STARTED && indexShard.translog().syncNeeded()) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() { + threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() { @Override public void run() { try { diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 1d5a6c1b988..f1bc08bb690 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -89,6 +89,7 @@ import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchService; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.transport.TransportModule; @@ -223,6 +224,7 @@ public final class InternalNode implements Node { injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(IndicesTTLService.class).start(); injector.getInstance(RiversManager.class).start(); + injector.getInstance(SnapshotsService.class).start(); injector.getInstance(ClusterService.class).start(); injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); @@ -263,6 +265,7 @@ public final class InternalNode implements Node { injector.getInstance(RiversManager.class).stop(); + injector.getInstance(SnapshotsService.class).stop(); // stop any changes happening as a result of cluster state changes injector.getInstance(IndicesClusterStateService.class).stop(); // we close indices first, so operations won't be allowed on it @@ -317,6 +320,8 @@ public final class InternalNode implements Node { stopWatch.stop().start("rivers"); injector.getInstance(RiversManager.class).close(); + stopWatch.stop().start("snapshot_service"); + injector.getInstance(SnapshotsService.class).close(); stopWatch.stop().start("client"); injector.getInstance(Client.class).close(); stopWatch.stop().start("indices_cluster"); diff --git a/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index eae5972caff..d7fa2011b8a 100644 --- a/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -24,17 +24,15 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.File; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * Shared file system implementation of the BlobStoreRepository @@ -68,7 +66,7 @@ public class FsRepository extends BlobStoreRepository { * @throws IOException */ @Inject - public FsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { + public FsRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException { super(name.getName(), repositorySettings, indexShardRepository); File locationFile; String location = repositorySettings.settings().get("location", componentSettings.get("location")); @@ -78,9 +76,7 @@ public class FsRepository extends BlobStoreRepository { } else { locationFile = new File(location); } - int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5)); - ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]")); - blobStore = new FsBlobStore(componentSettings, concurrentStreamPool, locationFile); + blobStore = new FsBlobStore(componentSettings, threadPool, locationFile); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null)); this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false)); this.basePath = BlobPath.cleanPath(); diff --git a/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java b/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java index e9341940a7d..023fdb8d74e 100644 --- a/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java +++ b/src/main/java/org/elasticsearch/repositories/uri/URLRepository.java @@ -25,17 +25,15 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.url.URLBlobStore; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.URL; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * Read-only URL-based implementation of the BlobStoreRepository @@ -65,7 +63,7 @@ public class URLRepository extends BlobStoreRepository { * @throws IOException */ @Inject - public URLRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { + public URLRepository(RepositoryName name, RepositorySettings repositorySettings, ThreadPool threadPool, IndexShardRepository indexShardRepository) throws IOException { super(name.getName(), repositorySettings, indexShardRepository); URL url; String path = repositorySettings.settings().get("url", componentSettings.get("url")); @@ -74,10 +72,8 @@ public class URLRepository extends BlobStoreRepository { } else { url = new URL(path); } - int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5)); - ExecutorService concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[fs_stream]")); listDirectories = repositorySettings.settings().getAsBoolean("list_directories", componentSettings.getAsBoolean("list_directories", true)); - blobStore = new URLBlobStore(componentSettings, concurrentStreamPool, url); + blobStore = new URLBlobStore(componentSettings, threadPool, url); basePath = BlobPath.cleanPath(); } diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index 5cd58a4e647..d43c7a04aef 100644 --- a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -65,6 +65,7 @@ public class RestThreadPoolAction extends AbstractCatAction { ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, + ThreadPool.Names.SNAPSHOT_DATA, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER }; @@ -82,6 +83,7 @@ public class RestThreadPoolAction extends AbstractCatAction { "r", "s", "sn", + "sd", "su", "w" }; diff --git a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 8df7d0c2884..85cb1a62b76 100644 --- a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -35,7 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -56,6 +56,10 @@ import org.elasticsearch.transport.*; import java.io.IOException; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Maps.newHashMap; @@ -79,7 +83,7 @@ import static com.google.common.collect.Sets.newHashSet; * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(SnapshotId, SnapshotInfo, Throwable)} to remove snapshot from cluster state * */ -public class SnapshotsService extends AbstractComponent implements ClusterStateListener { +public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateListener { private final ClusterService clusterService; @@ -93,6 +97,10 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL private volatile ImmutableMap shardSnapshots = ImmutableMap.of(); + private final Lock shutdownLock = new ReentrantLock(); + + private final Condition shutdownCondition = shutdownLock.newCondition(); + private final CopyOnWriteArrayList snapshotCompletionListeners = new CopyOnWriteArrayList<>(); @@ -678,7 +686,16 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL // Update the list of snapshots that we saw and tried to started // If startup of these shards fails later, we don't want to try starting these shards again - shardSnapshots = ImmutableMap.copyOf(survivors); + shutdownLock.lock(); + try { + shardSnapshots = ImmutableMap.copyOf(survivors); + if (shardSnapshots.isEmpty()) { + // Notify all waiting threads that no more snapshots + shutdownCondition.signalAll(); + } + } finally { + shutdownLock.unlock(); + } // We have new snapshots to process - if (newSnapshots != null) { @@ -1101,6 +1118,30 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL this.snapshotCompletionListeners.remove(listener); } + @Override + protected void doStart() throws ElasticsearchException { + + } + + @Override + protected void doStop() throws ElasticsearchException { + shutdownLock.lock(); + try { + while(!shardSnapshots.isEmpty() && shutdownCondition.await(5, TimeUnit.SECONDS)) { + // Wait for at most 5 second for locally running snapshots to finish + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } finally { + shutdownLock.unlock(); + } + } + + @Override + protected void doClose() throws ElasticsearchException { + + } + /** * Listener for create snapshot operation */ diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 169b2b5e729..3d5473b4aee 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -74,6 +74,7 @@ public class ThreadPool extends AbstractComponent { public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; + public static final String SNAPSHOT_DATA = "snapshot_data"; public static final String OPTIMIZE = "optimize"; public static final String BENCH = "bench"; } @@ -117,6 +118,7 @@ public class ThreadPool extends AbstractComponent { .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()) .put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) + .put(Names.SNAPSHOT_DATA, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()) .put(Names.OPTIMIZE, settingsBuilder().put("type", "fixed").put("size", 1).build()) .put(Names.BENCH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .build(); diff --git a/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 294a944d152..6347e114681 100644 --- a/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -71,8 +72,8 @@ public class MockRepository extends FsRepository { private volatile boolean blocked = false; @Inject - public MockRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { - super(name, repositorySettings, indexShardRepository); + public MockRepository(RepositoryName name, ThreadPool threadPool, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { + super(name, repositorySettings, threadPool, indexShardRepository); randomControlIOExceptionRate = repositorySettings.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = repositorySettings.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); blockOnControlFiles = repositorySettings.settings().getAsBoolean("block_on_control", false); diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index 8a8e34ca8b3..6aaa278e02e 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -294,7 +294,7 @@ public final class TestCluster extends ImmutableTestCluster { for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE, ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { + ThreadPool.Names.SNAPSHOT_DATA, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { if (random.nextBoolean()) { final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling")); builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type);