Made sure rivers _meta documents are retrieved via get with preference _primary

Closes #4864
This commit is contained in:
Luca Cavanna 2014-01-23 14:41:25 +01:00
parent a7a2d9f806
commit 2058edc117
1 changed files with 13 additions and 2 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
@ -252,23 +253,29 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
for (final RiverRouting routing : state.routing()) {
// not allocated
if (routing.node() == null) {
logger.trace("river {} has no routing node", routing.riverName().getName());
continue;
}
// only apply changes to the local node
if (!routing.node().equals(localNode)) {
logger.trace("river {} belongs to node {}", routing.riverName().getName(), routing.node());
continue;
}
// if its already created, ignore it
if (rivers.containsKey(routing.riverName())) {
logger.trace("river {} is already allocated", routing.riverName().getName());
continue;
}
client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").setListenerThreaded(true).execute(new ActionListener<GetResponse>() {
prepareGetMetaDocument(routing.riverName().name()).execute(new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
if (!rivers.containsKey(routing.riverName())) {
if (getResponse.isExists()) {
// only create the river if it exists, otherwise, the indexing meta data has not been visible yet...
createRiver(routing.riverName(), getResponse.getSourceAsMap());
} else {
//this should never happen as we've just found the _meta document in RiversRouter
logger.warn("{}/{}/_meta document not found", riverIndexName, routing.riverName().getName());
}
}
}
@ -286,7 +293,7 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() {
@Override
public void run() {
client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").setListenerThreaded(true).execute(listener);
prepareGetMetaDocument(routing.riverName().name()).execute(listener);
}
});
} catch (EsRejectedExecutionException ex) {
@ -299,5 +306,9 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
});
}
}
private GetRequestBuilder prepareGetMetaDocument(String riverName) {
return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary").setListenerThreaded(true);
}
}
}