allow to specify which executor to use when warming up

This commit is contained in:
Shay Banon 2012-04-29 17:13:52 +03:00
parent 70268a6c41
commit b379225238
2 changed files with 28 additions and 7 deletions

View File

@ -28,6 +28,9 @@ import org.elasticsearch.index.shard.ShardId;
public interface IndicesWarmer { public interface IndicesWarmer {
static interface Listener { static interface Listener {
String executor();
void warm(ShardId shardId, IndexMetaData indexMetaData, Engine.Searcher search); void warm(ShardId shardId, IndexMetaData indexMetaData, Engine.Searcher search);
} }

View File

@ -30,14 +30,18 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
*/ */
public class InternalIndicesWarmer extends AbstractComponent implements IndicesWarmer { public class InternalIndicesWarmer extends AbstractComponent implements IndicesWarmer {
private final ThreadPool threadPool;
private final ClusterService clusterService; private final ClusterService clusterService;
private final IndicesService indicesService; private final IndicesService indicesService;
@ -45,8 +49,9 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>(); private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
@Inject @Inject
public InternalIndicesWarmer(Settings settings, ClusterService clusterService, IndicesService indicesService) { public InternalIndicesWarmer(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService) {
super(settings); super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.indicesService = indicesService; this.indicesService = indicesService;
} }
@ -61,8 +66,8 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
listeners.remove(listener); listeners.remove(listener);
} }
public void warm(ShardId shardId, Engine.Searcher searcher) { public void warm(final ShardId shardId, final Engine.Searcher searcher) {
IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.index().name()); final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.index().name());
if (indexMetaData == null) { if (indexMetaData == null) {
return; return;
} }
@ -82,11 +87,24 @@ public class InternalIndicesWarmer extends AbstractComponent implements IndicesW
} }
indexShard.warmerService().onPreWarm(); indexShard.warmerService().onPreWarm();
long time = System.nanoTime(); long time = System.nanoTime();
for (Listener listener : listeners) { for (final Listener listener : listeners) {
final CountDownLatch latch = new CountDownLatch(1);
threadPool.executor(listener.executor()).execute(new Runnable() {
@Override
public void run() {
try {
listener.warm(shardId, indexMetaData, searcher);
} catch (Exception e) {
logger.warn("[{}][{}] failed to warm [{}]", shardId.index().name(), shardId.id(), listener);
} finally {
latch.countDown();
}
}
});
try { try {
listener.warm(shardId, indexMetaData, searcher); latch.await();
} catch (Exception e) { } catch (InterruptedException e) {
logger.warn("[{}][{}] failed to warm [{}]", shardId.index().name(), shardId.id(), listener); return;
} }
} }
long took = System.nanoTime() - time; long took = System.nanoTime() - time;