Add optimize thread pool (size 1) dedicated to perform explicit optimize API
Have a dedicated thread pool for explicit optimize calls (shard level optimize operations). By default, the size should be 1 to work the same with how things work currently allowing for only 1 shard level optimize on a node. The change allows to see the optimize thread pool stats now, and potentially increase the thread pool size for beefy machines. closes #3366
This commit is contained in:
parent
f9ce791578
commit
92a7030558
|
@ -50,8 +50,6 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
|
|||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final Object optimizeMutex = new Object();
|
||||
|
||||
@Inject
|
||||
public TransportOptimizeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, IndicesService indicesService) {
|
||||
|
@ -61,7 +59,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
|
|||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.MERGE;
|
||||
return ThreadPool.Names.OPTIMIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -118,17 +116,15 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
|
|||
|
||||
@Override
|
||||
protected ShardOptimizeResponse shardOperation(ShardOptimizeRequest request) throws ElasticSearchException {
|
||||
synchronized (optimizeMutex) {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
|
||||
indexShard.optimize(new Engine.Optimize()
|
||||
.waitForMerge(request.waitForMerge())
|
||||
.maxNumSegments(request.maxNumSegments())
|
||||
.onlyExpungeDeletes(request.onlyExpungeDeletes())
|
||||
.flush(request.flush())
|
||||
.refresh(request.refresh())
|
||||
);
|
||||
return new ShardOptimizeResponse(request.index(), request.shardId());
|
||||
}
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
|
||||
indexShard.optimize(new Engine.Optimize()
|
||||
.waitForMerge(request.waitForMerge())
|
||||
.maxNumSegments(request.maxNumSegments())
|
||||
.onlyExpungeDeletes(request.onlyExpungeDeletes())
|
||||
.flush(request.flush())
|
||||
.refresh(request.refresh())
|
||||
);
|
||||
return new ShardOptimizeResponse(request.index(), request.shardId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -72,6 +72,7 @@ public class ThreadPool extends AbstractComponent {
|
|||
public static final String REFRESH = "refresh";
|
||||
public static final String WARMER = "warmer";
|
||||
public static final String SNAPSHOT = "snapshot";
|
||||
public static final String OPTIMIZE = "optimize";
|
||||
}
|
||||
|
||||
public static final String THREADPOOL_GROUP = "threadpool.";
|
||||
|
@ -112,6 +113,7 @@ public class ThreadPool extends AbstractComponent {
|
|||
.put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build())
|
||||
.put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
|
||||
.put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
|
||||
.put(Names.OPTIMIZE, settingsBuilder().put("type", "fixed").put("size", 1).build())
|
||||
.build();
|
||||
|
||||
Map<String, ExecutorHolder> executors = Maps.newHashMap();
|
||||
|
|
Loading…
Reference in New Issue