cleanup stale TODOs; fix precommit

This commit is contained in:
Mike McCandless 2016-02-09 13:59:05 -05:00
parent 5640470593
commit 027bc0e4d6
14 changed files with 105 additions and 67 deletions

View File

@ -4738,7 +4738,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
/** @lucene.internal */
/** Record that the files referenced by this {@link SegmentInfos} are still in use.
*
* @lucene.internal */
public synchronized void incRefDeleter(SegmentInfos segmentInfos) throws IOException {
ensureOpen();
deleter.incRef(segmentInfos, false);
@ -4747,7 +4749,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
/** @lucene.internal */
/** Record that the files referenced by this {@link SegmentInfos} are no longer in use. Only call this if you are sure you previously
* called {@link #incRefDeleter}.
*
* @lucene.internal */
public synchronized void decRefDeleter(SegmentInfos segmentInfos) throws IOException {
ensureOpen();
deleter.decRef(segmentInfos);

View File

@ -285,6 +285,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
}
}
/** Read the commit from the provided {@link ChecksumIndexInput}. */
public static final SegmentInfos readCommit(Directory directory, ChecksumIndexInput input, long generation) throws IOException {
// NOTE: as long as we want to throw indexformattooold (vs corruptindexexception), we need
@ -479,6 +480,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
}
}
/** Write ourselves to the provided {@link IndexOutput} */
public void write(Directory directory, IndexOutput out) throws IOException {
CodecUtil.writeIndexHeader(out, "segments", VERSION_CURRENT,
StringHelper.randomId(), Long.toString(generation, Character.MAX_RADIX));
@ -725,8 +727,11 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
this.counter = other.counter;
}
/** Set the generation to be used for the next commit */
public void setNextWriteGeneration(long generation) {
assert generation >= this.generation;
if (generation < this.generation) {
throw new IllegalStateException("cannot decrease generation to " + generation + " from current generation " + this.generation);
}
this.generation = generation;
}
@ -843,6 +848,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
return userData;
}
/** Sets the commit data. */
public void setUserData(Map<String,String> data, boolean doIncrementVersion) {
if (data == null) {
userData = Collections.<String,String>emptyMap();

View File

@ -30,6 +30,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.IOUtils;
/** Default implementation of {@link DirectoryReader}. */
public final class StandardDirectoryReader extends DirectoryReader {
final IndexWriter writer;
@ -336,7 +337,9 @@ public final class StandardDirectoryReader extends DirectoryReader {
return segmentInfos.getVersion();
}
/** @lucene.internal */
/** Return the {@link SegmentInfos} for this reader.
*
* @lucene.internal */
public SegmentInfos getSegmentInfos() {
return segmentInfos;
}

View File

@ -54,7 +54,6 @@ import org.junit.Test;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
@SuppressWarnings("resource")
public class TestNumericDocValuesUpdates extends LuceneTestCase {

View File

@ -29,7 +29,7 @@ import org.apache.lucene.util.IOUtils;
/** Copies one file from an incoming DataInput to a dest filename in a local Directory */
class CopyOneFile implements Closeable {
public class CopyOneFile implements Closeable {
private final DataInput in;
private final IndexOutput out;
private final ReplicaNode dest;

View File

@ -21,7 +21,7 @@ package org.apache.lucene.replicator.nrt;
*
* @lucene.experimental */
class FileMetaData {
public class FileMetaData {
// Header and footer of the file must be identical between primary and replica to consider the files equal:
public final byte[] header;

View File

@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
@ -78,6 +79,9 @@ abstract class Node implements Closeable {
/** When this node was started */
public static final long localStartNS = System.nanoTime();
/** For debug logging */
protected final PrintStream printStream;
// public static final long globalStartNS;
// For debugging:
@ -86,10 +90,11 @@ abstract class Node implements Closeable {
/** File metadata for last sync that succeeded; we use this as a cache */
protected volatile Map<String,FileMetaData> lastFileMetaData;
public Node(int id, Directory dir, SearcherFactory searcherFactory) {
public Node(int id, Directory dir, SearcherFactory searcherFactory, PrintStream printStream) {
this.id = id;
this.dir = dir;
this.searcherFactory = searcherFactory;
this.printStream = printStream;
}
@Override
@ -99,38 +104,42 @@ abstract class Node implements Closeable {
public abstract void commit() throws IOException;
public static void nodeMessage(String message) {
public static void nodeMessage(PrintStream printStream, String message) {
if (printStream != null) {
long now = System.nanoTime();
System.out.println(String.format(Locale.ROOT,
printStream.println(String.format(Locale.ROOT,
"%5.3fs %5.1fs: [%11s] %s",
(now-globalStartNS)/1000000000.,
(now-localStartNS)/1000000000.,
Thread.currentThread().getName(),
message));
}
}
public static void nodeMessage(int id, String message) {
public static void nodeMessage(PrintStream printStream, int id, String message) {
if (printStream != null) {
long now = System.nanoTime();
System.out.println(String.format(Locale.ROOT,
printStream.println(String.format(Locale.ROOT,
"%5.3fs %5.1fs: N%d [%11s] %s",
(now-globalStartNS)/1000000000.,
(now-localStartNS)/1000000000.,
id,
Thread.currentThread().getName(),
message));
}
}
protected void message(String message) {
if (printStream != null) {
long now = System.nanoTime();
System.out.println(String.format(Locale.ROOT,
printStream.println(String.format(Locale.ROOT,
"%5.3fs %5.1fs: %7s %2s [%11s] %s",
(now-globalStartNS)/1000000000.,
(now-localStartNS)/1000000000.,
state, name(),
Thread.currentThread().getName(), message));
}
}
public String name() {
char mode = this instanceof PrimaryNode ? 'P' : 'R';

View File

@ -18,6 +18,7 @@ package org.apache.lucene.replicator.nrt;
*/
import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -67,8 +68,9 @@ public abstract class PrimaryNode extends Node {
private final AtomicInteger copyingCount = new AtomicInteger();
public PrimaryNode(IndexWriter writer, int id, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory) throws IOException {
super(id, writer.getDirectory(), searcherFactory);
public PrimaryNode(IndexWriter writer, int id, long primaryGen, long forcePrimaryVersion,
SearcherFactory searcherFactory, PrintStream printStream) throws IOException {
super(id, writer.getDirectory(), searcherFactory, printStream);
message("top: now init primary");
this.writer = writer;
this.primaryGen = primaryGen;
@ -115,7 +117,7 @@ public abstract class PrimaryNode extends Node {
} catch (Throwable t) {
message("init: exception");
t.printStackTrace(System.out);
t.printStackTrace(printStream);
throw new RuntimeException(t);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.lucene.replicator.nrt;
*/
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -54,7 +55,7 @@ import org.apache.lucene.util.IOUtils;
*
* @lucene.experimental */
abstract class ReplicaNode extends Node {
public abstract class ReplicaNode extends Node {
ReplicaFileDeleter deleter;
@ -79,8 +80,8 @@ abstract class ReplicaNode extends Node {
/** Primary gen last time we successfully replicated: */
protected long lastPrimaryGen;
public ReplicaNode(int id, Directory dir, SearcherFactory searcherFactory) throws IOException {
super(id, dir, searcherFactory);
public ReplicaNode(int id, Directory dir, SearcherFactory searcherFactory, PrintStream printStream) throws IOException {
super(id, dir, searcherFactory, printStream);
if (dir instanceof FSDirectory && ((FSDirectory) dir).checkPendingDeletions()) {
throw new IllegalArgumentException("Directory " + dir + " still has pending deleted files; cannot initialize IndexWriter");
@ -98,7 +99,7 @@ abstract class ReplicaNode extends Node {
deleter = new ReplicaFileDeleter(this, dir);
} catch (Throwable t) {
message("exc on init:");
t.printStackTrace(System.out);
t.printStackTrace(printStream);
throw t;
} finally {
if (success == false) {
@ -307,7 +308,7 @@ abstract class ReplicaNode extends Node {
} catch (Throwable t) {
if (t.getMessage().startsWith("replica cannot start") == false) {
message("exc on start:");
t.printStackTrace(System.out);
t.printStackTrace(printStream);
} else {
dir.close();
}
@ -522,7 +523,7 @@ abstract class ReplicaNode extends Node {
} catch (NodeCommunicationException nce) {
// E.g. primary could crash/close when we are asking it for the copy state:
message("top: ignoring communication exception creating CopyJob: " + nce);
//nce.printStackTrace(System.out);
//nce.printStackTrace(printStream);
if (state.equals("syncing")) {
state = "idle";
}
@ -560,7 +561,7 @@ abstract class ReplicaNode extends Node {
} catch (NodeCommunicationException nce) {
// E.g. primary could crash/close when we are asking it for the copy state:
message("top: ignoring exception starting CopyJob: " + nce);
nce.printStackTrace(System.out);
nce.printStackTrace(printStream);
if (state.equals("syncing")) {
state = "idle";
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentInfos;

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* <h1>Near-real-time replication framework</h1>
*/
package org.apache.lucene.replicator.nrt;

View File

@ -116,7 +116,7 @@ class SimplePrimaryNode extends PrimaryNode {
public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory,
boolean doFlipBitsDuringCopy, boolean doCheckIndexOnClose) throws IOException {
super(initWriter(id, random, indexPath, doCheckIndexOnClose), id, primaryGen, forcePrimaryVersion, searcherFactory);
super(initWriter(id, random, indexPath, doCheckIndexOnClose), id, primaryGen, forcePrimaryVersion, searcherFactory, System.out);
this.tcpPort = tcpPort;
this.random = new Random(random.nextLong());
this.doFlipBitsDuringCopy = doFlipBitsDuringCopy;

View File

@ -64,8 +64,9 @@ class SimpleReplicaNode extends ReplicaNode {
/** Changes over time, as primary node crashes and moves around */
int curPrimaryTCPPort;
public SimpleReplicaNode(Random random, int id, int tcpPort, Path indexPath, long curPrimaryGen, int primaryTCPPort, SearcherFactory searcherFactory, boolean doCheckIndexOnClose) throws IOException {
super(id, getDirectory(random, id, indexPath, doCheckIndexOnClose), searcherFactory);
public SimpleReplicaNode(Random random, int id, int tcpPort, Path indexPath, long curPrimaryGen, int primaryTCPPort,
SearcherFactory searcherFactory, boolean doCheckIndexOnClose) throws IOException {
super(id, getDirectory(random, id, indexPath, doCheckIndexOnClose), searcherFactory, System.out);
this.tcpPort = tcpPort;
this.random = new Random(random.nextLong());
@ -139,7 +140,7 @@ class SimpleReplicaNode extends ReplicaNode {
// Corrupt any index files not referenced by current commit point; this is important (increases test evilness) because we may have done
// a hard crash of the previous JVM writing to this directory and so MDW's corrupt-unknown-files-on-close never ran:
Node.nodeMessage(id, "top: corrupt unknown files");
Node.nodeMessage(System.out, id, "top: corrupt unknown files");
dir.corruptUnknownFiles();
return dir;

View File

@ -50,6 +50,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
import org.apache.lucene.util.LuceneTestCase;
@ -63,29 +64,17 @@ import com.carrotsearch.randomizedtesting.SeedUtils;
TODO
- fangs
- sometimes have one replica be really slow at copying / have random pauses (fake GC) / etc.
- graceful primary close
- why do we do the "rename temp to actual" all at the end...? what really does that buy us?
- replica should also track maxSegmentName its seen, and tap into inflateGens if it's later promoted to primary?
- test should not print scary exceptions and then succeed!
- since all nodes are local, we could have a different test only impl that just does local file copies instead of via tcp...
- are the pre-copied-completed-merged files not being cleared in primary?
- hmm the logic isn't right today? a replica may skip pulling a given copy state, that recorded the finished merged segments?
- beast & fix bugs
- graceful cluster restart
- better translog integration
- get "graceful primary shutdown" working
- there is still some global state we rely on for "correctness", e.g. lastPrimaryVersion
- clean up how version is persisted in commit data
- why am i not using hashes here? how does ES use them?
- later
- since all nodes are local, we could have a different test only impl that just does local file copies instead of via tcp...
- get all other "single shard" functions working too: this cluster should "act like" a single shard
- SLM
- controlled nrt reopen thread / returning long gen on write
- live field values
- add indexes
- make cluster level APIs to search, index, that deal w/ primary failover, etc.
- must prune xlog
- refuse to start primary unless we have quorum
- later
- replica should also track maxSegmentName its seen, and tap into inflateGens if it's later promoted to primary?
- if we named index files using segment's ID we wouldn't have file name conflicts after primary crash / rollback?
- back pressure on indexing if replicas can't keep up?
- get xlog working on top? needs to be checkpointed, so we can correlate IW ops to NRT reader version and prune xlog based on commit
@ -190,6 +179,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
final Set<Integer> crashingNodes = Collections.synchronizedSet(new HashSet<>());
@Nightly
public void test() throws Exception {
Node.globalStartNS = System.nanoTime();