[ML] Fix bwc streaming close job requests to v5.4 nodes (elastic/x-pack-elasticsearch#1586)

Original commit: elastic/x-pack-elasticsearch@0f02c8ddde
This commit is contained in:
David Kyle 2017-05-31 10:28:16 +01:00 committed by GitHub
parent 39cabad4d0
commit 1759f70ceb
2 changed files with 37 additions and 42 deletions

View File

@ -115,13 +115,11 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
private TimeValue timeout = MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT;
private String[] openJobIds;
private String[] closingJobIds;
private boolean local;
Request() {
openJobIds = new String[] {};
closingJobIds = new String[] {};
}
public Request(String jobId) {
@ -161,10 +159,6 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
this.openJobIds = openJobIds;
}
public void setClosingJobIds(String [] closingJobIds) {
this.closingJobIds = closingJobIds;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -172,7 +166,6 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
timeout = new TimeValue(in);
force = in.readBoolean();
openJobIds = in.readStringArray();
closingJobIds = in.readStringArray();
local = in.readBoolean();
}
@ -183,7 +176,6 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
timeout.writeTo(out);
out.writeBoolean(force);
out.writeStringArray(openJobIds);
out.writeStringArray(closingJobIds);
out.writeBoolean(local);
}
@ -199,7 +191,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// openJobIds and closingJobIds excluded
// openJobIds are excluded
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
@ -210,7 +202,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
public int hashCode() {
// openJobIds and closingJobIds excluded
// openJobIds are excluded
return Objects.hash(jobId, timeout, force);
}
@ -223,7 +215,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return false;
}
Request other = (Request) obj;
// openJobIds and closingJobIds excluded
// openJobIds are excluded
return Objects.equals(jobId, other.jobId) &&
Objects.equals(timeout, other.timeout) &&
Objects.equals(force, other.force);
@ -342,12 +334,11 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
* result/failure
*/
List<String> openJobs = new ArrayList<>();
List<String> closingJobs = new ArrayList<>();
resolveAndValidateJobId(request.getJobId(), state, openJobs, closingJobs, request.isForce());
request.setOpenJobIds(openJobs.toArray(new String[0]));
request.setClosingJobIds(closingJobs.toArray(new String[0]));
if (request.openJobIds.length == 0 && request.closingJobIds.length == 0) {
List<String> openJobIds = new ArrayList<>();
List<String> closingJobIds = new ArrayList<>();
resolveAndValidateJobId(request.getJobId(), state, openJobIds, closingJobIds, request.isForce());
request.setOpenJobIds(openJobIds.toArray(new String[0]));
if (openJobIds.isEmpty() && closingJobIds.isEmpty()) {
listener.onResponse(new Response(true));
return;
}
@ -368,9 +359,11 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
if (request.isForce()) {
forceCloseJob(state, request, listener);
List<String> jobIdsToForceClose = new ArrayList<>(openJobIds);
jobIdsToForceClose.addAll(closingJobIds);
forceCloseJob(state, request, jobIdsToForceClose, listener);
} else {
normalCloseJob(state, task, request, listener);
normalCloseJob(state, task, request, openJobIds, closingJobIds, listener);
}
}
}
@ -429,14 +422,15 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return new Response(in);
}
private void forceCloseJob(ClusterState currentState, Request request, ActionListener<Response> listener) {
private void forceCloseJob(ClusterState currentState, Request request, List<String> jobIdsToForceClose,
ActionListener<Response> listener) {
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
final int numberOfJobs = request.openJobIds.length + request.closingJobIds.length;
final int numberOfJobs = jobIdsToForceClose.size();
final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(numberOfJobs);
for (String jobId : ArrayUtils.concat(request.openJobIds, request.closingJobIds)) {
for (String jobId : jobIdsToForceClose) {
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask != null) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
@ -483,10 +477,12 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
}
private void normalCloseJob(ClusterState currentState, Task task, Request request, ActionListener<Response> listener) {
private void normalCloseJob(ClusterState currentState, Task task, Request request,
List<String> openJobIds, List<String> closingJobIds,
ActionListener<Response> listener) {
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
WaitForCloseRequest waitForCloseRequest = buildWaitForCloseRequest(request, tasks, auditor);
WaitForCloseRequest waitForCloseRequest = buildWaitForCloseRequest(openJobIds, closingJobIds, tasks, auditor);
// If there are no open or closing jobs in the request return
if (waitForCloseRequest.hasJobsToWaitFor() == false) {
@ -494,7 +490,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return;
}
boolean noOpenJobsToClose = request.openJobIds.length == 0;
boolean noOpenJobsToClose = openJobIds.isEmpty();
if (noOpenJobsToClose) {
// No jobs to close but we still want to wait on closing jobs in the request
waitForJobClosed(request, waitForCloseRequest, new Response(true), listener);
@ -561,15 +557,15 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
* Expand the {@code jobId} parameter and add the job Id to one of the list arguments
* depending on job state.
*
* Opened jobs are added to {@code openJobs} and closing jobs added to {@code closingJobs}. Failed jobs are added
* to {@code openJobs} if allowFailed is set otherwise an exception is thrown.
* Opened jobs are added to {@code openJobIds} and closing jobs added to {@code closingJobIds}. Failed jobs are added
* to {@code openJobIds} if allowFailed is set otherwise an exception is thrown.
* @param jobId The job Id. If jobId == {@link Job#ALL} then expand the job list.
* @param state Cluster state
* @param openJobs Opened or failed jobs are added to this list
* @param closingJobs Closing jobs are added to this list
* @param allowFailed Whether failed jobs are allowed, if yes, they are added to {@code openJobs}
* @param openJobIds Opened or failed jobs are added to this list
* @param closingJobIds Closing jobs are added to this list
* @param allowFailed Whether failed jobs are allowed, if yes, they are added to {@code openJobIds}
*/
static void resolveAndValidateJobId(String jobId, ClusterState state, List<String> openJobs, List<String> closingJobs,
static void resolveAndValidateJobId(String jobId, ClusterState state, List<String> openJobIds, List<String> closingJobIds,
boolean allowFailed) {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
@ -586,7 +582,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
if (job.isDeleted()) {
return;
}
addJobAccordingToState(id, tasksMetaData, openJobs, closingJobs, failedJobs);
addJobAccordingToState(id, tasksMetaData, openJobIds, closingJobIds, failedJobs);
};
if (!Job.ALL.equals(jobId)) {
@ -607,7 +603,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
// allowFailed == true
openJobs.addAll(failedJobs);
openJobIds.addAll(failedJobs);
}
private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData,
@ -629,11 +625,11 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
}
static TransportAction.WaitForCloseRequest buildWaitForCloseRequest(Request request,
static TransportAction.WaitForCloseRequest buildWaitForCloseRequest(List<String> openJobIds, List<String> closingJobIds,
PersistentTasksCustomMetaData tasks, Auditor auditor) {
TransportAction.WaitForCloseRequest waitForCloseRequest = new TransportAction.WaitForCloseRequest();
for (String jobId : request.openJobIds) {
for (String jobId : openJobIds) {
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask != null) {
auditor.info(jobId, Messages.JOB_AUDIT_CLOSING);
@ -641,7 +637,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
waitForCloseRequest.jobsToFinalize.add(jobId);
}
}
for (String jobId : request.closingJobIds) {
for (String jobId : closingJobIds) {
PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask != null) {
waitForCloseRequest.persistentTaskIds.add(jobTask.getId());

View File

@ -280,9 +280,8 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
}
public void testBuildWaitForCloseRequest() {
CloseJobAction.Request request = new Request();
request.setOpenJobIds(new String[] {"openjob1", "openjob2"});
request.setClosingJobIds(new String[] {"closingjob1"});
List<String> openJobIds = Arrays.asList(new String[] {"openjob1", "openjob2"});
List<String> closingJobIds = Arrays.asList(new String[] {"closingjob1"});
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("openjob1", null, JobState.OPENED, tasksBuilder);
@ -290,14 +289,14 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
addJobTask("closingjob1", null, JobState.CLOSING, tasksBuilder);
CloseJobAction.TransportAction.WaitForCloseRequest waitForCloseRequest =
CloseJobAction.buildWaitForCloseRequest(request, tasksBuilder.build(), mock(Auditor.class));
CloseJobAction.buildWaitForCloseRequest(openJobIds, closingJobIds, tasksBuilder.build(), mock(Auditor.class));
assertEquals(waitForCloseRequest.jobsToFinalize, Arrays.asList("openjob1", "openjob2"));
assertEquals(waitForCloseRequest.persistentTaskIds,
Arrays.asList("job-openjob1", "job-openjob2", "job-closingjob1"));
assertTrue(waitForCloseRequest.hasJobsToWaitFor());
request = new Request();
waitForCloseRequest = CloseJobAction.buildWaitForCloseRequest(request, tasksBuilder.build(), mock(Auditor.class));
waitForCloseRequest = CloseJobAction.buildWaitForCloseRequest(Collections.emptyList(), Collections.emptyList(),
tasksBuilder.build(), mock(Auditor.class));
assertFalse(waitForCloseRequest.hasJobsToWaitFor());
}