add a merge thread pool that only does async merges (optimize), so it can be controlled by itself

This commit is contained in:
kimchy 2011-03-04 01:04:59 +02:00
parent 06ad13f373
commit b629d36d8b
2 changed files with 13 additions and 11 deletions

View File

@ -94,11 +94,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private volatile IndexShardState state; private volatile IndexShardState state;
private final TimeValue refreshInterval; private final TimeValue refreshInterval;
private final TimeValue optimizeInterval; private final TimeValue mergeInterval;
private volatile ScheduledFuture refreshScheduledFuture; private volatile ScheduledFuture refreshScheduledFuture;
private volatile ScheduledFuture optimizeScheduleFuture; private volatile ScheduledFuture mergeScheduleFuture;
private volatile ShardRouting shardRouting; private volatile ShardRouting shardRouting;
@ -125,7 +125,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} else { } else {
refreshInterval = new TimeValue(-2); refreshInterval = new TimeValue(-2);
} }
optimizeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1)); mergeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1));
logger.debug("state: [CREATED]"); logger.debug("state: [CREATED]");
@ -444,9 +444,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
refreshScheduledFuture.cancel(true); refreshScheduledFuture.cancel(true);
refreshScheduledFuture = null; refreshScheduledFuture = null;
} }
if (optimizeScheduleFuture != null) { if (mergeScheduleFuture != null) {
optimizeScheduleFuture.cancel(true); mergeScheduleFuture.cancel(true);
optimizeScheduleFuture = null; mergeScheduleFuture = null;
} }
} }
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason); logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
@ -561,9 +561,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing // since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually // so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments // happen and reduce the number of segments
if (optimizeInterval.millis() > 0) { if (mergeInterval.millis() > 0) {
optimizeScheduleFuture = threadPool.schedule(optimizeInterval, ThreadPool.Names.MANAGEMENT, new EngineOptimizer()); mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", optimizeInterval); logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else { } else {
logger.debug("scheduled optimizer / merger disabled"); logger.debug("scheduled optimizer / merger disabled");
} }
@ -614,7 +614,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
} }
private class EngineOptimizer implements Runnable { private class EngineMerger implements Runnable {
@Override public void run() { @Override public void run() {
try { try {
// -1 means maybe merge // -1 means maybe merge
@ -635,7 +635,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
logger.warn("Failed to perform scheduled engine optimize/merge", e); logger.warn("Failed to perform scheduled engine optimize/merge", e);
} }
if (state != IndexShardState.CLOSED) { if (state != IndexShardState.CLOSED) {
optimizeScheduleFuture = threadPool.schedule(optimizeInterval, ThreadPool.Names.MANAGEMENT, this); mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, this);
} }
} }
} }

View File

@ -52,6 +52,7 @@ public class ThreadPool extends AbstractComponent {
public static final String SEARCH = "search"; public static final String SEARCH = "search";
public static final String PERCOLATE = "percolate"; public static final String PERCOLATE = "percolate";
public static final String MANAGEMENT = "management"; public static final String MANAGEMENT = "management";
public static final String MERGE = "merge";
public static final String SNAPSHOT = "snapshot"; public static final String SNAPSHOT = "snapshot";
} }
@ -74,6 +75,7 @@ public class ThreadPool extends AbstractComponent {
executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS)); executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS)); executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), settingsBuilder().put("keep_alive", "30s").put("size", 20).build())); executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), settingsBuilder().put("keep_alive", "30s").put("size", 20).build()));
executors.put(Names.MERGE, build(Names.MERGE, "scaling", groupSettings.get(Names.MERGE), settingsBuilder().put("keep_alive", "30s").put("size", 20).build()));
executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), ImmutableSettings.Builder.EMPTY_SETTINGS)); executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SAME, MoreExecutors.sameThreadExecutor()); executors.put(Names.SAME, MoreExecutors.sameThreadExecutor());
this.executors = ImmutableMap.copyOf(executors); this.executors = ImmutableMap.copyOf(executors);