[TEST] Change ml cleanup workaround.

Original commit: elastic/x-pack-elasticsearch@793d6cfb78
This commit is contained in:
Martijn van Groningen 2017-02-23 09:08:10 +01:00
parent 9f6285f362
commit 6195000b8a
6 changed files with 13 additions and 31 deletions

View File

@ -13,19 +13,18 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@ -145,6 +144,7 @@ class DatafeedJob {
try {
extractedData = dataExtractor.next();
} catch (Exception e) {
LOGGER.debug("[" + jobId + "] error while extracting data", e);
error = new ExtractionProblemException(e);
break;
}
@ -156,6 +156,7 @@ class DatafeedJob {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOGGER.debug("[" + jobId + "] error while posting data", e);
error = new AnalysisProblemException(e);
break;
}
@ -178,23 +179,16 @@ class DatafeedJob {
throw new EmptyDataCountException();
}
try {
client.execute(FlushJobAction.INSTANCE, flushRequest).get();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(e);
}
client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
}
private DataCounts postData(InputStream inputStream) throws IOException, ExecutionException, InterruptedException {
private DataCounts postData(InputStream inputStream) throws IOException {
PostDataAction.Request request = new PostDataAction.Request(jobId);
request.setDataDescription(dataDescription);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Streams.copy(inputStream, outputStream);
request.setContent(new BytesArray(outputStream.toByteArray()));
PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).get();
PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).actionGet();
return response.getDataCounts();
}

View File

@ -15,7 +15,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.junit.After;
import org.junit.Before;
import java.util.Collections;
@ -30,11 +29,6 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase {
internalCluster().ensureAtLeastNumDataNodes(1);
}
@After
public void stopNode() throws Exception {
cleanupWorkaround(1);
}
public void testLookbackOnly() throws Exception {
client().admin().indices().prepareCreate("data-1")
.addMapping("type", "time", "type=date")

View File

@ -27,7 +27,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -72,7 +71,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
cleanupWorkaround(2);
}
public void testFailOverBasics_withDataFeeder() throws Exception {
@ -134,7 +132,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals(1, statsResponse.getResponse().results().size());
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
});
cleanupWorkaround(2);
}
@TestLogging("org.elasticsearch.xpack.persistent:TRACE,org.elasticsearch.cluster.service:DEBUG")
@ -201,7 +198,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals(expectedNodeAttr, node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus());
});
cleanupWorkaround(3);
}
public void testMaxConcurrentJobAllocations() throws Exception {
@ -307,7 +303,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
}, 30, TimeUnit.SECONDS);
assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size());
cleanupWorkaround(numMlNodes + 1);
}

View File

@ -95,7 +95,6 @@ public class MlFullClusterRestartIT extends BaseMlIntegTestCase {
assertEquals(numDocs1 + numDocs2, dataCounts.getProcessedRecordCount());
assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount());
}, 30, TimeUnit.SECONDS);
cleanupWorkaround(3);
}
}

View File

@ -58,7 +58,6 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
assertEquals(JobState.OPENED, task.getStatus());
OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest();
assertEquals("1", openJobRequest.getJobId());
cleanupWorkaround(1);
}
public void testSingleNode() throws Exception {
@ -104,11 +103,9 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
cleanupWorkaround(numNodes);
return;
}
}
cleanupWorkaround(numNodes);
fail("shouldn't be able to add more than [" + clusterWideMaxNumberOfJobs + "] jobs");
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.junit.After;
import java.util.Collections;
import java.util.Date;
@ -121,9 +122,11 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
// Due to the fact that ml plugin creates the state, notifications and meta indices automatically
// when the test framework removes all indices then ml plugin adds them back. Causing validation to fail
// we should move to templates instead as that will fix the test problem
protected void cleanupWorkaround(int numNodes) throws Exception {
@After
public void cleanupWorkaround() throws Exception {
deleteAllDatafeeds(client());
deleteAllJobs(client());
int numNodes = internalCluster().size();
for (int i = 0; i < numNodes; i++) {
internalCluster().stopRandomNode(settings -> true);
}