diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java index b21456b0e31..066d11aa2c0 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java @@ -19,6 +19,7 @@ package org.apache.lucene.replicator.nrt; import java.io.Closeable; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.document.Document; @@ -48,6 +49,8 @@ class NodeProcess implements Closeable { // SegmentInfos.version, which can be higher than the initCommitVersion final long initInfosVersion; + private final Optional subprocessKiller; + volatile boolean isOpen = true; final AtomicBoolean nodeIsClosing; @@ -60,7 +63,8 @@ class NodeProcess implements Closeable { boolean isPrimary, long initCommitVersion, long initInfosVersion, - AtomicBoolean nodeIsClosing) { + AtomicBoolean nodeIsClosing, + Optional subprocessKiller) { this.p = p; this.id = id; this.tcpPort = tcpPort; @@ -69,6 +73,7 @@ class NodeProcess implements Closeable { this.initCommitVersion = initCommitVersion; this.initInfosVersion = initInfosVersion; this.nodeIsClosing = nodeIsClosing; + this.subprocessKiller = subprocessKiller; assert initInfosVersion >= initCommitVersion : "initInfosVersion=" + initInfosVersion + " initCommitVersion=" + initCommitVersion; lock = new ReentrantLock(); @@ -165,6 +170,11 @@ class NodeProcess implements Closeable { t.printStackTrace(System.out); } try { + if (subprocessKiller.isPresent()) { + var t = subprocessKiller.get(); + t.interrupt(); + t.join(); + } p.waitFor(); pumper.join(); } catch (InterruptedException ie) { diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java index 9e16af78ca5..ba1363798aa 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java @@ -27,6 +27,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -208,7 +209,8 @@ public class TestNRTReplication extends LuceneTestCase { primaryTCPPort == -1, initCommitVersion, initInfosVersion, - nodeClosing); + nodeClosing, + Optional.empty()); } @Override diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestSimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestSimpleServer.java index f21ac11b884..edcb09a01b7 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestSimpleServer.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestSimpleServer.java @@ -32,8 +32,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.store.DataInput; import org.apache.lucene.store.DataOutput; @@ -132,14 +134,6 @@ public class TestSimpleServer extends LuceneTestCase { } } - /** forcibly halt the JVM: similar to crashing */ - - // poached from TestIndexWriterOnJRECrash ... should we factor out to TestUtil? seems dangerous - // to give it such "publicity"? - private static void crashJRE() { - Runtime.getRuntime().halt(1); - } - static void writeFilesMetaData(DataOutput out, Map files) throws IOException { out.writeVInt(files.size()); @@ -293,7 +287,9 @@ public class TestSimpleServer extends LuceneTestCase { if (doClose) { node.message("top: will close after " + (waitForMS / 1000.0) + " seconds"); } else { - node.message("top: will crash after " + (waitForMS / 1000.0) + " seconds"); + // This message is pattern-parsed in TestStressNRTReplication, don't change it. + node.message( + "top: " + TestStressNRTReplication.CRASH_MSG_PREFIX + waitForMS + " milliseconds"); } Thread t = @@ -327,8 +323,21 @@ public class TestSimpleServer extends LuceneTestCase { throw new RuntimeException(ioe); } } else { - node.message("top: now crash JVM after " + (waitForMS / 1000.0) + " seconds"); - crashJRE(); + node.message( + String.format( + Locale.ROOT, + "Expecting this JVM (PID: %s) to crash about now (after %.2f seconds)", + ProcessHandle.current().pid(), + (waitForMS / 1000.0))); + while (true) { + try { + new CountDownLatch(1).await(); + } catch ( + @SuppressWarnings("unused") + InterruptedException e) { + // We'll eventually be killed by an outside process, retry. + } + } } } } diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java index d0e2847cb7c..e56d71d3bc7 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -119,6 +120,7 @@ import org.apache.lucene.util.ThreadInterruptedException; @SuppressCodecs({"MockRandom", "Direct", "SimpleText"}) @SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure") public class TestStressNRTReplication extends LuceneTestCase { + public static final String CRASH_MSG_PREFIX = "will crash after "; // Test evilness controls: @@ -256,7 +258,7 @@ public class TestStressNRTReplication extends LuceneTestCase { if (TEST_NIGHTLY) { runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 120, 240); } else { - runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 45, 120); + runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 20, 60); } System.out.println("TEST: will run for " + runTimeSec + " sec"); @@ -642,6 +644,7 @@ public class TestStressNRTReplication extends LuceneTestCase { long initInfosVersion = -1; Pattern logTimeStart = Pattern.compile("^[0-9.]+s .*"); boolean willCrash = false; + Optional subprocessKiller = Optional.empty(); while (true) { String l = r.readLine(); @@ -701,8 +704,40 @@ public class TestStressNRTReplication extends LuceneTestCase { initCommitVersion = Integer.parseInt(l.substring(16).trim()); } else if (l.startsWith("INFOS VERSION: ")) { initInfosVersion = Integer.parseInt(l.substring(15).trim()); - } else if (l.contains("will crash after")) { + } else if (l.contains(CRASH_MSG_PREFIX)) { willCrash = true; + Pattern crashMsg = + Pattern.compile(Pattern.quote(CRASH_MSG_PREFIX) + "\\s*(?[0-9]+)"); + var m = crashMsg.matcher(l); + if (!m.find()) { + throw new AssertionError("Expected the crash message to include the timeout: " + l); + } + final long deadline = + System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(Integer.parseInt(m.group("millis"))); + // Fork a new thread that will attempt to terminate the subprocess after a certain delay. + // We don't keep track of these "killer" subprocesses; they will end gracefully if the + // subprocess is terminated prior to the timeout. + subprocessKiller = + Optional.of( + new Thread( + () -> { + while (System.nanoTime() < deadline && p.isAlive()) { + try { + Thread.sleep(250); + } catch ( + @SuppressWarnings("unused") + InterruptedException e) { + // If we do get interrupted, it's likely we're being cleaned up. Do + // proceed immediately then. + break; + } + } + if (p.isAlive()) { + message("now killing process " + p); + p.destroyForcibly(); + } + })); + subprocessKiller.get().start(); } else if (l.startsWith("NODE STARTED")) { break; } @@ -769,7 +804,15 @@ public class TestStressNRTReplication extends LuceneTestCase { + " initInfosVersion=" + initInfosVersion); return new NodeProcess( - p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion, nodeIsClosing); + p, + id, + tcpPort, + pumper, + isPrimary, + initCommitVersion, + initInfosVersion, + nodeIsClosing, + subprocessKiller); } private void nodeClosed(int id) {