Remove Cluster State Status (elastic/elasticsearch#4313)

A companion PR for https://github.com/elastic/elasticsearch/pull/21817

Original commit: elastic/x-pack-elasticsearch@392235877f
This commit is contained in:
Boaz Leskes 2016-12-15 17:07:02 +01:00 committed by GitHub
parent e6ee905931
commit b2972a142c
13 changed files with 34 additions and 45 deletions

View File

@ -310,7 +310,7 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
@Override
protected void doStart() throws ElasticsearchException {
clusterService.add(this);
clusterService.addListener(this);
scheduler.start(Collections.emptyList());
logger.debug("initializing license state");
final ClusterState clusterState = clusterService.state();
@ -329,7 +329,7 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
@Override
protected void doStop() throws ElasticsearchException {
clusterService.remove(this);
clusterService.removeListener(this);
scheduler.stop();
// clear current license
currentLicense.set(null);

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.monitoring.exporter.local;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
@ -93,7 +92,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
}
}
clusterService.add(this);
clusterService.addListener(this);
cleanerService.add(this);
}
@ -120,7 +119,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
public void doClose() {
if (state.getAndSet(State.TERMINATED) != State.TERMINATED) {
logger.trace("stopped");
clusterService.remove(this);
clusterService.removeListener(this);
cleanerService.remove(this);
}
}

View File

@ -49,10 +49,10 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
// TODO: define a common interface for these and delegate from one place. nativeUserStore store is it's on
// cluster
// state listener , but is also activated from this clusterChanged method
clusterService.add(this);
clusterService.add(nativeUserStore);
clusterService.add(nativeRolesStore);
clusterService.add(new SecurityTemplateService(settings, clusterService, client));
clusterService.addListener(this);
clusterService.addListener(nativeUserStore);
clusterService.addListener(nativeRolesStore);
clusterService.addListener(new SecurityTemplateService(settings, client));
clusterService.addLifecycleListener(new LifecycleListener() {
@Override

View File

@ -21,7 +21,6 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedXContent;
@ -55,11 +54,9 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
final AtomicBoolean templateCreationPending = new AtomicBoolean(false);
final AtomicBoolean updateMappingPending = new AtomicBoolean(false);
public SecurityTemplateService(Settings settings, ClusterService clusterService,
InternalClient client) {
public SecurityTemplateService(Settings settings, InternalClient client) {
super(settings);
this.client = client;
clusterService.add(this);
}
@Override

View File

@ -82,21 +82,21 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import static org.elasticsearch.xpack.security.Security.setting;
import static org.elasticsearch.xpack.security.audit.AuditLevel.REALM_AUTHENTICATION_FAILED;
import static org.elasticsearch.xpack.security.audit.AuditUtil.indices;
import static org.elasticsearch.xpack.security.audit.AuditUtil.restRequestContent;
import static org.elasticsearch.xpack.security.audit.AuditLevel.ACCESS_DENIED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.ACCESS_GRANTED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.ANONYMOUS_ACCESS_DENIED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.AUTHENTICATION_FAILED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.AUTHENTICATION_SUCCESS;
import static org.elasticsearch.xpack.security.audit.AuditLevel.CONNECTION_DENIED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.CONNECTION_GRANTED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.REALM_AUTHENTICATION_FAILED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.RUN_AS_DENIED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.RUN_AS_GRANTED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.SYSTEM_ACCESS_GRANTED;
import static org.elasticsearch.xpack.security.audit.AuditLevel.TAMPERED_REQUEST;
import static org.elasticsearch.xpack.security.audit.AuditLevel.AUTHENTICATION_SUCCESS;
import static org.elasticsearch.xpack.security.audit.AuditLevel.parse;
import static org.elasticsearch.xpack.security.audit.AuditUtil.indices;
import static org.elasticsearch.xpack.security.audit.AuditUtil.restRequestContent;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.resolve;
/**
@ -317,7 +317,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
private void innerStart() {
if (indexToRemoteCluster == false) {
this.clusterService.add(this);
this.clusterService.addListener(this);
}
initializeBulkProcessor();
queueConsumer.start();

View File

@ -400,7 +400,7 @@ public class SecurityIndexSearcherWrapper extends IndexSearcherWrapper {
}
};
QueryRewriteContext copy = new QueryRewriteContext(original.getIndexSettings(), original.getMapperService(), scriptService, null,
client, original.getIndexReader(), original.getClusterState(), original::nowInMillis);
client, original.getIndexReader(), original::nowInMillis);
queryBuilder.rewrite(copy);
}
}

View File

@ -11,7 +11,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -26,7 +26,7 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
import java.util.concurrent.CountDownLatch;
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateApplier {
private final ThreadPool threadPool;
private final WatcherService watcherService;
@ -40,7 +40,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
this.threadPool = threadPool;
this.watcherService = watcherService;
this.clusterService = clusterService;
clusterService.add(this);
clusterService.addStateApplier(this);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an watch is scheduled.
clusterService.addLifecycleListener(new LifecycleListener() {
@ -104,7 +104,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
}
@Override
public void clusterChanged(final ClusterChangedEvent event) {
public void applyClusterState(final ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
// a .triggered_watches index, but they may not have been restored from the cluster state on disk

View File

@ -68,7 +68,7 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indexTemplates = TEMPLATE_CONFIGS;
clusterService.add(this);
clusterService.addListener(this);
Map<String, Settings> customIndexSettings = new HashMap<>();
for (TemplateConfig indexTemplate : indexTemplates) {

View File

@ -83,7 +83,7 @@ public class SecurityTemplateServiceTests extends ESTestCase {
}
}
client = new IClient(transportClient);
securityTemplateService = new SecurityTemplateService(Settings.EMPTY, clusterService, client);
securityTemplateService = new SecurityTemplateService(Settings.EMPTY, client);
listeners = new CopyOnWriteArrayList<>();
}

View File

@ -11,7 +11,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -173,7 +172,7 @@ public class SecurityTribeIT extends NativeRealmIntegTestCase {
final int cluster2Nodes = cluster2.size();
logger.info("waiting for [{}] nodes to be added to the tribe cluster state", cluster1Nodes + cluster2Nodes + 2);
final Predicate<ClusterState> nodeCountPredicate = state -> state.nodes().getSize() == cluster1Nodes + cluster2Nodes + 3;
if (nodeCountPredicate.test(observer.observedState().getClusterState()) == false) {
if (nodeCountPredicate.test(observer.observedState()) == false) {
CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
@ -192,12 +191,7 @@ public class SecurityTribeIT extends NativeRealmIntegTestCase {
fail("timed out waiting for nodes to be added to tribe's cluster state");
latch.countDown();
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterServiceState newState) {
return nodeCountPredicate.test(newState.getClusterState());
}
});
}, nodeCountPredicate);
latch.await();
}
}

View File

@ -75,7 +75,7 @@ public class SecurityIndexSearcherWrapperIntegrationTests extends ESTestCase {
when(client.settings()).thenReturn(Settings.EMPTY);
final long nowInMillis = randomPositiveLong();
QueryShardContext realQueryShardContext = new QueryShardContext(shardId.id(), indexSettings, null, null, mapperService, null,
null, indicesQueriesRegistry, client, null, null, () -> nowInMillis);
null, indicesQueriesRegistry, client, null, () -> nowInMillis);
QueryShardContext queryShardContext = spy(realQueryShardContext);
QueryParseContext queryParseContext = mock(QueryParseContext.class);
IndexSettings settings = IndexSettingsModule.newIndexSettings("_index", Settings.EMPTY);

View File

@ -815,8 +815,7 @@ public class SecurityIndexSearcherWrapperUnitTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
final long nowInMillis = randomPositiveLong();
QueryRewriteContext context = new QueryRewriteContext(null, mapperService, scriptService, null, client, null, null,
() -> nowInMillis);
QueryRewriteContext context = new QueryRewriteContext(null, mapperService, scriptService, null, client, null, () -> nowInMillis);
QueryBuilder queryBuilder1 = new TermsQueryBuilder("field", "val1", "val2");
SecurityIndexSearcherWrapper.failIfQueryUsesClient(scriptService, queryBuilder1, context);

View File

@ -68,13 +68,13 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
.nodes(nodes).build();
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
when(watcherService.validate(clusterState)).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, times(1)).start(clusterState);
verify(watcherService, never()).stop();
// Trying to start a second time, but that should have no affect.
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, times(1)).start(clusterState);
verify(watcherService, never()).stop();
@ -82,7 +82,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2");
ClusterState noMasterClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", noMasterClusterState, noMasterClusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", noMasterClusterState, noMasterClusterState));
verify(watcherService, times(1)).stop();
verify(watcherService, times(1)).start(clusterState);
}
@ -93,7 +93,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
.blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK))
.nodes(nodes).build();
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, never()).start(any(ClusterState.class));
}
@ -116,7 +116,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
// Starting via cluster state update, we shouldn't start because we have been stopped manually.
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, times(1)).stop();
@ -130,7 +130,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, times(2)).start(any(ClusterState.class));
verify(watcherService, times(2)).stop();
@ -140,7 +140,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
.nodes(nodes).build();
when(watcherService.validate(clusterState)).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, times(3)).start(any(ClusterState.class));
verify(watcherService, times(2)).stop();
}
@ -185,7 +185,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(discoveryNodes).build();
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", newClusterState, oldClusterState));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
verify(watcherService, times(1)).watchIndexDeletedOrClosed();
@ -207,7 +207,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
.nodes(discoveryNodes).build();
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState));
lifeCycleService.applyClusterState(new ClusterChangedEvent("any", newClusterState, oldClusterState));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
verify(watcherService, times(1)).watchIndexDeletedOrClosed();