[TEST] Change `DatafeedJobsIT` test to use the external cluster as it doesn't start/stop nodes.

Only distributed ml tests, only enable ml as security, watcher, monitoring etc. only add noise to the tests.
Disable mock big arrays for distributed ml tests, as nodes starting/stopping in tests somehow leaves allocations around,
which only is a test problem.

Original commit: elastic/x-pack-elasticsearch@5ff1e69036
This commit is contained in:
Martijn van Groningen 2017-03-01 22:00:50 +01:00
parent dcec4bbf4b
commit b5db0d9f44
4 changed files with 149 additions and 29 deletions

View File

@ -8,12 +8,14 @@ package org.elasticsearch.license;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.License.OperationMode;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.xpack.TestXPackTransportClient;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
@ -35,6 +37,20 @@ import static org.hamcrest.Matchers.is;
public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), true);
return settings.build();
}
@Override
protected Settings transportClientSettings() {
Settings.Builder settings = Settings.builder().put(super.transportClientSettings());
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), true);
return settings.build();
}
@Before
public void resetLicensing() {
enableLicensing();

View File

@ -7,25 +7,107 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionCoordinator;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.junit.Before;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.security.Security;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0, numClientNodes = 0,
transportClientRatio = 0, supportsDedicatedMasters = false)
public class DatafeedJobsIT extends BaseMlIntegTestCase {
@Before
public void startNode() {
internalCluster().ensureAtLeastNumDataNodes(1);
@Override
protected boolean ignoreExternalCluster() {
return false;
}
@Override
protected Settings externalClusterClientSettings() {
Settings.Builder settings = Settings.builder()
.put(Security.USER_SETTING.getKey(), "elastic:changeme");
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
settings.put(NetworkModule.TRANSPORT_TYPE_KEY, Security.NAME4);
return settings.build();
}
public void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME,
StartDatafeedAction.Request::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentActionRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentActionCoordinator.Status.NAME,
PersistentActionCoordinator.Status::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
// remove local node reference
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
String masterId = masterClusterState.nodes().getMasterNodeId();
for (Client client : cluster().getClients()) {
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
// remove local node reference
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
final Map<String, Object> localStateMap = convertToMap(localClusterState);
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version() &&
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull("clusterstate JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
} catch (AssertionError error) {
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(), localClusterState.toString());
throw error;
}
}
}
}
}
public void testLookbackOnly() throws Exception {

View File

@ -47,11 +47,11 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Job.Builder job = createJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
@ -83,18 +83,18 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Job.Builder job = createScheduledJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*"));
configBuilder.setFrequency(120);
DatafeedConfig config = configBuilder.build();
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get();
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
assertTrue(putDatadeedResponse.isAcknowledged());
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
@ -157,11 +157,11 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Job.Builder job = createJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
@ -237,11 +237,11 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
for (int i = 0; i < numJobs; i++) {
Job.Builder job = createJob(Integer.toString(i));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
}
assertBusy(() -> {
@ -301,23 +301,23 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
internalCluster().ensureAtMostNumDataNodes(0);
// start non ml node, but that will hold the indices
logger.info("Start non ml node:");
internalCluster().startNode(Settings.builder()
String nonMlNode = internalCluster().startNode(Settings.builder()
.put("node.data", true)
.put(MachineLearning.ML_ENABLED.getKey(), false));
ensureStableCluster(1);
logger.info("Starting ml node");
internalCluster().startNode(Settings.builder()
String mlNode = internalCluster().startNode(Settings.builder()
.put("node.data", false)
.put(MachineLearning.ML_ENABLED.getKey(), true));
ensureStableCluster(2);
Job.Builder job = createFareQuoteJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
PostDataAction.Request postDataRequest = new PostDataAction.Request("job_id");
postDataRequest.setContent(new BytesArray(
@ -344,11 +344,12 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertTrue(e.getMessage().endsWith("because not all primary shards are active for the following indices [.ml-anomalies-shared]]"));
logger.info("Start data node");
internalCluster().startNode(Settings.builder()
nonMlNode = internalCluster().startNode(Settings.builder()
.put("node.data", true)
.put(MachineLearning.ML_ENABLED.getKey(), false));
ensureStableCluster(2);
client().execute(OpenJobAction.INSTANCE, openJobRequest).get();
ensureStableCluster(2, mlNode);
ensureStableCluster(2, nonMlNode);
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> assertJobTask("job_id", JobState.OPENED, true));
}

View File

@ -16,6 +16,8 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.node.NodeMocksPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecurityIntegTestCase;
@ -38,10 +40,13 @@ import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.equalTo;
@ -67,6 +72,10 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false);
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
return settings.build();
}
@ -74,9 +83,20 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
protected Settings transportClientSettings() {
Settings.Builder settings = Settings.builder().put(super.transportClientSettings());
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
return settings.build();
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
Set<Class<? extends Plugin>> mocks = new HashSet<>(super.getMockPlugins());
mocks.remove(NodeMocksPlugin.class);
return mocks;
}
protected Job.Builder createJob(String id) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
@ -148,15 +168,16 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
public void cleanupWorkaround() throws Exception {
deleteAllDatafeeds(client());
deleteAllJobs(client());
internalCluster().wipe(Collections.emptySet());
assertBusy(() -> {
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries()
.setActiveOnly(true)
.get();
for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) {
assertThat(recoveryStates.size(), equalTo(0));
}
});
if (ignoreExternalCluster()) {
assertBusy(() -> {
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries()
.setActiveOnly(true)
.get();
for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) {
assertThat(recoveryStates.size(), equalTo(0));
}
});
}
}
protected void indexDocs(String index, long numDocs, long start, long end) {