SOLR-9408: Fix TreeMergeOutputFormat to add timestamp metadata to commits

This commit is contained in:
Varun Thacker 2016-09-12 19:11:49 +05:30
parent ab5afedd55
commit 08453fb7f0
5 changed files with 31 additions and 38 deletions

View File

@ -138,6 +138,9 @@ Bug Fixes
* SOLR-9494: Use of {!collapse} sometimes doesn't correctly return true for Collector.needsScores(), especially when the
query was cached. This can cause an exception when 'q' is a SpanQuery or potentially others. (David Smiley)
* SOLR-9408: Fix TreeMergeOutputFormat to add timestamp metadata to a commit. SolrCloud replication relies on this.
(Jessica Cheng Mallet via Varun Thacker)
================== 6.2.0 ==================

View File

@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@ -40,12 +41,11 @@ import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.misc.IndexMergeTool;
import org.apache.lucene.store.Directory;
import org.apache.solr.store.hdfs.HdfsDirectory;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.util.RTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* See {@link IndexMergeTool}.
*/
@ -151,7 +151,7 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
LOG.info("Optimizing Solr: forcing tree merge down to {} segments", maxSegments);
timer = new RTimer();
if (maxSegments < Integer.MAX_VALUE) {
writer.forceMerge(maxSegments);
writer.forceMerge(maxSegments);
// TODO: consider perf enhancement for no-deletes merges: bulk-copy the postings data
// see http://lucene.472066.n3.nabble.com/Experience-with-large-merge-factors-tp1637832p1647046.html
}
@ -161,6 +161,10 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
}
LOG.info("Optimizing Solr: done forcing tree merge down to {} segments in {}ms", maxSegments, timer.getTime());
// Set Solr's commit data so the created index is usable by SolrCloud. E.g. Currently SolrCloud relies on
// commitTimeMSec in the commit data to do replication.
SolrIndexWriter.setCommitData(writer);
timer = new RTimer();
LOG.info("Optimizing Solr: Closing index writer");
writer.close();

View File

@ -20,10 +20,8 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.LongAdder;
@ -47,7 +45,6 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
@ -516,19 +513,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
return rc;
}
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
/*
Also see SolrIndexSplitter.setCommitData
*/
private void setCommitData(IndexWriter iw) {
final Map<String,String> commitData = new HashMap<>();
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,
String.valueOf(System.currentTimeMillis()));
iw.setLiveCommitData(commitData.entrySet());
}
public void prepareCommit(CommitUpdateCommand cmd) throws IOException {
boolean error=true;
@ -537,7 +522,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
log.info("start "+cmd);
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
setCommitData(iw.get());
SolrIndexWriter.setCommitData(iw.get());
iw.get().prepareCommit();
} finally {
iw.decref();
@ -618,7 +603,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// SolrCore.verbose("writer.commit() start writer=",writer);
if (writer.hasUncommittedChanges()) {
setCommitData(writer);
SolrIndexWriter.setCommitData(writer);
writer.commit();
} else {
log.info("No uncommitted changes. Skipping IW.commit.");
@ -803,7 +788,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
// todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't be used)
setCommitData(writer);
SolrIndexWriter.setCommitData(writer);
writer.commit();
synchronized (solrCoreState.getUpdateLock()) {

View File

@ -19,17 +19,15 @@ package org.apache.solr.update;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.FilterCodecReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SlowCodecReaderWrapper;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
@ -42,7 +40,6 @@ import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.HashBasedRouter;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.BitsFilteredPostingsEnum;
@ -140,7 +137,7 @@ public class SolrIndexSplitter {
// we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
// because the sub-shard cores will just ignore such a commit because the update log is not
// in active state at this time.
setCommitData(iw);
SolrIndexWriter.setCommitData(iw);
iw.commit();
success = true;
} finally {
@ -159,14 +156,6 @@ public class SolrIndexSplitter {
}
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
private void setCommitData(IndexWriter iw) {
final Map<String,String> commitData = new HashMap<>();
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
iw.setLiveCommitData(commitData.entrySet());
}
FixedBitSet[] split(LeafReaderContext readerContext) throws IOException {
LeafReader reader = readerContext.reader();
FixedBitSet[] docSets = new FixedBitSet[numPieces];

View File

@ -18,6 +18,8 @@ package org.apache.solr.update;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.codecs.Codec;
@ -27,8 +29,9 @@ import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.IndexSchema;
import org.slf4j.Logger;
@ -86,7 +89,16 @@ public class SolrIndexWriter extends IndexWriter {
this.directory = directory;
numOpens.incrementAndGet();
}
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
public static void setCommitData(IndexWriter iw) {
log.info("Calling setCommitData with IW:" + iw.toString());
final Map<String,String> commitData = new HashMap<>();
commitData.put(COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
iw.setLiveCommitData(commitData.entrySet());
}
private void setDirectoryFactory(DirectoryFactory factory) {
this.directoryFactory = factory;
}