Merge remote-tracking branch 'upstream/master' into feature/seq_no
This commit is contained in:
commit
2c0338fa87
|
@ -280,5 +280,16 @@ export BATS_UTILS=/project/build/bats/utils
|
|||
export BATS_TESTS=/project/build/bats/tests
|
||||
export BATS_ARCHIVES=/project/build/bats/archives
|
||||
VARS
|
||||
cat \<\<SUDOERS_VARS > /etc/sudoers.d/elasticsearch_vars
|
||||
Defaults env_keep += "ZIP"
|
||||
Defaults env_keep += "TAR"
|
||||
Defaults env_keep += "RPM"
|
||||
Defaults env_keep += "DEB"
|
||||
Defaults env_keep += "BATS"
|
||||
Defaults env_keep += "BATS_UTILS"
|
||||
Defaults env_keep += "BATS_TESTS"
|
||||
Defaults env_keep += "BATS_ARCHIVES"
|
||||
SUDOERS_VARS
|
||||
chmod 0440 /etc/sudoers.d/elasticsearch_vars
|
||||
SHELL
|
||||
end
|
||||
|
|
|
@ -40,7 +40,7 @@ class VagrantTestPlugin implements Plugin<Project> {
|
|||
static List<String> UPGRADE_FROM_ARCHIVES = ['rpm', 'deb']
|
||||
|
||||
private static final BATS = 'bats'
|
||||
private static final String BATS_TEST_COMMAND ="cd \$BATS_ARCHIVES && sudo -E bats --tap \$BATS_TESTS/*.$BATS"
|
||||
private static final String BATS_TEST_COMMAND ="cd \$BATS_ARCHIVES && sudo bats --tap \$BATS_TESTS/*.$BATS"
|
||||
|
||||
@Override
|
||||
void apply(Project project) {
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -112,8 +111,22 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
|
|||
@Override
|
||||
protected void afterExecute(Runnable r, Throwable t) {
|
||||
super.afterExecute(r, t);
|
||||
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
|
||||
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
|
||||
assert assertDefaultContext(r);
|
||||
}
|
||||
|
||||
private boolean assertDefaultContext(Runnable r) {
|
||||
try {
|
||||
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
|
||||
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
|
||||
} catch (IllegalStateException ex) {
|
||||
// sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
|
||||
// this must not trigger an exception here since we only assert if the default is restored and
|
||||
// we don't really care if we are closed
|
||||
if (contextHolder.isClosed() == false) {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -253,6 +253,13 @@ public final class ThreadContext implements Closeable, Writeable {
|
|||
return threadLocal.get() == DEFAULT_CONTEXT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if the context is closed, otherwise <code>true</code>
|
||||
*/
|
||||
boolean isClosed() {
|
||||
return threadLocal.closed.get();
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface StoredContext extends AutoCloseable {
|
||||
@Override
|
||||
|
|
|
@ -19,30 +19,9 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
|
@ -53,6 +32,8 @@ import org.elasticsearch.common.UUIDs;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -75,6 +56,25 @@ import org.elasticsearch.transport.TransportResponse;
|
|||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
|
@ -186,9 +186,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public void close() {
|
||||
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
|
||||
IOUtils.close(receivedResponses.values());
|
||||
Releasables.close(receivedResponses.values());
|
||||
closed = true;
|
||||
}
|
||||
|
||||
|
@ -272,7 +272,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
}
|
||||
}
|
||||
|
||||
class SendPingsHandler implements Closeable {
|
||||
class SendPingsHandler implements Releasable {
|
||||
private final int id;
|
||||
private final Set<DiscoveryNode> nodeToDisconnect = ConcurrentCollections.newConcurrentSet();
|
||||
private final PingCollection pingCollection;
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.inject.internal.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
|
@ -240,6 +241,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
joinThreadControl.stop();
|
||||
masterFD.stop("zen disco stop");
|
||||
nodesFD.stop();
|
||||
Releasables.close(zenPing); // stop any ongoing pinging
|
||||
DiscoveryNodes nodes = nodes();
|
||||
if (sendLeaveRequest) {
|
||||
if (nodes.getMasterNode() == null) {
|
||||
|
@ -269,7 +271,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
@Override
|
||||
protected void doClose() throws IOException {
|
||||
IOUtils.close(masterFD, nodesFD, zenPing);
|
||||
IOUtils.close(masterFD, nodesFD);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,15 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import java.io.Closeable;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -28,17 +36,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
||||
public interface ZenPing extends Closeable {
|
||||
public interface ZenPing extends Releasable {
|
||||
|
||||
void start(PingContextProvider contextProvider);
|
||||
|
||||
|
|
|
@ -94,7 +94,6 @@ public abstract class Mapper implements ToXContent, Iterable<Mapper> {
|
|||
private final ParseFieldMatcher parseFieldMatcher;
|
||||
|
||||
private final Supplier<QueryShardContext> queryShardContextSupplier;
|
||||
private QueryShardContext queryShardContext;
|
||||
|
||||
public ParserContext(String type, IndexAnalyzers indexAnalyzers, Function<String, SimilarityProvider> similarityLookupService,
|
||||
MapperService mapperService, Function<String, TypeParser> typeParsers,
|
||||
|
@ -138,12 +137,8 @@ public abstract class Mapper implements ToXContent, Iterable<Mapper> {
|
|||
return parseFieldMatcher;
|
||||
}
|
||||
|
||||
public QueryShardContext queryShardContext() {
|
||||
// No need for synchronization, this class must be used in a single thread
|
||||
if (queryShardContext == null) {
|
||||
queryShardContext = queryShardContextSupplier.get();
|
||||
}
|
||||
return queryShardContext;
|
||||
public Supplier<QueryShardContext> queryShardContextSupplier() {
|
||||
return queryShardContextSupplier;
|
||||
}
|
||||
|
||||
public boolean isWithinMultiField() { return false; }
|
||||
|
@ -161,7 +156,7 @@ public abstract class Mapper implements ToXContent, Iterable<Mapper> {
|
|||
|
||||
static class MultiFieldParserContext extends ParserContext {
|
||||
MultiFieldParserContext(ParserContext in) {
|
||||
super(in.type(), in.indexAnalyzers, in.similarityLookupService(), in.mapperService(), in.typeParsers(), in.indexVersionCreated(), in.parseFieldMatcher(), in::queryShardContext);
|
||||
super(in.type(), in.indexAnalyzers, in.similarityLookupService(), in.mapperService(), in.typeParsers(), in.indexVersionCreated(), in.parseFieldMatcher(), in.queryShardContextSupplier());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -75,9 +75,6 @@ import org.elasticsearch.common.util.BigArrays;
|
|||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
|
@ -655,11 +652,13 @@ public class Node implements Closeable {
|
|||
injector.getInstance(SnapshotShardsService.class).stop();
|
||||
// stop any changes happening as a result of cluster state changes
|
||||
injector.getInstance(IndicesClusterStateService.class).stop();
|
||||
// close discovery early to not react to pings anymore.
|
||||
// This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
|
||||
injector.getInstance(Discovery.class).stop();
|
||||
// we close indices first, so operations won't be allowed on it
|
||||
injector.getInstance(IndicesTTLService.class).stop();
|
||||
injector.getInstance(RoutingService.class).stop();
|
||||
injector.getInstance(ClusterService.class).stop();
|
||||
injector.getInstance(Discovery.class).stop();
|
||||
injector.getInstance(NodeConnectionsService.class).stop();
|
||||
injector.getInstance(MonitorService.class).stop();
|
||||
injector.getInstance(GatewayService.class).stop();
|
||||
|
|
|
@ -31,7 +31,8 @@ import java.io.IOException;
|
|||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
||||
|
||||
@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
|
||||
@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0,
|
||||
autoMinMasterNodes = false)
|
||||
public class IndicesExistsIT extends ESIntegTestCase {
|
||||
|
||||
public void testIndexExistsWithBlocksInPlace() throws IOException {
|
||||
|
|
|
@ -43,7 +43,7 @@ import java.util.concurrent.CyclicBarrier;
|
|||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
|
||||
public class IndexingMasterFailoverIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -63,7 +63,7 @@ import static org.hamcrest.Matchers.isOneOf;
|
|||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.discovery.zen:TRACE")
|
||||
public class MinimumMasterNodesIT extends ESIntegTestCase {
|
||||
|
||||
|
@ -275,12 +275,14 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
|
|||
.put("discovery.initial_state_timeout", "500ms")
|
||||
.build();
|
||||
|
||||
logger.info("--> start 2 nodes");
|
||||
internalCluster().startNodesAsync(2, settings).get();
|
||||
logger.info("--> start first node and wait for it to be a master");
|
||||
internalCluster().startNode(settings);
|
||||
ensureClusterSizeConsistency();
|
||||
|
||||
// wait until second node join the cluster
|
||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
logger.info("--> start second node and wait for it to join");
|
||||
internalCluster().startNode(settings);
|
||||
ensureClusterSizeConsistency();
|
||||
|
||||
logger.info("--> setting minimum master node to 2");
|
||||
setMinimumMasterNodes(2);
|
||||
|
@ -298,8 +300,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
|
|||
|
||||
logger.info("--> bringing another node up");
|
||||
internalCluster().startNode(Settings.builder().put(settings).put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build());
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
|
||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
ensureClusterSizeConsistency();
|
||||
}
|
||||
|
||||
private void assertNoMasterBlockOnAllNodes() throws InterruptedException {
|
||||
|
|
|
@ -49,7 +49,7 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
|
||||
public class NoMasterNodeIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -69,7 +69,10 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase {
|
|||
private void removePublishTimeout() {
|
||||
//to test that the acknowledgement mechanism is working we better disable the wait for publish
|
||||
//otherwise the operation is most likely acknowledged even if it doesn't support ack
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0")));
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0")
|
||||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s")
|
||||
));
|
||||
}
|
||||
|
||||
public void testClusterUpdateSettingsAcknowledgement() {
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.elasticsearch.cluster.routing.RoutingNode;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||
import org.elasticsearch.common.collect.Iterators;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -61,7 +60,9 @@ public class AckIT extends ESIntegTestCase {
|
|||
//to test that the acknowledgement mechanism is working we better disable the wait for publish
|
||||
//otherwise the operation is most likely acknowledged even if it doesn't support ack
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), 0).build();
|
||||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit to check acking
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testUpdateSettingsAcknowledgement() {
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationD
|
|||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
@ -104,7 +103,6 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
|
|||
Settings commonSettings = Settings.builder()
|
||||
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")
|
||||
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
|
||||
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 3)
|
||||
.put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "10s")
|
||||
.build();
|
||||
|
||||
|
|
|
@ -20,32 +20,20 @@ package org.elasticsearch.cluster.service;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalNodeMasterListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Singleton;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -56,17 +44,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
|
||||
public class ClusterServiceIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(TestPlugin.class);
|
||||
}
|
||||
|
||||
public void testAckedUpdateTask() throws Exception {
|
||||
internalCluster().startNode();
|
||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
|
||||
|
@ -482,141 +463,4 @@ public class ClusterServiceIT extends ESIntegTestCase {
|
|||
assertTrue(controlSources.isEmpty());
|
||||
block2.countDown();
|
||||
}
|
||||
|
||||
public void testLocalNodeMasterListenerCallbacks() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put("discovery.zen.minimum_master_nodes", 1)
|
||||
.put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms")
|
||||
.put("discovery.initial_state_timeout", "500ms")
|
||||
.build();
|
||||
|
||||
String node_0 = internalCluster().startNode(settings);
|
||||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
|
||||
MasterAwareService testService = internalCluster().getInstance(MasterAwareService.class);
|
||||
|
||||
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
|
||||
.setWaitForNodes("1").get();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
|
||||
// the first node should be a master as the minimum required is 1
|
||||
assertThat(clusterService.state().nodes().getMasterNode(), notNullValue());
|
||||
assertThat(clusterService.state().nodes().isLocalNodeElectedMaster(), is(true));
|
||||
assertThat(testService.master(), is(true));
|
||||
String node_1 = internalCluster().startNode(settings);
|
||||
final ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class, node_1);
|
||||
MasterAwareService testService1 = internalCluster().getInstance(MasterAwareService.class, node_1);
|
||||
|
||||
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
|
||||
// the second node should not be the master as node1 is already the master.
|
||||
assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(false));
|
||||
assertThat(testService1.master(), is(false));
|
||||
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").get();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
|
||||
// now that node0 is closed, node1 should be elected as master
|
||||
assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(true));
|
||||
assertThat(testService1.master(), is(true));
|
||||
|
||||
// start another node and set min_master_node
|
||||
internalCluster().startNode(Settings.builder().put(settings));
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
|
||||
|
||||
Settings transientSettings = Settings.builder()
|
||||
.put("discovery.zen.minimum_master_nodes", 2)
|
||||
.build();
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(transientSettings).get();
|
||||
|
||||
// and shutdown the second node
|
||||
internalCluster().stopRandomNonMasterNode();
|
||||
|
||||
// there should not be any master as the minimum number of required eligible masters is not met
|
||||
awaitBusy(() -> clusterService1.state().nodes().getMasterNode() == null &&
|
||||
clusterService1.clusterServiceState().getClusterStateStatus() == ClusterStateStatus.APPLIED);
|
||||
assertThat(testService1.master(), is(false));
|
||||
|
||||
// bring the node back up
|
||||
String node_2 = internalCluster().startNode(Settings.builder().put(settings).put(transientSettings));
|
||||
ClusterService clusterService2 = internalCluster().getInstance(ClusterService.class, node_2);
|
||||
MasterAwareService testService2 = internalCluster().getInstance(MasterAwareService.class, node_2);
|
||||
|
||||
// make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive
|
||||
// the updated cluster state...
|
||||
assertThat(internalCluster().client(node_1).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true)
|
||||
.setWaitForNodes("2").get().isTimedOut(), is(false));
|
||||
assertThat(internalCluster().client(node_2).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true)
|
||||
.setWaitForNodes("2").get().isTimedOut(), is(false));
|
||||
|
||||
// now that we started node1 again, a new master should be elected
|
||||
assertThat(clusterService2.state().nodes().getMasterNode(), is(notNullValue()));
|
||||
if (node_2.equals(clusterService2.state().nodes().getMasterNode().getName())) {
|
||||
assertThat(testService1.master(), is(false));
|
||||
assertThat(testService2.master(), is(true));
|
||||
} else {
|
||||
assertThat(testService1.master(), is(true));
|
||||
assertThat(testService2.master(), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestPlugin extends Plugin {
|
||||
|
||||
@Override
|
||||
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
|
||||
List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1);
|
||||
services.add(MasterAwareService.class);
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
@Singleton
|
||||
public static class MasterAwareService extends AbstractLifecycleComponent implements LocalNodeMasterListener {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private volatile boolean master;
|
||||
|
||||
@Inject
|
||||
public MasterAwareService(Settings settings, ClusterService clusterService) {
|
||||
super(settings);
|
||||
clusterService.add(this);
|
||||
this.clusterService = clusterService;
|
||||
logger.info("initialized test service");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMaster() {
|
||||
logger.info("on master [{}]", clusterService.localNode());
|
||||
master = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void offMaster() {
|
||||
logger.info("off master [{}]", clusterService.localNode());
|
||||
master = false;
|
||||
}
|
||||
|
||||
public boolean master() {
|
||||
return master;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executorName() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
|||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.LocalNodeMasterListener;
|
||||
import org.elasticsearch.cluster.NodeConnectionsService;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -44,6 +45,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.BaseFuture;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
@ -1098,6 +1100,48 @@ public class ClusterServiceTests extends ESTestCase {
|
|||
timedClusterService.close();
|
||||
}
|
||||
|
||||
public void testLocalNodeMasterListenerCallbacks() throws Exception {
|
||||
TimedClusterService timedClusterService = createTimedClusterService(false);
|
||||
|
||||
AtomicBoolean isMaster = new AtomicBoolean();
|
||||
timedClusterService.add(new LocalNodeMasterListener() {
|
||||
@Override
|
||||
public void onMaster() {
|
||||
isMaster.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void offMaster() {
|
||||
isMaster.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executorName() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
});
|
||||
|
||||
ClusterState state = timedClusterService.state();
|
||||
DiscoveryNodes nodes = state.nodes();
|
||||
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId());
|
||||
state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
|
||||
setState(timedClusterService, state);
|
||||
assertThat(isMaster.get(), is(true));
|
||||
|
||||
nodes = state.nodes();
|
||||
nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(null);
|
||||
state = ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES))
|
||||
.nodes(nodesBuilder).build();
|
||||
setState(timedClusterService, state);
|
||||
assertThat(isMaster.get(), is(false));
|
||||
nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId());
|
||||
state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
|
||||
setState(timedClusterService, state);
|
||||
assertThat(isMaster.get(), is(true));
|
||||
|
||||
timedClusterService.close();
|
||||
}
|
||||
|
||||
private static class SimpleTask {
|
||||
private final int id;
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ import static org.hamcrest.Matchers.is;
|
|||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
|
||||
public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
|
|||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
|
||||
public class ZenUnicastDiscoveryIT extends ESIntegTestCase {
|
||||
|
||||
private ClusterDiscoveryConfiguration discoveryConfig;
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
@ -37,7 +36,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
|
||||
@ClusterScope(numDataNodes =0, scope= Scope.TEST)
|
||||
@ClusterScope(numDataNodes = 0, scope = Scope.TEST)
|
||||
public class QuorumGatewayIT extends ESIntegTestCase {
|
||||
@Override
|
||||
protected int numberOfReplicas() {
|
||||
|
@ -47,8 +46,7 @@ public class QuorumGatewayIT extends ESIntegTestCase {
|
|||
public void testQuorumRecovery() throws Exception {
|
||||
logger.info("--> starting 3 nodes");
|
||||
// we are shutting down nodes - make sure we don't have 2 clusters if we test network
|
||||
internalCluster().startNodesAsync(3,
|
||||
Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build()).get();
|
||||
internalCluster().startNodesAsync(3).get();
|
||||
|
||||
|
||||
createIndex("test");
|
||||
|
|
|
@ -34,7 +34,7 @@ import java.util.Set;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
|
||||
public class RecoverAfterNodesIT extends ESIntegTestCase {
|
||||
private static final TimeValue BLOCK_WAIT_TIMEOUT = TimeValue.timeValueSeconds(10);
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -44,6 +43,7 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.elasticsearch.test.store.MockFSIndexStore;
|
||||
|
@ -333,48 +333,43 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
String metaDataUuid = client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID();
|
||||
assertThat(metaDataUuid, not(equalTo("_na_")));
|
||||
|
||||
Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
|
||||
|
||||
logger.info("--> closing first node, and indexing more data to the second node");
|
||||
internalCluster().fullRestart(new RestartCallback() {
|
||||
internalCluster().stopRandomDataNode();
|
||||
|
||||
@Override
|
||||
public void doAfterNodes(int numNodes, Client client) throws Exception {
|
||||
if (numNodes == 1) {
|
||||
logger.info("--> one node is closed - start indexing data into the second one");
|
||||
client.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
|
||||
// TODO: remove once refresh doesn't fail immediately if there a master block:
|
||||
// https://github.com/elastic/elasticsearch/issues/9997
|
||||
client.admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
|
||||
client.admin().indices().prepareRefresh().execute().actionGet();
|
||||
logger.info("--> one node is closed - start indexing data into the second one");
|
||||
client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
|
||||
// TODO: remove once refresh doesn't fail immediately if there a master block:
|
||||
// https://github.com/elastic/elasticsearch/issues/9997
|
||||
client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertHitCount(client.prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3);
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3);
|
||||
}
|
||||
|
||||
logger.info("--> add some metadata, additional type and template");
|
||||
client.admin().indices().preparePutMapping("test").setType("type2")
|
||||
.setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
|
||||
.execute().actionGet();
|
||||
client.admin().indices().preparePutTemplate("template_1")
|
||||
.setPatterns(Collections.singletonList("te*"))
|
||||
.setOrder(0)
|
||||
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
|
||||
.startObject("field1").field("type", "text").field("store", true).endObject()
|
||||
.startObject("field2").field("type", "keyword").field("store", true).endObject()
|
||||
.endObject().endObject().endObject())
|
||||
.execute().actionGet();
|
||||
client.admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
|
||||
logger.info("--> starting two nodes back, verifying we got the latest version");
|
||||
}
|
||||
logger.info("--> add some metadata, additional type and template");
|
||||
client().admin().indices().preparePutMapping("test").setType("type2")
|
||||
.setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
|
||||
.execute().actionGet();
|
||||
client().admin().indices().preparePutTemplate("template_1")
|
||||
.setTemplate("te*")
|
||||
.setOrder(0)
|
||||
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
|
||||
.startObject("field1").field("type", "text").field("store", true).endObject()
|
||||
.startObject("field2").field("type", "keyword").field("store", true).endObject()
|
||||
.endObject().endObject().endObject())
|
||||
.execute().actionGet();
|
||||
client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
|
||||
|
||||
}
|
||||
logger.info("--> stopping the second node");
|
||||
internalCluster().stopRandomDataNode();
|
||||
|
||||
});
|
||||
logger.info("--> starting the two nodes back");
|
||||
|
||||
internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get();
|
||||
|
||||
logger.info("--> running cluster_health (wait for the shards to startup)");
|
||||
ensureGreen();
|
||||
primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
|
||||
|
||||
assertThat(client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(), equalTo(metaDataUuid));
|
||||
|
||||
|
@ -502,27 +497,28 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
public void testRecoveryDifferentNodeOrderStartup() throws Exception {
|
||||
// we need different data paths so we make sure we start the second node fresh
|
||||
|
||||
final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build());
|
||||
final Path pathNode1 = createTempDir();
|
||||
final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build());
|
||||
|
||||
client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet();
|
||||
|
||||
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build());
|
||||
final Path pathNode2 = createTempDir();
|
||||
final String node_2 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build());
|
||||
|
||||
ensureGreen();
|
||||
Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
|
||||
|
||||
|
||||
internalCluster().fullRestart(new RestartCallback() {
|
||||
|
||||
@Override
|
||||
public boolean doRestart(String nodeName) {
|
||||
return !node_1.equals(nodeName);
|
||||
}
|
||||
});
|
||||
|
||||
if (randomBoolean()) {
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1));
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2));
|
||||
} else {
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2));
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1));
|
||||
}
|
||||
// start the second node again
|
||||
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build());
|
||||
ensureYellow();
|
||||
primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
|
||||
|
||||
assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);
|
||||
}
|
||||
|
|
|
@ -555,10 +555,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
// start a master node
|
||||
internalCluster().startNode(nodeSettings);
|
||||
|
||||
InternalTestCluster.Async<String> blueFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
|
||||
InternalTestCluster.Async<String> redFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
|
||||
final String blueNodeName = blueFuture.get();
|
||||
final String redNodeName = redFuture.get();
|
||||
final String blueNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build());
|
||||
final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build());
|
||||
|
||||
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
|
||||
assertThat(response.isTimedOut(), is(false));
|
||||
|
|
|
@ -209,7 +209,10 @@ public class RareClusterStateIT extends ESIntegTestCase {
|
|||
// but the change might not be on the node that performed the indexing
|
||||
// operation yet
|
||||
|
||||
Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0ms").build();
|
||||
Settings settings = Settings.builder()
|
||||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
|
||||
.build();
|
||||
final List<String> nodeNames = internalCluster().startNodesAsync(2, settings).get();
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
|
||||
|
||||
|
@ -327,7 +330,6 @@ public class RareClusterStateIT extends ESIntegTestCase {
|
|||
// time of indexing it
|
||||
final List<String> nodeNames = internalCluster().startNodesAsync(2,
|
||||
Settings.builder()
|
||||
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)
|
||||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
|
||||
.build()).get();
|
||||
|
|
|
@ -72,16 +72,15 @@ public class FullRollingRestartIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
logger.info("--> now start adding nodes");
|
||||
internalCluster().startNodesAsync(2, settings).get();
|
||||
internalCluster().startNode(settings);
|
||||
internalCluster().startNode(settings);
|
||||
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3"));
|
||||
|
||||
logger.info("--> add two more nodes");
|
||||
internalCluster().startNodesAsync(2, settings).get();
|
||||
|
||||
// We now have 5 nodes
|
||||
setMinimumMasterNodes(3);
|
||||
internalCluster().startNode(settings);
|
||||
internalCluster().startNode(settings);
|
||||
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("5"));
|
||||
|
@ -97,9 +96,6 @@ public class FullRollingRestartIT extends ESIntegTestCase {
|
|||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("4"));
|
||||
|
||||
// going down to 3 nodes. note that the min_master_node may not be in effect when we shutdown the 4th
|
||||
// node, but that's OK as it is set to 3 before.
|
||||
setMinimumMasterNodes(2);
|
||||
internalCluster().stopRandomDataNode();
|
||||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3"));
|
||||
|
@ -115,8 +111,6 @@ public class FullRollingRestartIT extends ESIntegTestCase {
|
|||
// make sure the cluster state is green, and all has been recovered
|
||||
assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("2"));
|
||||
|
||||
// closing the 2nd node
|
||||
setMinimumMasterNodes(1);
|
||||
internalCluster().stopRandomDataNode();
|
||||
|
||||
// make sure the cluster state is yellow, and all has been recovered
|
||||
|
|
|
@ -60,7 +60,6 @@ import org.elasticsearch.test.BackgroundIndexer;
|
|||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.MockIndexEventListener;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
@ -399,7 +398,8 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
).get();
|
||||
|
||||
internalCluster().startNodesAsync(2).get();
|
||||
internalCluster().startNode();
|
||||
internalCluster().startNode();
|
||||
|
||||
List<IndexRequestBuilder> requests = new ArrayList<>();
|
||||
int numDocs = scaledRandomIntBetween(25, 250);
|
||||
|
@ -466,14 +466,15 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
|
||||
public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException {
|
||||
int halfNodes = randomIntBetween(1, 3);
|
||||
Settings blueSetting = Settings.builder().put("node.attr.color", "blue").build();
|
||||
InternalTestCluster.Async<List<String>> blueFuture = internalCluster().startNodesAsync(halfNodes, blueSetting);
|
||||
Settings redSetting = Settings.builder().put("node.attr.color", "red").build();
|
||||
InternalTestCluster.Async<java.util.List<String>> redFuture = internalCluster().startNodesAsync(halfNodes, redSetting);
|
||||
blueFuture.get();
|
||||
redFuture.get();
|
||||
logger.info("blue nodes: {}", blueFuture.get());
|
||||
logger.info("red nodes: {}", redFuture.get());
|
||||
Settings[] nodeSettings = Stream.concat(
|
||||
Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes),
|
||||
Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes)
|
||||
).toArray(Settings[]::new);
|
||||
List<String> nodes = internalCluster().startNodesAsync(nodeSettings).get();
|
||||
String[] blueNodes = nodes.subList(0, halfNodes).stream().toArray(String[]::new);
|
||||
String[] redNodes = nodes.subList(halfNodes, nodes.size()).stream().toArray(String[]::new);
|
||||
logger.info("blue nodes: {}", (Object)blueNodes);
|
||||
logger.info("red nodes: {}", (Object)redNodes);
|
||||
ensureStableCluster(halfNodes * 2);
|
||||
|
||||
assertAcked(prepareCreate("test", Settings.builder()
|
||||
|
@ -481,7 +482,7 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
.put(indexSettings())
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1))
|
||||
));
|
||||
assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2]));
|
||||
assertAllShardsOnNodes("test", redNodes);
|
||||
int numDocs = randomIntBetween(100, 150);
|
||||
ArrayList<String> ids = new ArrayList<>();
|
||||
logger.info(" --> indexing [{}] docs", numDocs);
|
||||
|
|
|
@ -19,18 +19,6 @@
|
|||
|
||||
package org.elasticsearch.tribe;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -58,6 +46,18 @@ import org.junit.After;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
|
@ -121,13 +121,13 @@ public class TribeIT extends ESIntegTestCase {
|
|||
final Collection<Class<? extends Plugin>> plugins = nodePlugins();
|
||||
|
||||
if (cluster1 == null) {
|
||||
cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes,
|
||||
cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes,
|
||||
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_1",
|
||||
plugins, Function.identity());
|
||||
}
|
||||
|
||||
if (cluster2 == null) {
|
||||
cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes,
|
||||
cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes,
|
||||
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_2",
|
||||
plugins, Function.identity());
|
||||
}
|
||||
|
|
|
@ -171,6 +171,10 @@ integTest {
|
|||
}
|
||||
configFile 'scripts/my_script.js'
|
||||
configFile 'scripts/my_script.py'
|
||||
configFile 'scripts/my_init_script.painless'
|
||||
configFile 'scripts/my_map_script.painless'
|
||||
configFile 'scripts/my_combine_script.painless'
|
||||
configFile 'scripts/my_reduce_script.painless'
|
||||
configFile 'userdict_ja.txt'
|
||||
configFile 'KeywordTokenizer.rbbi'
|
||||
// Whitelist reindexing from the local node so we can test it.
|
||||
|
@ -249,6 +253,39 @@ buildRestTests.setups['host'] = '''
|
|||
- set: {nodes.$master.http.publish_address: host}
|
||||
'''
|
||||
|
||||
// Used by scripted metric docs
|
||||
buildRestTests.setups['ledger'] = '''
|
||||
- do:
|
||||
indices.create:
|
||||
index: ledger
|
||||
body:
|
||||
settings:
|
||||
number_of_shards: 2
|
||||
number_of_replicas: 1
|
||||
mappings:
|
||||
sale:
|
||||
properties:
|
||||
type:
|
||||
type: keyword
|
||||
amount:
|
||||
type: double
|
||||
- do:
|
||||
bulk:
|
||||
index: ledger
|
||||
type: item
|
||||
refresh: true
|
||||
body: |
|
||||
{"index":{}}
|
||||
{"date": "2015/01/01 00:00:00", "amount": 200, "type": "sale", "description": "something"}
|
||||
{"index":{}}
|
||||
{"date": "2015/01/01 00:00:00", "amount": 10, "type": "expense", "decription": "another thing"}
|
||||
{"index":{}}
|
||||
{"date": "2015/01/01 00:00:00", "amount": 150, "type": "sale", "description": "blah"}
|
||||
{"index":{}}
|
||||
{"date": "2015/01/01 00:00:00", "amount": 50, "type": "expense", "description": "cost of blah"}
|
||||
{"index":{}}
|
||||
{"date": "2015/01/01 00:00:00", "amount": 50, "type": "expense", "description": "advertisement"}'''
|
||||
|
||||
// Used by pipeline aggregation docs
|
||||
buildRestTests.setups['sales'] = '''
|
||||
- do:
|
||||
|
|
|
@ -9,6 +9,7 @@ Example:
|
|||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST ledger/_search?size=0
|
||||
{
|
||||
"query" : {
|
||||
"match_all" : {}
|
||||
|
@ -16,15 +17,17 @@ Example:
|
|||
"aggs": {
|
||||
"profit": {
|
||||
"scripted_metric": {
|
||||
"init_script" : "_agg['transactions'] = []",
|
||||
"map_script" : "if (doc['type'].value == \"sale\") { _agg.transactions.add(doc['amount'].value) } else { _agg.transactions.add(-1 * doc['amount'].value) }", <1>
|
||||
"combine_script" : "profit = 0; for (t in _agg.transactions) { profit += t }; return profit",
|
||||
"reduce_script" : "profit = 0; for (a in _aggs) { profit += a }; return profit"
|
||||
"init_script" : "params._agg.transactions = []",
|
||||
"map_script" : "params._agg.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)", <1>
|
||||
"combine_script" : "double profit = 0; for (t in params._agg.transactions) { profit += t } return profit",
|
||||
"reduce_script" : "double profit = 0; for (a in params._aggs) { profit += a } return profit"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:ledger]
|
||||
|
||||
<1> `map_script` is the only required parameter
|
||||
|
||||
|
@ -35,24 +38,24 @@ The response for the above aggregation:
|
|||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"took": 218,
|
||||
...
|
||||
|
||||
"aggregations": {
|
||||
"profit": {
|
||||
"value": 170
|
||||
"value": 240.0
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"took": 218/"took": $body.took/]
|
||||
// TESTRESPONSE[s/\.\.\./"_shards": $body._shards, "hits": $body.hits, "timed_out": false,/]
|
||||
|
||||
The above example can also be specified using file scripts as follows:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST ledger/_search?size=0
|
||||
{
|
||||
"query" : {
|
||||
"match_all" : {}
|
||||
},
|
||||
"aggs": {
|
||||
"profit": {
|
||||
"scripted_metric": {
|
||||
|
@ -66,18 +69,42 @@ The above example can also be specified using file scripts as follows:
|
|||
"file": "my_combine_script"
|
||||
},
|
||||
"params": {
|
||||
"field": "amount" <1>
|
||||
"field": "amount", <1>
|
||||
"_agg": {} <2>
|
||||
},
|
||||
"reduce_script" : {
|
||||
"file": "my_reduce_script"
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:ledger]
|
||||
|
||||
<1> script parameters for init, map and combine scripts must be specified in a global `params` object so that it can be share between the scripts
|
||||
<1> script parameters for `init`, `map` and `combine` scripts must be specified
|
||||
in a global `params` object so that it can be share between the scripts.
|
||||
<2> if you specify script parameters then you must specify `"_agg": {}`.
|
||||
|
||||
////
|
||||
Verify this response as well but in a hidden block.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"took": 218,
|
||||
...
|
||||
"aggregations": {
|
||||
"profit": {
|
||||
"value": 240.0
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"took": 218/"took": $body.took/]
|
||||
// TESTRESPONSE[s/\.\.\./"_shards": $body._shards, "hits": $body.hits, "timed_out": false,/]
|
||||
////
|
||||
|
||||
For more details on specifying scripts see <<modules-scripting, script documentation>>.
|
||||
|
||||
|
@ -88,7 +115,7 @@ Whilst and valid script object can be used within a single script. the scripts m
|
|||
* primitive types
|
||||
* String
|
||||
* Map (containing only keys and values of the types listed here)
|
||||
* Array (containing elements of only the types listed here)
|
||||
* Array (containing elements of only the types listed here)
|
||||
|
||||
==== Scope of scripts
|
||||
|
||||
|
@ -98,24 +125,24 @@ init_script:: Executed prior to any collection of documents. Allows the ag
|
|||
+
|
||||
In the above example, the `init_script` creates an array `transactions` in the `_agg` object.
|
||||
|
||||
map_script:: Executed once per document collected. This is the only required script. If no combine_script is specified, the resulting state
|
||||
map_script:: Executed once per document collected. This is the only required script. If no combine_script is specified, the resulting state
|
||||
needs to be stored in an object named `_agg`.
|
||||
+
|
||||
In the above example, the `map_script` checks the value of the type field. If the value is 'sale' the value of the amount field
|
||||
is added to the transactions array. If the value of the type field is not 'sale' the negated value of the amount field is added
|
||||
In the above example, the `map_script` checks the value of the type field. If the value is 'sale' the value of the amount field
|
||||
is added to the transactions array. If the value of the type field is not 'sale' the negated value of the amount field is added
|
||||
to transactions.
|
||||
|
||||
combine_script:: Executed once on each shard after document collection is complete. Allows the aggregation to consolidate the state returned from
|
||||
combine_script:: Executed once on each shard after document collection is complete. Allows the aggregation to consolidate the state returned from
|
||||
each shard. If a combine_script is not provided the combine phase will return the aggregation variable.
|
||||
+
|
||||
In the above example, the `combine_script` iterates through all the stored transactions, summing the values in the `profit` variable
|
||||
In the above example, the `combine_script` iterates through all the stored transactions, summing the values in the `profit` variable
|
||||
and finally returns `profit`.
|
||||
|
||||
reduce_script:: Executed once on the coordinating node after all shards have returned their results. The script is provided with access to a
|
||||
variable `_aggs` which is an array of the result of the combine_script on each shard. If a reduce_script is not provided
|
||||
reduce_script:: Executed once on the coordinating node after all shards have returned their results. The script is provided with access to a
|
||||
variable `_aggs` which is an array of the result of the combine_script on each shard. If a reduce_script is not provided
|
||||
the reduce phase will return the `_aggs` variable.
|
||||
+
|
||||
In the above example, the `reduce_script` iterates through the `profit` returned by each shard summing the values before returning the
|
||||
In the above example, the `reduce_script` iterates through the `profit` returned by each shard summing the values before returning the
|
||||
final combined profit which will be returned in the response of the aggregation.
|
||||
|
||||
==== Worked Example
|
||||
|
@ -124,36 +151,19 @@ Imagine a situation where you index the following documents into and index with
|
|||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
$ curl -XPUT 'http://localhost:9200/transactions/stock/1' -d '
|
||||
{
|
||||
"type": "sale",
|
||||
"amount": 80
|
||||
}
|
||||
'
|
||||
|
||||
$ curl -XPUT 'http://localhost:9200/transactions/stock/2' -d '
|
||||
{
|
||||
"type": "cost",
|
||||
"amount": 10
|
||||
}
|
||||
'
|
||||
|
||||
$ curl -XPUT 'http://localhost:9200/transactions/stock/3' -d '
|
||||
{
|
||||
"type": "cost",
|
||||
"amount": 30
|
||||
}
|
||||
'
|
||||
|
||||
$ curl -XPUT 'http://localhost:9200/transactions/stock/4' -d '
|
||||
{
|
||||
"type": "sale",
|
||||
"amount": 130
|
||||
}
|
||||
'
|
||||
PUT /transactions/stock/_bulk?refresh
|
||||
{"index":{"_id":1}}
|
||||
{"type": "sale","amount": 80}
|
||||
{"index":{"_id":2}}
|
||||
{"type": "cost","amount": 10}
|
||||
{"index":{"_id":2}}
|
||||
{"type": "cost","amount": 30}
|
||||
{"index":{"_id":2}}
|
||||
{"type": "sale","amount": 130}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
Lets say that documents 1 and 3 end up on shard A and documents 2 and 4 end up on shard B. The following is a breakdown of what the aggregation result is
|
||||
Lets say that documents 1 and 3 end up on shard A and documents 2 and 4 end up on shard B. The following is a breakdown of what the aggregation result is
|
||||
at each stage of the example above.
|
||||
|
||||
===== Before init_script
|
||||
|
@ -221,7 +231,7 @@ Shard B::
|
|||
|
||||
===== After combine_script
|
||||
|
||||
The combine_script is executed on each shard after document collection is complete and reduces all the transactions down to a single profit figure for each
|
||||
The combine_script is executed on each shard after document collection is complete and reduces all the transactions down to a single profit figure for each
|
||||
shard (by summing the values in the transactions array) which is passed back to the coordinating node:
|
||||
|
||||
Shard A:: 50
|
||||
|
@ -239,7 +249,7 @@ The reduce_script receives an `_aggs` array containing the result of the combine
|
|||
]
|
||||
--------------------------------------------------
|
||||
|
||||
It reduces the responses for the shards down to a final overall profit figure (by summing the values) and returns this as the result of the aggregation to
|
||||
It reduces the responses for the shards down to a final overall profit figure (by summing the values) and returns this as the result of the aggregation to
|
||||
produce the response:
|
||||
|
||||
[source,js]
|
||||
|
@ -258,8 +268,8 @@ produce the response:
|
|||
==== Other Parameters
|
||||
|
||||
[horizontal]
|
||||
params:: Optional. An object whose contents will be passed as variables to the `init_script`, `map_script` and `combine_script`. This can be
|
||||
useful to allow the user to control the behavior of the aggregation and for storing state between the scripts. If this is not specified,
|
||||
params:: Optional. An object whose contents will be passed as variables to the `init_script`, `map_script` and `combine_script`. This can be
|
||||
useful to allow the user to control the behavior of the aggregation and for storing state between the scripts. If this is not specified,
|
||||
the default is the equivalent of providing:
|
||||
+
|
||||
[source,js]
|
||||
|
@ -268,4 +278,3 @@ params:: Optional. An object whose contents will be passed as variable
|
|||
"_agg" : {}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
|
|
|
@ -10,12 +10,7 @@ GET /_cat/indices/twi*?v&s=index
|
|||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:huge_twitter]
|
||||
// TEST[s/^/POST _flush\n/]
|
||||
// TEST[s/^/PUT twitter2\n{"settings": {"number_of_replicas": 0}}\n/]
|
||||
// We flush very early here because the index's size is cached and we sort on
|
||||
// size below. So to get a realistic sort on size we need to flush here or else
|
||||
// the size is just whatever portion of the index is pushed out of memory
|
||||
// during test setup which isn't deterministic.
|
||||
|
||||
Might respond with:
|
||||
|
||||
|
@ -64,11 +59,11 @@ yellow open twitter u8FNjxh8Rfy_awN11oDKYQ 1 1 1200 0
|
|||
// TESTRESPONSE[s/\d+(\.\d+)?[tgmk]?b/\\d+(\\.\\d+)?[tgmk]?b/]
|
||||
// TESTRESPONSE[s/u8FNjxh8Rfy_awN11oDKYQ/.+/ _cat]
|
||||
|
||||
What's my largest index by disk usage not including replicas?
|
||||
Which index has the largest number of documents?
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
GET /_cat/indices?v&s=store.size:desc
|
||||
GET /_cat/indices?v&s=docs.count:desc
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[continued]
|
||||
|
|
|
@ -179,4 +179,4 @@ logs to roll and compress after 1 GB, and to preserve a maximum of five log
|
|||
files (four rolled logs, and the active log).
|
||||
|
||||
You can disable it in the `config/log4j2.properties` file by setting the deprecation
|
||||
log level to `info`.
|
||||
log level to `error`.
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
double profit = 0;
|
||||
for (t in params._agg.transactions) {
|
||||
profit += t
|
||||
}
|
||||
return profit
|
|
@ -0,0 +1 @@
|
|||
params._agg.transactions = []
|
|
@ -0,0 +1 @@
|
|||
params._agg.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)
|
|
@ -0,0 +1,5 @@
|
|||
double profit = 0;
|
||||
for (a in params._aggs) {
|
||||
profit += a
|
||||
}
|
||||
return profit
|
|
@ -69,6 +69,7 @@ import java.util.Collections;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class PercolatorFieldMapper extends FieldMapper {
|
||||
|
||||
|
@ -89,9 +90,9 @@ public class PercolatorFieldMapper extends FieldMapper {
|
|||
|
||||
public static class Builder extends FieldMapper.Builder<Builder, PercolatorFieldMapper> {
|
||||
|
||||
private final QueryShardContext queryShardContext;
|
||||
private final Supplier<QueryShardContext> queryShardContext;
|
||||
|
||||
public Builder(String fieldName, QueryShardContext queryShardContext) {
|
||||
public Builder(String fieldName, Supplier<QueryShardContext> queryShardContext) {
|
||||
super(fieldName, FIELD_TYPE, FIELD_TYPE);
|
||||
this.queryShardContext = queryShardContext;
|
||||
}
|
||||
|
@ -136,7 +137,7 @@ public class PercolatorFieldMapper extends FieldMapper {
|
|||
|
||||
@Override
|
||||
public Builder parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException {
|
||||
return new Builder(name, parserContext.queryShardContext());
|
||||
return new Builder(name, parserContext.queryShardContextSupplier());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -222,13 +223,14 @@ public class PercolatorFieldMapper extends FieldMapper {
|
|||
}
|
||||
|
||||
private final boolean mapUnmappedFieldAsString;
|
||||
private final QueryShardContext queryShardContext;
|
||||
private final Supplier<QueryShardContext> queryShardContext;
|
||||
private KeywordFieldMapper queryTermsField;
|
||||
private KeywordFieldMapper extractionResultField;
|
||||
private BinaryFieldMapper queryBuilderField;
|
||||
|
||||
public PercolatorFieldMapper(String simpleName, MappedFieldType fieldType, MappedFieldType defaultFieldType,
|
||||
Settings indexSettings, MultiFields multiFields, CopyTo copyTo, QueryShardContext queryShardContext,
|
||||
Settings indexSettings, MultiFields multiFields, CopyTo copyTo,
|
||||
Supplier<QueryShardContext> queryShardContext,
|
||||
KeywordFieldMapper queryTermsField, KeywordFieldMapper extractionResultField,
|
||||
BinaryFieldMapper queryBuilderField) {
|
||||
super(simpleName, fieldType, defaultFieldType, indexSettings, multiFields, copyTo);
|
||||
|
@ -261,7 +263,7 @@ public class PercolatorFieldMapper extends FieldMapper {
|
|||
|
||||
@Override
|
||||
public Mapper parse(ParseContext context) throws IOException {
|
||||
QueryShardContext queryShardContext = new QueryShardContext(this.queryShardContext);
|
||||
QueryShardContext queryShardContext = this.queryShardContext.get();
|
||||
if (context.doc().getField(queryBuilderField.name()) != null) {
|
||||
// If a percolator query has been defined in an array object then multiple percolator queries
|
||||
// could be provided. In order to prevent this we fail if we try to parse more than one query
|
||||
|
|
|
@ -39,7 +39,8 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE,
|
||||
numDataNodes = 0,
|
||||
transportClientRatio = 0.0,
|
||||
numClientNodes = 0)
|
||||
numClientNodes = 0,
|
||||
autoMinMasterNodes = false)
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-azure/issues/89")
|
||||
public class AzureMinimumMasterNodesTests extends AbstractAzureComputeServiceTestCase {
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ import static org.hamcrest.CoreMatchers.is;
|
|||
* starting.
|
||||
* This test requires AWS to run.
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0, autoMinMasterNodes = false)
|
||||
public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase {
|
||||
public void testMinimumMasterNodesStart() {
|
||||
Settings nodeSettings = Settings.builder()
|
||||
|
|
|
@ -88,7 +88,11 @@ setup() {
|
|||
sudo chmod +x $JAVA
|
||||
|
||||
[ "$status" -eq 1 ]
|
||||
[[ "$output" == *"Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME"* ]]
|
||||
local expected="Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME"
|
||||
[[ "$output" == *"$expected"* ]] || {
|
||||
echo "Expected error message [$expected] but found: $output"
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
##################################
|
||||
|
|
|
@ -176,7 +176,11 @@ fi
|
|||
sudo chmod +x $JAVA
|
||||
|
||||
[ "$status" -eq 1 ]
|
||||
[[ "$output" == *"Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME"* ]]
|
||||
local expected="Could not find any executable java binary. Please install java in your PATH or set JAVA_HOME"
|
||||
[[ "$output" == *"$expected"* ]] || {
|
||||
echo "Expected error message [$expected] but found: $output"
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
# Note that all of the tests from here to the end of the file expect to be run
|
||||
|
|
|
@ -150,6 +150,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
|
@ -179,6 +180,7 @@ import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgno
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.emptyArray;
|
||||
import static org.hamcrest.Matchers.emptyIterable;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -528,10 +530,15 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
if (cluster() != null) {
|
||||
if (currentClusterScope != Scope.TEST) {
|
||||
MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData();
|
||||
assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData
|
||||
.persistentSettings().getAsMap().size(), equalTo(0));
|
||||
assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData
|
||||
.transientSettings().getAsMap().size(), equalTo(0));
|
||||
final Map<String, String> persistent = metaData.persistentSettings().getAsMap();
|
||||
assertThat("test leaves persistent cluster metadata behind: " + persistent, persistent.size(), equalTo(0));
|
||||
final Map<String, String> transientSettings = new HashMap<>(metaData.transientSettings().getAsMap());
|
||||
if (isInternalCluster() && internalCluster().getAutoManageMinMasterNode()) {
|
||||
// this is set by the test infra
|
||||
transientSettings.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey());
|
||||
}
|
||||
assertThat("test leaves transient cluster metadata behind: " + transientSettings,
|
||||
transientSettings.keySet(), empty());
|
||||
}
|
||||
ensureClusterSizeConsistency();
|
||||
ensureClusterStateConsistency();
|
||||
|
@ -1525,6 +1532,12 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
*/
|
||||
boolean supportsDedicatedMasters() default true;
|
||||
|
||||
/**
|
||||
* The cluster automatically manages the {@link ElectMasterService#DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING} by default
|
||||
* as nodes are started and stopped. Set this to false to manage the setting manually.
|
||||
*/
|
||||
boolean autoMinMasterNodes() default true;
|
||||
|
||||
/**
|
||||
* Returns the number of client nodes in the cluster. Default is {@link InternalTestCluster#DEFAULT_NUM_CLIENT_NODES}, a
|
||||
* negative value means that the number of client nodes will be randomized.
|
||||
|
@ -1622,6 +1635,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
return annotation == null ? true : annotation.supportsDedicatedMasters();
|
||||
}
|
||||
|
||||
private boolean getAutoMinMasterNodes() {
|
||||
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
|
||||
return annotation == null ? true : annotation.autoMinMasterNodes();
|
||||
}
|
||||
|
||||
private int getNumDataNodes() {
|
||||
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
|
||||
return annotation == null ? -1 : annotation.numDataNodes();
|
||||
|
@ -1760,7 +1778,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
}
|
||||
mockPlugins = mocks;
|
||||
}
|
||||
return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes,
|
||||
return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, getAutoMinMasterNodes(),
|
||||
minNumDataNodes, maxNumDataNodes,
|
||||
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
|
||||
InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper());
|
||||
}
|
||||
|
|
|
@ -444,7 +444,6 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||
return RandomPicks.randomFrom(random, array);
|
||||
}
|
||||
|
||||
|
||||
/** Pick a random object from the given list. */
|
||||
public static <T> T randomFrom(List<T> list) {
|
||||
return RandomPicks.randomFrom(random(), list);
|
||||
|
@ -452,7 +451,12 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||
|
||||
/** Pick a random object from the given collection. */
|
||||
public static <T> T randomFrom(Collection<T> collection) {
|
||||
return RandomPicks.randomFrom(random(), collection);
|
||||
return randomFrom(random(), collection);
|
||||
}
|
||||
|
||||
/** Pick a random object from the given collection. */
|
||||
public static <T> T randomFrom(Random random, Collection<T> collection) {
|
||||
return RandomPicks.randomFrom(random, collection);
|
||||
}
|
||||
|
||||
public static String randomAsciiOfLengthBetween(int minCodeUnits, int maxCodeUnits) {
|
||||
|
|
|
@ -24,12 +24,10 @@ import com.carrotsearch.randomizedtesting.SysGlobals;
|
|||
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
|
@ -68,7 +66,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||
|
@ -133,8 +132,10 @@ import java.util.stream.Stream;
|
|||
|
||||
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
|
||||
import static org.apache.lucene.util.LuceneTestCase.rarely;
|
||||
import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.test.ESTestCase.assertBusy;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
||||
import static org.elasticsearch.test.ESTestCase.randomFrom;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
@ -225,6 +226,8 @@ public final class InternalTestCluster extends TestCluster {
|
|||
|
||||
private final ExecutorService executor;
|
||||
|
||||
private final boolean autoManageMinMasterNodes;
|
||||
|
||||
private final Collection<Class<? extends Plugin>> mockPlugins;
|
||||
|
||||
/**
|
||||
|
@ -238,9 +241,10 @@ public final class InternalTestCluster extends TestCluster {
|
|||
|
||||
public InternalTestCluster(long clusterSeed, Path baseDir,
|
||||
boolean randomlyAddDedicatedMasters,
|
||||
int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
|
||||
boolean autoManageMinMasterNodes, int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
|
||||
boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins, Function<Client, Client> clientWrapper) {
|
||||
super(clusterSeed);
|
||||
this.autoManageMinMasterNodes = autoManageMinMasterNodes;
|
||||
this.clientWrapper = clientWrapper;
|
||||
this.baseDir = baseDir;
|
||||
this.clusterName = clusterName;
|
||||
|
@ -345,6 +349,11 @@ public final class InternalTestCluster extends TestCluster {
|
|||
return clusterName;
|
||||
}
|
||||
|
||||
/** returns true if the {@link ElectMasterService#DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING} setting is auto managed by this cluster */
|
||||
public boolean getAutoManageMinMasterNode() {
|
||||
return autoManageMinMasterNodes;
|
||||
}
|
||||
|
||||
public String[] getNodeNames() {
|
||||
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
|
||||
}
|
||||
|
@ -466,7 +475,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
if (randomNodeAndClient != null) {
|
||||
return randomNodeAndClient;
|
||||
}
|
||||
NodeAndClient buildNode = buildNode();
|
||||
NodeAndClient buildNode = buildNode(1);
|
||||
buildNode.startNode();
|
||||
publishNode(buildNode);
|
||||
return buildNode;
|
||||
|
@ -496,30 +505,20 @@ public final class InternalTestCluster extends TestCluster {
|
|||
* if more nodes than <code>n</code> are present this method will not
|
||||
* stop any of the running nodes.
|
||||
*/
|
||||
public void ensureAtLeastNumDataNodes(int n) {
|
||||
final List<Async<String>> asyncs = new ArrayList<>();
|
||||
synchronized (this) {
|
||||
int size = numDataNodes();
|
||||
for (int i = size; i < n; i++) {
|
||||
logger.info("increasing cluster size from {} to {}", size, n);
|
||||
if (numSharedDedicatedMasterNodes > 0) {
|
||||
asyncs.add(startDataOnlyNodeAsync());
|
||||
} else {
|
||||
asyncs.add(startNodeAsync());
|
||||
}
|
||||
public synchronized void ensureAtLeastNumDataNodes(int n) {
|
||||
boolean added = false;
|
||||
int size = numDataNodes();
|
||||
for (int i = size; i < n; i++) {
|
||||
logger.info("increasing cluster size from {} to {}", size, n);
|
||||
added = true;
|
||||
if (numSharedDedicatedMasterNodes > 0) {
|
||||
startDataOnlyNode(Settings.EMPTY);
|
||||
} else {
|
||||
startNode(Settings.EMPTY);
|
||||
}
|
||||
}
|
||||
try {
|
||||
for (Async<String> async : asyncs) {
|
||||
async.get();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("failed to start nodes", e);
|
||||
}
|
||||
if (!asyncs.isEmpty()) {
|
||||
synchronized (this) {
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodes.size())).get());
|
||||
}
|
||||
if (added) {
|
||||
validateClusterFormed();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -544,28 +543,47 @@ public final class InternalTestCluster extends TestCluster {
|
|||
while (values.hasNext() && numNodesAndClients++ < size - n) {
|
||||
NodeAndClient next = values.next();
|
||||
nodesToRemove.add(next);
|
||||
removeDisruptionSchemeFromNode(next);
|
||||
next.close();
|
||||
}
|
||||
for (NodeAndClient toRemove : nodesToRemove) {
|
||||
nodes.remove(toRemove.name);
|
||||
}
|
||||
|
||||
stopNodesAndClients(nodesToRemove);
|
||||
if (!nodesToRemove.isEmpty() && size() > 0) {
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodes.size())).get());
|
||||
validateClusterFormed();
|
||||
}
|
||||
}
|
||||
|
||||
private NodeAndClient buildNode(Settings settings) {
|
||||
/**
|
||||
* builds a new node given the settings.
|
||||
*
|
||||
* @param settings the settings to use
|
||||
* @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
|
||||
*/
|
||||
private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes) {
|
||||
int ord = nextNodeId.getAndIncrement();
|
||||
return buildNode(ord, random.nextLong(), settings, false);
|
||||
return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes);
|
||||
}
|
||||
|
||||
private NodeAndClient buildNode() {
|
||||
/**
|
||||
* builds a new node with default settings
|
||||
*
|
||||
* @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
|
||||
*/
|
||||
private NodeAndClient buildNode(int defaultMinMasterNodes) {
|
||||
int ord = nextNodeId.getAndIncrement();
|
||||
return buildNode(ord, random.nextLong(), null, false);
|
||||
return buildNode(ord, random.nextLong(), null, false, defaultMinMasterNodes);
|
||||
}
|
||||
|
||||
private NodeAndClient buildNode(int nodeId, long seed, Settings settings, boolean reuseExisting) {
|
||||
/**
|
||||
* builds a new node
|
||||
*
|
||||
* @param nodeId the node internal id (see {@link NodeAndClient#nodeAndClientId()}
|
||||
* @param seed the node's random seed
|
||||
* @param settings the settings to use
|
||||
* @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and
|
||||
* the method will return the existing one
|
||||
* @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
|
||||
*/
|
||||
private NodeAndClient buildNode(int nodeId, long seed, Settings settings,
|
||||
boolean reuseExisting, int defaultMinMasterNodes) {
|
||||
assert Thread.holdsLock(this);
|
||||
ensureOpen();
|
||||
settings = getSettings(nodeId, seed, settings);
|
||||
|
@ -577,13 +595,21 @@ public final class InternalTestCluster extends TestCluster {
|
|||
assert reuseExisting == true || nodes.containsKey(name) == false :
|
||||
"node name [" + name + "] already exists but not allowed to use it";
|
||||
}
|
||||
Settings finalSettings = Settings.builder()
|
||||
Settings.Builder finalSettings = Settings.builder()
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home
|
||||
.put(settings)
|
||||
.put("node.name", name)
|
||||
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed)
|
||||
.build();
|
||||
MockNode node = new MockNode(finalSettings, plugins);
|
||||
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed);
|
||||
|
||||
if (autoManageMinMasterNodes) {
|
||||
assert finalSettings.get(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null :
|
||||
"min master nodes may not be set when auto managed";
|
||||
finalSettings
|
||||
// don't wait too long not to slow down tests
|
||||
.put(ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.getKey(), "5s")
|
||||
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), defaultMinMasterNodes);
|
||||
}
|
||||
MockNode node = new MockNode(finalSettings.build(), plugins);
|
||||
return new NodeAndClient(name, node, nodeId);
|
||||
}
|
||||
|
||||
|
@ -684,7 +710,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
|
||||
if (size() == 0) {
|
||||
// if we are the first node - don't wait for a state
|
||||
builder.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
|
||||
builder.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
|
||||
}
|
||||
return startNode(builder);
|
||||
}
|
||||
|
@ -777,6 +803,10 @@ public final class InternalTestCluster extends TestCluster {
|
|||
return nodeAndClientId;
|
||||
}
|
||||
|
||||
public boolean isMasterEligible() {
|
||||
return Node.NODE_MASTER_SETTING.get(node.settings());
|
||||
}
|
||||
|
||||
Client client(Random random) {
|
||||
if (closed.get()) {
|
||||
throw new RuntimeException("already closed");
|
||||
|
@ -844,21 +874,40 @@ public final class InternalTestCluster extends TestCluster {
|
|||
node.close();
|
||||
}
|
||||
|
||||
void restart(RestartCallback callback, boolean clearDataIfNeeded) throws Exception {
|
||||
assert callback != null;
|
||||
resetClient();
|
||||
/**
|
||||
* closes the current node if not already closed, builds a new node object using the current node settings and starts it
|
||||
*/
|
||||
void restart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception {
|
||||
if (!node.isClosed()) {
|
||||
closeNode();
|
||||
}
|
||||
Settings newSettings = callback.onNodeStopped(name);
|
||||
if (newSettings == null) {
|
||||
newSettings = Settings.EMPTY;
|
||||
recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes);
|
||||
startNode();
|
||||
}
|
||||
|
||||
/**
|
||||
* rebuilds a new node object using the current node settings and starts it
|
||||
*/
|
||||
void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception {
|
||||
assert callback != null;
|
||||
Settings callbackSettings = callback.onNodeStopped(name);
|
||||
Settings.Builder newSettings = Settings.builder();
|
||||
if (callbackSettings != null) {
|
||||
newSettings.put(callbackSettings);
|
||||
}
|
||||
if (minMasterNodes >= 0) {
|
||||
assert ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(newSettings.build()) == false : "min master nodes is auto managed";
|
||||
newSettings.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes).build();
|
||||
}
|
||||
|
||||
// validation is (optionally) done in fullRestart/rollingRestart
|
||||
newSettings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s");
|
||||
if (clearDataIfNeeded) {
|
||||
clearDataIfNeeded(callback);
|
||||
}
|
||||
createNewNode(newSettings);
|
||||
startNode();
|
||||
createNewNode(newSettings.build());
|
||||
// make sure cached client points to new node
|
||||
resetClient();
|
||||
}
|
||||
|
||||
private void clearDataIfNeeded(RestartCallback callback) throws IOException {
|
||||
|
@ -948,22 +997,24 @@ public final class InternalTestCluster extends TestCluster {
|
|||
if (wipeData) {
|
||||
wipePendingDataDirectories();
|
||||
}
|
||||
if (nodes.size() > 0 && autoManageMinMasterNodes) {
|
||||
updateMinMasterNodes(getMasterNodesCount());
|
||||
}
|
||||
logger.debug("Cluster hasn't changed - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
|
||||
return;
|
||||
}
|
||||
logger.debug("Cluster is NOT consistent - restarting shared nodes - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
|
||||
|
||||
// trash all nodes with id >= sharedNodesSeeds.length - they are non shared
|
||||
|
||||
|
||||
final List<NodeAndClient> toClose = new ArrayList<>();
|
||||
for (Iterator<NodeAndClient> iterator = nodes.values().iterator(); iterator.hasNext();) {
|
||||
NodeAndClient nodeAndClient = iterator.next();
|
||||
if (nodeAndClient.nodeAndClientId() >= sharedNodesSeeds.length) {
|
||||
logger.debug("Close Node [{}] not shared", nodeAndClient.name);
|
||||
nodeAndClient.close();
|
||||
iterator.remove();
|
||||
toClose.add(nodeAndClient);
|
||||
}
|
||||
}
|
||||
stopNodesAndClients(toClose);
|
||||
|
||||
// clean up what the nodes left that is unused
|
||||
if (wipeData) {
|
||||
|
@ -972,13 +1023,19 @@ public final class InternalTestCluster extends TestCluster {
|
|||
|
||||
// start any missing node
|
||||
assert newSize == numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes;
|
||||
final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes;
|
||||
final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1;
|
||||
final List<NodeAndClient> toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes
|
||||
for (int i = 0; i < numSharedDedicatedMasterNodes; i++) {
|
||||
final Settings.Builder settings = Settings.builder();
|
||||
settings.put(Node.NODE_MASTER_SETTING.getKey(), true).build();
|
||||
settings.put(Node.NODE_DATA_SETTING.getKey(), false).build();
|
||||
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
|
||||
nodeAndClient.startNode();
|
||||
publishNode(nodeAndClient);
|
||||
settings.put(Node.NODE_MASTER_SETTING.getKey(), true);
|
||||
settings.put(Node.NODE_DATA_SETTING.getKey(), false);
|
||||
if (autoManageMinMasterNodes) {
|
||||
settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end
|
||||
}
|
||||
|
||||
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
|
||||
toStartAndPublish.add(nodeAndClient);
|
||||
}
|
||||
for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) {
|
||||
final Settings.Builder settings = Settings.builder();
|
||||
|
@ -987,32 +1044,43 @@ public final class InternalTestCluster extends TestCluster {
|
|||
settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build();
|
||||
settings.put(Node.NODE_DATA_SETTING.getKey(), true).build();
|
||||
}
|
||||
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
|
||||
nodeAndClient.startNode();
|
||||
publishNode(nodeAndClient);
|
||||
if (autoManageMinMasterNodes) {
|
||||
settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end
|
||||
}
|
||||
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
|
||||
toStartAndPublish.add(nodeAndClient);
|
||||
}
|
||||
for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes;
|
||||
i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) {
|
||||
final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false)
|
||||
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
|
||||
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true);
|
||||
nodeAndClient.startNode();
|
||||
publishNode(nodeAndClient);
|
||||
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes);
|
||||
toStartAndPublish.add(nodeAndClient);
|
||||
}
|
||||
|
||||
startAndPublishNodesAndClients(toStartAndPublish);
|
||||
|
||||
nextNodeId.set(newSize);
|
||||
assert size() == newSize;
|
||||
if (newSize > 0) {
|
||||
ClusterHealthResponse response = client().admin().cluster().prepareHealth()
|
||||
.setWaitForNodes(Integer.toString(newSize)).get();
|
||||
if (response.isTimedOut()) {
|
||||
logger.warn("failed to wait for a cluster of size [{}], got [{}]", newSize, response);
|
||||
throw new IllegalStateException("cluster failed to reach the expected size of [" + newSize + "]");
|
||||
}
|
||||
validateClusterFormed();
|
||||
}
|
||||
logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
|
||||
}
|
||||
|
||||
/** ensure a cluster is form with {@link #nodes}.size() nodes. */
|
||||
private void validateClusterFormed() {
|
||||
final int size = nodes.size();
|
||||
String name = randomFrom(random, getNodeNames());
|
||||
logger.trace("validating cluster formed via [{}], expecting [{}]", name, size);
|
||||
final Client client = client(name);
|
||||
ClusterHealthResponse response = client.admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(size)).get();
|
||||
if (response.isTimedOut()) {
|
||||
logger.warn("failed to wait for a cluster of size [{}], got [{}]", size, response);
|
||||
throw new IllegalStateException("cluster failed to reach the expected size of [" + size + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void afterTest() throws IOException {
|
||||
wipePendingDataDirectories();
|
||||
|
@ -1242,9 +1310,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
|
||||
if (nodeAndClient != null) {
|
||||
logger.info("Closing random node [{}] ", nodeAndClient.name);
|
||||
removeDisruptionSchemeFromNode(nodeAndClient);
|
||||
nodes.remove(nodeAndClient.name);
|
||||
nodeAndClient.close();
|
||||
stopNodesAndClient(nodeAndClient);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -1259,9 +1325,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
NodeAndClient nodeAndClient = getRandomNodeAndClient(nc -> filter.test(nc.node.settings()));
|
||||
if (nodeAndClient != null) {
|
||||
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
|
||||
removeDisruptionSchemeFromNode(nodeAndClient);
|
||||
nodes.remove(nodeAndClient.name);
|
||||
nodeAndClient.close();
|
||||
stopNodesAndClient(nodeAndClient);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1274,9 +1338,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
String masterNodeName = getMasterName();
|
||||
assert nodes.containsKey(masterNodeName);
|
||||
logger.info("Closing master node [{}] ", masterNodeName);
|
||||
removeDisruptionSchemeFromNode(nodes.get(masterNodeName));
|
||||
NodeAndClient remove = nodes.remove(masterNodeName);
|
||||
remove.close();
|
||||
stopNodesAndClient(nodes.get(masterNodeName));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1286,8 +1348,47 @@ public final class InternalTestCluster extends TestCluster {
|
|||
NodeAndClient nodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName()).negate());
|
||||
if (nodeAndClient != null) {
|
||||
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
|
||||
stopNodesAndClient(nodeAndClient);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void startAndPublishNodesAndClients(List<NodeAndClient> nodeAndClients) {
|
||||
if (nodeAndClients.size() > 0) {
|
||||
final int newMasters = (int) nodeAndClients.stream().filter(NodeAndClient::isMasterEligible)
|
||||
.filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters
|
||||
.count();
|
||||
final int currentMasters = getMasterNodesCount();
|
||||
if (autoManageMinMasterNodes && currentMasters > 1 && newMasters > 0) {
|
||||
// special case for 1 node master - we can't update the min master nodes before we add more nodes.
|
||||
updateMinMasterNodes(currentMasters + newMasters);
|
||||
}
|
||||
for (NodeAndClient nodeAndClient : nodeAndClients) {
|
||||
nodeAndClient.startNode();
|
||||
publishNode(nodeAndClient);
|
||||
}
|
||||
if (autoManageMinMasterNodes && currentMasters == 1 && newMasters > 0) {
|
||||
// update once masters have joined
|
||||
validateClusterFormed();
|
||||
updateMinMasterNodes(currentMasters + newMasters);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException {
|
||||
stopNodesAndClients(Collections.singleton(nodeAndClient));
|
||||
}
|
||||
|
||||
private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndClients) throws IOException {
|
||||
if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {
|
||||
int masters = (int)nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();
|
||||
if (masters > 0) {
|
||||
updateMinMasterNodes(getMasterNodesCount() - masters);
|
||||
}
|
||||
}
|
||||
for (NodeAndClient nodeAndClient: nodeAndClients) {
|
||||
removeDisruptionSchemeFromNode(nodeAndClient);
|
||||
nodes.remove(nodeAndClient.name);
|
||||
NodeAndClient previous = nodes.remove(nodeAndClient.name);
|
||||
assert previous == nodeAndClient;
|
||||
nodeAndClient.close();
|
||||
}
|
||||
}
|
||||
|
@ -1327,8 +1428,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
ensureOpen();
|
||||
NodeAndClient nodeAndClient = getRandomNodeAndClient(predicate);
|
||||
if (nodeAndClient != null) {
|
||||
logger.info("Restarting random node [{}] ", nodeAndClient.name);
|
||||
nodeAndClient.restart(callback, true);
|
||||
restartNode(nodeAndClient, callback);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1339,93 +1439,10 @@ public final class InternalTestCluster extends TestCluster {
|
|||
ensureOpen();
|
||||
NodeAndClient nodeAndClient = nodes.get(nodeName);
|
||||
if (nodeAndClient != null) {
|
||||
logger.info("Restarting node [{}] ", nodeAndClient.name);
|
||||
nodeAndClient.restart(callback, true);
|
||||
restartNode(nodeAndClient, callback);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void restartAllNodes(boolean rollingRestart, RestartCallback callback) throws Exception {
|
||||
ensureOpen();
|
||||
List<NodeAndClient> toRemove = new ArrayList<>();
|
||||
try {
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
if (!callback.doRestart(nodeAndClient.name)) {
|
||||
logger.info("Closing node [{}] during restart", nodeAndClient.name);
|
||||
toRemove.add(nodeAndClient);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
|
||||
}
|
||||
nodeAndClient.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
for (NodeAndClient nodeAndClient : toRemove) {
|
||||
nodes.remove(nodeAndClient.name);
|
||||
}
|
||||
}
|
||||
logger.info("Restarting remaining nodes rollingRestart [{}]", rollingRestart);
|
||||
if (rollingRestart) {
|
||||
int numNodesRestarted = 0;
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
|
||||
logger.info("Restarting node [{}] ", nodeAndClient.name);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
|
||||
}
|
||||
nodeAndClient.restart(callback, true);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int numNodesRestarted = 0;
|
||||
Set[] nodesRoleOrder = new Set[nextNodeId.get()];
|
||||
Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
|
||||
logger.info("Stopping node [{}] ", nodeAndClient.name);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
|
||||
}
|
||||
nodeAndClient.closeNode();
|
||||
// delete data folders now, before we start other nodes that may claim it
|
||||
nodeAndClient.clearDataIfNeeded(callback);
|
||||
|
||||
DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode();
|
||||
nodesRoleOrder[nodeAndClient.nodeAndClientId()] = discoveryNode.getRoles();
|
||||
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
|
||||
}
|
||||
|
||||
assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size();
|
||||
|
||||
// randomize start up order, but making sure that:
|
||||
// 1) A data folder that was assigned to a data node will stay so
|
||||
// 2) Data nodes will get the same node lock ordinal range, so custom index paths (where the ordinal is used)
|
||||
// will still belong to data nodes
|
||||
for (List<NodeAndClient> sameRoleNodes : nodesByRoles.values()) {
|
||||
Collections.shuffle(sameRoleNodes, random);
|
||||
}
|
||||
|
||||
for (Set roles : nodesRoleOrder) {
|
||||
if (roles == null) {
|
||||
// if some nodes were stopped, we want have a role for them
|
||||
continue;
|
||||
}
|
||||
NodeAndClient nodeAndClient = nodesByRoles.get(roles).remove(0);
|
||||
logger.info("Starting node [{}] ", nodeAndClient.name);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
|
||||
}
|
||||
// we already cleared data folders, before starting nodes up
|
||||
nodeAndClient.restart(callback, false);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static final RestartCallback EMPTY_CALLBACK = new RestartCallback() {
|
||||
@Override
|
||||
public Settings onNodeStopped(String node) {
|
||||
|
@ -1450,15 +1467,98 @@ public final class InternalTestCluster extends TestCluster {
|
|||
/**
|
||||
* Restarts all nodes in a rolling restart fashion ie. only restarts on node a time.
|
||||
*/
|
||||
public void rollingRestart(RestartCallback function) throws Exception {
|
||||
restartAllNodes(true, function);
|
||||
public synchronized void rollingRestart(RestartCallback callback) throws Exception {
|
||||
int numNodesRestarted = 0;
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
|
||||
restartNode(nodeAndClient, callback);
|
||||
}
|
||||
}
|
||||
|
||||
private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
|
||||
logger.info("Restarting node [{}] ", nodeAndClient.name);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
|
||||
}
|
||||
final int masterNodesCount = getMasterNodesCount();
|
||||
// special case to allow stopping one node in a two node cluster and keep it functional
|
||||
final boolean updateMinMaster = nodeAndClient.isMasterEligible() && masterNodesCount == 2 && autoManageMinMasterNodes;
|
||||
if (updateMinMaster) {
|
||||
updateMinMasterNodes(masterNodesCount - 1);
|
||||
}
|
||||
nodeAndClient.restart(callback, true, autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
|
||||
}
|
||||
if (callback.validateClusterForming() || updateMinMaster) {
|
||||
// we have to validate cluster size if updateMinMaster == true, because we need the
|
||||
// second node to join in order to increment min_master_nodes back to 2.
|
||||
validateClusterFormed();
|
||||
}
|
||||
if (updateMinMaster) {
|
||||
updateMinMasterNodes(masterNodesCount);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Restarts all nodes in the cluster. It first stops all nodes and then restarts all the nodes again.
|
||||
*/
|
||||
public void fullRestart(RestartCallback function) throws Exception {
|
||||
restartAllNodes(false, function);
|
||||
public synchronized void fullRestart(RestartCallback callback) throws Exception {
|
||||
int numNodesRestarted = 0;
|
||||
Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
|
||||
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
|
||||
logger.info("Stopping node [{}] ", nodeAndClient.name);
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
|
||||
}
|
||||
nodeAndClient.closeNode();
|
||||
// delete data folders now, before we start other nodes that may claim it
|
||||
nodeAndClient.clearDataIfNeeded(callback);
|
||||
DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode();
|
||||
rolesOrderedByOriginalStartupOrder[nodeAndClient.nodeAndClientId] = discoveryNode.getRoles();
|
||||
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
|
||||
}
|
||||
|
||||
assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size();
|
||||
|
||||
// randomize start up order, but making sure that:
|
||||
// 1) A data folder that was assigned to a data node will stay so
|
||||
// 2) Data nodes will get the same node lock ordinal range, so custom index paths (where the ordinal is used)
|
||||
// will still belong to data nodes
|
||||
for (List<NodeAndClient> sameRoleNodes : nodesByRoles.values()) {
|
||||
Collections.shuffle(sameRoleNodes, random);
|
||||
}
|
||||
List<NodeAndClient> startUpOrder = new ArrayList<>();
|
||||
for (Set roles : rolesOrderedByOriginalStartupOrder) {
|
||||
if (roles == null) {
|
||||
// if some nodes were stopped, we want have a role for that ordinal
|
||||
continue;
|
||||
}
|
||||
final List<NodeAndClient> nodesByRole = nodesByRoles.get(roles);
|
||||
startUpOrder.add(nodesByRole.remove(0));
|
||||
}
|
||||
assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0;
|
||||
|
||||
// do two rounds to minimize pinging (mock zen pings pings with no delay and can create a lot of logs)
|
||||
for (NodeAndClient nodeAndClient : startUpOrder) {
|
||||
logger.info("resetting node [{}] ", nodeAndClient.name);
|
||||
// we already cleared data folders, before starting nodes up
|
||||
nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1);
|
||||
}
|
||||
|
||||
for (NodeAndClient nodeAndClient : startUpOrder) {
|
||||
logger.info("starting node [{}] ", nodeAndClient.name);
|
||||
nodeAndClient.startNode();
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
|
||||
}
|
||||
}
|
||||
|
||||
if (callback.validateClusterForming()) {
|
||||
validateClusterFormed();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1542,19 +1642,51 @@ public final class InternalTestCluster extends TestCluster {
|
|||
* Starts a node with the given settings and returns it's name.
|
||||
*/
|
||||
public synchronized String startNode(Settings settings) {
|
||||
NodeAndClient buildNode = buildNode(settings);
|
||||
buildNode.startNode();
|
||||
publishNode(buildNode);
|
||||
final int defaultMinMasterNodes = getMinMasterNodes(getMasterNodesCount() + (Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0));
|
||||
NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes);
|
||||
startAndPublishNodesAndClients(Collections.singletonList(buildNode));
|
||||
return buildNode.name;
|
||||
}
|
||||
|
||||
/**
|
||||
* updates the min master nodes setting in the current running cluster.
|
||||
*
|
||||
* @param eligibleMasterNodeCount the number of master eligible nodes to use as basis for the min master node setting
|
||||
*/
|
||||
private int updateMinMasterNodes(int eligibleMasterNodeCount) {
|
||||
assert autoManageMinMasterNodes;
|
||||
final int minMasterNodes = getMinMasterNodes(eligibleMasterNodeCount);
|
||||
if (getMasterNodesCount() > 0) {
|
||||
// there should be at least one master to update
|
||||
logger.debug("updating min_master_nodes to [{}]", minMasterNodes);
|
||||
try {
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(
|
||||
Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes)
|
||||
));
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("failed to update minimum master node to [{}] (current masters [{}])", e,
|
||||
minMasterNodes, getMasterNodesCount());
|
||||
}
|
||||
}
|
||||
return minMasterNodes;
|
||||
}
|
||||
|
||||
/** calculates a min master nodes value based on the given number of master nodes */
|
||||
private int getMinMasterNodes(int eligibleMasterNodes) {
|
||||
return eligibleMasterNodes / 2 + 1;
|
||||
}
|
||||
|
||||
private int getMasterNodesCount() {
|
||||
return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count();
|
||||
}
|
||||
|
||||
public synchronized Async<List<String>> startMasterOnlyNodesAsync(int numNodes) {
|
||||
return startMasterOnlyNodesAsync(numNodes, Settings.EMPTY);
|
||||
}
|
||||
|
||||
public synchronized Async<List<String>> startMasterOnlyNodesAsync(int numNodes, Settings settings) {
|
||||
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
|
||||
return startNodesAsync(numNodes, settings1, Version.CURRENT);
|
||||
return startNodesAsync(numNodes, settings1);
|
||||
}
|
||||
|
||||
public synchronized Async<List<String>> startDataOnlyNodesAsync(int numNodes) {
|
||||
|
@ -1563,7 +1695,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
|
||||
public synchronized Async<List<String>> startDataOnlyNodesAsync(int numNodes, Settings settings) {
|
||||
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
|
||||
return startNodesAsync(numNodes, settings1, Version.CURRENT);
|
||||
return startNodesAsync(numNodes, settings1);
|
||||
}
|
||||
|
||||
public synchronized Async<String> startMasterOnlyNodeAsync() {
|
||||
|
@ -1572,7 +1704,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
|
||||
public synchronized Async<String> startMasterOnlyNodeAsync(Settings settings) {
|
||||
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build();
|
||||
return startNodeAsync(settings1, Version.CURRENT);
|
||||
return startNodeAsync(settings1);
|
||||
}
|
||||
|
||||
public synchronized String startMasterOnlyNode(Settings settings) {
|
||||
|
@ -1586,7 +1718,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
|
||||
public synchronized Async<String> startDataOnlyNodeAsync(Settings settings) {
|
||||
Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build();
|
||||
return startNodeAsync(settings1, Version.CURRENT);
|
||||
return startNodeAsync(settings1);
|
||||
}
|
||||
|
||||
public synchronized String startDataOnlyNode(Settings settings) {
|
||||
|
@ -1598,21 +1730,25 @@ public final class InternalTestCluster extends TestCluster {
|
|||
* Starts a node in an async manner with the given settings and returns future with its name.
|
||||
*/
|
||||
public synchronized Async<String> startNodeAsync() {
|
||||
return startNodeAsync(Settings.EMPTY, Version.CURRENT);
|
||||
return startNodeAsync(Settings.EMPTY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a node in an async manner with the given settings and returns future with its name.
|
||||
*/
|
||||
public synchronized Async<String> startNodeAsync(final Settings settings) {
|
||||
return startNodeAsync(settings, Version.CURRENT);
|
||||
final int defaultMinMasterNodes;
|
||||
if (autoManageMinMasterNodes) {
|
||||
int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0;
|
||||
defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
|
||||
} else {
|
||||
defaultMinMasterNodes = -1;
|
||||
}
|
||||
return startNodeAsync(settings, defaultMinMasterNodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a node in an async manner with the given settings and version and returns future with its name.
|
||||
*/
|
||||
public synchronized Async<String> startNodeAsync(final Settings settings, final Version version) {
|
||||
final NodeAndClient buildNode = buildNode(settings);
|
||||
private synchronized Async<String> startNodeAsync(final Settings settings, int defaultMinMasterNodes) {
|
||||
final NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes);
|
||||
final Future<String> submit = executor.submit(() -> {
|
||||
buildNode.startNode();
|
||||
publishNode(buildNode);
|
||||
|
@ -1621,27 +1757,28 @@ public final class InternalTestCluster extends TestCluster {
|
|||
return () -> submit.get();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Starts multiple nodes in an async manner and returns future with its name.
|
||||
*/
|
||||
public synchronized Async<List<String>> startNodesAsync(final int numNodes) {
|
||||
return startNodesAsync(numNodes, Settings.EMPTY, Version.CURRENT);
|
||||
return startNodesAsync(numNodes, Settings.EMPTY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts multiple nodes in an async manner with the given settings and returns future with its name.
|
||||
*/
|
||||
public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings) {
|
||||
return startNodesAsync(numNodes, settings, Version.CURRENT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts multiple nodes in an async manner with the given settings and version and returns future with its name.
|
||||
*/
|
||||
public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings, final Version version) {
|
||||
public synchronized Async<List<String>> startNodesAsync(final int numNodes, final Settings settings) {
|
||||
final int defaultMinMasterNodes;
|
||||
if (autoManageMinMasterNodes) {
|
||||
int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? numNodes : 0;
|
||||
defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
|
||||
} else {
|
||||
defaultMinMasterNodes = -1;
|
||||
}
|
||||
final List<Async<String>> asyncs = new ArrayList<>();
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
asyncs.add(startNodeAsync(settings, version));
|
||||
asyncs.add(startNodeAsync(settings, defaultMinMasterNodes));
|
||||
}
|
||||
|
||||
return () -> {
|
||||
|
@ -1658,9 +1795,16 @@ public final class InternalTestCluster extends TestCluster {
|
|||
* The order of the node names returned matches the order of the settings provided.
|
||||
*/
|
||||
public synchronized Async<List<String>> startNodesAsync(final Settings... settings) {
|
||||
final int defaultMinMasterNodes;
|
||||
if (autoManageMinMasterNodes) {
|
||||
int mastersDelta = (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count();
|
||||
defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta);
|
||||
} else {
|
||||
defaultMinMasterNodes = -1;
|
||||
}
|
||||
List<Async<String>> asyncs = new ArrayList<>();
|
||||
for (Settings setting : settings) {
|
||||
asyncs.add(startNodeAsync(setting, Version.CURRENT));
|
||||
asyncs.add(startNodeAsync(setting, defaultMinMasterNodes));
|
||||
}
|
||||
return () -> {
|
||||
List<String> ids = new ArrayList<>();
|
||||
|
@ -1691,6 +1835,11 @@ public final class InternalTestCluster extends TestCluster {
|
|||
return dataAndMasterNodes().size();
|
||||
}
|
||||
|
||||
public synchronized int numMasterNodes() {
|
||||
return filterNodes(nodes, NodeAndClient::isMasterEligible).size();
|
||||
}
|
||||
|
||||
|
||||
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
|
||||
clearDisruptionScheme();
|
||||
scheme.applyToCluster(this);
|
||||
|
@ -1895,14 +2044,8 @@ public final class InternalTestCluster extends TestCluster {
|
|||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* If this returns <code>false</code> the node with the given node name will not be restarted. It will be
|
||||
* closed and removed from the cluster. Returns <code>true</code> by default.
|
||||
*/
|
||||
public boolean doRestart(String nodeName) {
|
||||
return true;
|
||||
}
|
||||
/** returns true if the restart should also validate the cluster has reformed */
|
||||
public boolean validateClusterForming() { return true; }
|
||||
}
|
||||
|
||||
public Settings getDefaultSettings() {
|
||||
|
|
|
@ -18,11 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.test.discovery;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -32,13 +27,22 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.discovery.zen.PingContextProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging
|
||||
* to be immediate and can be used to speed up tests.
|
||||
*/
|
||||
public final class MockZenPing extends AbstractComponent implements ZenPing {
|
||||
|
||||
static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap();
|
||||
static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = new HashMap<>();
|
||||
|
||||
/** a set of the last discovered pings. used to throttle busy spinning where MockZenPing will keep returning the same results */
|
||||
private Set<MockZenPing> lastDiscoveredPings = null;
|
||||
|
||||
private volatile PingContextProvider contextProvider;
|
||||
|
||||
|
@ -50,18 +54,34 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
|
|||
public void start(PingContextProvider contextProvider) {
|
||||
this.contextProvider = contextProvider;
|
||||
assert contextProvider != null;
|
||||
boolean added = getActiveNodesForCurrentCluster().add(this);
|
||||
assert added;
|
||||
synchronized (activeNodesPerCluster) {
|
||||
boolean added = getActiveNodesForCurrentCluster().add(this);
|
||||
assert added;
|
||||
activeNodesPerCluster.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ping(PingListener listener, TimeValue timeout) {
|
||||
logger.info("pinging using mock zen ping");
|
||||
List<PingResponse> responseList = getActiveNodesForCurrentCluster().stream()
|
||||
.filter(p -> p != this) // remove this as pings are not expected to return the local node
|
||||
.map(MockZenPing::getPingResponse)
|
||||
.collect(Collectors.toList());
|
||||
listener.onPing(responseList);
|
||||
synchronized (activeNodesPerCluster) {
|
||||
Set<MockZenPing> activeNodes = getActiveNodesForCurrentCluster();
|
||||
if (activeNodes.equals(lastDiscoveredPings)) {
|
||||
try {
|
||||
logger.trace("nothing has changed since the last ping. waiting for a change");
|
||||
activeNodesPerCluster.wait(timeout.millis());
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
}
|
||||
activeNodes = getActiveNodesForCurrentCluster();
|
||||
}
|
||||
lastDiscoveredPings = activeNodes;
|
||||
List<PingResponse> responseList = activeNodes.stream()
|
||||
.filter(p -> p != this) // remove this as pings are not expected to return the local node
|
||||
.map(MockZenPing::getPingResponse)
|
||||
.collect(Collectors.toList());
|
||||
listener.onPing(responseList);
|
||||
}
|
||||
}
|
||||
|
||||
private ClusterName getClusterName() {
|
||||
|
@ -74,13 +94,17 @@ public final class MockZenPing extends AbstractComponent implements ZenPing {
|
|||
}
|
||||
|
||||
private Set<MockZenPing> getActiveNodesForCurrentCluster() {
|
||||
assert Thread.holdsLock(activeNodesPerCluster);
|
||||
return activeNodesPerCluster.computeIfAbsent(getClusterName(),
|
||||
clusterName -> ConcurrentCollections.newConcurrentSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
boolean found = getActiveNodesForCurrentCluster().remove(this);
|
||||
assert found;
|
||||
synchronized (activeNodesPerCluster) {
|
||||
boolean found = getActiveNodesForCurrentCluster().remove(this);
|
||||
assert found;
|
||||
activeNodesPerCluster.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -636,7 +636,7 @@ public class ElasticsearchAssertions {
|
|||
* a way that sucks less.
|
||||
*/
|
||||
NamedWriteableRegistry registry;
|
||||
if (ESIntegTestCase.isInternalCluster()) {
|
||||
if (ESIntegTestCase.isInternalCluster() && ESIntegTestCase.internalCluster().size() > 0) {
|
||||
registry = ESIntegTestCase.internalCluster().getInstance(NamedWriteableRegistry.class);
|
||||
} else {
|
||||
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
|
||||
|
|
|
@ -19,21 +19,6 @@
|
|||
*/
|
||||
package org.elasticsearch.test.test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -51,14 +36,32 @@ import org.elasticsearch.test.NodeConfigurationSource;
|
|||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.DATA;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.INGEST;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.MASTER;
|
||||
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileNotExists;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
/**
|
||||
|
@ -81,10 +84,10 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
|
||||
Path baseDir = createTempDir();
|
||||
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
|
||||
minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
|
||||
randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
|
||||
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
|
||||
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
|
||||
minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
|
||||
randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
|
||||
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
|
||||
// TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way
|
||||
assertClusters(cluster0, cluster1, false);
|
||||
|
@ -116,7 +119,8 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
public static void assertSettings(Settings left, Settings right, boolean checkClusterUniqueSettings) {
|
||||
Set<Map.Entry<String, String>> entries0 = left.getAsMap().entrySet();
|
||||
Map<String, String> entries1 = right.getAsMap();
|
||||
assertThat(entries0.size(), equalTo(entries1.size()));
|
||||
assertThat("--> left:\n" + left.toDelimitedString('\n') + "\n-->right:\n" + right.toDelimitedString('\n'),
|
||||
entries0.size(), equalTo(entries1.size()));
|
||||
for (Map.Entry<String, String> entry : entries0) {
|
||||
if (clusterUniqueSettings.contains(entry.getKey()) && checkClusterUniqueSettings == false) {
|
||||
continue;
|
||||
|
@ -125,6 +129,41 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertMMNinNodeSetting(InternalTestCluster cluster, int masterNodes) {
|
||||
for (final String node : cluster.getNodeNames()) {
|
||||
assertMMNinNodeSetting(node, cluster, masterNodes);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertMMNinNodeSetting(String node, InternalTestCluster cluster, int masterNodes) {
|
||||
final int minMasterNodes = masterNodes / 2 + 1;
|
||||
final Matcher<Map<? extends String, ? extends String>> minMasterMatcher =
|
||||
hasEntry(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes));
|
||||
final Matcher<Map<? extends String, ?>> noMinMasterNodesMatcher = not(hasKey(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()));
|
||||
Settings nodeSettings = cluster.client(node).admin().cluster().prepareNodesInfo(node).get().getNodes().get(0).getSettings();
|
||||
assertThat("node setting of node [" + node + "] has the wrong min_master_node setting: ["
|
||||
+ nodeSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
|
||||
nodeSettings.getAsMap(),
|
||||
cluster.getAutoManageMinMasterNode() ? minMasterMatcher: noMinMasterNodesMatcher);
|
||||
}
|
||||
|
||||
private void assertMMNinClusterSetting(InternalTestCluster cluster, int masterNodes) {
|
||||
final int minMasterNodes = masterNodes / 2 + 1;
|
||||
Matcher<Map<? extends String, ? extends String>> minMasterMatcher =
|
||||
hasEntry(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes));
|
||||
Matcher<Map<? extends String, ?>> noMinMasterNodesMatcher = not(hasKey(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()));
|
||||
|
||||
for (final String node : cluster.getNodeNames()) {
|
||||
Settings stateSettings = cluster.client(node).admin().cluster().prepareState().setLocal(true)
|
||||
.get().getState().getMetaData().settings();
|
||||
|
||||
assertThat("dynamic setting for node [" + node + "] has the wrong min_master_node setting : ["
|
||||
+ stateSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
|
||||
stateSettings.getAsMap(),
|
||||
cluster.getAutoManageMinMasterNode() ? minMasterMatcher: noMinMasterNodesMatcher);
|
||||
}
|
||||
}
|
||||
|
||||
public void testBeforeTest() throws Exception {
|
||||
long clusterSeed = randomLong();
|
||||
boolean masterNodes = randomBoolean();
|
||||
|
@ -156,11 +195,12 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
|
||||
Path baseDir = createTempDir();
|
||||
final List<Class<? extends Plugin>> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class);
|
||||
final boolean autoManageMinMasterNodes = randomBoolean();
|
||||
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
|
||||
minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
|
||||
autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
|
||||
enableHttpPipelining, nodePrefix, mockPlugins, Function.identity());
|
||||
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
|
||||
minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes,
|
||||
autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes,
|
||||
enableHttpPipelining, nodePrefix, mockPlugins, Function.identity());
|
||||
|
||||
assertClusters(cluster0, cluster1, false);
|
||||
|
@ -182,6 +222,8 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
assertSettings(client.settings(), other.settings(), false);
|
||||
}
|
||||
assertArrayEquals(cluster0.getNodeNames(), cluster1.getNodeNames());
|
||||
assertMMNinNodeSetting(cluster0, cluster0.numMasterNodes());
|
||||
assertMMNinNodeSetting(cluster1, cluster0.numMasterNodes());
|
||||
cluster0.afterTest();
|
||||
cluster1.afterTest();
|
||||
} finally {
|
||||
|
@ -216,12 +258,15 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
boolean enableHttpPipelining = randomBoolean();
|
||||
String nodePrefix = "test";
|
||||
Path baseDir = createTempDir();
|
||||
final boolean autoManageMinMasterNodes = randomBoolean();
|
||||
InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
|
||||
minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
|
||||
autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
|
||||
enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class),
|
||||
Function.identity());
|
||||
try {
|
||||
cluster.beforeTest(random(), 0.0);
|
||||
final int originalMasterCount = cluster.numMasterNodes();
|
||||
assertMMNinNodeSetting(cluster, originalMasterCount);
|
||||
final Map<String,Path[]> shardNodePaths = new HashMap<>();
|
||||
for (String name: cluster.getNodeNames()) {
|
||||
shardNodePaths.put(name, getNodePaths(cluster, name));
|
||||
|
@ -230,7 +275,15 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
Path dataPath = getNodePaths(cluster, poorNode)[0];
|
||||
final Path testMarker = dataPath.resolve("testMarker");
|
||||
Files.createDirectories(testMarker);
|
||||
int expectedMasterCount = originalMasterCount;
|
||||
if (cluster.getInstance(ClusterService.class, poorNode).localNode().isMasterNode()) {
|
||||
expectedMasterCount--;
|
||||
}
|
||||
cluster.stopRandomNode(InternalTestCluster.nameFilter(poorNode));
|
||||
if (expectedMasterCount != originalMasterCount) {
|
||||
// check for updated
|
||||
assertMMNinClusterSetting(cluster, expectedMasterCount);
|
||||
}
|
||||
assertFileExists(testMarker); // stopping a node half way shouldn't clean data
|
||||
|
||||
final String stableNode = randomFrom(cluster.getNodeNames());
|
||||
|
@ -240,10 +293,17 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
Files.createDirectories(stableTestMarker);
|
||||
|
||||
final String newNode1 = cluster.startNode();
|
||||
expectedMasterCount++;
|
||||
assertThat(getNodePaths(cluster, newNode1)[0], equalTo(dataPath));
|
||||
assertFileExists(testMarker); // starting a node should re-use data folders and not clean it
|
||||
if (expectedMasterCount > 1) { // this is the first master, it's in cluster state settings won't be updated
|
||||
assertMMNinClusterSetting(cluster, expectedMasterCount);
|
||||
}
|
||||
assertMMNinNodeSetting(newNode1, cluster, expectedMasterCount);
|
||||
|
||||
final String newNode2 = cluster.startNode();
|
||||
expectedMasterCount++;
|
||||
assertMMNinClusterSetting(cluster, expectedMasterCount);
|
||||
final Path newDataPath = getNodePaths(cluster, newNode2)[0];
|
||||
final Path newTestMarker = newDataPath.resolve("newTestMarker");
|
||||
assertThat(newDataPath, not(dataPath));
|
||||
|
@ -262,6 +322,7 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
assertThat("data paths for " + name + " changed", getNodePaths(cluster, name),
|
||||
equalTo(shardNodePaths.get(name)));
|
||||
}
|
||||
assertMMNinNodeSetting(cluster, originalMasterCount);
|
||||
|
||||
} finally {
|
||||
cluster.close();
|
||||
|
@ -280,7 +341,7 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
public void testDifferentRolesMaintainPathOnRestart() throws Exception {
|
||||
final Path baseDir = createTempDir();
|
||||
final int numNodes = 5;
|
||||
InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, true, 0, 0, "test",
|
||||
InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, true, true, 0, 0, "test",
|
||||
new NodeConfigurationSource() {
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
|
@ -301,7 +362,9 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
try {
|
||||
Map<DiscoveryNode.Role, Set<String>> pathsPerRole = new HashMap<>();
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
final DiscoveryNode.Role role = randomFrom(MASTER, DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST);
|
||||
final DiscoveryNode.Role role = i == numNodes -1 && pathsPerRole.containsKey(MASTER) == false ?
|
||||
MASTER : // last noe and still no master ofr the cluster
|
||||
randomFrom(MASTER, DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST);
|
||||
final String node;
|
||||
switch (role) {
|
||||
case MASTER:
|
||||
|
@ -343,6 +406,59 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
} finally {
|
||||
cluster.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void testTwoNodeCluster() throws Exception {
|
||||
final boolean autoManageMinMasterNodes = randomBoolean();
|
||||
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false)
|
||||
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2)
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings transportClientSettings() {
|
||||
return Settings.builder()
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build();
|
||||
}
|
||||
};
|
||||
boolean enableHttpPipelining = randomBoolean();
|
||||
String nodePrefix = "test";
|
||||
Path baseDir = createTempDir();
|
||||
InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, false, autoManageMinMasterNodes, 2, 2,
|
||||
"test", nodeConfigurationSource, 0, enableHttpPipelining, nodePrefix,
|
||||
Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity());
|
||||
try {
|
||||
cluster.beforeTest(random(), 0.0);
|
||||
assertMMNinNodeSetting(cluster, 2);
|
||||
switch (randomInt(2)) {
|
||||
case 0:
|
||||
cluster.stopRandomDataNode();
|
||||
assertMMNinClusterSetting(cluster, 1);
|
||||
cluster.startNode();
|
||||
assertMMNinClusterSetting(cluster, 2);
|
||||
assertMMNinNodeSetting(cluster, 2);
|
||||
break;
|
||||
case 1:
|
||||
cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
|
||||
@Override
|
||||
public Settings onNodeStopped(String nodeName) throws Exception {
|
||||
assertMMNinClusterSetting(cluster, 1);
|
||||
return super.onNodeStopped(nodeName);
|
||||
}
|
||||
});
|
||||
assertMMNinClusterSetting(cluster, 2);
|
||||
break;
|
||||
case 2:
|
||||
cluster.fullRestart();
|
||||
break;
|
||||
}
|
||||
assertMMNinNodeSetting(cluster, 2);
|
||||
} finally {
|
||||
cluster.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue