Remove halt() call in TestSimpleServer (part of TestStressNRTReplication (#13177)

This commit is contained in:
Dawid Weiss 2024-03-13 09:03:34 +01:00 committed by GitHub
parent b527e101e7
commit d5ade0d30f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 80 additions and 16 deletions

View File

@ -19,6 +19,7 @@ package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.document.Document;
@ -48,6 +49,8 @@ class NodeProcess implements Closeable {
// SegmentInfos.version, which can be higher than the initCommitVersion
final long initInfosVersion;
private final Optional<Thread> subprocessKiller;
volatile boolean isOpen = true;
final AtomicBoolean nodeIsClosing;
@ -60,7 +63,8 @@ class NodeProcess implements Closeable {
boolean isPrimary,
long initCommitVersion,
long initInfosVersion,
AtomicBoolean nodeIsClosing) {
AtomicBoolean nodeIsClosing,
Optional<Thread> subprocessKiller) {
this.p = p;
this.id = id;
this.tcpPort = tcpPort;
@ -69,6 +73,7 @@ class NodeProcess implements Closeable {
this.initCommitVersion = initCommitVersion;
this.initInfosVersion = initInfosVersion;
this.nodeIsClosing = nodeIsClosing;
this.subprocessKiller = subprocessKiller;
assert initInfosVersion >= initCommitVersion
: "initInfosVersion=" + initInfosVersion + " initCommitVersion=" + initCommitVersion;
lock = new ReentrantLock();
@ -165,6 +170,11 @@ class NodeProcess implements Closeable {
t.printStackTrace(System.out);
}
try {
if (subprocessKiller.isPresent()) {
var t = subprocessKiller.get();
t.interrupt();
t.join();
}
p.waitFor();
pumper.join();
} catch (InterruptedException ie) {

View File

@ -27,6 +27,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -208,7 +209,8 @@ public class TestNRTReplication extends LuceneTestCase {
primaryTCPPort == -1,
initCommitVersion,
initInfosVersion,
nodeClosing);
nodeClosing,
Optional.empty());
}
@Override

View File

@ -32,8 +32,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
@ -132,14 +134,6 @@ public class TestSimpleServer extends LuceneTestCase {
}
}
/** forcibly halt the JVM: similar to crashing */
// poached from TestIndexWriterOnJRECrash ... should we factor out to TestUtil? seems dangerous
// to give it such "publicity"?
private static void crashJRE() {
Runtime.getRuntime().halt(1);
}
static void writeFilesMetaData(DataOutput out, Map<String, FileMetaData> files)
throws IOException {
out.writeVInt(files.size());
@ -293,7 +287,9 @@ public class TestSimpleServer extends LuceneTestCase {
if (doClose) {
node.message("top: will close after " + (waitForMS / 1000.0) + " seconds");
} else {
node.message("top: will crash after " + (waitForMS / 1000.0) + " seconds");
// This message is pattern-parsed in TestStressNRTReplication, don't change it.
node.message(
"top: " + TestStressNRTReplication.CRASH_MSG_PREFIX + waitForMS + " milliseconds");
}
Thread t =
@ -327,8 +323,21 @@ public class TestSimpleServer extends LuceneTestCase {
throw new RuntimeException(ioe);
}
} else {
node.message("top: now crash JVM after " + (waitForMS / 1000.0) + " seconds");
crashJRE();
node.message(
String.format(
Locale.ROOT,
"Expecting this JVM (PID: %s) to crash about now (after %.2f seconds)",
ProcessHandle.current().pid(),
(waitForMS / 1000.0)));
while (true) {
try {
new CountDownLatch(1).await();
} catch (
@SuppressWarnings("unused")
InterruptedException e) {
// We'll eventually be killed by an outside process, retry.
}
}
}
}
}

View File

@ -37,6 +37,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -119,6 +120,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
@SuppressCodecs({"MockRandom", "Direct", "SimpleText"})
@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
public class TestStressNRTReplication extends LuceneTestCase {
public static final String CRASH_MSG_PREFIX = "will crash after ";
// Test evilness controls:
@ -256,7 +258,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
if (TEST_NIGHTLY) {
runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 120, 240);
} else {
runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 45, 120);
runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 20, 60);
}
System.out.println("TEST: will run for " + runTimeSec + " sec");
@ -642,6 +644,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
long initInfosVersion = -1;
Pattern logTimeStart = Pattern.compile("^[0-9.]+s .*");
boolean willCrash = false;
Optional<Thread> subprocessKiller = Optional.empty();
while (true) {
String l = r.readLine();
@ -701,8 +704,40 @@ public class TestStressNRTReplication extends LuceneTestCase {
initCommitVersion = Integer.parseInt(l.substring(16).trim());
} else if (l.startsWith("INFOS VERSION: ")) {
initInfosVersion = Integer.parseInt(l.substring(15).trim());
} else if (l.contains("will crash after")) {
} else if (l.contains(CRASH_MSG_PREFIX)) {
willCrash = true;
Pattern crashMsg =
Pattern.compile(Pattern.quote(CRASH_MSG_PREFIX) + "\\s*(?<millis>[0-9]+)");
var m = crashMsg.matcher(l);
if (!m.find()) {
throw new AssertionError("Expected the crash message to include the timeout: " + l);
}
final long deadline =
System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(Integer.parseInt(m.group("millis")));
// Fork a new thread that will attempt to terminate the subprocess after a certain delay.
// We don't keep track of these "killer" subprocesses; they will end gracefully if the
// subprocess is terminated prior to the timeout.
subprocessKiller =
Optional.of(
new Thread(
() -> {
while (System.nanoTime() < deadline && p.isAlive()) {
try {
Thread.sleep(250);
} catch (
@SuppressWarnings("unused")
InterruptedException e) {
// If we do get interrupted, it's likely we're being cleaned up. Do
// proceed immediately then.
break;
}
}
if (p.isAlive()) {
message("now killing process " + p);
p.destroyForcibly();
}
}));
subprocessKiller.get().start();
} else if (l.startsWith("NODE STARTED")) {
break;
}
@ -769,7 +804,15 @@ public class TestStressNRTReplication extends LuceneTestCase {
+ " initInfosVersion="
+ initInfosVersion);
return new NodeProcess(
p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion, nodeIsClosing);
p,
id,
tcpPort,
pumper,
isPrimary,
initCommitVersion,
initInfosVersion,
nodeIsClosing,
subprocessKiller);
}
private void nodeClosed(int id) {