fix more nocommits; add separate test that deleteAll can replicate

This commit is contained in:
Mike McCandless 2016-01-27 06:00:12 -05:00
parent 33890681a0
commit 022540e8c2
9 changed files with 1363 additions and 1095 deletions

View File

@ -23,30 +23,14 @@ package org.apache.lucene.replicator.nrt;
* flushed segment sizes, not merged segments. */
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.replicator.nrt.CopyJob.OnceDone;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
// TODO: or ... replica node can do merging locally? tricky to keep things in sync, when one node merges more slowly than others...
@ -71,8 +55,6 @@ class PreCopyMergedSegmentWarmer extends IndexReaderWarmer {
filesMetaData.put(fileName, metaData);
}
// nocommit if one replica is very slow then it dos's all other replicas?
primary.preCopyMergedSegmentFiles(info, filesMetaData);
primary.message(String.format(Locale.ROOT, "top: done warm merge " + info + ": took %.3f sec, %.1f MB", (System.nanoTime()-startNS)/1000000000., info.sizeInBytes()/1024/1024.));
primary.finishedMergedFiles.addAll(filesMetaData.keySet());

View File

@ -17,43 +17,26 @@ package org.apache.lucene.replicator.nrt;
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RAMFile;
import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.PrintStreamInfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
/*
@ -178,7 +161,6 @@ public abstract class PrimaryNode extends Node {
// on xlog replay we are replaying more ops than necessary.
commitData.put(VERSION_KEY, Long.toString(copyState.version));
message("top: commit commitData=" + commitData);
// nocommit this is now an NRT-visible change! make test where nothing is indexing and confirm we don't do silly commit + refresh loop forever!
writer.setCommitData(commitData, false);
writer.commit();
}

View File

@ -17,14 +17,7 @@ package org.apache.lucene.replicator.nrt;
* limitations under the License.
*/
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -34,15 +27,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
@ -52,24 +41,12 @@ import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
/** Replica node, that pulls index changes from the primary node by copying newly flushed or merged index files */
@ -544,7 +521,7 @@ abstract class ReplicaNode extends Node {
synchronized (mergeCopyJobs) {
for (CopyJob mergeJob : mergeCopyJobs) {
if (mergeJob.getFileNames().contains(fileName)) {
// nocommit can't we .transferAndCancel?
// TODO: we could maybe transferAndCancel here? except CopyJob can't transferAndCancel more than one currently
message("top: now cancel merge copy job=" + mergeJob + ": file " + fileName + " is now being copied via NRT point");
mergeJob.cancel("newNRTPoint is copying over the same file", null);
}

View File

@ -19,18 +19,9 @@ package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
/** Parent JVM hold this "wrapper" to refer to each child JVM. This is roughly equivalent e.g. to a client-side "sugar" API. */
class NodeProcess implements Closeable {
@ -234,5 +225,14 @@ class NodeProcess implements Closeable {
c.flush();
c.in.readByte();
}
public void deleteAllDocuments(Connection c) throws IOException {
if (isPrimary == false) {
throw new IllegalStateException("only primary can index");
}
c.out.writeByte(SimplePrimaryNode.CMD_DELETE_ALL_DOCS);
c.flush();
c.in.readByte();
}
}

View File

@ -43,17 +43,16 @@ import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.DataInput;
@ -61,8 +60,6 @@ import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
@ -74,6 +71,8 @@ class SimplePrimaryNode extends PrimaryNode {
final int tcpPort;
final Random random;
// These are updated by parent test process whenever replicas change:
int[] replicaTCPPorts;
int[] replicaIDs;
@ -81,6 +80,10 @@ class SimplePrimaryNode extends PrimaryNode {
// So we only flip a bit once per file name:
final Set<String> bitFlipped = Collections.synchronizedSet(new HashSet<>());
final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
final boolean doFlipBitsDuringCopy;
static class MergePreCopy {
final List<Connection> connections = Collections.synchronizedList(new ArrayList<>());
final Map<String,FileMetaData> files;
@ -109,11 +112,12 @@ class SimplePrimaryNode extends PrimaryNode {
}
}
final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory) throws IOException {
public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory,
boolean doFlipBitsDuringCopy) throws IOException {
super(initWriter(id, random, indexPath), id, primaryGen, forcePrimaryVersion, searcherFactory);
this.tcpPort = tcpPort;
this.random = new Random(random.nextLong());
this.doFlipBitsDuringCopy = doFlipBitsDuringCopy;
}
/** Records currently alive replicas. */
@ -187,7 +191,7 @@ class SimplePrimaryNode extends PrimaryNode {
long startNS = System.nanoTime();
long lastWarnNS = startNS;
// TODO: maybe ... place some sort of time limit on how long we are willing to wait for slow replicas to finish copying?
// TODO: maybe ... place some sort of time limit on how long we are willing to wait for slow replica(s) to finish copying?
while (preCopy.finished() == false) {
try {
Thread.sleep(10);
@ -241,7 +245,16 @@ class SimplePrimaryNode extends PrimaryNode {
message("top: warning: replica socket=" + c.s + " for segment=" + info + " seems to be dead; closing files=" + files.keySet());
IOUtils.closeWhileHandlingException(c);
it.remove();
done = true;
}
if (done == false && random.nextInt(1000) == 17) {
message("top: warning: now randomly dropping replica from merge warming; files=" + files.keySet());
IOUtils.closeWhileHandlingException(c);
it.remove();
done = true;
}
} catch (Throwable t) {
message("top: ignore exception trying to read byte during warm for segment=" + info + " to replica socket=" + c.s + ": " + t + " files=" + files.keySet());
IOUtils.closeWhileHandlingException(c);
@ -368,7 +381,7 @@ class SimplePrimaryNode extends PrimaryNode {
while (upto < len) {
int chunk = (int) Math.min(buffer.length, (len-upto));
in.readBytes(buffer, 0, chunk);
if (TestNRTReplication.DO_BIT_FLIPS_DURING_COPY) {
if (doFlipBitsDuringCopy) {
if (random.nextInt(3000) == 17 && bitFlipped.contains(fileName) == false) {
bitFlipped.add(fileName);
message("file " + fileName + " to R" + replicaID + ": now randomly flipping a bit at byte=" + upto);
@ -435,6 +448,10 @@ class SimplePrimaryNode extends PrimaryNode {
handleDeleteDocument(in, out);
out.writeByte((byte) 1);
bos.flush();
} else if (cmd == CMD_DELETE_ALL_DOCS) {
writer.deleteAll();
out.writeByte((byte) 1);
bos.flush();
} else if (cmd == CMD_INDEXING_DONE) {
out.writeByte((byte) 1);
bos.flush();
@ -508,6 +525,7 @@ class SimplePrimaryNode extends PrimaryNode {
static final byte CMD_MARKER_SEARCH = 3;
static final byte CMD_COMMIT = 4;
static final byte CMD_CLOSE = 5;
static final byte CMD_SEARCH_ALL = 21;
// Send (to primary) the list of currently running replicas:
static final byte CMD_SET_REPLICAS = 16;
@ -518,6 +536,7 @@ class SimplePrimaryNode extends PrimaryNode {
static final byte CMD_UPDATE_DOC = 7;
static final byte CMD_DELETE_DOC = 8;
static final byte CMD_INDEXING_DONE = 19;
static final byte CMD_DELETE_ALL_DOCS = 22;
// Sent by replica to primary when replica first starts up, so primary can add it to any warming merges:
static final byte CMD_NEW_REPLICA = 20;
@ -579,6 +598,22 @@ class SimplePrimaryNode extends PrimaryNode {
}
continue outer;
case CMD_SEARCH_ALL:
{
Thread.currentThread().setName("search all");
IndexSearcher searcher = mgr.acquire();
try {
long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
int hitCount = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
//message("version=" + version + " searcher=" + searcher);
out.writeVLong(version);
out.writeVInt(hitCount);
} finally {
mgr.release(searcher);
}
}
continue outer;
case CMD_MARKER_SEARCH:
{
Thread.currentThread().setName("msearch");

View File

@ -24,19 +24,16 @@ import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.DataInput;
@ -45,7 +42,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.LuceneTestCase;
@ -182,7 +178,7 @@ class SimpleReplicaNode extends ReplicaNode {
break;
case SimplePrimaryNode.CMD_GET_SEARCHING_VERSION:
// nocommit this is hacky:
// This is called when primary has crashed and we need to elect a new primary from all the still running replicas:
// Tricky: if a sync is just finishing up, i.e. managed to finish copying all files just before we crashed primary, and is now
// in the process of opening a new reader, we need to wait for it, to be sure we really pick the most current replica:
@ -190,7 +186,7 @@ class SimpleReplicaNode extends ReplicaNode {
message("top: getSearchingVersion: now wait for finish sync");
// TODO: use immediate concurrency instead of polling:
while (isCopying() && stop.get() == false) {
Thread.sleep(50);
Thread.sleep(10);
message("top: curNRTCopy=" + curNRTCopy);
}
message("top: getSearchingVersion: done wait for finish sync");
@ -212,6 +208,24 @@ class SimpleReplicaNode extends ReplicaNode {
//node.message("version=" + version + " searcher=" + searcher);
out.writeVLong(version);
out.writeVInt(hitCount);
bos.flush();
} finally {
mgr.release(searcher);
}
}
continue outer;
case SimplePrimaryNode.CMD_SEARCH_ALL:
{
Thread.currentThread().setName("search all");
IndexSearcher searcher = mgr.acquire();
try {
long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
int hitCount = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
//node.message("version=" + version + " searcher=" + searcher);
out.writeVLong(version);
out.writeVInt(hitCount);
bos.flush();
} finally {
mgr.release(searcher);
}
@ -227,6 +241,7 @@ class SimpleReplicaNode extends ReplicaNode {
int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
out.writeVLong(version);
out.writeVInt(hitCount);
bos.flush();
} finally {
mgr.release(searcher);
}
@ -290,6 +305,7 @@ class SimpleReplicaNode extends ReplicaNode {
default:
throw new IllegalArgumentException("unrecognized cmd=" + cmd);
}
System.out.println("NOW FLUSH");
bos.flush();
break;

View File

@ -19,61 +19,32 @@ package org.apache.lucene.replicator.nrt;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.file.Path;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
/** Child process with silly naive TCP socket server to handle
@ -262,8 +233,9 @@ public class SimpleServer extends LuceneTestCase {
long primaryGen = Long.parseLong(System.getProperty("tests.nrtreplication.primaryGen"));
Node.globalStartNS = Long.parseLong(System.getProperty("tests.nrtreplication.startNS"));
boolean doRandomCrash = isPrimary ? TestNRTReplication.DO_CRASH_PRIMARY : TestNRTReplication.DO_CRASH_REPLICA;
boolean doRandomClose = isPrimary ? false : TestNRTReplication.DO_CLOSE_REPLICA;
boolean doRandomCrash = "true".equals(System.getProperty("tests.nrtreplication.doRandomCrash"));
boolean doRandomClose = "true".equals(System.getProperty("tests.nrtreplication.doRandomClose"));
boolean doFlipBitsDuringCopy = "true".equals(System.getProperty("tests.nrtreplication.doFlipBitsDuringCopy"));
// Create server socket that we listen for incoming requests on:
try (final ServerSocket ss = new ServerSocket(0)) {
@ -272,7 +244,7 @@ public class SimpleServer extends LuceneTestCase {
System.out.println("\nPORT: " + tcpPort);
final Node node;
if (isPrimary) {
node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null);
node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null, doFlipBitsDuringCopy);
System.out.println("\nCOMMIT VERSION: " + ((PrimaryNode) node).getLastCommitVersion());
} else {
node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);