From c14fa123da435a34c4f40e6685c3184ecd9a60e0 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Mon, 8 Feb 2016 14:07:14 -0500 Subject: [PATCH 1/3] fix compilation errors --- .../test/org/apache/lucene/replicator/nrt/Jobs.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java index a0b9535acd4..de8c0c38e6d 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java @@ -80,7 +80,12 @@ class Jobs extends Thread implements Closeable { } else { node.message("AlreadyClosedException during job.visit job=" + topJob + "; now cancel"); } - topJob.cancel("unexpected exception in visit", t); + try { + topJob.cancel("unexpected exception in visit", t); + } catch (Throwable t2) { + node.message("ignore exception calling cancel: " + t2); + t2.printStackTrace(System.out); + } try { topJob.onceDone.run(topJob); } catch (Throwable t2) { @@ -112,7 +117,11 @@ class Jobs extends Thread implements Closeable { while (queue.isEmpty() == false) { SimpleCopyJob job = (SimpleCopyJob) queue.poll(); node.message("top: Jobs: now cancel job=" + job); - job.cancel("jobs closing", null); + try { + job.cancel("jobs closing", null); + } catch (Throwable t) { + node.message("ignore exception calling cancel: " + t); + } try { job.onceDone.run(job); } catch (Throwable t) { From ae2b58c03c82b6faedc35698542f4898892683b2 Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Mon, 8 Feb 2016 19:15:17 -0500 Subject: [PATCH 2/3] indexer threads should stop on shutdown; don't suppress exceptions; decrease random commit frequency --- .../replicator/nrt/SimplePrimaryNode.java | 19 ++++++++++++++----- .../lucene/replicator/nrt/SimpleServer.java | 3 ++- .../nrt/TestStressNRTReplication.java | 7 ++++--- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java index fe142344569..d3415ad4441 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java @@ -163,7 +163,7 @@ class SimplePrimaryNode extends PrimaryNode { return; } - message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas: files=" + files.keySet()); + message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas; tcpPort=" + tcpPort + ": files=" + files.keySet()); MergePreCopy preCopy = new MergePreCopy(files); warmingSegments.add(preCopy); @@ -289,8 +289,8 @@ class SimplePrimaryNode extends PrimaryNode { // Something did get flushed (there were indexing ops since the last flush): verifyAtLeastMarkerCount(atLeastMarkerCount, null); - - // Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we + + // Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we // (possibly) pushed to some replicas. Alternatively we could make this 2 separate ops? long version = getCopyStateVersion(); message("send flushed version=" + version); @@ -432,10 +432,19 @@ class SimplePrimaryNode extends PrimaryNode { tokenizedWithTermVectors.setStoreTermVectorPositions(true); } - private void handleIndexing(Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException { + private void handleIndexing(Socket socket, AtomicBoolean stop, InputStream is, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException { Thread.currentThread().setName("indexing"); message("start handling indexing socket=" + socket); while (true) { + while (true) { + if (is.available() > 0) { + break; + } + if (stop.get()) { + return; + } + Thread.sleep(10); + } byte cmd; try { cmd = in.readByte(); @@ -587,7 +596,7 @@ class SimplePrimaryNode extends PrimaryNode { break; case CMD_INDEXING: - handleIndexing(socket, in, out, bos); + handleIndexing(socket, stop, is, in, out, bos); break; case CMD_GET_SEARCHING_VERSION: diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java index 72e33d7af8e..3bad39b621b 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java @@ -106,7 +106,8 @@ public class SimpleServer extends LuceneTestCase { // Test should fail with this: throw new RuntimeException(t); } else { - node.message("exception " + t + " handling client connection; ignoring"); + node.message("exception handling client connection; ignoring:"); + t.printStackTrace(System.out); } } finally { if (success) { diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java index 28b15f88922..86550c56bf2 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java @@ -155,7 +155,7 @@ public class TestStressNRTReplication extends LuceneTestCase { static final boolean DO_BIT_FLIPS_DURING_COPY = true; /** Set to a non-null value to force exactly that many nodes; else, it's random. */ - static final Integer NUM_NODES = null; + static final Integer NUM_NODES; final AtomicBoolean failed = new AtomicBoolean(); @@ -321,7 +321,7 @@ public class TestStressNRTReplication extends LuceneTestCase { // Commit a random node, primary or replica - { + if (random().nextInt(10) == 1) { NodeProcess node = nodes[random().nextInt(nodes.length)]; if (node != null && node.nodeIsClosing.get() == false) { // TODO: if this node is primary, it means we committed an unpublished version (not exposed as an NRT point)... not sure it matters. @@ -751,7 +751,8 @@ public class TestStressNRTReplication extends LuceneTestCase { c.flush(); c.in.readByte(); } catch (Throwable t) { - message("top: ignore exc sending replicas to primary: " + t); + message("top: ignore exc sending replicas to primary P" + curPrimary.id + " at tcpPort=" + curPrimary.tcpPort); + t.printStackTrace(System.out); } } } From f7a56ed6bca6d47de1b14e1aa6c13941076eca2d Mon Sep 17 00:00:00 2001 From: Mike McCandless Date: Mon, 8 Feb 2016 19:16:33 -0500 Subject: [PATCH 3/3] fix compilation --- .../apache/lucene/replicator/nrt/TestStressNRTReplication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java index 86550c56bf2..f31376b5909 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java @@ -155,7 +155,7 @@ public class TestStressNRTReplication extends LuceneTestCase { static final boolean DO_BIT_FLIPS_DURING_COPY = true; /** Set to a non-null value to force exactly that many nodes; else, it's random. */ - static final Integer NUM_NODES; + static final Integer NUM_NODES = null; final AtomicBoolean failed = new AtomicBoolean();