[7.x] Synchronize processInStream.close() call (#50581)

This commit is contained in:
Przemysław Witek 2020-01-03 10:23:51 +01:00 committed by GitHub
parent 0d78aa2708
commit 8917c05df8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 3 deletions

View File

@ -183,7 +183,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
"Finished analysis");
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/49680")
public void testStopAndRestart() throws Exception {
initialize("regression_stop_and_restart");
indexData(sourceIndex, 350, 0);

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
@ -44,6 +45,9 @@ public abstract class AbstractNativeProcess implements NativeProcess {
private final String jobId;
private final CppLogMessageHandler cppLogHandler;
private final OutputStream processInStream;
// We need this as in Java 8 closing {@link FilterOutputStream} is not idempotent (i.e. cannot be performed twice).
// For more details regarding the underlying issue see https://bugs.openjdk.java.net/browse/JDK-8054565
private final AtomicBoolean processInStreamClosed = new AtomicBoolean();
private final InputStream processOutStream;
private final OutputStream processRestoreStream;
private final LengthEncodedWriter recordWriter;
@ -163,7 +167,10 @@ public abstract class AbstractNativeProcess implements NativeProcess {
processCloseInitiated = true;
// closing its input causes the process to exit
if (processInStream != null) {
processInStream.close();
// Make sure {@code processInStream.close()} is called at most once.
if (processInStreamClosed.compareAndSet(false, true)) {
processInStream.close();
}
}
// wait for the process to exit by waiting for end-of-file on the named pipe connected
// to the state processor - it may take a long time for all the model state to be
@ -209,7 +216,10 @@ public abstract class AbstractNativeProcess implements NativeProcess {
} finally {
try {
if (processInStream != null) {
processInStream.close();
// Make sure {@code processInStream.close()} is called at most once.
if (processInStreamClosed.compareAndSet(false, true)) {
processInStream.close();
}
}
} catch (IOException e) {
// Ignore it - we're shutting down and the method itself has logged a warning