SOLR-4030: Allow rate limiting Directory IO based on the IO context.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1420336 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-12-11 19:19:07 +00:00
parent 52536c12a2
commit a81f749ee1
7 changed files with 75 additions and 12 deletions

View File

@ -120,6 +120,10 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
this.lockFactory = new MockLockFactoryWrapper(this, delegate.getLockFactory());
init();
}
public Directory getDelegate() {
return this.delegate;
}
public int getInputCloneCount() {
return inputCloneCount.get();

View File

@ -130,6 +130,9 @@ New Features
* SOLR-3948: Calculate/display deleted documents in admin interface.
(Shawn Heisey via Mark Miller)
* SOLR-4030: Allow rate limiting Directory IO based on the IO context.
(Mark Miller, Radim Kolar)
Optimizations
----------------------

View File

@ -26,8 +26,10 @@ import java.util.Locale;
import java.util.Map;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext.Context;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.apache.solr.common.SolrException;
@ -61,6 +63,14 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
protected Map<Directory,CacheValue> byDirectoryCache = new HashMap<Directory,CacheValue>();
protected Map<Directory,List<CloseListener>> closeListeners = new HashMap<Directory,List<CloseListener>>();
private Double maxWriteMBPerSecFlush;
private Double maxWriteMBPerSecMerge;
private Double maxWriteMBPerSecRead;
private Double maxWriteMBPerSecDefault;
public interface CloseListener {
public void postClose();
@ -233,6 +243,8 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
if (directory == null || forceNew) {
directory = create(fullPath);
directory = rateLimit(directory);
CacheValue newCacheValue = new CacheValue();
newCacheValue.directory = directory;
newCacheValue.path = fullPath;
@ -249,6 +261,25 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
return directory;
}
}
private Directory rateLimit(Directory directory) {
if (maxWriteMBPerSecDefault != null || maxWriteMBPerSecFlush != null || maxWriteMBPerSecMerge != null || maxWriteMBPerSecRead != null) {
directory = new RateLimitedDirectoryWrapper(directory);
if (maxWriteMBPerSecDefault != null) {
((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecDefault, Context.DEFAULT);
}
if (maxWriteMBPerSecFlush != null) {
((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecFlush, Context.FLUSH);
}
if (maxWriteMBPerSecMerge != null) {
((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecMerge, Context.MERGE);
}
if (maxWriteMBPerSecRead != null) {
((RateLimitedDirectoryWrapper)directory).setMaxWriteMBPerSec(maxWriteMBPerSecRead, Context.READ);
}
}
return directory;
}
/*
* (non-Javadoc)
@ -270,7 +301,12 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
@Override
public void init(NamedList args) {}
public void init(NamedList args) {
maxWriteMBPerSecFlush = (Double) args.get("maxWriteMBPerSecFlush");
maxWriteMBPerSecMerge = (Double) args.get("maxWriteMBPerSecMerge");
maxWriteMBPerSecRead = (Double) args.get("maxWriteMBPerSecRead");
maxWriteMBPerSecDefault = (Double) args.get("maxWriteMBPerSecDefault");
}
/*
* (non-Javadoc)

View File

@ -46,6 +46,7 @@ public class MMapDirectoryFactory extends StandardDirectoryFactory {
@Override
public void init(NamedList args) {
super.init(args);
SolrParams params = SolrParams.toSolrParams( args );
maxChunk = params.getInt("maxChunkSize", MMapDirectory.DEFAULT_MAX_BUFF);
if (maxChunk <= 0){

View File

@ -35,6 +35,7 @@ public class NRTCachingDirectoryFactory extends StandardDirectoryFactory {
@Override
public void init(NamedList args) {
super.init(args);
SolrParams params = SolrParams.toSolrParams(args);
maxMergeSizeMB = params.getDouble("maxMergeSizeMB", 4);
if (maxMergeSizeMB <= 0){

View File

@ -40,7 +40,12 @@
<!-- The DirectoryFactory to use for indexes.
solr.StandardDirectoryFactory, the default, is filesystem based.
solr.RAMDirectoryFactory is memory based and not persistent. -->
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}">
<double name="maxWriteMBPerSecDefault">1000000</double>
<double name="maxWriteMBPerSecFlush">2000000</double>
<double name="maxWriteMBPerSecMerge">3000000</double>
<double name="maxWriteMBPerSecRead">4000000</double>
</directoryFactory>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>

View File

@ -21,20 +21,22 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.index.StorableField;
import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext.Context;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.util.English;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
@ -55,8 +57,6 @@ import org.apache.solr.search.DocIterator;
import org.apache.solr.search.DocList;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.RefCounted;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@ -126,15 +126,28 @@ public class BasicFunctionalityTest extends SolrTestCaseJ4 {
clearIndex();
// test merge factor picked up
// and for rate limited settings
SolrCore core = h.getCore();
RefCounted<IndexWriter> iw = ((DirectUpdateHandler2) core
RefCounted<IndexWriter> iwr = ((DirectUpdateHandler2) core
.getUpdateHandler()).getSolrCoreState().getIndexWriter(core);
try {
assertEquals("Mergefactor was not picked up", 8, ((LogMergePolicy) iw
.get().getConfig().getMergePolicy()).getMergeFactor());
IndexWriter iw = iwr.get();
assertEquals("Mergefactor was not picked up", 8, ((LogMergePolicy) iw.getConfig().getMergePolicy()).getMergeFactor());
Directory dir = iw.getDirectory();
if (dir instanceof MockDirectoryWrapper) {
dir = ((MockDirectoryWrapper)dir).getDelegate();
}
assertTrue(dir.getClass().getName(), dir instanceof RateLimitedDirectoryWrapper);
assertEquals(Double.valueOf(1000000), ((RateLimitedDirectoryWrapper)dir).getMaxWriteMBPerSec(Context.DEFAULT));
assertEquals(Double.valueOf(2000000), ((RateLimitedDirectoryWrapper)dir).getMaxWriteMBPerSec(Context.FLUSH));
assertEquals(Double.valueOf(3000000), ((RateLimitedDirectoryWrapper)dir).getMaxWriteMBPerSec(Context.MERGE));
assertEquals(Double.valueOf(4000000), ((RateLimitedDirectoryWrapper)dir).getMaxWriteMBPerSec(Context.READ));
} finally {
iw.decref();
iwr.decref();
}
// test stats call
NamedList stats = core.getStatistics();