mirror of https://github.com/apache/lucene.git
Merge branch 'jira/lucene-5438-nrt-replication' of https://git-wip-us.apache.org/repos/asf/lucene-solr into jira/lucene-5438-nrt-replication
This commit is contained in:
commit
c03bb2ecf9
|
@ -163,7 +163,7 @@ class SimplePrimaryNode extends PrimaryNode {
|
|||
return;
|
||||
}
|
||||
|
||||
message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas: files=" + files.keySet());
|
||||
message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas; tcpPort=" + tcpPort + ": files=" + files.keySet());
|
||||
|
||||
MergePreCopy preCopy = new MergePreCopy(files);
|
||||
warmingSegments.add(preCopy);
|
||||
|
@ -289,8 +289,8 @@ class SimplePrimaryNode extends PrimaryNode {
|
|||
// Something did get flushed (there were indexing ops since the last flush):
|
||||
|
||||
verifyAtLeastMarkerCount(atLeastMarkerCount, null);
|
||||
|
||||
// Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we
|
||||
|
||||
// Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we
|
||||
// (possibly) pushed to some replicas. Alternatively we could make this 2 separate ops?
|
||||
long version = getCopyStateVersion();
|
||||
message("send flushed version=" + version);
|
||||
|
@ -432,10 +432,19 @@ class SimplePrimaryNode extends PrimaryNode {
|
|||
tokenizedWithTermVectors.setStoreTermVectorPositions(true);
|
||||
}
|
||||
|
||||
private void handleIndexing(Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException {
|
||||
private void handleIndexing(Socket socket, AtomicBoolean stop, InputStream is, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException {
|
||||
Thread.currentThread().setName("indexing");
|
||||
message("start handling indexing socket=" + socket);
|
||||
while (true) {
|
||||
while (true) {
|
||||
if (is.available() > 0) {
|
||||
break;
|
||||
}
|
||||
if (stop.get()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
}
|
||||
byte cmd;
|
||||
try {
|
||||
cmd = in.readByte();
|
||||
|
@ -587,7 +596,7 @@ class SimplePrimaryNode extends PrimaryNode {
|
|||
break;
|
||||
|
||||
case CMD_INDEXING:
|
||||
handleIndexing(socket, in, out, bos);
|
||||
handleIndexing(socket, stop, is, in, out, bos);
|
||||
break;
|
||||
|
||||
case CMD_GET_SEARCHING_VERSION:
|
||||
|
|
|
@ -106,7 +106,8 @@ public class SimpleServer extends LuceneTestCase {
|
|||
// Test should fail with this:
|
||||
throw new RuntimeException(t);
|
||||
} else {
|
||||
node.message("exception " + t + " handling client connection; ignoring");
|
||||
node.message("exception handling client connection; ignoring:");
|
||||
t.printStackTrace(System.out);
|
||||
}
|
||||
} finally {
|
||||
if (success) {
|
||||
|
|
|
@ -321,7 +321,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
|
|||
|
||||
// Commit a random node, primary or replica
|
||||
|
||||
{
|
||||
if (random().nextInt(10) == 1) {
|
||||
NodeProcess node = nodes[random().nextInt(nodes.length)];
|
||||
if (node != null && node.nodeIsClosing.get() == false) {
|
||||
// TODO: if this node is primary, it means we committed an unpublished version (not exposed as an NRT point)... not sure it matters.
|
||||
|
@ -751,7 +751,8 @@ public class TestStressNRTReplication extends LuceneTestCase {
|
|||
c.flush();
|
||||
c.in.readByte();
|
||||
} catch (Throwable t) {
|
||||
message("top: ignore exc sending replicas to primary: " + t);
|
||||
message("top: ignore exc sending replicas to primary P" + curPrimary.id + " at tcpPort=" + curPrimary.tcpPort);
|
||||
t.printStackTrace(System.out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue