From 2058edc117c0fb14b570f46bffa4cdec397039c4 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 23 Jan 2014 14:41:25 +0100 Subject: [PATCH] Made sure rivers _meta documents are retrieved via get with preference _primary Closes #4864 --- .../org/elasticsearch/river/RiversService.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/elasticsearch/river/RiversService.java b/src/main/java/org/elasticsearch/river/RiversService.java index b7ce20f77aa..e924158d11a 100644 --- a/src/main/java/org/elasticsearch/river/RiversService.java +++ b/src/main/java/org/elasticsearch/river/RiversService.java @@ -27,6 +27,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; +import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; @@ -252,23 +253,29 @@ public class RiversService extends AbstractLifecycleComponent { for (final RiverRouting routing : state.routing()) { // not allocated if (routing.node() == null) { + logger.trace("river {} has no routing node", routing.riverName().getName()); continue; } // only apply changes to the local node if (!routing.node().equals(localNode)) { + logger.trace("river {} belongs to node {}", routing.riverName().getName(), routing.node()); continue; } // if its already created, ignore it if (rivers.containsKey(routing.riverName())) { + logger.trace("river {} is already allocated", routing.riverName().getName()); continue; } - client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").setListenerThreaded(true).execute(new ActionListener() { + prepareGetMetaDocument(routing.riverName().name()).execute(new ActionListener() { @Override public void onResponse(GetResponse getResponse) { if (!rivers.containsKey(routing.riverName())) { if (getResponse.isExists()) { // only create the river if it exists, otherwise, the indexing meta data has not been visible yet... createRiver(routing.riverName(), getResponse.getSourceAsMap()); + } else { + //this should never happen as we've just found the _meta document in RiversRouter + logger.warn("{}/{}/_meta document not found", riverIndexName, routing.riverName().getName()); } } } @@ -286,7 +293,7 @@ public class RiversService extends AbstractLifecycleComponent { threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() { @Override public void run() { - client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").setListenerThreaded(true).execute(listener); + prepareGetMetaDocument(routing.riverName().name()).execute(listener); } }); } catch (EsRejectedExecutionException ex) { @@ -299,5 +306,9 @@ public class RiversService extends AbstractLifecycleComponent { }); } } + + private GetRequestBuilder prepareGetMetaDocument(String riverName) { + return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary").setListenerThreaded(true); + } } }