diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index f5919be8701..976cc47e751 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -13,19 +13,18 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.xpack.ml.action.FlushJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.ml.job.config.DataDescription; -import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.notifications.Auditor; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -145,6 +144,7 @@ class DatafeedJob { try { extractedData = dataExtractor.next(); } catch (Exception e) { + LOGGER.debug("[" + jobId + "] error while extracting data", e); error = new ExtractionProblemException(e); break; } @@ -156,6 +156,7 @@ class DatafeedJob { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } + LOGGER.debug("[" + jobId + "] error while posting data", e); error = new AnalysisProblemException(e); break; } @@ -178,23 +179,16 @@ class DatafeedJob { throw new EmptyDataCountException(); } - try { - client.execute(FlushJobAction.INSTANCE, flushRequest).get(); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException(e); - } + client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet(); } - private DataCounts postData(InputStream inputStream) throws IOException, ExecutionException, InterruptedException { + private DataCounts postData(InputStream inputStream) throws IOException { PostDataAction.Request request = new PostDataAction.Request(jobId); request.setDataDescription(dataDescription); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); Streams.copy(inputStream, outputStream); request.setContent(new BytesArray(outputStream.toByteArray())); - PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).get(); + PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).actionGet(); return response.getDataCounts(); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java index fb73f102410..c05ca10fd55 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java @@ -15,7 +15,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; -import org.junit.After; import org.junit.Before; import java.util.Collections; @@ -30,11 +29,6 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase { internalCluster().ensureAtLeastNumDataNodes(1); } - @After - public void stopNode() throws Exception { - cleanupWorkaround(1); - } - public void testLookbackOnly() throws Exception { client().admin().indices().prepareCreate("data-1") .addMapping("type", "time", "type=date") diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 70039551540..37734179e98 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -27,7 +27,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTa import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,7 +71,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); }); - cleanupWorkaround(2); } public void testFailOverBasics_withDataFeeder() throws Exception { @@ -134,7 +132,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(1, statsResponse.getResponse().results().size()); assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState()); }); - cleanupWorkaround(2); } @TestLogging("org.elasticsearch.xpack.persistent:TRACE,org.elasticsearch.cluster.service:DEBUG") @@ -201,7 +198,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(JobState.OPENED, task.getStatus()); }); - cleanupWorkaround(3); } public void testMaxConcurrentJobAllocations() throws Exception { @@ -307,7 +303,6 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { }, 30, TimeUnit.SECONDS); assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size()); - cleanupWorkaround(numMlNodes + 1); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlFullClusterRestartIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlFullClusterRestartIT.java index 233e699f335..d6af820aa94 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlFullClusterRestartIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlFullClusterRestartIT.java @@ -95,7 +95,6 @@ public class MlFullClusterRestartIT extends BaseMlIntegTestCase { assertEquals(numDocs1 + numDocs2, dataCounts.getProcessedRecordCount()); assertEquals(0L, dataCounts.getOutOfOrderTimeStampCount()); }, 30, TimeUnit.SECONDS); - cleanupWorkaround(3); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 9183ba2c524..6084543b4f5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -58,7 +58,6 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { assertEquals(JobState.OPENED, task.getStatus()); OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest(); assertEquals("1", openJobRequest.getJobId()); - cleanupWorkaround(1); } public void testSingleNode() throws Exception { @@ -104,11 +103,9 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); }); - cleanupWorkaround(numNodes); return; } } - cleanupWorkaround(numNodes); fail("shouldn't be able to add more than [" + clusterWideMaxNumberOfJobs + "] jobs"); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index e1862e07ae5..38c9f7751d3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; +import org.junit.After; import java.util.Collections; import java.util.Date; @@ -121,9 +122,11 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase { // Due to the fact that ml plugin creates the state, notifications and meta indices automatically // when the test framework removes all indices then ml plugin adds them back. Causing validation to fail // we should move to templates instead as that will fix the test problem - protected void cleanupWorkaround(int numNodes) throws Exception { + @After + public void cleanupWorkaround() throws Exception { deleteAllDatafeeds(client()); deleteAllJobs(client()); + int numNodes = internalCluster().size(); for (int i = 0; i < numNodes; i++) { internalCluster().stopRandomNode(settings -> true); }