diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java index fe142344569..d3415ad4441 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java @@ -163,7 +163,7 @@ class SimplePrimaryNode extends PrimaryNode { return; } - message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas: files=" + files.keySet()); + message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas; tcpPort=" + tcpPort + ": files=" + files.keySet()); MergePreCopy preCopy = new MergePreCopy(files); warmingSegments.add(preCopy); @@ -289,8 +289,8 @@ class SimplePrimaryNode extends PrimaryNode { // Something did get flushed (there were indexing ops since the last flush): verifyAtLeastMarkerCount(atLeastMarkerCount, null); - - // Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we + + // Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we // (possibly) pushed to some replicas. Alternatively we could make this 2 separate ops? long version = getCopyStateVersion(); message("send flushed version=" + version); @@ -432,10 +432,19 @@ class SimplePrimaryNode extends PrimaryNode { tokenizedWithTermVectors.setStoreTermVectorPositions(true); } - private void handleIndexing(Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException { + private void handleIndexing(Socket socket, AtomicBoolean stop, InputStream is, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException { Thread.currentThread().setName("indexing"); message("start handling indexing socket=" + socket); while (true) { + while (true) { + if (is.available() > 0) { + break; + } + if (stop.get()) { + return; + } + Thread.sleep(10); + } byte cmd; try { cmd = in.readByte(); @@ -587,7 +596,7 @@ class SimplePrimaryNode extends PrimaryNode { break; case CMD_INDEXING: - handleIndexing(socket, in, out, bos); + handleIndexing(socket, stop, is, in, out, bos); break; case CMD_GET_SEARCHING_VERSION: diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java index 72e33d7af8e..3bad39b621b 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java @@ -106,7 +106,8 @@ public class SimpleServer extends LuceneTestCase { // Test should fail with this: throw new RuntimeException(t); } else { - node.message("exception " + t + " handling client connection; ignoring"); + node.message("exception handling client connection; ignoring:"); + t.printStackTrace(System.out); } } finally { if (success) { diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java index f2595802bb4..fff59fa68d1 100644 --- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java +++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java @@ -321,7 +321,7 @@ public class TestStressNRTReplication extends LuceneTestCase { // Commit a random node, primary or replica - { + if (random().nextInt(10) == 1) { NodeProcess node = nodes[random().nextInt(nodes.length)]; if (node != null && node.nodeIsClosing.get() == false) { // TODO: if this node is primary, it means we committed an unpublished version (not exposed as an NRT point)... not sure it matters. @@ -751,7 +751,8 @@ public class TestStressNRTReplication extends LuceneTestCase { c.flush(); c.in.readByte(); } catch (Throwable t) { - message("top: ignore exc sending replicas to primary: " + t); + message("top: ignore exc sending replicas to primary P" + curPrimary.id + " at tcpPort=" + curPrimary.tcpPort); + t.printStackTrace(System.out); } } }