From 65b033712b921f24ea01c4cf4971d5d3f6e4349e Mon Sep 17 00:00:00 2001 From: Ramkumar Aiyengar Date: Sat, 7 Mar 2015 11:40:37 +0000 Subject: [PATCH] SOLR-6359: Allow customization of the number of records and logs kept by UpdateLog git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1664825 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../apache/solr/cloud/RecoveryStrategy.java | 4 +- .../org/apache/solr/cloud/SyncStrategy.java | 2 +- .../org/apache/solr/update/HdfsUpdateLog.java | 9 ++- .../org/apache/solr/update/UpdateLog.java | 28 ++++++- .../org/apache/solr/search/TestRecovery.java | 80 ++++++++++--------- .../java/org/apache/solr/JSONTestUtil.java | 2 +- 7 files changed, 82 insertions(+), 46 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 4eb1c8d1e1c..2288a5fa7d1 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -141,6 +141,9 @@ New Features * SOLR-7073: Support adding a jar to a collections classpath (Noble Paul) +* SOLR-6359: Allow number of logs and records kept by UpdateLog to be configured + (Ramkumar Aiyengar) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 7e6bdf668ab..688957a0f05 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -259,7 +259,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { UpdateLog.RecentUpdates recentUpdates = null; try { recentUpdates = ulog.getRecentUpdates(); - recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep); + recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep()); } catch (Exception e) { SolrException.log(log, "Corrupt tlog - ignoring. core=" + coreName, e); recentVersions = new ArrayList<>(0); @@ -382,7 +382,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread { // System.out.println("Attempting to PeerSync from " + leaderUrl // + " i am:" + zkController.getNodeName()); PeerSync peerSync = new PeerSync(core, - Collections.singletonList(leaderUrl), ulog.numRecordsToKeep, false, false); + Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false); peerSync.setStartingVersions(recentVersions); boolean syncSuccess = peerSync.sync(); if (syncSuccess) { diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index 1ec4859366e..9625024b6ac 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -163,7 +163,7 @@ public class SyncStrategy { // if we can't reach a replica for sync, we still consider the overall sync a success // TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach // to recover once more? - PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().numRecordsToKeep, true, true, peerSyncOnlyWithActive); + PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive); return peerSync.sync(); } diff --git a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java index b1bf9ff5f2a..c5cce54851c 100644 --- a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java @@ -84,7 +84,12 @@ public class HdfsUpdateLog extends UpdateLog { defaultSyncLevel = SyncLevel.getSyncLevel((String) info.initArgs .get("syncLevel")); - + + numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100); + maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10); + + log.info("Initializing HdfsUpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={}", + dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep); } private Configuration getConf() { @@ -217,7 +222,7 @@ public class HdfsUpdateLog extends UpdateLog { // non-complete tlogs. HdfsUpdateLog.RecentUpdates startingUpdates = getRecentUpdates(); try { - startingVersions = startingUpdates.getVersions(numRecordsToKeep); + startingVersions = startingUpdates.getVersions(getNumRecordsToKeep()); startingOperation = startingUpdates.getLatestOperation(); // populate recent deletes list (since we can't get that info from the diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 16bbf329f77..b852eb62a05 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -130,7 +130,7 @@ public class UpdateLog implements PluginInfoInitialized { protected TransactionLog tlog; protected TransactionLog prevTlog; - protected Deque logs = new LinkedList<>(); // list of recent logs, newest first + protected final Deque logs = new LinkedList<>(); // list of recent logs, newest first protected LinkedList newestLogsOnStartup = new LinkedList<>(); protected int numOldRecords; // number of records in the recent logs @@ -142,7 +142,8 @@ public class UpdateLog implements PluginInfoInitialized { protected final int numDeletesToKeep = 1000; protected final int numDeletesByQueryToKeep = 100; - public final int numRecordsToKeep = 100; + protected int numRecordsToKeep; + protected int maxNumLogsToKeep; // keep track of deletes only... this is not updated on an add protected LinkedHashMap oldDeletes = new LinkedHashMap(numDeletesToKeep) { @@ -215,10 +216,31 @@ public class UpdateLog implements PluginInfoInitialized { return versionInfo; } + public int getNumRecordsToKeep() { + return numRecordsToKeep; + } + + public int getMaxNumLogsToKeep() { + return maxNumLogsToKeep; + } + + protected static int objToInt(Object obj, int def) { + if (obj != null) { + return Integer.parseInt(obj.toString()); + } + else return def; + } + @Override public void init(PluginInfo info) { dataDir = (String)info.initArgs.get("dir"); defaultSyncLevel = SyncLevel.getSyncLevel((String)info.initArgs.get("syncLevel")); + + numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100); + maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10); + + log.info("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={}", + dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep); } /* Note, when this is called, uhandler is not completely constructed. @@ -335,7 +357,7 @@ public class UpdateLog implements PluginInfoInitialized { int nrec = log.numRecords(); // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if // we already have the limit of 10 log files. - if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) { + if (currRecords - nrec >= numRecordsToKeep || (maxNumLogsToKeep > 0 && logs.size() >= maxNumLogsToKeep)) { currRecords -= nrec; numOldRecords -= nrec; logs.removeLast().decref(); // dereference so it will be deleted when no longer in use diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java index 870eb3bfd60..5d6ced464c1 100644 --- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java +++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java @@ -19,7 +19,6 @@ package org.apache.solr.search; import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; -import org.apache.solr.common.SolrException; import org.noggit.ObjectBuilder; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.request.SolrQueryRequest; @@ -28,7 +27,6 @@ import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateHandler; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -46,16 +44,7 @@ import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import org.apache.solr.SolrTestCaseJ4; -import org.apache.solr.request.SolrQueryRequest; -import org.apache.solr.update.DirectUpdateHandler2; -import org.apache.solr.update.UpdateHandler; -import org.apache.solr.update.UpdateLog; import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.noggit.ObjectBuilder; public class TestRecovery extends SolrTestCaseJ4 { @@ -770,46 +759,63 @@ public class TestRecovery extends SolrTestCaseJ4 { createCore(); - int start = 0; - int maxReq = 50; + int numIndexed = 0; + int maxReq = 200; LinkedList versions = new LinkedList<>(); - addDocs(10, start, versions); start+=10; - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); + + int docsPerBatch = 3; + // we don't expect to reach numRecordsToKeep as yet, so the bottleneck is still number of logs to keep + int expectedToRetain = ulog.getMaxNumLogsToKeep() * docsPerBatch; + int versExpected; + + for (int i = 1; i <= ulog.getMaxNumLogsToKeep() + 2; i ++) { + addDocs(docsPerBatch, numIndexed, versions); numIndexed += docsPerBatch; + versExpected = Math.min(numIndexed, expectedToRetain + docsPerBatch); // not yet committed, so one more tlog could slip in + assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected))); + assertU(commit()); + versExpected = Math.min(numIndexed, expectedToRetain); + assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected))); + assertEquals(Math.min(i, ulog.getMaxNumLogsToKeep()), ulog.getLogList(logDir).length); + } + + docsPerBatch = ulog.getNumRecordsToKeep() + 20; + // about to commit a lot of docs, so numRecordsToKeep becomes the bottleneck + expectedToRetain = ulog.getNumRecordsToKeep(); + + addDocs(docsPerBatch, numIndexed, versions); numIndexed+=docsPerBatch; + versExpected = Math.min(numIndexed, expectedToRetain); + assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected))); assertU(commit()); - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); + expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record + versExpected = Math.min(numIndexed, expectedToRetain); + assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, versExpected))); - addDocs(10, start, versions); start+=10; - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); - assertU(commit()); - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); - - assertEquals(2, ulog.getLogList(logDir).length); - - addDocs(105, start, versions); start+=105; - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); - assertU(commit()); - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); - - // previous two logs should be gone now + // previous logs should be gone now assertEquals(1, ulog.getLogList(logDir).length); - addDocs(1, start, versions); start+=1; + addDocs(1, numIndexed, versions); numIndexed+=1; h.close(); createCore(); // trigger recovery, make sure that tlog reference handling is correct // test we can get versions while replay is happening - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); + assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, expectedToRetain))); logReplay.release(1000); assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS)); - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); + expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record made by recovery + assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain))); - addDocs(105, start, versions); start+=105; - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); + docsPerBatch = ulog.getNumRecordsToKeep() + 20; + // about to commit a lot of docs, so numRecordsToKeep becomes the bottleneck + expectedToRetain = ulog.getNumRecordsToKeep(); + + addDocs(docsPerBatch, numIndexed, versions); numIndexed+=docsPerBatch; + assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain))); assertU(commit()); - assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start))); + expectedToRetain = expectedToRetain - 1; // we lose a log entry due to the commit record + assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,expectedToRetain))); // previous logs should be gone now assertEquals(1, ulog.getLogList(logDir).length); @@ -817,7 +823,7 @@ public class TestRecovery extends SolrTestCaseJ4 { // // test that a corrupt tlog file doesn't stop us from coming up, or seeing versions before that tlog file. // - addDocs(1, start, new LinkedList()); // don't add this to the versions list because we are going to lose it... + addDocs(1, numIndexed, new LinkedList()); // don't add this to the versions list because we are going to lose it... h.close(); files = ulog.getLogList(logDir); Arrays.sort(files); @@ -828,7 +834,7 @@ public class TestRecovery extends SolrTestCaseJ4 { ignoreException("Failure to open existing"); createCore(); // we should still be able to get the list of versions (not including the trashed log file) - assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start))); + assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, expectedToRetain))); resetExceptionIgnores(); } finally { diff --git a/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java b/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java index b964358d9c6..72018696f1d 100644 --- a/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java +++ b/solr/test-framework/src/java/org/apache/solr/JSONTestUtil.java @@ -243,7 +243,7 @@ class CollectionTester { if (a >= expectedList.size() || b >=v.size()) { popPath(); - setErr("List size mismatch"); + setErr("List size mismatch (expected: " + expectedList.size() + ", got: " + v.size() + ")"); return false; }