[ML] Take more care that normalize processes use unique named pipes (#54641)

When one of ML's normalize processes fails to connect to the JVM
quickly enough and another normalize process for the same job
starts shortly afterwards it is possible that their named pipes
can get mixed up.

This change avoids the risk of that by adding an incrementing
counter value into the named pipe names used for normalize
processes.

Backport of #54636
This commit is contained in:
David Roberts 2020-04-02 14:25:31 +01:00 committed by GitHub
parent de8e0200fe
commit 4b4800e096
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 8 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
public class NativeNormalizerProcessFactory implements NormalizerProcessFactory {
@ -31,11 +32,13 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
private final Environment env;
private final NativeController nativeController;
private final AtomicLong counter;
private volatile Duration processConnectTimeout;
public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
this.counter = new AtomicLong(0);
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
this::setProcessConnectTimeout);
@ -48,8 +51,11 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
@Override
public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan,
ExecutorService executorService) {
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE, jobId,
true, false, true, true, false, false);
// The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times
// in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
// are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE,
jobId + "_" + counter.incrementAndGet(), true, false, true, true, false, false);
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(),