[ML] Fix race condition between multiple job deletion and immediate recreation (elastic/x-pack-elasticsearch#1793)

If multiple job deletion requests were sent in quick succession, there was a
race condition that meant they could both get through the check to enforce
one active deletion request at a time.  Then, if the job was immediately
recreated after the first deletion request returned, the second, still running,
deletion request could interfere with it and delete the aliases that the put
job request created.

This problem can be avoided by using the "ask forgiveness, not permission"
idiom when checking if the job is already being deleted at the beginning of
each deletion request.

Additionally, now even force delete requests will wait for a certain amount
of time for a prior delete request to complete.  This is to avoid the same
race conditions.  However, force delete requests will eventually start an
(unsafe) parallel delete to provide a get-out in case a delete request
completely dies.

relates elastic/x-pack-elasticsearch#1765

Original commit: elastic/x-pack-elasticsearch@b5c8f26a0e
This commit is contained in:
David Roberts 2017-06-21 09:14:51 +01:00 committed by GitHub
parent 410b210736
commit a8e394c3b5
3 changed files with 65 additions and 20 deletions

View File

@ -167,9 +167,9 @@ public class MlMetadata implements MetaData.Custom {
}
public MlMetadataDiff(StreamInput in) throws IOException {
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
MlMetadataDiff::readJobDiffFrom);
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readSchedulerDiffFrom);
}
@ -356,8 +356,8 @@ public class MlMetadata implements MetaData.Custom {
throw ExceptionsHelper.missingJobException(jobId);
}
if (job.isDeleted()) {
// Job still exists
return;
// Job still exists but is already being deleted
throw new JobAlreadyMarkedAsDeletedException();
}
checkJobHasNoDatafeed(jobId);
@ -441,4 +441,6 @@ public class MlMetadata implements MetaData.Custom {
}
}
public static class JobAlreadyMarkedAsDeletedException extends RuntimeException {
}
}

View File

@ -48,6 +48,7 @@ import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAction.Response, DeleteJobAction.RequestBuilder> {
@ -199,19 +200,6 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
// For a normal delete check if the job is already being deleted.
if (request.isForce() == false) {
MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (currentMlMetadata != null) {
Job job = currentMlMetadata.getJobs().get(request.getJobId());
if (job != null && job.isDeleted()) {
// This is a generous timeout value but it's unlikely to ever take this long
waitForDeletingJob(request.getJobId(), MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT, listener);
return;
}
}
}
ActionListener<Boolean> markAsDeletingListener = ActionListener.wrap(
response -> {
if (request.isForce()) {
@ -220,7 +208,28 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
normalDeleteJob(request, (JobStorageDeletionTask) task, listener);
}
},
listener::onFailure);
e -> {
if (e instanceof MlMetadata.JobAlreadyMarkedAsDeletedException) {
// Don't kick off a parallel deletion task, but just wait for
// the in-progress request to finish. This is much safer in the
// case where the job with the same name might be immediately
// recreated after the delete returns. However, if a force
// delete times out then eventually kick off a parallel delete
// in case the original completely failed for some reason.
waitForDeletingJob(request.getJobId(), MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT, ActionListener.wrap(
listener::onResponse,
e2 -> {
if (request.isForce() && e2 instanceof TimeoutException) {
forceDeleteJob(request, (JobStorageDeletionTask) task, listener);
} else {
listener.onFailure(e2);
}
}
));
} else {
listener.onFailure(e);
}
});
markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce());
}
@ -354,7 +363,7 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException("timed out after " + timeout));
listener.onFailure(new TimeoutException("timed out after " + timeout));
}
}, newClusterState -> jobIsDeletedFromState(jobId, newClusterState), timeout);
}

View File

@ -25,6 +25,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -122,7 +123,7 @@ public class MlJobIT extends ESRestTestCase {
assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-3\""));
}
private Response createFarequoteJob(String jobId) throws Exception {
private Response createFarequoteJob(String jobId) throws IOException {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\": \"3600s\",\n"
+ " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
@ -544,6 +545,9 @@ public class MlJobIT extends ESRestTestCase {
ConcurrentMapLong<Response> responses = ConcurrentCollections.newConcurrentMapLong();
ConcurrentMapLong<ResponseException> responseExceptions = ConcurrentCollections.newConcurrentMapLong();
AtomicReference<IOException> ioe = new AtomicReference<>();
AtomicInteger recreationGuard = new AtomicInteger(0);
AtomicReference<Response> recreationResponse = new AtomicReference<>();
AtomicReference<ResponseException> recreationException = new AtomicReference<>();
Runnable deleteJob = () -> {
try {
@ -560,6 +564,17 @@ public class MlJobIT extends ESRestTestCase {
ioe.set(e);
}
// Immediately after the first deletion finishes, recreate the job. This should pick up
// race conditions where another delete request deletes part of the newly created job.
if (recreationGuard.getAndIncrement() == 0) {
try {
recreationResponse.set(createFarequoteJob(jobId));
} catch (ResponseException re) {
recreationException.set(re);
} catch (IOException e) {
ioe.set(e);
}
}
};
// The idea is to hit the situation where one request waits for
@ -593,6 +608,25 @@ public class MlJobIT extends ESRestTestCase {
for (Response response : responses.values()) {
assertEquals(responseEntityToString(response), 200, response.getStatusLine().getStatusCode());
}
assertNotNull(recreationResponse.get());
assertEquals(responseEntityToString(recreationResponse.get()), 200, recreationResponse.get().getStatusLine().getStatusCode());
if (recreationException.get() != null) {
assertNull(recreationException.get().getMessage(), recreationException.get());
}
// Check that the job aliases exist. These are the last thing to be deleted when a job is deleted, so
// if there's been a race between deletion and recreation these are what will be missing.
Response response = client().performRequest("get", "_aliases");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId)
+ "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId + "\",\"boost\":1.0}}}}"));
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId) + "\":{}"));
assertEquals(numThreads, recreationGuard.get());
}
private static String responseEntityToString(Response response) throws Exception {