add internal force local flag, used by tribe node

tribe node to set it to true so all master read operations will automatically execute on the local tribe node
This commit is contained in:
Shay Banon 2014-01-20 22:40:18 +01:00
parent 78590a8b6b
commit e29659e36d
4 changed files with 21 additions and 9 deletions

View File

@ -41,8 +41,8 @@ However, there are a few exceptions:
clusters. It will pick one of them and discard the other.
* Master level read operations (eg <<cluster-state>>, <<cluster-health>>)
need to have the `local` flag set to `true` as the tribe node does not
have a single master node.
will automatically execute with a local flag set to true since there is
no master.
* Master level write operations (eg <<indices-create-index>>) are not
allowed. These should be performed on a single cluster.

View File

@ -31,11 +31,19 @@ import org.elasticsearch.transport.TransportService;
*/
public abstract class TransportMasterNodeReadOperationAction<Request extends MasterNodeReadOperationRequest, Response extends ActionResponse> extends TransportMasterNodeOperationAction<Request, Response> {
public static final String FORCE_LOCAL_SETTING = "action.master.force_local";
private Boolean forceLocal;
protected TransportMasterNodeReadOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings, transportService, clusterService, threadPool);
this.forceLocal = settings.getAsBoolean(FORCE_LOCAL_SETTING, null);
}
protected final boolean localExecute(Request request) {
if (forceLocal != null) {
return forceLocal;
}
return request.local();
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -57,7 +58,7 @@ import java.util.concurrent.CountDownLatch;
* <p/>
* The tribe node settings make sure the discovery used is "local", but with no master elected. This means no
* write level master node operations will work ({@link org.elasticsearch.discovery.MasterNotDiscoveredException}
* will be thrown), and state level metadata operations should use the local flag.
* will be thrown), and state level metadata operations with automatically use the local flag.
* <p/>
* The state merged from different clusters include the list of nodes, metadata, and routing table. Each node merged
* will have in its tribe which tribe member it came from. Each index merged will have in its settings which tribe
@ -95,6 +96,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM
}
sb.put("gateway.type", "none"); // we shouldn't store anything locally...
sb.put(TransportMasterNodeReadOperationAction.FORCE_LOCAL_SETTING, true);
return sb.build();
}

View File

@ -40,6 +40,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC
import static org.hamcrest.Matchers.equalTo;
/**
* Note, when talking to tribe client, no need to set the local flag on master read operations, it
* does it by default.
*/
public class TribeTests extends ElasticsearchIntegrationTest {
@ -84,7 +86,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState();
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
return tribeState.getMetaData().hasIndex("test1") && tribeState.getMetaData().hasIndex("test2") &&
tribeState.getRoutingTable().hasIndex("test1") && tribeState.getRoutingTable().hasIndex("test2");
}
@ -93,7 +95,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
logger.info("wait till tribe has the same nodes as the 2 clusters");
awaitSameNodeCounts();
assertThat(tribeClient.admin().cluster().prepareHealth().setLocal(true).setWaitForGreenStatus().get().getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(tribeClient.admin().cluster().prepareHealth().setWaitForGreenStatus().get().getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.info("create 2 docs through the tribe node");
tribeClient.prepareIndex("test1", "type1", "1").setSource("field1", "value1").get();
@ -106,7 +108,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState();
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
return tribeState.getMetaData().index("test1").mapping("type1") != null &&
tribeState.getMetaData().index("test2").mapping("type2") != null;
}
@ -125,7 +127,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState();
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
return tribeState.getMetaData().index("test1").mapping("type1") != null && tribeState.getMetaData().index("test1").mapping("type2") != null &&
tribeState.getMetaData().index("test2").mapping("type1") != null && tribeState.getMetaData().index("test2").mapping("type2") != null;
}
@ -144,7 +146,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState();
ClusterState tribeState = tribeNode.client().admin().cluster().prepareState().get().getState();
return tribeState.getMetaData().hasIndex("test1") && !tribeState.getMetaData().hasIndex("test2") &&
tribeState.getRoutingTable().hasIndex("test1") && !tribeState.getRoutingTable().hasIndex("test2");
}
@ -159,7 +161,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
DiscoveryNodes tribeNodes = tribeNode.client().admin().cluster().prepareState().setLocal(true).get().getState().getNodes();
DiscoveryNodes tribeNodes = tribeNode.client().admin().cluster().prepareState().get().getState().getNodes();
return countDataNodesForTribe("t1", tribeNodes) == cluster().client().admin().cluster().prepareState().get().getState().getNodes().dataNodes().size()
&& countDataNodesForTribe("t2", tribeNodes) == cluster2.client().admin().cluster().prepareState().get().getState().getNodes().dataNodes().size();
}