Remove NoneGateway, NoneGatewayAllocator, & NoneGatewayModule

Always use the LocalGateway* equivalents

We already check in the LocalGateway whether a node is a client node, or
is not master-eligible, and skip writing the state there. This allows us
to remove this code that was previously used only for tribe nodes (which
are not master eligible anyway and wouldn't write state) and in
tests (which can shake more bugs out)
This commit is contained in:
Lee Hinman 2014-11-13 11:56:59 +01:00
parent ad408eee85
commit 45408844e7
31 changed files with 55 additions and 605 deletions

View File

@ -239,13 +239,9 @@
# in the gateway, and when the cluster starts up for the first time,
# it will read its state from the gateway.
# There are several types of gateway implementations. For more information, see
# For more information, see
# <http://elasticsearch.org/guide/en/elasticsearch/reference/current/modules-gateway.html>.
# The default gateway type is the "local" gateway (recommended):
#
#gateway.type: local
# Settings below control how and when to start the initial recovery process on
# a full cluster restart (to reuse as much local data as possible when using shared
# gateway).

View File

@ -11,16 +11,16 @@ added or deleted), those changes will be persisted using the gateway.
When the cluster first starts up, the state will be read from the
gateway and applied.
The gateway set on the node level will automatically control the index
gateway that will be used. For example, if the `local` gateway is used,
then automatically, each index created on the node will also use its own
respective index level `local` gateway. In this case, if an index should
not persist its state, it should be explicitly set to `none` (which is
the only other value it can be set to).
The gateway set on the node level will automatically control the index gateway
that will be used. For example, if the `local` gateway is used (the default),
then each index created on the node will automatically use its own respective
index level `local` gateway.
The default gateway used is the
<<modules-gateway-local,local>> gateway.
The `none` gateway option was removed in Elasticsearch 2.0.
[float]
[[recover-after]]
=== Recovery After Nodes / Time

View File

@ -21,7 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGatewayAllocator;
import org.elasticsearch.gateway.local.LocalGatewayAllocator;
/**
*/
@ -37,7 +37,7 @@ public class ShardsAllocatorModule extends AbstractModule {
private Class<? extends ShardsAllocator> shardsAllocator;
private Class<? extends GatewayAllocator> gatewayAllocator = NoneGatewayAllocator.class;
private Class<? extends GatewayAllocator> gatewayAllocator = LocalGatewayAllocator.class;
public ShardsAllocatorModule(Settings settings) {
this.settings = settings;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@ -29,7 +28,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGatewayAllocator;
/**
* The {@link ShardsAllocator} class offers methods for allocating shard within a cluster.
@ -41,12 +39,12 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
private final GatewayAllocator gatewayAllocator;
private final ShardsAllocator allocator;
public ShardsAllocators() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
public ShardsAllocators(GatewayAllocator allocator) {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, allocator);
}
public ShardsAllocators(Settings settings) {
this(settings, new NoneGatewayAllocator(), new BalancedShardsAllocator(settings));
public ShardsAllocators(Settings settings, GatewayAllocator allocator) {
this(settings, allocator, new BalancedShardsAllocator(settings));
}
@Inject

View File

@ -32,6 +32,8 @@ import org.elasticsearch.gateway.local.LocalGatewayModule;
*/
public class GatewayModule extends AbstractModule implements SpawnModules {
public static String GATEWAY_TYPE_SETTING = "gateway.type";
private final Settings settings;
public GatewayModule(Settings settings) {
@ -40,7 +42,8 @@ public class GatewayModule extends AbstractModule implements SpawnModules {
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(settings.getAsClass("gateway.type", LocalGatewayModule.class, "org.elasticsearch.gateway.", "GatewayModule"), settings));
Class gateway = settings.getAsClass(GATEWAY_TYPE_SETTING, LocalGatewayModule.class, "org.elasticsearch.gateway.", "GatewayModule");
return ImmutableList.of(Modules.createModule(gateway, settings));
}
@Override

View File

@ -1,134 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.none;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule;
/**
*
*/
public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
public static final String TYPE = "none";
private final NodeEnvironment nodeEnv;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final ClusterName clusterName;
@Nullable
private volatile MetaData currentMetaData;
@Inject
public NoneGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, NodeIndexDeletedAction nodeIndexDeletedAction, ClusterName clusterName) {
super(settings);
this.nodeEnv = nodeEnv;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.clusterName = clusterName;
clusterService.addLast(this);
}
@Override
public String type() {
return TYPE;
}
@Override
public String toString() {
return "_none_";
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
}
@Override
public void performStateRecovery(GatewayStateRecoveredListener listener) throws GatewayException {
logger.debug("performing state recovery");
listener.onSuccess(ClusterState.builder(clusterName).build());
}
@Override
public Class<? extends Module> suggestIndexGateway() {
return NoneIndexGatewayModule.class;
}
@Override
public void reset() {
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
// reset the current metadata, we need to start fresh...
this.currentMetaData = null;
return;
}
MetaData newMetaData = event.state().metaData();
// delete indices that were there before, but are deleted now
// we need to do it so they won't be detected as dangling
if (currentMetaData != null) {
// only delete indices when we already received a state (currentMetaData != null)
for (IndexMetaData current : currentMetaData) {
if (!newMetaData.hasIndex(current.index())) {
logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys());
if (nodeEnv.hasNodeFile()) {
try {
nodeEnv.deleteIndexDirectorySafe(new Index(current.index()));
} catch (Exception ex) {
logger.debug("failed to delete shard locations", ex);
}
}
try {
nodeIndexDeletedAction.nodeIndexStoreDeleted(event.state(), current.index(), event.state().nodes().localNodeId());
} catch (Exception e) {
logger.debug("[{}] failed to notify master on local index store deletion", e, current.index());
}
}
}
}
currentMetaData = newMetaData;
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.none;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.gateway.Gateway;
/**
*/
public class NoneGatewayModule extends AbstractModule implements PreProcessModule {
@Override
public void processModule(Module module) {
if (module instanceof ShardsAllocatorModule) {
((ShardsAllocatorModule) module).setGatewayAllocator(NoneGatewayAllocator.class);
}
}
@Override
protected void configure() {
bind(Gateway.class).to(NoneGateway.class).asEagerSingleton();
}
}

View File

@ -1,58 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.none;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.settings.IndexSettings;
/**
*
*/
public class NoneIndexGateway extends AbstractIndexComponent implements IndexGateway {
@Inject
public NoneIndexGateway(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
}
@Override
public String type() {
return "none";
}
@Override
public Class<? extends IndexShardGateway> shardGatewayClass() {
return NoneIndexShardGateway.class;
}
@Override
public String toString() {
return "_none_";
}
@Override
public void close() {
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.none;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.gateway.IndexGateway;
/**
*
*/
public class NoneIndexGatewayModule extends AbstractModule {
@Override
protected void configure() {
bind(IndexGateway.class).to(NoneIndexGateway.class).asEagerSingleton();
}
}

View File

@ -1,89 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.none;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import java.io.IOException;
/**
*
*/
public class NoneIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
private final InternalIndexShard indexShard;
private final RecoveryState recoveryState = new RecoveryState();
@Inject
public NoneIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) {
super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard;
}
@Override
public String toString() {
return "_none_";
}
@Override
public RecoveryState recoveryState() {
return recoveryState;
}
@Override
public void recover(boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardGatewayRecoveryException {
recoveryState.getIndex().startTime(System.currentTimeMillis());
// in the none case, we simply start the shard
// clean the store, there should be nothing there...
indexShard.store().incRef();
try {
logger.debug("cleaning shard content before creation");
indexShard.store().deleteContent();
} catch (IOException e) {
logger.warn("failed to clean store before starting shard", e);
} finally {
indexShard.store().decRef();
}
indexShard.postRecovery("post recovery from gateway");
recoveryState.getIndex().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
recoveryState.getTranslog().startTime(System.currentTimeMillis());
recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
}
@Override
public String type() {
return NoneGateway.TYPE;
}
@Override
public void close() {
}
}

View File

@ -100,7 +100,6 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
if (sb.get("cluster.name") == null) {
sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM
}
sb.put("gateway.type", "none"); // we shouldn't store anything locally...
sb.put(TransportMasterNodeReadOperationAction.FORCE_LOCAL_SETTING, true);
return sb.build();
}

View File

@ -48,7 +48,6 @@ public class PercolatorStressBenchmark {
public static void main(String[] args) throws Exception {
Settings settings = settingsBuilder()
.put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS)
.put("gateway.type", "none")
.put(SETTING_NUMBER_OF_SHARDS, 4)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.build();

View File

@ -56,7 +56,6 @@ public class SingleThreadBulkStress {
.put("index.refresh_interval", "1s")
.put("index.merge.async", true)
.put("index.translog.flush_threshold_ops", 5000)
.put("gateway.type", "none")
.put(SETTING_NUMBER_OF_SHARDS, shardsCount)
.put(SETTING_NUMBER_OF_REPLICAS, replicaCount)
.build();

View File

@ -45,7 +45,6 @@ public class SingleThreadIndexingStress {
.put("index.refresh_interval", "1s")
.put("index.merge.async", true)
.put("index.translog.flush_threshold_ops", 5000)
.put("gateway.type", "none")
.put(SETTING_NUMBER_OF_SHARDS, 2)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.build();

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
@ -60,7 +59,6 @@ public class TransportClientTests extends ElasticsearchIntegrationTest {
.put("http.enabled", false)
.put("index.store.type", "ram")
.put("config.ignore_system_properties", true) // make sure we get what we set :)
.put("gateway.type", "none")
.build()).clusterName("foobar").build();
node.start();
try {

View File

@ -35,7 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.gateway.none.NoneGatewayAllocator;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.hamcrest.Matchers;
@ -354,7 +354,7 @@ public class BalanceConfigurationTests extends ElasticsearchAllocationTestCase {
ImmutableSettings.Builder settings = settingsBuilder();
AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(),
new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS), getRandom()), new ShardsAllocators(settings.build(),
new NoneGatewayAllocator(), new ShardsAllocator() {
NoopGatewayAllocator.INSTANCE, new ShardsAllocator() {
@Override
public boolean rebalance(RoutingAllocation allocation) {

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
@ -60,7 +61,7 @@ public class RandomAllocationDeciderTests extends ElasticsearchAllocationTestCas
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom());
AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(ImmutableSettings.EMPTY),
randomAllocationDecider))), new ShardsAllocators(), ClusterInfoService.EMPTY);
randomAllocationDecider))), new ShardsAllocators(NoopGatewayAllocator.INSTANCE), ClusterInfoService.EMPTY);
int indices = scaledRandomIntBetween(1, 20);
Builder metaBuilder = MetaData.builder();
int maxNumReplicas = 1;

View File

@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -54,6 +55,10 @@ import static org.hamcrest.Matchers.equalTo;
public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
private static ShardsAllocators makeShardsAllocators() {
return new ShardsAllocators(NoopGatewayAllocator.INSTANCE);
}
@Test
public void diskThresholdTest() {
Settings diskSettings = settingsBuilder()
@ -94,7 +99,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@ -179,7 +184,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -210,7 +215,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -290,7 +295,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
@ -345,7 +350,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -412,7 +417,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -443,7 +448,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -551,7 +556,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
@ -618,7 +623,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
@ -722,7 +727,7 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
.build(), deciders, makeShardsAllocators(), cis);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))

View File

@ -25,7 +25,6 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -82,7 +81,6 @@ public class FileBasedMappingsTests extends ElasticsearchTestCase {
.put("node.name", NAME)
.put("path.conf", configDir.getAbsolutePath())
.put("http.enabled", false)
.put("gateway.type", "none")
.build();
try (Node node = NodeBuilder.nodeBuilder().local(true).data(true).settings(settings).node()) {

View File

@ -19,7 +19,6 @@
package org.elasticsearch.indexlifecycle;
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@ -41,12 +40,13 @@ import org.junit.Test;
import java.util.Set;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.*;
@ -58,17 +58,6 @@ public class IndexLifecycleActionTests extends ElasticsearchIntegrationTest {
@Slow
@Test
public void testIndexLifecycleActions() throws Exception {
if (randomBoolean()) { // both run with @Nightly
testIndexLifecycleActionsWith11Shards0Backup();
} else {
testIndexLifecycleActionsWith11Shards1Backup();
}
}
@Slow
@Nightly
@Test
public void testIndexLifecycleActionsWith11Shards1Backup() throws Exception {
Settings settings = settingsBuilder()
.put(SETTING_NUMBER_OF_SHARDS, 11)
@ -105,7 +94,6 @@ public class IndexLifecycleActionTests extends ElasticsearchIntegrationTest {
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
setMinimumMasterNodes(2);
final String node2 = getLocalNodeId(server_2);
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
@ -230,168 +218,6 @@ public class IndexLifecycleActionTests extends ElasticsearchIntegrationTest {
return nodeId;
}
@Slow
@Nightly
@Test
public void testIndexLifecycleActionsWith11Shards0Backup() throws Exception {
Settings settings = settingsBuilder()
.put(SETTING_NUMBER_OF_SHARDS, 11)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put("cluster.routing.schedule", "20ms") // reroute every 20ms so we identify new nodes fast
.build();
// start one server
logger.info("Starting server1");
final String server_1 = internalCluster().startNode(settings);
final String node1 = getLocalNodeId(server_1);
logger.info("Creating index [test]");
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test")).actionGet();
assertThat(createIndexResponse.isAcknowledged(), equalTo(true));
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getRelocatingShards(), equalTo(0));
assertThat(clusterHealth.getActiveShards(), equalTo(11));
assertThat(clusterHealth.getActivePrimaryShards(), equalTo(11));
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
assertNodesPresent(clusterState.readOnlyRoutingNodes(), node1);
RoutingNode routingNodeEntry1 = clusterState.readOnlyRoutingNodes().node(node1);
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(11));
// start another server
logger.info("Starting server2");
final String server_2 = internalCluster().startNode(settings);
// first wait for 2 nodes in the cluster
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
setMinimumMasterNodes(2);
final String node2 = getLocalNodeId(server_2);
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
client().admin().cluster().prepareReroute().execute().actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2));
assertThat(clusterHealth.getInitializingShards(), equalTo(0));
assertThat(clusterHealth.getUnassignedShards(), equalTo(0));
assertThat(clusterHealth.getRelocatingShards(), equalTo(0));
assertThat(clusterHealth.getActiveShards(), equalTo(11));
assertThat(clusterHealth.getActivePrimaryShards(), equalTo(11));
clusterState = client().admin().cluster().prepareState().get().getState();
assertNodesPresent(clusterState.readOnlyRoutingNodes(), node1, node2);
routingNodeEntry1 = clusterState.readOnlyRoutingNodes().node(node1);
assertThat(routingNodeEntry1.numberOfShardsWithState(RELOCATING), equalTo(0));
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(6), equalTo(5)));
RoutingNode routingNodeEntry2 = clusterState.readOnlyRoutingNodes().node(node2);
assertThat(routingNodeEntry2.numberOfShardsWithState(INITIALIZING), equalTo(0));
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6)));
// start another server
logger.info("Starting server3");
final String server_3 = internalCluster().startNode();
// first wait for 3 nodes in the cluster
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3")).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
final String node3 = getLocalNodeId(server_3);
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
client().admin().cluster().prepareReroute().execute().actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(3));
assertThat(clusterHealth.getInitializingShards(), equalTo(0));
assertThat(clusterHealth.getUnassignedShards(), equalTo(0));
assertThat(clusterHealth.getRelocatingShards(), equalTo(0));
assertThat(clusterHealth.getActiveShards(), equalTo(11));
assertThat(clusterHealth.getActivePrimaryShards(), equalTo(11));
clusterState = client().admin().cluster().prepareState().get().getState();
assertNodesPresent(clusterState.readOnlyRoutingNodes(), node1, node2, node3);
routingNodeEntry1 = clusterState.readOnlyRoutingNodes().node(node1);
routingNodeEntry2 = clusterState.readOnlyRoutingNodes().node(node2);
RoutingNode routingNodeEntry3 = clusterState.readOnlyRoutingNodes().node(node3);
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED) + routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(11));
assertThat(routingNodeEntry1.numberOfShardsWithState(RELOCATING), equalTo(0));
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(4), equalTo(3)));
assertThat(routingNodeEntry2.numberOfShardsWithState(RELOCATING), equalTo(0));
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(4), equalTo(3)));
assertThat(routingNodeEntry3.numberOfShardsWithState(INITIALIZING), equalTo(0));
assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(3));
logger.info("Closing server1");
// kill the first server
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(server_1));
logger.info("Running Cluster Health");
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
client().admin().cluster().prepareReroute().get();
logger.info("Running Cluster Health");
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getRelocatingShards(), equalTo(0));
assertThat(clusterHealth.getActiveShards(), equalTo(11));
assertThat(clusterHealth.getActivePrimaryShards(), equalTo(11));
clusterState = client().admin().cluster().prepareState().get().getState();
assertNodesPresent(clusterState.readOnlyRoutingNodes(), node3, node2);
routingNodeEntry2 = clusterState.readOnlyRoutingNodes().node(node2);
routingNodeEntry3 = clusterState.readOnlyRoutingNodes().node(node3);
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED) + routingNodeEntry3.numberOfShardsWithState(STARTED), equalTo(11));
assertThat(routingNodeEntry2.numberOfShardsWithState(RELOCATING), equalTo(0));
assertThat(routingNodeEntry2.numberOfShardsWithState(INITIALIZING), equalTo(0));
assertThat(routingNodeEntry2.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6)));
assertThat(routingNodeEntry3.numberOfShardsWithState(RELOCATING), equalTo(0));
assertThat(routingNodeEntry3.numberOfShardsWithState(INITIALIZING), equalTo(0));
assertThat(routingNodeEntry3.numberOfShardsWithState(STARTED), anyOf(equalTo(5), equalTo(6)));
logger.info("Deleting index [test]");
// last, lets delete the index
DeleteIndexResponse deleteIndexResponse = client().admin().indices().delete(deleteIndexRequest("test")).actionGet();
assertThat(deleteIndexResponse.isAcknowledged(), equalTo(true));
clusterState = client().admin().cluster().prepareState().get().getState();
assertNodesPresent(clusterState.readOnlyRoutingNodes(), node3, node2);
routingNodeEntry2 = clusterState.readOnlyRoutingNodes().node(node2);
assertThat(routingNodeEntry2.isEmpty(), equalTo(true));
routingNodeEntry3 = clusterState.readOnlyRoutingNodes().node(node3);
assertThat(routingNodeEntry3.isEmpty(), equalTo(true));
}
private void assertNodesPresent(RoutingNodes routingNodes, String... nodes) {
final Set<String> keySet = Sets.newHashSet(Iterables.transform(routingNodes, new Function<RoutingNode, String>() {
@Override

View File

@ -146,7 +146,6 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
.put(EsExecutors.PROCESSORS, 1) // limit the number of threads created
.put("http.enabled", false)
.put("config.ignore_system_properties", true) // make sure we get what we set :)
.put("gateway.type", "none")
.put("indices.memory.interval", "100ms")
.put(settings)
);

View File

@ -35,9 +35,7 @@ public class FilterCacheGcStress {
public static void main(String[] args) {
Settings settings = ImmutableSettings.settingsBuilder()
.put("gateway.type", "none")
.build();
Settings settings = ImmutableSettings.EMPTY;
Node node = NodeBuilder.nodeBuilder().settings(settings).node();
final Client client = node.client();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.stresstest.indexing;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.index.query.QueryBuilders;
@ -29,7 +30,6 @@ import org.elasticsearch.node.Node;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
/**
@ -39,9 +39,7 @@ public class ConcurrentIndexingVersioningStressTest {
public static void main(String[] args) throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "none")
.build();
Settings settings = ImmutableSettings.EMPTY;
Node node1 = nodeBuilder().settings(settings).node();
Node node2 = nodeBuilder().settings(settings).node();

View File

@ -38,14 +38,12 @@ public class RefreshStressTest1 {
Node node = NodeBuilder.nodeBuilder().local(true).loadConfigSettings(false).clusterName("testCluster").settings(
ImmutableSettings.settingsBuilder()
.put("node.name", "node1")
.put("gateway.type", "none")
.put("index.number_of_shards", numberOfShards)
//.put("path.data", new File("target/data").getAbsolutePath())
.build()).node();
Node node2 = NodeBuilder.nodeBuilder().local(true).loadConfigSettings(false).clusterName("testCluster").settings(
ImmutableSettings.settingsBuilder()
.put("node.name", "node2")
.put("gateway.type", "none")
.put("index.number_of_shards", numberOfShards)
//.put("path.data", new File("target/data").getAbsolutePath())
.build()).node();

View File

@ -336,7 +336,6 @@ public class RollingRestartStressTest {
Settings settings = settingsBuilder()
.put("index.shard.check_on_startup", true)
.put("gateway.type", "none")
.put("path.data", "data/data1,data/data2")
.build();

View File

@ -24,8 +24,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
@ -45,11 +43,9 @@ public class ConcurrentSearchSerializationTests {
public static void main(String[] args) throws Exception {
Settings settings = ImmutableSettings.settingsBuilder().put("gateway.type", "none").build();
Node node1 = NodeBuilder.nodeBuilder().settings(settings).node();
Node node2 = NodeBuilder.nodeBuilder().settings(settings).node();
Node node3 = NodeBuilder.nodeBuilder().settings(settings).node();
Node node1 = NodeBuilder.nodeBuilder().node();
Node node2 = NodeBuilder.nodeBuilder().node();
Node node3 = NodeBuilder.nodeBuilder().node();
final Client client = node1.client();

View File

@ -402,13 +402,8 @@ public class Search1StressTest {
}
public static void main(String[] args) throws Exception {
Settings settings = ImmutableSettings.settingsBuilder()
.put("gateway.type", "none")
.build();
Search1StressTest test = new Search1StressTest()
.setPeriod(TimeValue.timeValueMinutes(10))
.setSettings(settings)
.setNumberOfNodes(2)
.setPreIndexDocs(SizeValue.parseSizeValue("100"))
.setIndexers(2)

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.lang.reflect.Constructor;
@ -63,7 +64,7 @@ public abstract class ElasticsearchAllocationTestCase extends ElasticsearchTestC
public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
return new AllocationService(settings,
randomAllocationDeciders(settings, nodeSettingsService, random),
new ShardsAllocators(settings), ClusterInfoService.EMPTY);
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), ClusterInfoService.EMPTY);
}

View File

@ -103,7 +103,7 @@ public abstract class ElasticsearchSingleNodeTest extends ElasticsearchTestCase
.put(EsExecutors.PROCESSORS, 1) // limit the number of threads created
.put("http.enabled", false)
.put("config.ignore_system_properties", true) // make sure we get what we set :)
.put("gateway.type", "none")).build();
).build();
build.start();
assertThat(DiscoveryNode.localNode(build.settings()), is(true));
return build;

View File

@ -356,8 +356,6 @@ public final class InternalTestCluster extends TestCluster {
Builder builder = ImmutableSettings.settingsBuilder()
// decrease the routing schedule so new nodes will be added quickly - some random value between 30 and 80 ms
.put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms")
// default to non gateway
.put("gateway.type", "none")
.put(SETTING_CLUSTER_NODE_SEED, seed);
if (ENABLE_MOCK_MODULES && usually(random)) {
builder.put("index.store.type", MockFSIndexStoreModule.class.getName()); // no RAM dir for now!

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.gateway.none;
package org.elasticsearch.test.gateway;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -25,15 +25,20 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
/**
* An allocator used for tests that doesn't do anything
*/
public class NoneGatewayAllocator implements GatewayAllocator {
public class NoopGatewayAllocator implements GatewayAllocator {
public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {
// noop
}
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {
// noop
}
@Override