remove some core changes; add missing sync that caused stress test failure

This commit is contained in:
Mike McCandless 2016-02-10 09:09:49 -05:00
parent 027bc0e4d6
commit 3a47dd29bc
3 changed files with 33 additions and 36 deletions

View File

@ -175,10 +175,8 @@ public class NRTCachingDirectory extends FilterDirectory implements Accountable
@Override
public void renameFile(String source, String dest) throws IOException {
unCache(source);
try {
cache.deleteFile(dest);
} catch (FileNotFoundException fnfe) {
// OK -- it may not exist
if (cache.fileNameExists(dest)) {
throw new IllegalArgumentException("target file " + dest + " already exists");
}
in.renameFile(source, dest);
}

View File

@ -107,9 +107,6 @@ public final class IOUtils {
* objects to call <tt>close()</tt> on
*/
public static void closeWhileHandlingException(Closeable... objects) {
if (objects.length == 0) {
throw new IllegalArgumentException("pass at least one Closeable");
}
closeWhileHandlingException(Arrays.asList(objects));
}

View File

@ -679,41 +679,43 @@ class SimplePrimaryNode extends PrimaryNode {
int replicaTCPPort = in.readVInt();
message("new replica: " + warmingSegments.size() + " current warming merges");
// Step through all currently warming segments and try to add this replica if it isn't there already:
for(MergePreCopy preCopy : warmingSegments) {
message("warming segment " + preCopy.files.keySet());
boolean found = false;
synchronized (preCopy.connections) {
for(Connection c : preCopy.connections) {
if (c.destTCPPort == replicaTCPPort) {
found = true;
break;
synchronized(warmingSegments) {
for(MergePreCopy preCopy : warmingSegments) {
message("warming segment " + preCopy.files.keySet());
boolean found = false;
synchronized (preCopy.connections) {
for(Connection c : preCopy.connections) {
if (c.destTCPPort == replicaTCPPort) {
found = true;
break;
}
}
}
}
if (found) {
message("this replica is already warming this segment; skipping");
// It's possible (maybe) that the replica started up, then a merge kicked off, and it warmed to this new replica, all before the
// replica sent us this command:
continue;
}
if (found) {
message("this replica is already warming this segment; skipping");
// It's possible (maybe) that the replica started up, then a merge kicked off, and it warmed to this new replica, all before the
// replica sent us this command:
continue;
}
// OK, this new replica is not already warming this segment, so attempt (could fail) to start warming now:
// OK, this new replica is not already warming this segment, so attempt (could fail) to start warming now:
Connection c = new Connection(replicaTCPPort);
if (preCopy.tryAddConnection(c) == false) {
// This can happen, if all other replicas just now finished warming this segment, and so we were just a bit too late. In this
// case the segment will be copied over in the next nrt point sent to this replica
message("failed to add connection to segment warmer (too late); closing");
c.close();
Connection c = new Connection(replicaTCPPort);
if (preCopy.tryAddConnection(c) == false) {
// This can happen, if all other replicas just now finished warming this segment, and so we were just a bit too late. In this
// case the segment will be copied over in the next nrt point sent to this replica
message("failed to add connection to segment warmer (too late); closing");
c.close();
}
c.out.writeByte(SimpleReplicaNode.CMD_PRE_COPY_MERGE);
c.out.writeVLong(primaryGen);
c.out.writeVInt(tcpPort);
SimpleServer.writeFilesMetaData(c.out, preCopy.files);
c.flush();
c.s.shutdownOutput();
message("successfully started warming");
}
c.out.writeByte(SimpleReplicaNode.CMD_PRE_COPY_MERGE);
c.out.writeVLong(primaryGen);
c.out.writeVInt(tcpPort);
SimpleServer.writeFilesMetaData(c.out, preCopy.files);
c.flush();
c.s.shutdownOutput();
message("successfully started warming");
}
break;