diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index 7985799a696..fbbf0bae5ca 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; public class NativeNormalizerProcessFactory implements NormalizerProcessFactory { @@ -31,11 +32,13 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory private final Environment env; private final NativeController nativeController; + private final AtomicLong counter; private volatile Duration processConnectTimeout; public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) { this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); + this.counter = new AtomicLong(0); setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings())); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT, this::setProcessConnectTimeout); @@ -48,8 +51,11 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory @Override public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, ExecutorService executorService) { - ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE, jobId, - true, false, true, true, false, false); + // The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times + // in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names + // are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls. + ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE, + jobId + "_" + counter.incrementAndGet(), true, false, true, true, false, false); createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(),