indexer threads should stop on shutdown; don't suppress exceptions; decrease random commit frequency

This commit is contained in:
Mike McCandless 2016-02-08 19:15:17 -05:00
parent 30613c7403
commit ae2b58c03c
3 changed files with 20 additions and 9 deletions

View File

@ -163,7 +163,7 @@ class SimplePrimaryNode extends PrimaryNode {
return;
}
message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas: files=" + files.keySet());
message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas; tcpPort=" + tcpPort + ": files=" + files.keySet());
MergePreCopy preCopy = new MergePreCopy(files);
warmingSegments.add(preCopy);
@ -432,10 +432,19 @@ class SimplePrimaryNode extends PrimaryNode {
tokenizedWithTermVectors.setStoreTermVectorPositions(true);
}
private void handleIndexing(Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException {
private void handleIndexing(Socket socket, AtomicBoolean stop, InputStream is, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException {
Thread.currentThread().setName("indexing");
message("start handling indexing socket=" + socket);
while (true) {
while (true) {
if (is.available() > 0) {
break;
}
if (stop.get()) {
return;
}
Thread.sleep(10);
}
byte cmd;
try {
cmd = in.readByte();
@ -587,7 +596,7 @@ class SimplePrimaryNode extends PrimaryNode {
break;
case CMD_INDEXING:
handleIndexing(socket, in, out, bos);
handleIndexing(socket, stop, is, in, out, bos);
break;
case CMD_GET_SEARCHING_VERSION:

View File

@ -106,7 +106,8 @@ public class SimpleServer extends LuceneTestCase {
// Test should fail with this:
throw new RuntimeException(t);
} else {
node.message("exception " + t + " handling client connection; ignoring");
node.message("exception handling client connection; ignoring:");
t.printStackTrace(System.out);
}
} finally {
if (success) {

View File

@ -155,7 +155,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
static final boolean DO_BIT_FLIPS_DURING_COPY = true;
/** Set to a non-null value to force exactly that many nodes; else, it's random. */
static final Integer NUM_NODES = null;
static final Integer NUM_NODES;
final AtomicBoolean failed = new AtomicBoolean();
@ -321,7 +321,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
// Commit a random node, primary or replica
{
if (random().nextInt(10) == 1) {
NodeProcess node = nodes[random().nextInt(nodes.length)];
if (node != null && node.nodeIsClosing.get() == false) {
// TODO: if this node is primary, it means we committed an unpublished version (not exposed as an NRT point)... not sure it matters.
@ -751,7 +751,8 @@ public class TestStressNRTReplication extends LuceneTestCase {
c.flush();
c.in.readByte();
} catch (Throwable t) {
message("top: ignore exc sending replicas to primary: " + t);
message("top: ignore exc sending replicas to primary P" + curPrimary.id + " at tcpPort=" + curPrimary.tcpPort);
t.printStackTrace(System.out);
}
}
}