From 1759f70cebfe907bce6e49f84fa47b906428502a Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 31 May 2017 10:28:16 +0100 Subject: [PATCH] [ML] Fix bwc streaming close job requests to v5.4 nodes (elastic/x-pack-elasticsearch#1586) Original commit: elastic/x-pack-elasticsearch@0f02c8ddde64ae8ade22f2c6d446625d6cd48d85 --- .../xpack/ml/action/CloseJobAction.java | 68 +++++++++---------- .../ml/action/CloseJobActionRequestTests.java | 11 ++- 2 files changed, 37 insertions(+), 42 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index f62a4573beb..3dced25810c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -115,13 +115,11 @@ public class CloseJobAction extends Action openJobs = new ArrayList<>(); - List closingJobs = new ArrayList<>(); - resolveAndValidateJobId(request.getJobId(), state, openJobs, closingJobs, request.isForce()); - request.setOpenJobIds(openJobs.toArray(new String[0])); - request.setClosingJobIds(closingJobs.toArray(new String[0])); - if (request.openJobIds.length == 0 && request.closingJobIds.length == 0) { + List openJobIds = new ArrayList<>(); + List closingJobIds = new ArrayList<>(); + resolveAndValidateJobId(request.getJobId(), state, openJobIds, closingJobIds, request.isForce()); + request.setOpenJobIds(openJobIds.toArray(new String[0])); + if (openJobIds.isEmpty() && closingJobIds.isEmpty()) { listener.onResponse(new Response(true)); return; } @@ -368,9 +359,11 @@ public class CloseJobAction extends Action jobIdsToForceClose = new ArrayList<>(openJobIds); + jobIdsToForceClose.addAll(closingJobIds); + forceCloseJob(state, request, jobIdsToForceClose, listener); } else { - normalCloseJob(state, task, request, listener); + normalCloseJob(state, task, request, openJobIds, closingJobIds, listener); } } } @@ -429,14 +422,15 @@ public class CloseJobAction extends Action listener) { + private void forceCloseJob(ClusterState currentState, Request request, List jobIdsToForceClose, + ActionListener listener) { PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - final int numberOfJobs = request.openJobIds.length + request.closingJobIds.length; + final int numberOfJobs = jobIdsToForceClose.size(); final AtomicInteger counter = new AtomicInteger(); final AtomicArray failures = new AtomicArray<>(numberOfJobs); - for (String jobId : ArrayUtils.concat(request.openJobIds, request.closingJobIds)) { + for (String jobId : jobIdsToForceClose) { PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask != null) { auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); @@ -483,10 +477,12 @@ public class CloseJobAction extends Action listener) { + private void normalCloseJob(ClusterState currentState, Task task, Request request, + List openJobIds, List closingJobIds, + ActionListener listener) { PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - WaitForCloseRequest waitForCloseRequest = buildWaitForCloseRequest(request, tasks, auditor); + WaitForCloseRequest waitForCloseRequest = buildWaitForCloseRequest(openJobIds, closingJobIds, tasks, auditor); // If there are no open or closing jobs in the request return if (waitForCloseRequest.hasJobsToWaitFor() == false) { @@ -494,7 +490,7 @@ public class CloseJobAction extends Action openJobs, List closingJobs, + static void resolveAndValidateJobId(String jobId, ClusterState state, List openJobIds, List closingJobIds, boolean allowFailed) { MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); @@ -586,7 +582,7 @@ public class CloseJobAction extends Action openJobIds, List closingJobIds, PersistentTasksCustomMetaData tasks, Auditor auditor) { TransportAction.WaitForCloseRequest waitForCloseRequest = new TransportAction.WaitForCloseRequest(); - for (String jobId : request.openJobIds) { + for (String jobId : openJobIds) { PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask != null) { auditor.info(jobId, Messages.JOB_AUDIT_CLOSING); @@ -641,7 +637,7 @@ public class CloseJobAction extends Action jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask != null) { waitForCloseRequest.persistentTaskIds.add(jobTask.getId()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java index ccc7aa0826b..26f752ef958 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java @@ -280,9 +280,8 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa } public void testBuildWaitForCloseRequest() { - CloseJobAction.Request request = new Request(); - request.setOpenJobIds(new String[] {"openjob1", "openjob2"}); - request.setClosingJobIds(new String[] {"closingjob1"}); + List openJobIds = Arrays.asList(new String[] {"openjob1", "openjob2"}); + List closingJobIds = Arrays.asList(new String[] {"closingjob1"}); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("openjob1", null, JobState.OPENED, tasksBuilder); @@ -290,14 +289,14 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa addJobTask("closingjob1", null, JobState.CLOSING, tasksBuilder); CloseJobAction.TransportAction.WaitForCloseRequest waitForCloseRequest = - CloseJobAction.buildWaitForCloseRequest(request, tasksBuilder.build(), mock(Auditor.class)); + CloseJobAction.buildWaitForCloseRequest(openJobIds, closingJobIds, tasksBuilder.build(), mock(Auditor.class)); assertEquals(waitForCloseRequest.jobsToFinalize, Arrays.asList("openjob1", "openjob2")); assertEquals(waitForCloseRequest.persistentTaskIds, Arrays.asList("job-openjob1", "job-openjob2", "job-closingjob1")); assertTrue(waitForCloseRequest.hasJobsToWaitFor()); - request = new Request(); - waitForCloseRequest = CloseJobAction.buildWaitForCloseRequest(request, tasksBuilder.build(), mock(Auditor.class)); + waitForCloseRequest = CloseJobAction.buildWaitForCloseRequest(Collections.emptyList(), Collections.emptyList(), + tasksBuilder.build(), mock(Auditor.class)); assertFalse(waitForCloseRequest.hasJobsToWaitFor()); }