SOLR-11277: Add auto hard commit setting based on tlog size (this closes #358)

This commit is contained in:
Anshum Gupta 2018-05-03 15:00:47 -07:00
parent ab11867364
commit b617489638
10 changed files with 642 additions and 20 deletions

View File

@ -110,6 +110,8 @@ New Features
* SOLR-11278: Add IgnoreLargeDocumentProcessFactory (Cao Manh Dat, David Smiley)
* SOLR-11277: Add auto hard-commit settings based on tlog size (Rupa Shankar, Anshum Gupta)
Bug Fixes
----------------------

View File

@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathConstants;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.util.Version;
@ -448,6 +449,7 @@ public class SolrConfig extends Config implements MapSerializable {
return new UpdateHandlerInfo(get("updateHandler/@class", null),
getInt("updateHandler/autoCommit/maxDocs", -1),
getInt("updateHandler/autoCommit/maxTime", -1),
convertHeapOptionStyleConfigStringToBytes(get("updateHandler/autoCommit/maxSize", "")),
getBool("updateHandler/indexWriter/closeWaitsForMerges", true),
getBool("updateHandler/autoCommit/openSearcher", true),
getInt("updateHandler/autoSoftCommit/maxDocs", -1),
@ -455,6 +457,44 @@ public class SolrConfig extends Config implements MapSerializable {
getBool("updateHandler/commitWithin/softCommit", true));
}
/**
* Converts a Java heap option-like config string to bytes. Valid suffixes are: 'k', 'm', 'g'
* (case insensitive). If there is no suffix, the default unit is bytes.
* For example, 50k = 50KB, 20m = 20MB, 4g = 4GB, 300 = 300 bytes
* @param configStr the config setting to parse
* @return the size, in bytes. -1 if the given config string is empty
*/
protected static long convertHeapOptionStyleConfigStringToBytes(String configStr) {
if (configStr.isEmpty()) {
return -1;
}
long multiplier = 1;
String numericValueStr = configStr;
char suffix = Character.toLowerCase(configStr.charAt(configStr.length() - 1));
if (Character.isLetter(suffix)) {
if (suffix == 'k') {
multiplier = FileUtils.ONE_KB;
}
else if (suffix == 'm') {
multiplier = FileUtils.ONE_MB;
}
else if (suffix == 'g') {
multiplier = FileUtils.ONE_GB;
} else {
throw new RuntimeException("Invalid suffix. Valid suffixes are 'k' (KB), 'm' (MB), 'g' (G). "
+ "No suffix means the amount is in bytes. ");
}
numericValueStr = configStr.substring(0, configStr.length() - 1);
}
try {
return Long.parseLong(numericValueStr) * multiplier;
} catch (NumberFormatException e) {
throw new RuntimeException("Invalid format. The config setting should be a long with an "
+ "optional letter suffix. Valid suffixes are 'k' (KB), 'm' (MB), 'g' (G). "
+ "No suffix means the amount is in bytes.");
}
}
private void loadPluginInfo(SolrPluginInfo pluginInfo) {
boolean requireName = pluginInfo.options.contains(REQUIRE_NAME);
boolean requireClass = pluginInfo.options.contains(REQUIRE_CLASS);
@ -631,6 +671,7 @@ public class SolrConfig extends Config implements MapSerializable {
public final String className;
public final int autoCommmitMaxDocs, autoCommmitMaxTime,
autoSoftCommmitMaxDocs, autoSoftCommmitMaxTime;
public final long autoCommitMaxSizeBytes;
public final boolean indexWriterCloseWaitsForMerges;
public final boolean openSearcher; // is opening a new searcher part of hard autocommit?
public final boolean commitWithinSoftCommit;
@ -638,12 +679,14 @@ public class SolrConfig extends Config implements MapSerializable {
/**
* @param autoCommmitMaxDocs set -1 as default
* @param autoCommmitMaxTime set -1 as default
* @param autoCommitMaxSize set -1 as default
*/
public UpdateHandlerInfo(String className, int autoCommmitMaxDocs, int autoCommmitMaxTime, boolean indexWriterCloseWaitsForMerges, boolean openSearcher,
public UpdateHandlerInfo(String className, int autoCommmitMaxDocs, int autoCommmitMaxTime, long autoCommitMaxSize, boolean indexWriterCloseWaitsForMerges, boolean openSearcher,
int autoSoftCommmitMaxDocs, int autoSoftCommmitMaxTime, boolean commitWithinSoftCommit) {
this.className = className;
this.autoCommmitMaxDocs = autoCommmitMaxDocs;
this.autoCommmitMaxTime = autoCommmitMaxTime;
this.autoCommitMaxSizeBytes = autoCommitMaxSize;
this.indexWriterCloseWaitsForMerges = indexWriterCloseWaitsForMerges;
this.openSearcher = openSearcher;

View File

@ -49,10 +49,13 @@ public final class CommitTracker implements Runnable {
// scheduler delay for maxDoc-triggered autocommits
public static final int DOC_COMMIT_DELAY_MS = 1;
// scheduler delay for maxSize-triggered autocommits
public static final int SIZE_COMMIT_DELAY_MS = 1;
// settings, not final so we can change them in testing
private int docsUpperBound;
private long timeUpperBound;
private long tLogFileSizeUpperBound;
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1, new DefaultSolrThreadFactory("commitScheduler"));
@ -70,13 +73,15 @@ public final class CommitTracker implements Runnable {
private String name;
public CommitTracker(String name, SolrCore core, int docsUpperBound, int timeUpperBound, boolean openSearcher, boolean softCommit) {
public CommitTracker(String name, SolrCore core, int docsUpperBound, int timeUpperBound, long tLogFileSizeUpperBound,
boolean openSearcher, boolean softCommit) {
this.core = core;
this.name = name;
pending = null;
this.docsUpperBound = docsUpperBound;
this.timeUpperBound = timeUpperBound;
this.tLogFileSizeUpperBound = tLogFileSizeUpperBound;
this.softCommit = softCommit;
this.openSearcher = openSearcher;
@ -154,9 +159,34 @@ public final class CommitTracker implements Runnable {
/**
* Indicate that documents have been added
* @param commitWithin amount of time (in ms) within which a commit should be scheduled
*/
public void addedDocument(int commitWithin) {
// maxDocs-triggered autoCommit. Use == instead of > so we only trigger once on the way up
addedDocument(commitWithin, -1);
}
/**
* Indicate that documents have been added
* @param commitWithin amount of time (in ms) within which a commit should be scheduled
* @param currentTlogSize current tlog size (in bytes). Use -1 if we don't want to check for a max size triggered commit
*/
public void addedDocument(int commitWithin, long currentTlogSize) {
// maxDocs-triggered autoCommit
_scheduleMaxDocsTriggeredCommitIfNeeded();
// maxTime-triggered autoCommit
_scheduleCommitWithinIfNeeded(commitWithin);
// maxSize-triggered autoCommit
_scheduleMaxSizeTriggeredCommitIfNeeded(currentTlogSize);
}
/**
* If a doc size upper bound is set, and the current number of documents has exceeded it, then
* schedule a commit and reset the counter
*/
private void _scheduleMaxDocsTriggeredCommitIfNeeded() {
// Use == instead of > so we only trigger once on the way up
if (docsUpperBound > 0) {
long docs = docsSinceCommit.incrementAndGet();
if (docs == docsUpperBound + 1) {
@ -165,9 +195,6 @@ public final class CommitTracker implements Runnable {
_scheduleCommitWithin(DOC_COMMIT_DELAY_MS);
}
}
// maxTime-triggered autoCommit
_scheduleCommitWithinIfNeeded(commitWithin);
}
/**
@ -177,6 +204,26 @@ public final class CommitTracker implements Runnable {
_scheduleCommitWithinIfNeeded(commitWithin);
}
/**
* If the given current tlog size is greater than the file size upper bound, then schedule a commit
* @param currentTlogSize current tlog size (in bytes)
*/
public void scheduleMaxSizeTriggeredCommitIfNeeded(long currentTlogSize) {
_scheduleMaxSizeTriggeredCommitIfNeeded(currentTlogSize);
}
/**
* If the given current tlog size is greater than the file size upper bound, then schedule a commit
* @param currentTlogSize current tlog size (in bytes)
*/
private void _scheduleMaxSizeTriggeredCommitIfNeeded(long currentTlogSize) {
if (tLogFileSizeUpperBound > 0 && currentTlogSize > tLogFileSizeUpperBound) {
docsSinceCommit.set(0);
_scheduleCommitWithin(SIZE_COMMIT_DELAY_MS);
}
}
/** Inform tracker that a commit has occurred */
public void didCommit() {
}
@ -254,6 +301,10 @@ public final class CommitTracker implements Runnable {
return docsUpperBound;
}
long getTLogFileSizeUpperBound() {
return tLogFileSizeUpperBound;
}
void setDocsUpperBound(int docsUpperBound) {
this.docsUpperBound = docsUpperBound;
}
@ -263,6 +314,11 @@ public final class CommitTracker implements Runnable {
this.timeUpperBound = timeUpperBound;
}
// only for testing - not thread safe
public void setTLogFileSizeUpperBound(int sizeUpperBound) {
this.tLogFileSizeUpperBound = sizeUpperBound;
}
// only for testing - not thread safe
public void setOpenSearcher(boolean openSearcher) {
this.openSearcher = openSearcher;

View File

@ -73,6 +73,9 @@ import org.slf4j.LoggerFactory;
* directly to the main Lucene index as opposed to adding to a separate smaller index.
*/
public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState.IndexWriterCloser, SolrMetricProducer {
private static final int NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER = -1;
protected final SolrCoreState solrCoreState;
// stats
@ -118,13 +121,14 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig()
.getUpdateHandlerInfo();
int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs; // getInt("updateHandler/autoCommit/maxDocs", -1);
int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime; // getInt("updateHandler/autoCommit/maxTime", -1);
commitTracker = new CommitTracker("Hard", core, docsUpperBound, timeUpperBound, updateHandlerInfo.openSearcher, false);
int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs;
int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime;
long fileSizeUpperBound = updateHandlerInfo.autoCommitMaxSizeBytes;
commitTracker = new CommitTracker("Hard", core, docsUpperBound, timeUpperBound, fileSizeUpperBound, updateHandlerInfo.openSearcher, false);
int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs; // getInt("updateHandler/autoSoftCommit/maxDocs", -1);
int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime; // getInt("updateHandler/autoSoftCommit/maxTime", -1);
softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, true, true);
int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs;
int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime;
softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER, true, true);
commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;
indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges;
@ -143,13 +147,14 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig()
.getUpdateHandlerInfo();
int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs; // getInt("updateHandler/autoCommit/maxDocs", -1);
int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime; // getInt("updateHandler/autoCommit/maxTime", -1);
commitTracker = new CommitTracker("Hard", core, docsUpperBound, timeUpperBound, updateHandlerInfo.openSearcher, false);
int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs;
int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime;
long fileSizeUpperBound = updateHandlerInfo.autoCommitMaxSizeBytes;
commitTracker = new CommitTracker("Hard", core, docsUpperBound, timeUpperBound, fileSizeUpperBound, updateHandlerInfo.openSearcher, false);
int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs; // getInt("updateHandler/autoSoftCommit/maxDocs", -1);
int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime; // getInt("updateHandler/autoSoftCommit/maxTime", -1);
softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, updateHandlerInfo.openSearcher, true);
int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs;
int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime;
softCommitTracker = new CommitTracker("Soft", core, softCommitDocsUpperBound, softCommitTimeUpperBound, NO_FILE_SIZE_UPPER_BOUND_PLACEHOLDER, updateHandlerInfo.openSearcher, true);
commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;
indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges;
@ -178,6 +183,10 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
manager.registerGauge(this, registryName, () -> "" + commitTracker.getTimeUpperBound() + "ms", tag, true, "autoCommitMaxTime",
getCategory().toString(), scope);
}
if (commitTracker.getTLogFileSizeUpperBound() > 0) {
manager.registerGauge(this, registryName, () -> commitTracker.getTLogFileSizeUpperBound(), tag, true, "autoCommitMaxSize",
getCategory().toString(), scope);
}
if (softCommitTracker.getDocsUpperBound() > 0) {
manager.registerGauge(this, registryName, () -> softCommitTracker.getDocsUpperBound(), tag, true, "softAutoCommitMaxDocs",
getCategory().toString(), scope);
@ -279,12 +288,13 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
long currentTlogSize = getCurrentTLogSize();
if (commitWithinSoftCommit) {
commitTracker.addedDocument(-1);
commitTracker.addedDocument(-1, currentTlogSize);
softCommitTracker.addedDocument(cmd.commitWithin);
} else {
softCommitTracker.addedDocument(-1);
commitTracker.addedDocument(cmd.commitWithin);
commitTracker.addedDocument(cmd.commitWithin, currentTlogSize);
}
}
@ -419,6 +429,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
}
long currentTlogSize = getCurrentTLogSize();
commitTracker.scheduleMaxSizeTriggeredCommitIfNeeded(currentTlogSize);
if (softCommitTracker.getTimeUpperBound() > 0) {
softCommitTracker.scheduleCommitWithin(softCommitTracker
.getTimeUpperBound());
@ -990,6 +1003,10 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
return solrCoreState;
}
private long getCurrentTLogSize() {
return ulog != null && ulog.hasUncommittedChanges() ? ulog.getCurrentLogSizeFromStream() : -1;
}
// allow access for tests
public CommitTracker getCommitTracker() {
return commitTracker;

View File

@ -630,6 +630,13 @@ public class TransactionLog implements Closeable {
return 0;
}
/**
* @return the FastOutputStream size
*/
public synchronized long getLogSizeFromStream() {
return fos.size();
}
/** Returns a reader that can be used while a log is still in use.
* Currently only *one* LogReader may be outstanding, and that log may only
* be used from a single thread. */

View File

@ -292,6 +292,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
return size;
}
/**
* @return the current transaction log's size (based on its output stream)
*/
public long getCurrentLogSizeFromStream() {
return tlog.getLogSizeFromStream();
}
public long getTotalLogsNumber() {
synchronized (this) {
return logs.size();

View File

@ -0,0 +1,52 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- Minimal solrconfig.xml with basic autoCommit settings, but without a valid "autoCommit" tag, to test
autoCommit-related defaults -->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<schemaFactory class="ClassicIndexSchemaFactory"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateHandler class="solr.DirectUpdateHandler2">
<!-- autocommit pending docs if certain criteria are met -->
<!-- <autoCommit> -->
<maxSize>5k</maxSize>
<!-- </autoCommit> -->
<updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
</updateHandler>
<requestHandler name="/select" class="solr.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>
<str name="indent">true</str>
<str name="df">text</str>
</lst>
</requestHandler>
</config>

View File

@ -54,6 +54,10 @@
</peerSync>
<updateHandler class="solr.DirectUpdateHandler2">
<!-- autocommit pending docs if certain criteria are met -->
<autoCommit>
<maxSize>${solr.autoCommit.maxSize:}</maxSize>
</autoCommit>
<updateLog class="${solr.tests.ulog:solr.UpdateLog}">
<str name="dir">${solr.ulog.dir:}</str>
<str name="maxNumLogsToKeep">${solr.ulog.maxNumLogsToKeep:10}</str>

View File

@ -30,6 +30,7 @@ import org.apache.solr.handler.admin.ShowFileRequestHandler;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.IndexSchemaFactory;
import org.apache.solr.update.SolrIndexConfig;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Node;
@ -181,6 +182,41 @@ public class TestConfig extends SolrTestCaseJ4 {
assertEquals("numDefaultsTested vs. numDefaultsMapped+numNullDefaults ="+sic.toMap(new LinkedHashMap<>()).keySet(), numDefaultsTested, numDefaultsMapped+numNullDefaults);
}
@Test
public void testConvertAutoCommitMaxSizeStringToBytes() {
// Valid values
Assert.assertEquals(300, SolrConfig.convertHeapOptionStyleConfigStringToBytes("300"));
Assert.assertEquals(307200, SolrConfig.convertHeapOptionStyleConfigStringToBytes("300k"));
Assert.assertEquals(307200, SolrConfig.convertHeapOptionStyleConfigStringToBytes("300K"));
Assert.assertEquals(314572800, SolrConfig.convertHeapOptionStyleConfigStringToBytes("300m"));
Assert.assertEquals(314572800, SolrConfig.convertHeapOptionStyleConfigStringToBytes("300M"));
Assert.assertEquals(322122547200L, SolrConfig.convertHeapOptionStyleConfigStringToBytes("300g"));
Assert.assertEquals(322122547200L, SolrConfig.convertHeapOptionStyleConfigStringToBytes("300G"));
Assert.assertEquals(-1, SolrConfig.convertHeapOptionStyleConfigStringToBytes(""));
// Invalid values
try {
SolrConfig.convertHeapOptionStyleConfigStringToBytes("3jbk32k"); // valid suffix but non-numeric prefix
Assert.fail();
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains("Invalid"));
}
try {
SolrConfig.convertHeapOptionStyleConfigStringToBytes("300x"); // valid prefix but invalid suffix
Assert.fail();
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains("Invalid"));
}
}
@Test
public void testMaxSizeSettingWithoutAutoCommit() throws Exception {
SolrConfig solrConfig = new SolrConfig(new SolrResourceLoader(TEST_PATH().resolve("collection1")), "bad-solrconfig-no-autocommit-tag.xml", null);
Assert.assertEquals(-1, solrConfig.getUpdateHandlerInfo().autoCommitMaxSizeBytes);
Assert.assertEquals(-1, solrConfig.getUpdateHandlerInfo().autoCommmitMaxDocs);
Assert.assertEquals(-1, solrConfig.getUpdateHandlerInfo().autoCommmitMaxTime);
}
// sanity check that sys propertis are working as expected
public void testSanityCheckTestSysPropsAreUsed() throws Exception {

View File

@ -0,0 +1,398 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.UpdateRequestHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryRequestBase;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class MaxSizeAutoCommitTest extends SolrTestCaseJ4 {
// Given an ID, returns an XML string for an "add document" request
private static final Function<Integer, String> ADD_DOC_FN = (id) -> adoc("id", Integer.toString(id));
// Given an ID, returns an XML string for a "delete document" request
private static final Function<Integer, String> DELETE_DOC_FN = (id) -> delI(Integer.toString(id));
private ObjectMapper objectMapper; // for JSON parsing
private SolrCore core;
private DirectUpdateHandler2 updateHandler;
private CommitTracker hardCommitTracker;
private UpdateRequestHandler updateRequestHandler;
private String tlogDirPath;
@Before
public void setup() throws Exception {
objectMapper = new ObjectMapper();
System.setProperty("solr.autoCommit.maxSize", "5k");
System.setProperty("solr.ulog", "solr.UpdateLog");
initCore("solrconfig-tlog.xml", "schema.xml");
core = h.getCore();
updateHandler = (DirectUpdateHandler2) core.getUpdateHandler();
hardCommitTracker = updateHandler.commitTracker;
// Only testing file-size based auto hard commits - disable other checks
hardCommitTracker.setTimeUpperBound(-1);
hardCommitTracker.setDocsUpperBound(-1);
updateRequestHandler = new UpdateRequestHandler();
updateRequestHandler.init( null );
tlogDirPath = core.getDataDir() + "/tlog";
}
@After
public void tearDown() throws Exception {
super.tearDown();
System.clearProperty("solr.autoCommit.maxSize");
System.clearProperty("solr.ulog");
deleteCore();
}
@Test
public void simpleTest() throws Exception {
int maxFileSizeBound = 1000;
int maxFileSizeBoundWithBuffer = (int) (maxFileSizeBound * 1.25);
// Set max size bound
hardCommitTracker.setTLogFileSizeUpperBound(maxFileSizeBound);
// Adding these docs will place the tlog size just under the threshold
int numDocs = 27;
int batchSize = 3;
int numBatches = numDocs / batchSize;
SolrQueryResponse updateResp = new SolrQueryResponse();
int numTlogs = -1;
TreeMap<String, Long> tlogsInfo = null;
for (int batchCounter = 0; batchCounter < numBatches; batchCounter++) {
int docStartId = batchSize * batchCounter;
// Send batch update request
updateRequestHandler.handleRequest(constructBatchAddDocRequest(docStartId, batchSize), updateResp);
// The sleep is to allow existing commits to finish (or at least mostly finish) before querying/submitting more documents
waitForCommit(200);
// There should just be 1 tlog and its size should be within the (buffered) file size bound
tlogsInfo = getTlogFileSizes(tlogDirPath, maxFileSizeBoundWithBuffer);
numTlogs = parseTotalNumTlogs(tlogsInfo);
Assert.assertEquals(1, numTlogs);
}
// Now that the core's tlog size is just under the threshold, one more update should induce a commit
int docStartId = batchSize * numBatches;
updateRequestHandler.handleRequest(constructBatchAddDocRequest(docStartId, batchSize), updateResp);
waitForCommit(200);
// Verify that a commit happened. There should now be 2 tlogs, both of which are < maxFileSizeBound.
TreeMap<String, Long> tlogsInfoPostCommit = getTlogFileSizes(tlogDirPath, maxFileSizeBoundWithBuffer);
Assert.assertEquals(2, parseTotalNumTlogs(tlogsInfoPostCommit));
// And the current tlog's size should be less than the previous tlog's size
Assert.assertTrue(tlogsInfoPostCommit.lastEntry().getValue() < tlogsInfo.lastEntry().getValue());
}
@Test
public void testRedundantDeletes() throws Exception {
int maxFileSizeBound = 1000;
int maxFileSizeBoundWithBuffer = (int) (maxFileSizeBound * 1.25);
// Set max size bound
hardCommitTracker.setTLogFileSizeUpperBound(maxFileSizeBound);
// Add docs
int numDocsToAdd = 150;
SolrQueryResponse updateResp = new SolrQueryResponse();
updateRequestHandler.handleRequest(constructBatchAddDocRequest(0, numDocsToAdd), updateResp);
waitForCommit(200);
// Get the tlog file info
TreeMap<String, Long> tlogsInfoPreDeletes = getTlogFileSizes(tlogDirPath);
// Send a bunch of redundant deletes
int numDeletesToSend = 5000;
int docIdToDelete = 100;
SolrQueryRequestBase requestWithOneDelete = new SolrQueryRequestBase(core, new MapSolrParams(new HashMap<String, String>())) {};
List<String> docs = new ArrayList<>();
docs.add(delI(Integer.toString(docIdToDelete)));
requestWithOneDelete.setContentStreams(toContentStreams(docs));
for (int i = 0; i < numDeletesToSend; i++) {
if (i % 50 == 0) {
// Wait periodically to allow existing commits to finish before
// sending more delete requests
waitForCommit(200);
}
updateRequestHandler.handleRequest(requestWithOneDelete, updateResp);
}
// Verify that new tlogs have been created, and that their sizes are as expected
TreeMap<String, Long> tlogsInfoPostDeletes = getTlogFileSizes(tlogDirPath, maxFileSizeBoundWithBuffer);
Assert.assertTrue(parseTotalNumTlogs(tlogsInfoPreDeletes) < parseTotalNumTlogs(tlogsInfoPostDeletes));
}
@Test
public void deleteTest() throws Exception {
int maxFileSizeBound = 1000;
int maxFileSizeBoundWithBuffer = (int) (maxFileSizeBound * 1.25);
// Set max size bound
hardCommitTracker.setTLogFileSizeUpperBound(maxFileSizeBound);
// Add docs
int numDocsToAdd = 150;
SolrQueryResponse updateResp = new SolrQueryResponse();
updateRequestHandler.handleRequest(constructBatchAddDocRequest(0, numDocsToAdd), updateResp);
waitForCommit(200);
// Get the tlog file info
TreeMap<String, Long> tlogsInfoPreDeletes = getTlogFileSizes(tlogDirPath);
// Delete documents (in batches, so we can allow commits to finish and new tlog files to be created)
int batchSize = 15;
int numBatches = numDocsToAdd / batchSize;
for (int batchCounter = 0; batchCounter < numBatches; batchCounter++) {
int docStartId = batchSize * batchCounter;
// Send batch delete doc request
updateRequestHandler.handleRequest(constructBatchDeleteDocRequest(docStartId, batchSize), updateResp);
// The sleep is to allow existing commits to finish before deleting more documents
waitForCommit(200);
}
// Verify that the commit happened by seeing if a new tlog file was opened
TreeMap<String, Long> tlogsInfoPostDeletes = getTlogFileSizes(tlogDirPath, maxFileSizeBoundWithBuffer);
Assert.assertTrue(parseTotalNumTlogs(tlogsInfoPreDeletes) < parseTotalNumTlogs(tlogsInfoPostDeletes));
}
@Test
@Repeat(iterations = 5)
public void endToEndTest() throws Exception {
int maxFileSizeBound = 5000;
// Set max size bound
hardCommitTracker.setTLogFileSizeUpperBound(maxFileSizeBound);
// Giving a 10% buffer for the max size bound
int maxFileSizeBoundWithBuffer = (int) (maxFileSizeBound * 1.1);
SolrQueryRequest selectQuery = req("*:*");
List<Integer> docCounts = new ArrayList<>();
int numDocs = 1000;
int batchSize = 20;
int numBatches = numDocs / batchSize;
for (int batchCounter = 0; batchCounter < numBatches; batchCounter++) {
SolrQueryResponse updateResp = new SolrQueryResponse();
int docStartId = batchSize * batchCounter;
// Send batch add doc request
updateRequestHandler.handleRequest(constructBatchAddDocRequest(docStartId, batchSize), updateResp);
// The sleep is to allow existing commits to finish before querying/submitting more documents
waitForCommit(200);
// Check tlog file sizes
getTlogFileSizes(tlogDirPath, maxFileSizeBoundWithBuffer);
// See how many documents are currently visible. This should increase as more commits occur.
docCounts.add(queryCore(selectQuery));
}
// One final commit, after which all documents should be visible
CommitUpdateCommand commitUpdateCommand = new CommitUpdateCommand(req(), false);
updateHandler.commit(commitUpdateCommand);
waitForCommit(200);
docCounts.add(queryCore(selectQuery));
// Evaluate the document counts
checkNumFoundDocuments(docCounts, numDocs);
}
/**
* Sleeps in increments of 50 ms while checking to see if a commit completed. If it did, then return. If not, continue
* this cycle for at most the amount of time specified
* @param maxTotalWaitTimeMillis the max amount of time (in ms) to wait/check for a commit
*/
private void waitForCommit(long maxTotalWaitTimeMillis) throws Exception {
long startTimeNanos = System.nanoTime();
long maxTotalWaitTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxTotalWaitTimeMillis);
while (System.nanoTime() - startTimeNanos < maxTotalWaitTimeNanos) {
Thread.sleep(50);
if (!updateHandler.getUpdateLog().hasUncommittedChanges()) {
return;
}
}
}
/**
* Returns the total number of tlogs that have been created for the core.
*
* The tlogs in a core's tlog directory are named: tlog.0000000000000000000, tlog.0000000000000000001, tlog.0000000000000000002, etc.
* Because old tlogs are periodically deleted, we can't just count the number of existing files. Instead, we take the
* highest ordering tlog file name (which would be the newest) and parse the extension.
*
* e.g if the most recently created tlog file is tlog.0000000000000000003, we know that this core has had 4 tlogs.
*
* @param tlogsInfo TreeMap of (tlog file name, tlog file size (in bytes)) pairs
* @return total number of tlogs created for this core
*/
private int parseTotalNumTlogs(TreeMap<String, Long> tlogsInfo) {
String mostRecentFileName = tlogsInfo.lastKey();
int extensionDelimiterIndex = mostRecentFileName.lastIndexOf(".");
if (extensionDelimiterIndex == -1) {
throw new RuntimeException("Invalid tlog filename: " + mostRecentFileName);
}
String extension = mostRecentFileName.substring(extensionDelimiterIndex + 1);
try {
return Integer.parseInt(extension) + 1;
} catch (NumberFormatException e) {
throw new RuntimeException("Could not parse tlog filename: " + mostRecentFileName, e);
}
}
/**
* Construct a batch add document request with a series of very simple Solr docs with increasing IDs.
* @param startId the document ID to begin with
* @param batchSize the number of documents to include in the batch
* @return a SolrQueryRequestBase
*/
private SolrQueryRequestBase constructBatchAddDocRequest(int startId, int batchSize) {
return constructBatchRequestHelper(startId, batchSize, ADD_DOC_FN);
}
/**
* Construct a batch delete document request, with IDs incrementing from startId
* @param startId the document ID to begin with
* @param batchSize the number of documents to include in the batch
* @return a SolrQueryRequestBase
*/
private SolrQueryRequestBase constructBatchDeleteDocRequest(int startId, int batchSize) {
return constructBatchRequestHelper(startId, batchSize, DELETE_DOC_FN);
}
/**
* Helper for constructing a batch update request
* @param startId the document ID to begin with
* @param batchSize the number of documents to include in the batch
* @param requestFn a function that takes an (int) ID and returns an XML string of the request to add to the batch request
* @return a SolrQueryRequestBase
*/
private SolrQueryRequestBase constructBatchRequestHelper(int startId, int batchSize, Function<Integer, String> requestFn) {
SolrQueryRequestBase updateReq = new SolrQueryRequestBase(core, new MapSolrParams(new HashMap<>())) {};
List<String> docs = new ArrayList<>();
for (int i = startId; i < startId + batchSize; i++) {
docs.add(requestFn.apply(i));
}
updateReq.setContentStreams(toContentStreams(docs));
return updateReq;
}
/**
* Executes the given query
* @param query the query to execute
* @return the number of documents found
*/
public int queryCore(SolrQueryRequest query) throws Exception {
String responseStr = h.query(query);
try {
Map<String, Object> root = (Map<String, Object>) objectMapper.readValue(responseStr, Object.class);
Map<String, Object> rootResponse = (Map<String, Object>) root.get("response");
return (int) rootResponse.get("numFound");
} catch (Exception e) {
throw new RuntimeException("Unable to parse Solr query response", e);
}
}
/**
* Checks the given list of document counts to make sure that they are increasing (as commits occur).
* @param numDocList list of the number of documents found in a given core. Ascending from oldest to newest
*/
private void checkNumFoundDocuments(List<Integer> numDocList, int finalValue) {
long currentTotal = 0;
for (Integer count : numDocList) {
Assert.assertTrue(count >= currentTotal);
currentTotal = count;
}
Assert.assertEquals(finalValue, numDocList.get(numDocList.size() - 1).intValue());
}
/**
* Goes through the given tlog directory and inspects each tlog.
* @param tlogDirPath tlog directory path
* @return a TreeMap of (tlog file name, tlog file size (in bytes)) pairs
*/
private TreeMap<String, Long> getTlogFileSizes(String tlogDirPath) {
return getTlogFileSizes(tlogDirPath, Integer.MAX_VALUE);
}
/**
* Goes through the given tlog directory and inspects each tlog. Asserts that each tlog's size is <= the given max size bound.
* @param tlogDirPath tlog directory path
* @param maxSizeBound the max tlog size
* @return a TreeMap of (tlog file name, tlog file size (in bytes)) pairs
*/
private TreeMap<String, Long> getTlogFileSizes(String tlogDirPath, int maxSizeBound) {
File tlogDir = new File(tlogDirPath);
File[] tlogs = tlogDir.listFiles();
TreeMap<String, Long> tlogInfo = new TreeMap<>();
if (tlogs != null) {
for (File tlog : tlogs) {
String message = String.format(Locale.getDefault(), "Tlog size exceeds the max size bound. Tlog path: %s, tlog size: %d",
tlog.getPath(), tlog.length());
Assert.assertTrue(message, tlog.length() <= maxSizeBound);
tlogInfo.put(tlog.getName(), tlog.length());
}
}
return tlogInfo;
}
/**
* Convert the given list of strings into a list of streams, for Solr update requests
* @param strs strings to convert into streams
* @return list of streams
*/
private List<ContentStream> toContentStreams(List<String> strs) {
ArrayList<ContentStream> streams = new ArrayList<>();
for (String str : strs) {
streams.addAll(ClientUtils.toContentStreams(str, "text/xml"));
}
return streams;
}
}