Refactoring of Gateway*** classes (#26706)

- Removes mutual dependency between GatewayMetaState and TransportNodesListGatewayMetaState
- Deguices MetaDataIndexUpgradeService
- Deguices GatewayMetaState
- Makes Gateway the master-level component that is only responsible for coordinating the state recovery
This commit is contained in:
Yannick Welsch 2017-09-20 12:51:58 +02:00 committed by GitHub
parent b3819e7f30
commit 5f407062ad
9 changed files with 21 additions and 40 deletions

View File

@ -18,13 +18,11 @@
*/ */
package org.elasticsearch.cluster.metadata; package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -35,7 +33,6 @@ import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.plugins.Plugin;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Collection; import java.util.Collection;
@ -59,7 +56,6 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
private final IndexScopedSettings indexScopedSettings; private final IndexScopedSettings indexScopedSettings;
private final UnaryOperator<IndexMetaData> upgraders; private final UnaryOperator<IndexMetaData> upgraders;
@Inject
public MetaDataIndexUpgradeService(Settings settings, NamedXContentRegistry xContentRegistry, MapperRegistry mapperRegistry, public MetaDataIndexUpgradeService(Settings settings, NamedXContentRegistry xContentRegistry, MapperRegistry mapperRegistry,
IndexScopedSettings indexScopedSettings, IndexScopedSettings indexScopedSettings,
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders) { Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders) {

View File

@ -23,9 +23,7 @@ import com.carrotsearch.hppc.ObjectFloatHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
@ -39,27 +37,23 @@ import org.elasticsearch.indices.IndicesService;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
public class Gateway extends AbstractComponent implements ClusterStateApplier { public class Gateway extends AbstractComponent {
private final ClusterService clusterService; private final ClusterService clusterService;
private final GatewayMetaState metaState;
private final TransportNodesListGatewayMetaState listGatewayMetaState; private final TransportNodesListGatewayMetaState listGatewayMetaState;
private final int minimumMasterNodes; private final int minimumMasterNodes;
private final IndicesService indicesService; private final IndicesService indicesService;
public Gateway(Settings settings, ClusterService clusterService, GatewayMetaState metaState, public Gateway(Settings settings, ClusterService clusterService,
TransportNodesListGatewayMetaState listGatewayMetaState, TransportNodesListGatewayMetaState listGatewayMetaState,
IndicesService indicesService) { IndicesService indicesService) {
super(settings); super(settings);
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.metaState = metaState;
this.listGatewayMetaState = listGatewayMetaState; this.listGatewayMetaState = listGatewayMetaState;
this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
clusterService.addLowPriorityApplier(this);
} }
public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException { public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
@ -174,13 +168,6 @@ public class Gateway extends AbstractComponent implements ClusterStateApplier {
ex); ex);
} }
@Override
public void applyClusterState(final ClusterChangedEvent event) {
// order is important, first metaState, and then shardsState
// so dangling indices will be recorded
metaState.applyClusterState(event);
}
public interface GatewayStateRecoveredListener { public interface GatewayStateRecoveredListener {
void onSuccess(ClusterState build); void onSuccess(ClusterState build);

View File

@ -33,7 +33,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.IndexFolderUpgrader; import org.elasticsearch.common.util.IndexFolderUpgrader;
@ -69,15 +68,11 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
private volatile Set<Index> previouslyWrittenIndices = emptySet(); private volatile Set<Index> previouslyWrittenIndices = emptySet();
@Inject
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService, public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
TransportNodesListGatewayMetaState nodesListGatewayMetaState, MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) throws IOException {
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader)
throws Exception {
super(settings); super(settings);
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService; this.metaStateService = metaStateService;
nodesListGatewayMetaState.init(this);
if (DiscoveryNode.isDataNode(settings)) { if (DiscoveryNode.isDataNode(settings)) {
ensureNoPre019ShardState(nodeEnv); ensureNoPre019ShardState(nodeEnv);
@ -210,7 +205,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
/** /**
* Throws an IAE if a pre 0.19 state is detected * Throws an IAE if a pre 0.19 state is detected
*/ */
private void ensureNoPre019State() throws Exception { private void ensureNoPre019State() throws IOException {
for (Path dataLocation : nodeEnv.nodeDataPaths()) { for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME); final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (!Files.exists(stateLocation)) { if (!Files.exists(stateLocation)) {
@ -242,7 +237,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
*/ */
static MetaData upgradeMetaData(MetaData metaData, static MetaData upgradeMetaData(MetaData metaData,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) throws Exception { MetaDataUpgrader metaDataUpgrader) throws IOException {
// upgrade index meta data // upgrade index meta data
boolean changed = false; boolean changed = false;
final MetaData.Builder upgradedMetaData = MetaData.builder(metaData); final MetaData.Builder upgradedMetaData = MetaData.builder(metaData);
@ -288,7 +283,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
} }
// shard state BWC // shard state BWC
private void ensureNoPre019ShardState(NodeEnvironment nodeEnv) throws Exception { private void ensureNoPre019ShardState(NodeEnvironment nodeEnv) throws IOException {
for (Path dataLocation : nodeEnv.nodeDataPaths()) { for (Path dataLocation : nodeEnv.nodeDataPaths()) {
final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME); final Path stateLocation = dataLocation.resolve(MetaDataStateFormat.STATE_DIR_NAME);
if (Files.exists(stateLocation)) { if (Files.exists(stateLocation)) {

View File

@ -30,7 +30,6 @@ public class GatewayModule extends AbstractModule {
bind(DanglingIndicesState.class).asEagerSingleton(); bind(DanglingIndicesState.class).asEagerSingleton();
bind(GatewayService.class).asEagerSingleton(); bind(GatewayService.class).asEagerSingleton();
bind(TransportNodesListGatewayMetaState.class).asEagerSingleton(); bind(TransportNodesListGatewayMetaState.class).asEagerSingleton();
bind(GatewayMetaState.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton(); bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton();
bind(LocalAllocateDangledIndices.class).asEagerSingleton(); bind(LocalAllocateDangledIndices.class).asEagerSingleton();
} }

View File

@ -95,7 +95,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
TransportNodesListGatewayMetaState listGatewayMetaState, TransportNodesListGatewayMetaState listGatewayMetaState,
IndicesService indicesService) { IndicesService indicesService) {
super(settings); super(settings);
this.gateway = new Gateway(settings, clusterService, metaState, listGatewayMetaState, this.gateway = new Gateway(settings, clusterService, listGatewayMetaState,
indicesService); indicesService);
this.allocationService = allocationService; this.allocationService = allocationService;
this.clusterService = clusterService; this.clusterService = clusterService;
@ -121,6 +121,8 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
// TODO: change me once the minimum_master_nodes is changed too // TODO: change me once the minimum_master_nodes is changed too
recoverAfterMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1); recoverAfterMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
} }
clusterService.addLowPriorityApplier(metaState);
} }
@Override @Override

View File

@ -133,7 +133,7 @@ public class MetaStateService extends AbstractComponent {
/** /**
* Writes the global state, *without* the indices states. * Writes the global state, *without* the indices states.
*/ */
void writeGlobalState(String reason, MetaData metaData) throws Exception { void writeGlobalState(String reason, MetaData metaData) throws IOException {
logger.trace("[_global] writing state, reason [{}]", reason); logger.trace("[_global] writing state, reason [{}]", reason);
try { try {
MetaData.FORMAT.write(metaData, nodeEnv.nodeDataPaths()); MetaData.FORMAT.write(metaData, nodeEnv.nodeDataPaths());

View File

@ -52,19 +52,16 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
public static final String ACTION_NAME = "internal:gateway/local/meta_state"; public static final String ACTION_NAME = "internal:gateway/local/meta_state";
private GatewayMetaState metaState; private final GatewayMetaState metaState;
@Inject @Inject
public TransportNodesListGatewayMetaState(Settings settings, ThreadPool threadPool, public TransportNodesListGatewayMetaState(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
GatewayMetaState metaState) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters, super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class); indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);
}
TransportNodesListGatewayMetaState init(GatewayMetaState metaState) {
this.metaState = metaState; this.metaState = metaState;
return this;
} }
public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) { public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {

View File

@ -100,6 +100,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService; import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
@ -416,6 +417,10 @@ public class Node implements Closeable {
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList()); .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,
metaDataIndexUpgradeService, metaDataUpgrader);
new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders); new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get(); final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool, final TransportService transportService = newTransportService(settings, transport, threadPool,
@ -475,9 +480,9 @@ public class Node implements Closeable {
b.bind(TransportService.class).toInstance(transportService); b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService); b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService())); b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings, xContentRegistry, b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders));
b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{ {
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());

View File

@ -91,7 +91,7 @@ public class RecoveryWithUnsupportedIndicesIT extends ESIntegTestCase {
internalCluster().startNode(nodeSettings); internalCluster().startNode(nodeSettings);
fail(); fail();
} catch (Exception ex) { } catch (Exception ex) {
assertThat(ex.getMessage(), containsString(" was created before v2.0.0.beta1 and wasn't upgraded")); assertThat(ex.getCause().getCause().getMessage(), containsString(" was created before v2.0.0.beta1 and wasn't upgraded"));
} }
} }
} }