This commit is contained in:
Michael McCandless 2016-02-08 14:07:56 -05:00 committed by Mike McCandless
commit 30613c7403
10 changed files with 74 additions and 52 deletions

View File

@ -462,12 +462,19 @@ abstract class ReplicaNode extends Node {
/** Call this to notify this replica node that a new NRT infos is available on the primary.
* We kick off a job (runs in the background) to copy files across, and open a new reader once that's done. */
public synchronized CopyJob newNRTPoint(long version) throws IOException {
public synchronized CopyJob newNRTPoint(long newPrimaryGen, long version) throws IOException {
if (isClosed()) {
throw new AlreadyClosedException("this replica is closed: state=" + state);
}
// Cutover (possibly) to new primary first, so we discard any pre-copied merged segments up front, before checking for which files need
// copying. While it's possible the pre-copied merged segments could still be useful to us, in the case that the new primary is either
// the same primary (just e.g. rebooted), or a promoted replica that had a newer NRT point than we did that included the pre-copied
// merged segments, it's still a bit risky to rely solely on checksum/file length to catch the difference, so we defensively discard
// here and re-copy in that case:
maybeNewPrimary(newPrimaryGen);
// Caller should not "publish" us until we have finished .start():
assert mgr != null;
@ -520,9 +527,9 @@ abstract class ReplicaNode extends Node {
return null;
}
assert newPrimaryGen == job.getCopyState().primaryGen;
Collection<String> newNRTFiles = job.getFileNames();
long newPrimaryGen = job.getCopyState().primaryGen;
maybeNewPrimary(newPrimaryGen);
message("top: newNRTPoint: job files=" + newNRTFiles);
@ -608,9 +615,15 @@ abstract class ReplicaNode extends Node {
}
/** Called when the primary changed */
protected synchronized void maybeNewPrimary(long newPrimaryGen) {
protected synchronized void maybeNewPrimary(long newPrimaryGen) throws IOException {
if (newPrimaryGen != lastPrimaryGen) {
message("top: now change lastPrimaryGen from " + lastPrimaryGen + " to " + newPrimaryGen + " pendingMergeFiles=" + pendingMergeFiles);
message("top: delete if no ref pendingMergeFiles=" + pendingMergeFiles);
for(String fileName : pendingMergeFiles) {
deleter.deleteIfNoRef(fileName);
}
assert newPrimaryGen > lastPrimaryGen: "newPrimaryGen=" + newPrimaryGen + " vs lastPrimaryGen=" + lastPrimaryGen;
lastPrimaryGen = newPrimaryGen;
pendingMergeFiles.clear();

View File

@ -106,6 +106,7 @@ class Jobs extends Thread implements Closeable {
topJob.onceDone.run(topJob);
} catch (Throwable t) {
node.message("ignore exception calling OnceDone: " + t);
t.printStackTrace(System.out);
}
}
}
@ -120,12 +121,14 @@ class Jobs extends Thread implements Closeable {
try {
job.cancel("jobs closing", null);
} catch (Throwable t) {
node.message("ignore exception calling cancel: " + t);
node.message("ignore exception calling cancel");
t.printStackTrace(System.out);
}
try {
job.onceDone.run(job);
} catch (Throwable t) {
node.message("ignore exception calling OnceDone: " + t);
node.message("ignore exception calling OnceDone");
t.printStackTrace(System.out);
}
}
}

View File

@ -86,7 +86,7 @@ class NodeProcess implements Closeable {
}
}
public boolean commit() {
public boolean commit() throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
c.flush();
@ -95,36 +95,22 @@ class NodeProcess implements Closeable {
throw new RuntimeException("commit failed");
}
return true;
} catch (Throwable t) {
// nocommit throw this
// Something wrong with this replica; skip it:
System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping");
return false;
}
}
public void commitAsync() {
public void commitAsync() throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
c.flush();
} catch (Throwable t) {
// nocommit throw this
// Something wrong with this replica; skip it:
System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping");
}
}
public long getSearchingVersion() {
public long getSearchingVersion() throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_GET_SEARCHING_VERSION);
c.flush();
c.s.shutdownOutput();
return c.in.readVLong();
} catch (Throwable t) {
// nocommit throw this
// Something wrong with this replica; skip it:
System.out.println("PARENT: top: hit SocketException during getSearchingVersion with R" + id + "; skipping");
return -1L;
}
}
@ -162,6 +148,7 @@ class NodeProcess implements Closeable {
}
} catch (Throwable t) {
System.out.println("top: shutdown failed; ignoring");
t.printStackTrace(System.out);
}
try {
p.waitFor();
@ -178,10 +165,11 @@ class NodeProcess implements Closeable {
}
}
public void newNRTPoint(long version, int primaryTCPPort) throws IOException {
public void newNRTPoint(long version, long primaryGen, int primaryTCPPort) throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
c.out.writeVLong(version);
c.out.writeVLong(primaryGen);
c.out.writeInt(primaryTCPPort);
c.flush();
}

View File

@ -304,6 +304,7 @@ class SimplePrimaryNode extends PrimaryNode {
message("send NEW_NRT_POINT to R" + replicaID + " at tcpPort=" + replicaTCPPorts[i]);
c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
c.out.writeVLong(version);
c.out.writeVLong(primaryGen);
c.out.writeInt(tcpPort);
c.flush();
// TODO: we should use multicast to broadcast files out to replicas

View File

@ -176,10 +176,11 @@ class SimpleReplicaNode extends ReplicaNode {
case CMD_NEW_NRT_POINT:
{
long version = in.readVLong();
long newPrimaryGen = in.readVLong();
Thread.currentThread().setName("recv-" + version);
curPrimaryTCPPort = in.readInt();
message("newNRTPoint primaryTCPPort=" + curPrimaryTCPPort);
newNRTPoint(version);
message("newNRTPoint primaryTCPPort=" + curPrimaryTCPPort + " version=" + version + " newPrimaryGen=" + newPrimaryGen);
newNRTPoint(newPrimaryGen, version);
}
break;

View File

@ -356,7 +356,7 @@ public class TestNRTReplication extends LuceneTestCase {
assertVersionAndHits(replica, 0, 0);
// Ask replica to sync:
replica.newNRTPoint(primaryVersion1, primary.tcpPort);
replica.newNRTPoint(primaryVersion1, 0, primary.tcpPort);
waitForVersionAndHits(replica, primaryVersion1, 10);
replica.close();
@ -461,7 +461,7 @@ public class TestNRTReplication extends LuceneTestCase {
assertVersionAndHits(replica, primaryVersion1, 10);
// Now ask replica to sync:
replica.newNRTPoint(primaryVersion2, primary.tcpPort);
replica.newNRTPoint(primaryVersion2, 0, primary.tcpPort);
waitForVersionAndHits(replica, primaryVersion2, 20);
@ -736,7 +736,7 @@ public class TestNRTReplication extends LuceneTestCase {
sendReplicasToPrimary(primary, replica);
// Now ask replica to sync:
replica.newNRTPoint(primaryVersion2, primary.tcpPort);
replica.newNRTPoint(primaryVersion2, 0, primary.tcpPort);
// Make sure it sees all docs that were indexed while it was down:
assertVersionAndHits(primary, primaryVersion2, 110);

View File

@ -60,14 +60,8 @@ import com.carrotsearch.randomizedtesting.SeedUtils;
// nocommit why so many "hit SocketException during commit with R0"?
// nocommit why so much time when so many nodes are down
// nocommit indexing is too fast? (xlog replay fails to finish before primary crashes itself)
// nocommit why all these NodeCommunicationExcs?
// nocommit the sockets are a pita on jvm crashing ...
/*
TODO
- fangs
@ -161,8 +155,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
static final boolean DO_BIT_FLIPS_DURING_COPY = true;
/** Set to a non-null value to force exactly that many nodes; else, it's random. */
// nocommit
static final Integer NUM_NODES = 2;
static final Integer NUM_NODES = null;
final AtomicBoolean failed = new AtomicBoolean();
@ -214,9 +207,6 @@ public class TestStressNRTReplication extends LuceneTestCase {
// Silly bootstrapping:
versionToTransLogLocation.put(0L, 0L);
// nocommit why also 1?
//versionToTransLogLocation.put(1L, 0L);
versionToMarker.put(0L, 0);
int numNodes;
@ -334,10 +324,15 @@ public class TestStressNRTReplication extends LuceneTestCase {
{
NodeProcess node = nodes[random().nextInt(nodes.length)];
if (node != null && node.nodeIsClosing.get() == false) {
// TODO: if this node is primary, it means we committed a "partial" version (not exposed as an NRT point)... not sure it matters.
// TODO: if this node is primary, it means we committed an unpublished version (not exposed as an NRT point)... not sure it matters.
// maybe we somehow allow IW to commit a specific sis (the one we just flushed)?
message("top: now commit node=" + node);
try {
node.commitAsync();
} catch (Throwable t) {
message("top: hit exception during commit with R" + node.id + "; skipping");
t.printStackTrace(System.out);
}
}
}
}
@ -400,7 +395,14 @@ public class TestStressNRTReplication extends LuceneTestCase {
for (NodeProcess node : nodes) {
if (node != null) {
message("ask " + node + " for its current searching version");
long searchingVersion = node.getSearchingVersion();
long searchingVersion;
try {
searchingVersion = node.getSearchingVersion();
} catch (Throwable t) {
message("top: hit SocketException during getSearchingVersion with R" + node.id + "; skipping");
t.printStackTrace(System.out);
continue;
}
message(node + " has searchingVersion=" + searchingVersion);
if (searchingVersion > maxSearchingVersion) {
maxSearchingVersion = searchingVersion;
@ -415,8 +417,12 @@ public class TestStressNRTReplication extends LuceneTestCase {
}
message("top: promote " + replicaToPromote + " version=" + maxSearchingVersion + "; now commit");
if (replicaToPromote.commit() == false) {
message("top: commit failed; skipping primary promotion");
try {
replicaToPromote.commit();
} catch (Throwable t) {
// Something wrong with this replica; skip it:
message("top: hit exception during commit with R" + replicaToPromote.id + "; skipping");
t.printStackTrace(System.out);
return;
}
@ -478,8 +484,9 @@ public class TestStressNRTReplication extends LuceneTestCase {
try {
transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc);
} catch (IOException ioe) {
// nocommit what if primary node is still running here, and we failed for some other reason?
message("top: replay xlog failed; abort");
message("top: replay xlog failed; shutdown new primary");
ioe.printStackTrace(System.out);
newPrimary.shutdown();
return;
}
@ -1182,4 +1189,14 @@ public class TestStressNRTReplication extends LuceneTestCase {
Thread.currentThread().getName(),
message));
}
static void message(String message, long localStartNS) {
long now = System.nanoTime();
System.out.println(String.format(Locale.ROOT,
"%5.3fs %5.1fs: parent [%11s] %s",
(now-Node.globalStartNS)/1000000000.,
(now-localStartNS)/1000000000.,
Thread.currentThread().getName(),
message));
}
}

View File

@ -31,6 +31,7 @@ class ThreadPumper {
@Override
public void run() {
try {
long startTimeNS = System.nanoTime();
Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*");
String line;
while ((line = from.readLine()) != null) {
@ -42,7 +43,7 @@ class ThreadPumper {
// Already a well-formed log output:
System.out.println(line);
} else {
TestNRTReplication.message(line);
TestStressNRTReplication.message(line, startTimeNS);
}
if (line.contains("now force close server socket after")) {
nodeClosing.set(true);
@ -60,4 +61,3 @@ class ThreadPumper {
return t;
}
}

View File

@ -1,3 +1 @@
python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 3
# -mult 4 -nightly
python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 3 -verbose -mult 4 -nightly

View File

@ -852,6 +852,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
// TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles
if (assertNoUnreferencedFilesOnClose) {
System.out.println("MDW: now assert no unref'd files at close");
// now look for unreferenced files: discount ones that we tried to delete but could not
Set<String> allFiles = new HashSet<>(Arrays.asList(listAll()));