Load metadata at start time not construction time (#46326)
Today we load the metadata from disk while constructing the node. However there is no real need to do so, and this commit moves that code to run later while the node is starting instead.
This commit is contained in:
parent
b40ac6dee7
commit
6c67b53932
|
@ -28,7 +28,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.RerouteService;
|
import org.elasticsearch.cluster.routing.RerouteService;
|
||||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
|
||||||
import org.elasticsearch.cluster.service.MasterService;
|
import org.elasticsearch.cluster.service.MasterService;
|
||||||
import org.elasticsearch.common.Randomness;
|
import org.elasticsearch.common.Randomness;
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
|
@ -152,12 +151,12 @@ public class DiscoveryModule {
|
||||||
if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
|
if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
|
||||||
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
|
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
|
||||||
settings, clusterSettings,
|
settings, clusterSettings,
|
||||||
transportService, namedWriteableRegistry, allocationService, masterService,
|
transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState,
|
||||||
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
|
seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService,
|
||||||
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy);
|
electionStrategy);
|
||||||
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
|
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
|
||||||
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
|
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
|
||||||
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, rerouteService);
|
clusterSettings, seedHostsProvider, allocationService, joinValidators, rerouteService);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
|
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||||
import org.elasticsearch.cluster.NotMasterException;
|
import org.elasticsearch.cluster.NotMasterException;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||||
|
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
|
||||||
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
|
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
|
||||||
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
|
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
|
||||||
import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor;
|
import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor;
|
||||||
|
@ -40,7 +41,6 @@ import org.elasticsearch.cluster.routing.RerouteService;
|
||||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||||
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
|
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
|
||||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
|
||||||
import org.elasticsearch.cluster.service.MasterService;
|
import org.elasticsearch.cluster.service.MasterService;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
|
@ -59,10 +59,8 @@ import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.DiscoveryStats;
|
import org.elasticsearch.discovery.DiscoveryStats;
|
||||||
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
|
|
||||||
import org.elasticsearch.discovery.SeedHostsProvider;
|
import org.elasticsearch.discovery.SeedHostsProvider;
|
||||||
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
|
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
|
||||||
import org.elasticsearch.gateway.GatewayMetaState;
|
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||||
|
@ -165,8 +163,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||||
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
|
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||||
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
|
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
|
||||||
ClusterSettings clusterSettings, SeedHostsProvider hostsProvider, AllocationService allocationService,
|
ClusterSettings clusterSettings, SeedHostsProvider hostsProvider, AllocationService allocationService,
|
||||||
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, GatewayMetaState gatewayMetaState,
|
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, RerouteService rerouteService) {
|
||||||
RerouteService rerouteService) {
|
|
||||||
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
|
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
|
||||||
this.masterService = masterService;
|
this.masterService = masterService;
|
||||||
this.clusterApplier = clusterApplier;
|
this.clusterApplier = clusterApplier;
|
||||||
|
@ -234,10 +231,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||||
|
|
||||||
transportService.registerRequestHandler(
|
transportService.registerRequestHandler(
|
||||||
DISCOVERY_REJOIN_ACTION_NAME, ThreadPool.Names.SAME, RejoinClusterRequest::new, new RejoinClusterRequestHandler());
|
DISCOVERY_REJOIN_ACTION_NAME, ThreadPool.Names.SAME, RejoinClusterRequest::new, new RejoinClusterRequestHandler());
|
||||||
|
|
||||||
if (clusterApplier instanceof ClusterApplierService) {
|
|
||||||
((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// protected to allow overriding in tests
|
// protected to allow overriding in tests
|
||||||
|
|
|
@ -23,12 +23,12 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
import org.apache.lucene.util.SetOnce;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
|
||||||
import org.elasticsearch.cluster.coordination.CoordinationState;
|
|
||||||
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
|
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
|
||||||
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
|
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
@ -38,12 +38,12 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.discovery.DiscoveryModule;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.plugins.MetaDataUpgrader;
|
import org.elasticsearch.plugins.MetaDataUpgrader;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
@ -71,41 +71,71 @@ import java.util.function.UnaryOperator;
|
||||||
* elected as master, it requests metaData from other master eligible nodes. After that, master node performs re-conciliation on the
|
* elected as master, it requests metaData from other master eligible nodes. After that, master node performs re-conciliation on the
|
||||||
* gathered results, re-creates {@link ClusterState} and broadcasts this state to other nodes in the cluster.
|
* gathered results, re-creates {@link ClusterState} and broadcasts this state to other nodes in the cluster.
|
||||||
*/
|
*/
|
||||||
public class GatewayMetaState implements ClusterStateApplier, CoordinationState.PersistedState {
|
public class GatewayMetaState implements PersistedState {
|
||||||
protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class);
|
protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class);
|
||||||
|
|
||||||
private final MetaStateService metaStateService;
|
private final MetaStateService metaStateService;
|
||||||
private final Settings settings;
|
private final Settings settings;
|
||||||
private final ClusterService clusterService;
|
|
||||||
private final TransportService transportService;
|
|
||||||
|
|
||||||
//there is a single thread executing updateClusterState calls, hence no volatile modifier
|
// On master-eligible Zen2 nodes, we use this very object for the PersistedState (so that the state is actually persisted); on other
|
||||||
|
// nodes we use an InMemoryPersistedState instead and persist using a cluster applier if needed. In all cases it's an error to try and
|
||||||
|
// use this object as a PersistedState before calling start(). TODO stop implementing PersistedState at the top level.
|
||||||
|
private final SetOnce<PersistedState> persistedState = new SetOnce<>();
|
||||||
|
|
||||||
|
// on master-eligible nodes we call updateClusterState under the Coordinator's mutex; on master-ineligible data nodes we call
|
||||||
|
// updateClusterState on the (unique) cluster applier thread; on other nodes we never call updateClusterState. In all cases there's no
|
||||||
|
// need to synchronize access to these variables.
|
||||||
protected Manifest previousManifest;
|
protected Manifest previousManifest;
|
||||||
protected ClusterState previousClusterState;
|
protected ClusterState previousClusterState;
|
||||||
protected boolean incrementalWrite;
|
protected boolean incrementalWrite;
|
||||||
|
|
||||||
public GatewayMetaState(Settings settings, MetaStateService metaStateService,
|
public GatewayMetaState(Settings settings, MetaStateService metaStateService) {
|
||||||
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader,
|
|
||||||
TransportService transportService, ClusterService clusterService) throws IOException {
|
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.metaStateService = metaStateService;
|
this.metaStateService = metaStateService;
|
||||||
this.transportService = transportService;
|
|
||||||
this.clusterService = clusterService;
|
|
||||||
|
|
||||||
upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
|
|
||||||
initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings));
|
|
||||||
incrementalWrite = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public PersistedState getPersistedState(Settings settings, ClusterApplierService clusterApplierService) {
|
public void start(TransportService transportService, ClusterService clusterService,
|
||||||
applyClusterStateUpdaters();
|
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) {
|
||||||
if (DiscoveryNode.isMasterNode(settings) == false) {
|
assert previousClusterState == null : "should only start once, but already have " + previousClusterState;
|
||||||
// use Zen1 way of writing cluster state for non-master-eligible nodes
|
try {
|
||||||
// this avoids concurrent manipulating of IndexMetadata with IndicesStore
|
upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
|
||||||
clusterApplierService.addLowPriorityApplier(this);
|
initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings));
|
||||||
return new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState());
|
} catch (IOException e) {
|
||||||
|
throw new ElasticsearchException("failed to load metadata", e);
|
||||||
|
}
|
||||||
|
incrementalWrite = false;
|
||||||
|
|
||||||
|
applyClusterStateUpdaters(transportService, clusterService);
|
||||||
|
if (DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings).equals(DiscoveryModule.ZEN_DISCOVERY_TYPE)) {
|
||||||
|
// only for tests that simulate a mixed Zen1/Zen2 clusters, see Zen1IT
|
||||||
|
if (isMasterOrDataNode()) {
|
||||||
|
clusterService.addLowPriorityApplier(this::applyClusterState);
|
||||||
|
}
|
||||||
|
persistedState.set(new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState()));
|
||||||
|
} else {
|
||||||
|
if (DiscoveryNode.isMasterNode(settings) == false) {
|
||||||
|
if (DiscoveryNode.isDataNode(settings)) {
|
||||||
|
// Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's
|
||||||
|
// vitally important to the safety of the cluster coordination system that master-eligible nodes persist this metadata
|
||||||
|
// when _accepting_ the cluster state (i.e. before it is committed). This persistence happens on the generic threadpool.
|
||||||
|
//
|
||||||
|
// In contrast, master-ineligible data nodes only persist the index metadata for shards that they hold. When all shards
|
||||||
|
// of an index are moved off such a node the IndicesStore is responsible for removing the corresponding index directory,
|
||||||
|
// including the metadata, and does so on the cluster applier thread.
|
||||||
|
//
|
||||||
|
// This presents a problem: if a shard is unassigned from a node and then reassigned back to it again then there is a
|
||||||
|
// race between the IndicesStore deleting the index folder and the CoordinationState concurrently trying to write the
|
||||||
|
// updated metadata into it. We could probably solve this with careful synchronization, but in fact there is no need.
|
||||||
|
// The persisted state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index
|
||||||
|
// imports, which is inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes
|
||||||
|
// until applying the cluster state, which is what this does:
|
||||||
|
clusterService.addLowPriorityApplier(this::applyClusterState);
|
||||||
|
}
|
||||||
|
persistedState.set(new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState()));
|
||||||
|
} else {
|
||||||
|
persistedState.set(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeClusterState(ClusterName clusterName) throws IOException {
|
private void initializeClusterState(ClusterName clusterName) throws IOException {
|
||||||
|
@ -122,7 +152,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
|
||||||
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
|
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void applyClusterStateUpdaters() {
|
protected void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) {
|
||||||
assert previousClusterState.nodes().getLocalNode() == null : "applyClusterStateUpdaters must only be called once";
|
assert previousClusterState.nodes().getLocalNode() == null : "applyClusterStateUpdaters must only be called once";
|
||||||
assert transportService.getLocalNode() != null : "transport service is not yet started";
|
assert transportService.getLocalNode() != null : "transport service is not yet started";
|
||||||
|
|
||||||
|
@ -181,15 +211,18 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState.
|
||||||
return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
|
return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public PersistedState getPersistedState() {
|
||||||
|
final PersistedState persistedState = this.persistedState.get();
|
||||||
|
assert persistedState != null : "not started";
|
||||||
|
return persistedState;
|
||||||
|
}
|
||||||
|
|
||||||
public MetaData getMetaData() {
|
public MetaData getMetaData() {
|
||||||
return previousClusterState.metaData();
|
return previousClusterState.metaData();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void applyClusterState(ClusterChangedEvent event) {
|
||||||
public void applyClusterState(ClusterChangedEvent event) {
|
assert isMasterOrDataNode();
|
||||||
if (isMasterOrDataNode() == false) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event.state().blocks().disableStatePersistence()) {
|
if (event.state().blocks().disableStatePersistence()) {
|
||||||
incrementalWrite = false;
|
incrementalWrite = false;
|
||||||
|
|
|
@ -158,7 +158,6 @@ import org.elasticsearch.usage.UsageService;
|
||||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
|
|
||||||
import javax.net.ssl.SNIHostName;
|
import javax.net.ssl.SNIHostName;
|
||||||
|
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -483,8 +482,7 @@ public class Node implements Closeable {
|
||||||
).collect(Collectors.toSet());
|
).collect(Collectors.toSet());
|
||||||
final TransportService transportService = newTransportService(settings, transport, threadPool,
|
final TransportService transportService = newTransportService(settings, transport, threadPool,
|
||||||
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
|
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
|
||||||
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService,
|
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService);
|
||||||
metaDataIndexUpgradeService, metaDataUpgrader, transportService, clusterService);
|
|
||||||
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
|
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
|
||||||
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
|
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
|
||||||
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
|
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
|
||||||
|
@ -699,14 +697,14 @@ public class Node implements Closeable {
|
||||||
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
|
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
|
||||||
: "transportService has a different local node than the factory provided";
|
: "transportService has a different local node than the factory provided";
|
||||||
injector.getInstance(PeerRecoverySourceService.class).start();
|
injector.getInstance(PeerRecoverySourceService.class).start();
|
||||||
final MetaData onDiskMetadata;
|
|
||||||
|
// Load (and maybe upgrade) the metadata stored on disk
|
||||||
|
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
|
||||||
|
gatewayMetaState.start(transportService, clusterService,
|
||||||
|
injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class));
|
||||||
// we load the global state here (the persistent part of the cluster state stored on disk) to
|
// we load the global state here (the persistent part of the cluster state stored on disk) to
|
||||||
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
|
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
|
||||||
if (DiscoveryNode.isMasterNode(settings()) || DiscoveryNode.isDataNode(settings())) {
|
final MetaData onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metaData();
|
||||||
onDiskMetadata = injector.getInstance(GatewayMetaState.class).getMetaData();
|
|
||||||
} else {
|
|
||||||
onDiskMetadata = MetaData.EMPTY_META_DATA;
|
|
||||||
}
|
|
||||||
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
|
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
|
||||||
validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),
|
validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),
|
||||||
pluginsService.filterPlugins(Plugin.class).stream()
|
pluginsService.filterPlugins(Plugin.class).stream()
|
||||||
|
|
|
@ -28,9 +28,11 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.apache.lucene.util.TestUtil;
|
import org.apache.lucene.util.TestUtil;
|
||||||
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
import org.elasticsearch.gateway.CorruptStateException;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
@ -42,7 +44,7 @@ public class RecoveryWithUnsupportedIndicesIT extends ESIntegTestCase {
|
||||||
/**
|
/**
|
||||||
* Return settings that could be used to start a node that has the given zipped home directory.
|
* Return settings that could be used to start a node that has the given zipped home directory.
|
||||||
*/
|
*/
|
||||||
protected Settings prepareBackwardsDataDir(Path backwardsIndex) throws IOException {
|
private Settings prepareBackwardsDataDir(Path backwardsIndex) throws IOException {
|
||||||
Path indexDir = createTempDir();
|
Path indexDir = createTempDir();
|
||||||
Path dataDir = indexDir.resolve("data");
|
Path dataDir = indexDir.resolve("data");
|
||||||
try (InputStream stream = Files.newInputStream(backwardsIndex)) {
|
try (InputStream stream = Files.newInputStream(backwardsIndex)) {
|
||||||
|
@ -86,7 +88,8 @@ public class RecoveryWithUnsupportedIndicesIT extends ESIntegTestCase {
|
||||||
|
|
||||||
logger.info("Checking static index {}", indexName);
|
logger.info("Checking static index {}", indexName);
|
||||||
Settings nodeSettings = prepareBackwardsDataDir(getDataPath("/indices/bwc").resolve(indexName + ".zip"));
|
Settings nodeSettings = prepareBackwardsDataDir(getDataPath("/indices/bwc").resolve(indexName + ".zip"));
|
||||||
assertThat(expectThrows(Exception.class, () -> internalCluster().startNode(nodeSettings))
|
assertThat(ExceptionsHelper.unwrap(
|
||||||
.getCause().getCause().getMessage(), containsString("Format version is not supported"));
|
expectThrows(Exception.class, () -> internalCluster().startNode(nodeSettings)), CorruptStateException.class).getMessage(),
|
||||||
|
containsString("Format version is not supported"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
|
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
|
||||||
import org.elasticsearch.discovery.zen.ZenDiscovery.ZenNodeRemovalClusterStateTaskExecutor;
|
import org.elasticsearch.discovery.zen.ZenDiscovery.ZenNodeRemovalClusterStateTaskExecutor;
|
||||||
import org.elasticsearch.gateway.GatewayMetaState;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ClusterServiceUtils;
|
import org.elasticsearch.test.ClusterServiceUtils;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -369,8 +368,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
||||||
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service,
|
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service,
|
||||||
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
|
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
|
||||||
masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(),
|
masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(),
|
||||||
ESAllocationTestCase.createAllocationService(),
|
ESAllocationTestCase.createAllocationService(), Collections.emptyList(), (s, p, r) -> {});
|
||||||
Collections.emptyList(), mock(GatewayMetaState.class), (s, p, r) -> {});
|
|
||||||
zenDiscovery.start();
|
zenDiscovery.start();
|
||||||
return zenDiscovery;
|
return zenDiscovery;
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,9 +63,9 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private MockGatewayMetaState newGateway() throws IOException {
|
private MockGatewayMetaState newGateway() {
|
||||||
MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode);
|
final MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode);
|
||||||
gateway.applyClusterStateUpdaters();
|
gateway.start();
|
||||||
return gateway;
|
return gateway;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -703,8 +703,10 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
||||||
if (rarely()) {
|
if (rarely()) {
|
||||||
nodeEnvironment = newNodeEnvironment();
|
nodeEnvironment = newNodeEnvironment();
|
||||||
nodeEnvironments.add(nodeEnvironment);
|
nodeEnvironments.add(nodeEnvironment);
|
||||||
delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode)
|
final MockGatewayMetaState gatewayMetaState
|
||||||
.getPersistedState(Settings.EMPTY, null);
|
= new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode);
|
||||||
|
gatewayMetaState.start();
|
||||||
|
delegate = gatewayMetaState.getPersistedState();
|
||||||
} else {
|
} else {
|
||||||
nodeEnvironment = null;
|
nodeEnvironment = null;
|
||||||
delegate = new InMemoryPersistedState(0L,
|
delegate = new InMemoryPersistedState(0L,
|
||||||
|
@ -734,8 +736,10 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
||||||
new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(),
|
new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(),
|
||||||
manifest.getIndexGenerations()));
|
manifest.getIndexGenerations()));
|
||||||
}
|
}
|
||||||
delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode)
|
final MockGatewayMetaState gatewayMetaState
|
||||||
.getPersistedState(Settings.EMPTY, null);
|
= new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode);
|
||||||
|
gatewayMetaState.start();
|
||||||
|
delegate = gatewayMetaState.getPersistedState();
|
||||||
} else {
|
} else {
|
||||||
nodeEnvironment = null;
|
nodeEnvironment = null;
|
||||||
BytesStreamOutput outStream = new BytesStreamOutput();
|
BytesStreamOutput outStream = new BytesStreamOutput();
|
||||||
|
|
|
@ -28,10 +28,6 @@ import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.plugins.MetaDataUpgrader;
|
import org.elasticsearch.plugins.MetaDataUpgrader;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link GatewayMetaState} constructor accepts a lot of arguments.
|
* {@link GatewayMetaState} constructor accepts a lot of arguments.
|
||||||
* It's not always easy / convenient to construct these dependencies.
|
* It's not always easy / convenient to construct these dependencies.
|
||||||
|
@ -42,10 +38,8 @@ public class MockGatewayMetaState extends GatewayMetaState {
|
||||||
private final DiscoveryNode localNode;
|
private final DiscoveryNode localNode;
|
||||||
|
|
||||||
public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment,
|
public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment,
|
||||||
NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) throws IOException {
|
NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) {
|
||||||
super(settings, new MetaStateService(nodeEnvironment, xContentRegistry),
|
super(settings, new MetaStateService(nodeEnvironment, xContentRegistry));
|
||||||
mock(MetaDataIndexUpgradeService.class), mock(MetaDataUpgrader.class),
|
|
||||||
mock(TransportService.class), mock(ClusterService.class));
|
|
||||||
this.localNode = localNode;
|
this.localNode = localNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,8 +49,12 @@ public class MockGatewayMetaState extends GatewayMetaState {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applyClusterStateUpdaters() {
|
public void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) {
|
||||||
// Just set localNode here, not to mess with ClusterService and IndicesService mocking
|
// Just set localNode here, not to mess with ClusterService and IndicesService mocking
|
||||||
previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode);
|
previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
start(null, null, null, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -958,10 +958,16 @@ public final class InternalTestCluster extends TestCluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
void startNode() {
|
void startNode() {
|
||||||
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
node.start();
|
node.start();
|
||||||
|
success = true;
|
||||||
} catch (NodeValidationException e) {
|
} catch (NodeValidationException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
} finally {
|
||||||
|
if (success == false) {
|
||||||
|
IOUtils.closeWhileHandlingException(node);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue