SOLR-6359: Allow customization of the number of records and logs kept by UpdateLog

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1664825 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ramkumar Aiyengar 2015-03-07 11:40:37 +00:00
parent a0ff7fdef2
commit 65b033712b
7 changed files with 82 additions and 46 deletions

View File

@ -141,6 +141,9 @@ New Features
* SOLR-7073: Support adding a jar to a collections classpath (Noble Paul) * SOLR-7073: Support adding a jar to a collections classpath (Noble Paul)
* SOLR-6359: Allow number of logs and records kept by UpdateLog to be configured
(Ramkumar Aiyengar)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -259,7 +259,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
UpdateLog.RecentUpdates recentUpdates = null; UpdateLog.RecentUpdates recentUpdates = null;
try { try {
recentUpdates = ulog.getRecentUpdates(); recentUpdates = ulog.getRecentUpdates();
recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep); recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
} catch (Exception e) { } catch (Exception e) {
SolrException.log(log, "Corrupt tlog - ignoring. core=" + coreName, e); SolrException.log(log, "Corrupt tlog - ignoring. core=" + coreName, e);
recentVersions = new ArrayList<>(0); recentVersions = new ArrayList<>(0);
@ -382,7 +382,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
// System.out.println("Attempting to PeerSync from " + leaderUrl // System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName()); // + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core, PeerSync peerSync = new PeerSync(core,
Collections.singletonList(leaderUrl), ulog.numRecordsToKeep, false, false); Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
peerSync.setStartingVersions(recentVersions); peerSync.setStartingVersions(recentVersions);
boolean syncSuccess = peerSync.sync(); boolean syncSuccess = peerSync.sync();
if (syncSuccess) { if (syncSuccess) {

View File

@ -163,7 +163,7 @@ public class SyncStrategy {
// if we can't reach a replica for sync, we still consider the overall sync a success // if we can't reach a replica for sync, we still consider the overall sync a success
// TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach // TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
// to recover once more? // to recover once more?
PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep, true, true, peerSyncOnlyWithActive); PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive);
return peerSync.sync(); return peerSync.sync();
} }

View File

@ -84,7 +84,12 @@ public class HdfsUpdateLog extends UpdateLog {
defaultSyncLevel = SyncLevel.getSyncLevel((String) info.initArgs defaultSyncLevel = SyncLevel.getSyncLevel((String) info.initArgs
.get("syncLevel")); .get("syncLevel"));
numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100);
maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10);
log.info("Initializing HdfsUpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={}",
dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep);
} }
private Configuration getConf() { private Configuration getConf() {
@ -217,7 +222,7 @@ public class HdfsUpdateLog extends UpdateLog {
// non-complete tlogs. // non-complete tlogs.
HdfsUpdateLog.RecentUpdates startingUpdates = getRecentUpdates(); HdfsUpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
try { try {
startingVersions = startingUpdates.getVersions(numRecordsToKeep); startingVersions = startingUpdates.getVersions(getNumRecordsToKeep());
startingOperation = startingUpdates.getLatestOperation(); startingOperation = startingUpdates.getLatestOperation();
// populate recent deletes list (since we can't get that info from the // populate recent deletes list (since we can't get that info from the

View File

@ -130,7 +130,7 @@ public class UpdateLog implements PluginInfoInitialized {
protected TransactionLog tlog; protected TransactionLog tlog;
protected TransactionLog prevTlog; protected TransactionLog prevTlog;
protected Deque<TransactionLog> logs = new LinkedList<>(); // list of recent logs, newest first protected final Deque<TransactionLog> logs = new LinkedList<>(); // list of recent logs, newest first
protected LinkedList<TransactionLog> newestLogsOnStartup = new LinkedList<>(); protected LinkedList<TransactionLog> newestLogsOnStartup = new LinkedList<>();
protected int numOldRecords; // number of records in the recent logs protected int numOldRecords; // number of records in the recent logs
@ -142,7 +142,8 @@ public class UpdateLog implements PluginInfoInitialized {
protected final int numDeletesToKeep = 1000; protected final int numDeletesToKeep = 1000;
protected final int numDeletesByQueryToKeep = 100; protected final int numDeletesByQueryToKeep = 100;
public final int numRecordsToKeep = 100; protected int numRecordsToKeep;
protected int maxNumLogsToKeep;
// keep track of deletes only... this is not updated on an add // keep track of deletes only... this is not updated on an add
protected LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) { protected LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
@ -215,10 +216,31 @@ public class UpdateLog implements PluginInfoInitialized {
return versionInfo; return versionInfo;
} }
public int getNumRecordsToKeep() {
return numRecordsToKeep;
}
public int getMaxNumLogsToKeep() {
return maxNumLogsToKeep;
}
protected static int objToInt(Object obj, int def) {
if (obj != null) {
return Integer.parseInt(obj.toString());
}
else return def;
}
@Override @Override
public void init(PluginInfo info) { public void init(PluginInfo info) {
dataDir = (String)info.initArgs.get("dir"); dataDir = (String)info.initArgs.get("dir");
defaultSyncLevel = SyncLevel.getSyncLevel((String)info.initArgs.get("syncLevel")); defaultSyncLevel = SyncLevel.getSyncLevel((String)info.initArgs.get("syncLevel"));
numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100);
maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10);
log.info("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={}",
dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep);
} }
/* Note, when this is called, uhandler is not completely constructed. /* Note, when this is called, uhandler is not completely constructed.
@ -335,7 +357,7 @@ public class UpdateLog implements PluginInfoInitialized {
int nrec = log.numRecords(); int nrec = log.numRecords();
// remove oldest log if we don't need it to keep at least numRecordsToKeep, or if // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
// we already have the limit of 10 log files. // we already have the limit of 10 log files.
if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) { if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) {
currRecords -= nrec; currRecords -= nrec;
numOldRecords -= nrec; numOldRecords -= nrec;
logs.removeLast().decref(); // dereference so it will be deleted when no longer in use logs.removeLast().decref(); // dereference so it will be deleted when no longer in use

View File

@ -19,7 +19,6 @@ package org.apache.solr.search;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import org.apache.solr.common.SolrException;
import org.noggit.ObjectBuilder; import org.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
@ -28,7 +27,6 @@ import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateHandler; import org.apache.solr.update.UpdateHandler;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
@ -46,16 +44,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.noggit.ObjectBuilder;
public class TestRecovery extends SolrTestCaseJ4 { public class TestRecovery extends SolrTestCaseJ4 {
@ -770,46 +759,63 @@ public class TestRecovery extends SolrTestCaseJ4 {
createCore(); createCore();
int start = 0; int numIndexed = 0;
int maxReq = 50; int maxReq = 200;
LinkedList<Long> versions = new LinkedList<>(); LinkedList<Long> versions = new LinkedList<>();
addDocs(10, start, versions); start+=10;
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); int docsPerBatch = 3;
// we don't expect to reach numRecordsToKeep as yet, so the bottleneck is still number of logs to keep
int expectedToRetain = ulog.getMaxNumLogsToKeep() * docsPerBatch;
int versExpected;
for (int i = 1; i <= ulog.getMaxNumLogsToKeep() + 2; i ++) {
addDocs(docsPerBatch, numIndexed, versions); numIndexed += docsPerBatch;
versExpected = Math.min(numIndexed, expectedToRetain + docsPerBatch); // not yet committed, so one more tlog could slip in
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected)));
assertU(commit());
versExpected = Math.min(numIndexed, expectedToRetain);
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected)));
assertEquals(Math.min(i, ulog.getMaxNumLogsToKeep()), ulog.getLogList(logDir).length);
}
docsPerBatch = ulog.getNumRecordsToKeep() + 20;
// about to commit a lot of docs, so numRecordsToKeep becomes the bottleneck
expectedToRetain = ulog.getNumRecordsToKeep();
addDocs(docsPerBatch, numIndexed, versions); numIndexed+=docsPerBatch;
versExpected = Math.min(numIndexed, expectedToRetain);
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected)));
assertU(commit()); assertU(commit());
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record
versExpected = Math.min(numIndexed, expectedToRetain);
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected)));
addDocs(10, start, versions); start+=10; // previous logs should be gone now
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
assertU(commit());
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
assertEquals(2, ulog.getLogList(logDir).length);
addDocs(105, start, versions); start+=105;
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
assertU(commit());
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
// previous two logs should be gone now
assertEquals(1, ulog.getLogList(logDir).length); assertEquals(1, ulog.getLogList(logDir).length);
addDocs(1, start, versions); start+=1; addDocs(1, numIndexed, versions); numIndexed+=1;
h.close(); h.close();
createCore(); // trigger recovery, make sure that tlog reference handling is correct createCore(); // trigger recovery, make sure that tlog reference handling is correct
// test we can get versions while replay is happening // test we can get versions while replay is happening
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, expectedToRetain)));
logReplay.release(1000); logReplay.release(1000);
assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS)); assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record made by recovery
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain)));
addDocs(105, start, versions); start+=105; docsPerBatch = ulog.getNumRecordsToKeep() + 20;
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); // about to commit a lot of docs, so numRecordsToKeep becomes the bottleneck
expectedToRetain = ulog.getNumRecordsToKeep();
addDocs(docsPerBatch, numIndexed, versions); numIndexed+=docsPerBatch;
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain)));
assertU(commit()); assertU(commit());
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record
assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain)));
// previous logs should be gone now // previous logs should be gone now
assertEquals(1, ulog.getLogList(logDir).length); assertEquals(1, ulog.getLogList(logDir).length);
@ -817,7 +823,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
// //
// test that a corrupt tlog file doesn't stop us from coming up, or seeing versions before that tlog file. // test that a corrupt tlog file doesn't stop us from coming up, or seeing versions before that tlog file.
// //
addDocs(1, start, new LinkedList<Long>()); // don't add this to the versions list because we are going to lose it... addDocs(1, numIndexed, new LinkedList<Long>()); // don't add this to the versions list because we are going to lose it...
h.close(); h.close();
files = ulog.getLogList(logDir); files = ulog.getLogList(logDir);
Arrays.sort(files); Arrays.sort(files);
@ -828,7 +834,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
ignoreException("Failure to open existing"); ignoreException("Failure to open existing");
createCore(); createCore();
// we should still be able to get the list of versions (not including the trashed log file) // we should still be able to get the list of versions (not including the trashed log file)
assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start))); assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, expectedToRetain)));
resetExceptionIgnores(); resetExceptionIgnores();
} finally { } finally {

View File

@ -243,7 +243,7 @@ class CollectionTester {
if (a >= expectedList.size() || b >=v.size()) { if (a >= expectedList.size() || b >=v.size()) {
popPath(); popPath();
setErr("List size mismatch"); setErr("List size mismatch (expected: " + expectedList.size() + ", got: " + v.size() + ")");
return false; return false;
} }