diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java index e191caf4964..62827e857ec 100644 --- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java +++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java @@ -462,12 +462,19 @@ abstract class ReplicaNode extends Node { /** Call this to notify this replica node that a new NRT infos is available on the primary. * We kick off a job (runs in the background) to copy files across, and open a new reader once that's done. */ - public synchronized CopyJob newNRTPoint(long version) throws IOException { + public synchronized CopyJob newNRTPoint(long newPrimaryGen, long version) throws IOException { if (isClosed()) { throw new AlreadyClosedException("this replica is closed: state=" + state); } + // Cutover (possibly) to new primary first, so we discard any pre-copied merged segments up front, before checking for which files need + // copying. While it's possible the pre-copied merged segments could still be useful to us, in the case that the new primary is either + // the same primary (just e.g. rebooted), or a promoted replica that had a newer NRT point than we did that included the pre-copied + // merged segments, it's still a bit risky to rely solely on checksum/file length to catch the difference, so we defensively discard + // here and re-copy in that case: + maybeNewPrimary(newPrimaryGen); + // Caller should not "publish" us until we have finished .start(): assert mgr != null; @@ -520,9 +527,9 @@ abstract class ReplicaNode extends Node { return null; } + assert newPrimaryGen == job.getCopyState().primaryGen; + Collection newNRTFiles = job.getFileNames(); - long newPrimaryGen = job.getCopyState().primaryGen; - maybeNewPrimary(newPrimaryGen); message("top: newNRTPoint: job files=" + newNRTFiles); @@ -608,9 +615,15 @@ abstract class ReplicaNode extends Node { } /** Called when the primary changed */ - protected synchronized void maybeNewPrimary(long newPrimaryGen) { + protected synchronized void maybeNewPrimary(long newPrimaryGen) throws IOException { if (newPrimaryGen != lastPrimaryGen) { message("top: now change lastPrimaryGen from " + lastPrimaryGen + " to " + newPrimaryGen + " pendingMergeFiles=" + pendingMergeFiles); + + message("top: delete if no ref pendingMergeFiles=" + pendingMergeFiles); + for(String fileName : pendingMergeFiles) { + deleter.deleteIfNoRef(fileName); + } + assert newPrimaryGen > lastPrimaryGen: "newPrimaryGen=" + newPrimaryGen + " vs lastPrimaryGen=" + lastPrimaryGen; lastPrimaryGen = newPrimaryGen; pendingMergeFiles.clear(); diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java index a0b9535acd4..f75a027c68d 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java @@ -80,7 +80,12 @@ class Jobs extends Thread implements Closeable { } else { node.message("AlreadyClosedException during job.visit job=" + topJob + "; now cancel"); } - topJob.cancel("unexpected exception in visit", t); + try { + topJob.cancel("unexpected exception in visit", t); + } catch (Throwable t2) { + node.message("ignore exception calling cancel: " + t2); + t2.printStackTrace(System.out); + } try { topJob.onceDone.run(topJob); } catch (Throwable t2) { @@ -101,6 +106,7 @@ class Jobs extends Thread implements Closeable { topJob.onceDone.run(topJob); } catch (Throwable t) { node.message("ignore exception calling OnceDone: " + t); + t.printStackTrace(System.out); } } } @@ -112,11 +118,17 @@ class Jobs extends Thread implements Closeable { while (queue.isEmpty() == false) { SimpleCopyJob job = (SimpleCopyJob) queue.poll(); node.message("top: Jobs: now cancel job=" + job); - job.cancel("jobs closing", null); + try { + job.cancel("jobs closing", null); + } catch (Throwable t) { + node.message("ignore exception calling cancel"); + t.printStackTrace(System.out); + } try { job.onceDone.run(job); } catch (Throwable t) { - node.message("ignore exception calling OnceDone: " + t); + node.message("ignore exception calling OnceDone"); + t.printStackTrace(System.out); } } } diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java index 9d8b7641294..a0bfb78f399 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java @@ -86,7 +86,7 @@ class NodeProcess implements Closeable { } } - public boolean commit() { + public boolean commit() throws IOException { try (Connection c = new Connection(tcpPort)) { c.out.writeByte(SimplePrimaryNode.CMD_COMMIT); c.flush(); @@ -95,36 +95,22 @@ class NodeProcess implements Closeable { throw new RuntimeException("commit failed"); } return true; - } catch (Throwable t) { - // nocommit throw this - // Something wrong with this replica; skip it: - System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping"); - return false; } } - public void commitAsync() { + public void commitAsync() throws IOException { try (Connection c = new Connection(tcpPort)) { c.out.writeByte(SimplePrimaryNode.CMD_COMMIT); c.flush(); - } catch (Throwable t) { - // nocommit throw this - // Something wrong with this replica; skip it: - System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping"); } } - public long getSearchingVersion() { + public long getSearchingVersion() throws IOException { try (Connection c = new Connection(tcpPort)) { c.out.writeByte(SimplePrimaryNode.CMD_GET_SEARCHING_VERSION); c.flush(); c.s.shutdownOutput(); return c.in.readVLong(); - } catch (Throwable t) { - // nocommit throw this - // Something wrong with this replica; skip it: - System.out.println("PARENT: top: hit SocketException during getSearchingVersion with R" + id + "; skipping"); - return -1L; } } @@ -162,6 +148,7 @@ class NodeProcess implements Closeable { } } catch (Throwable t) { System.out.println("top: shutdown failed; ignoring"); + t.printStackTrace(System.out); } try { p.waitFor(); @@ -178,10 +165,11 @@ class NodeProcess implements Closeable { } } - public void newNRTPoint(long version, int primaryTCPPort) throws IOException { + public void newNRTPoint(long version, long primaryGen, int primaryTCPPort) throws IOException { try (Connection c = new Connection(tcpPort)) { c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT); c.out.writeVLong(version); + c.out.writeVLong(primaryGen); c.out.writeInt(primaryTCPPort); c.flush(); } diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java index 7f5634cde63..fe142344569 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java @@ -304,6 +304,7 @@ class SimplePrimaryNode extends PrimaryNode { message("send NEW_NRT_POINT to R" + replicaID + " at tcpPort=" + replicaTCPPorts[i]); c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT); c.out.writeVLong(version); + c.out.writeVLong(primaryGen); c.out.writeInt(tcpPort); c.flush(); // TODO: we should use multicast to broadcast files out to replicas diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java index 4868338476b..9658ad17775 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java @@ -176,10 +176,11 @@ class SimpleReplicaNode extends ReplicaNode { case CMD_NEW_NRT_POINT: { long version = in.readVLong(); + long newPrimaryGen = in.readVLong(); Thread.currentThread().setName("recv-" + version); curPrimaryTCPPort = in.readInt(); - message("newNRTPoint primaryTCPPort=" + curPrimaryTCPPort); - newNRTPoint(version); + message("newNRTPoint primaryTCPPort=" + curPrimaryTCPPort + " version=" + version + " newPrimaryGen=" + newPrimaryGen); + newNRTPoint(newPrimaryGen, version); } break; diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java index 2c66994c27d..262e68e01d6 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java @@ -356,7 +356,7 @@ public class TestNRTReplication extends LuceneTestCase { assertVersionAndHits(replica, 0, 0); // Ask replica to sync: - replica.newNRTPoint(primaryVersion1, primary.tcpPort); + replica.newNRTPoint(primaryVersion1, 0, primary.tcpPort); waitForVersionAndHits(replica, primaryVersion1, 10); replica.close(); @@ -461,7 +461,7 @@ public class TestNRTReplication extends LuceneTestCase { assertVersionAndHits(replica, primaryVersion1, 10); // Now ask replica to sync: - replica.newNRTPoint(primaryVersion2, primary.tcpPort); + replica.newNRTPoint(primaryVersion2, 0, primary.tcpPort); waitForVersionAndHits(replica, primaryVersion2, 20); @@ -736,7 +736,7 @@ public class TestNRTReplication extends LuceneTestCase { sendReplicasToPrimary(primary, replica); // Now ask replica to sync: - replica.newNRTPoint(primaryVersion2, primary.tcpPort); + replica.newNRTPoint(primaryVersion2, 0, primary.tcpPort); // Make sure it sees all docs that were indexed while it was down: assertVersionAndHits(primary, primaryVersion2, 110); diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java index a765f11b436..28b15f88922 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java @@ -60,14 +60,8 @@ import com.carrotsearch.randomizedtesting.SeedUtils; // nocommit why so many "hit SocketException during commit with R0"? -// nocommit why so much time when so many nodes are down - -// nocommit indexing is too fast? (xlog replay fails to finish before primary crashes itself) - // nocommit why all these NodeCommunicationExcs? -// nocommit the sockets are a pita on jvm crashing ... - /* TODO - fangs @@ -161,8 +155,7 @@ public class TestStressNRTReplication extends LuceneTestCase { static final boolean DO_BIT_FLIPS_DURING_COPY = true; /** Set to a non-null value to force exactly that many nodes; else, it's random. */ - // nocommit - static final Integer NUM_NODES = 2; + static final Integer NUM_NODES = null; final AtomicBoolean failed = new AtomicBoolean(); @@ -214,9 +207,6 @@ public class TestStressNRTReplication extends LuceneTestCase { // Silly bootstrapping: versionToTransLogLocation.put(0L, 0L); - // nocommit why also 1? - //versionToTransLogLocation.put(1L, 0L); - versionToMarker.put(0L, 0); int numNodes; @@ -334,10 +324,15 @@ public class TestStressNRTReplication extends LuceneTestCase { { NodeProcess node = nodes[random().nextInt(nodes.length)]; if (node != null && node.nodeIsClosing.get() == false) { - // TODO: if this node is primary, it means we committed a "partial" version (not exposed as an NRT point)... not sure it matters. + // TODO: if this node is primary, it means we committed an unpublished version (not exposed as an NRT point)... not sure it matters. // maybe we somehow allow IW to commit a specific sis (the one we just flushed)? message("top: now commit node=" + node); - node.commitAsync(); + try { + node.commitAsync(); + } catch (Throwable t) { + message("top: hit exception during commit with R" + node.id + "; skipping"); + t.printStackTrace(System.out); + } } } } @@ -400,7 +395,14 @@ public class TestStressNRTReplication extends LuceneTestCase { for (NodeProcess node : nodes) { if (node != null) { message("ask " + node + " for its current searching version"); - long searchingVersion = node.getSearchingVersion(); + long searchingVersion; + try { + searchingVersion = node.getSearchingVersion(); + } catch (Throwable t) { + message("top: hit SocketException during getSearchingVersion with R" + node.id + "; skipping"); + t.printStackTrace(System.out); + continue; + } message(node + " has searchingVersion=" + searchingVersion); if (searchingVersion > maxSearchingVersion) { maxSearchingVersion = searchingVersion; @@ -415,8 +417,12 @@ public class TestStressNRTReplication extends LuceneTestCase { } message("top: promote " + replicaToPromote + " version=" + maxSearchingVersion + "; now commit"); - if (replicaToPromote.commit() == false) { - message("top: commit failed; skipping primary promotion"); + try { + replicaToPromote.commit(); + } catch (Throwable t) { + // Something wrong with this replica; skip it: + message("top: hit exception during commit with R" + replicaToPromote.id + "; skipping"); + t.printStackTrace(System.out); return; } @@ -478,8 +484,9 @@ public class TestStressNRTReplication extends LuceneTestCase { try { transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc); } catch (IOException ioe) { - // nocommit what if primary node is still running here, and we failed for some other reason? - message("top: replay xlog failed; abort"); + message("top: replay xlog failed; shutdown new primary"); + ioe.printStackTrace(System.out); + newPrimary.shutdown(); return; } @@ -1182,4 +1189,14 @@ public class TestStressNRTReplication extends LuceneTestCase { Thread.currentThread().getName(), message)); } + + static void message(String message, long localStartNS) { + long now = System.nanoTime(); + System.out.println(String.format(Locale.ROOT, + "%5.3fs %5.1fs: parent [%11s] %s", + (now-Node.globalStartNS)/1000000000., + (now-localStartNS)/1000000000., + Thread.currentThread().getName(), + message)); + } } diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java index d74e1703676..73f3908eee1 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java @@ -31,6 +31,7 @@ class ThreadPumper { @Override public void run() { try { + long startTimeNS = System.nanoTime(); Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*"); String line; while ((line = from.readLine()) != null) { @@ -42,7 +43,7 @@ class ThreadPumper { // Already a well-formed log output: System.out.println(line); } else { - TestNRTReplication.message(line); + TestStressNRTReplication.message(line, startTimeNS); } if (line.contains("now force close server socket after")) { nodeClosing.set(true); @@ -60,4 +61,3 @@ class ThreadPumper { return t; } } - diff --git a/lucene/replicator/test.cmd b/lucene/replicator/test.cmd index f636a61cf43..18045ce0579 100644 --- a/lucene/replicator/test.cmd +++ b/lucene/replicator/test.cmd @@ -1,3 +1 @@ -python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 3 - -# -mult 4 -nightly +python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 3 -verbose -mult 4 -nightly diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java index a36d6d4a733..aa892096a92 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java @@ -852,6 +852,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { // TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles if (assertNoUnreferencedFilesOnClose) { + System.out.println("MDW: now assert no unref'd files at close"); // now look for unreferenced files: discount ones that we tried to delete but could not Set allFiles = new HashSet<>(Arrays.asList(listAll()));