SOLR-9219: Make hdfs blockcache read buffer sizes configurable and improve cache concurrency.

This commit is contained in:
markrmiller 2016-06-21 13:06:03 -04:00
parent 281af8b89c
commit 740198f33d
13 changed files with 39 additions and 142 deletions

View File

@ -79,6 +79,11 @@ Bug Fixes
* SOLR-9234: srcField parameter works only when all fields are captured in the /update/json/docs
endpoint (noble)
Optimizations
----------------------
* SOLR-9219: Make hdfs blockcache read buffer sizes configurable and improve cache concurrency. (Mark Miller)
Other Changes
----------------------

View File

@ -55,7 +55,7 @@ class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public final static List<String> allowedConfigDirectories = new ArrayList<>(
Arrays.asList(new String[] { "conf", "lib", "solr.xml" }));
Arrays.asList(new String[] { "conf", "lib", "solr.xml", "core1" }));
public final static Set<String> requiredConfigDirectories = new HashSet<>();
@ -160,8 +160,7 @@ class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
CoreContainer container = new CoreContainer(loader);
container.load();
SolrCore core = container.create("core1", ImmutableMap.of(CoreDescriptor.CORE_DATADIR, dataDirStr));
SolrCore core = container.create("", ImmutableMap.of(CoreDescriptor.CORE_DATADIR, dataDirStr));
if (!(core.getDirectoryFactory() instanceof HdfsDirectoryFactory)) {
throw new UnsupportedOperationException(
@ -169,7 +168,7 @@ class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
+ HdfsDirectoryFactory.class.getSimpleName());
}
EmbeddedSolrServer solr = new EmbeddedSolrServer(container, "core1");
EmbeddedSolrServer solr = new EmbeddedSolrServer(container, "");
return solr;
}

View File

@ -39,7 +39,6 @@ import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.misc.IndexMergeTool;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.solr.store.hdfs.HdfsDirectory;
import org.apache.solr.util.RTimer;
import org.slf4j.Logger;
@ -96,7 +95,7 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
writeShardNumberFile(context);
heartBeater.needHeartBeat();
try {
Directory mergedIndex = new HdfsDirectory(workDir, NoLockFactory.INSTANCE, context.getConfiguration());
Directory mergedIndex = new HdfsDirectory(workDir, context.getConfiguration());
// TODO: shouldn't we pull the Version from the solrconfig.xml?
IndexWriterConfig writerConfig = new IndexWriterConfig(null)
@ -130,7 +129,7 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
Directory[] indexes = new Directory[shards.size()];
for (int i = 0; i < shards.size(); i++) {
indexes[i] = new HdfsDirectory(shards.get(i), NoLockFactory.INSTANCE, context.getConfiguration());
indexes[i] = new HdfsDirectory(shards.get(i), context.getConfiguration());
}
context.setStatus("Logically merging " + shards.size() + " shards into one shard");

View File

@ -214,18 +214,19 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
new Object[] {slabSize, bankCount,
((long) bankCount * (long) slabSize)});
int bufferSize = getConfig("solr.hdfs.blockcache.bufferstore.buffersize", 128);
int bufferCount = getConfig("solr.hdfs.blockcache.bufferstore.buffercount", 128 * 128);
int bsBufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", blockSize);
int bsBufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 0); // this is actually total size
BlockCache blockCache = getBlockDirectoryCache(numberOfBlocksPerBank,
blockSize, bankCount, directAllocation, slabSize,
bufferSize, bufferCount, blockCacheGlobal);
bsBufferSize, bsBufferCount, blockCacheGlobal);
Cache cache = new BlockDirectoryCache(blockCache, path, metrics, blockCacheGlobal);
hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf);
int readBufferSize = params.getInt("solr.hdfs.blockcache.read.buffersize", blockSize);
hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf, readBufferSize);
dir = new BlockDirectory(path, hdfsDir, cache, null, blockCacheReadEnabled, false, cacheMerges, cacheReadOnce);
} else {
hdfsDir = new HdfsDirectory(new Path(path), lockFactory, conf);
hdfsDir = new HdfsDirectory(new Path(path), conf);
dir = hdfsDir;
}
if (params.getBool(LOCALITYMETRICS_ENABLED, false)) {

View File

@ -40,9 +40,10 @@ import org.slf4j.LoggerFactory;
public class BlockDirectory extends FilterDirectory implements ShutdownAwareDirectory {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final long BLOCK_SHIFT = 13; // 2^13 = 8,192 bytes per block
public static final long BLOCK_MOD = 0x1FFF;
public static final long BLOCK_SHIFT = Integer.getInteger("solr.hdfs.blockcache.blockshift", 13);
public static final int BLOCK_SIZE = 1 << BLOCK_SHIFT;
public static final long BLOCK_MOD = BLOCK_SIZE - 1;
public static long getBlock(long pos) {
return pos >>> BLOCK_SHIFT;

View File

@ -17,7 +17,6 @@
package org.apache.solr.store.blockcache;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -29,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class BlockDirectoryCache implements Cache {
private final BlockCache blockCache;
private final AtomicInteger counter = new AtomicInteger();
private final Map<String,Integer> names = new ConcurrentHashMap<>();
private final Map<String,Integer> names = new ConcurrentHashMap<>(8192, 0.75f, 512);
private Set<BlockCacheKey> keysToRelease;
private final String path;
private final Metrics metrics;
@ -43,7 +42,7 @@ public class BlockDirectoryCache implements Cache {
this.path = path;
this.metrics = metrics;
if (releaseBlocks) {
keysToRelease = Collections.synchronizedSet(new HashSet<BlockCacheKey>());
keysToRelease = Collections.newSetFromMap(new ConcurrentHashMap<BlockCacheKey,Boolean>(1024, 0.75f, 512));
}
}

View File

@ -39,7 +39,7 @@ public class BufferStore implements Store {
}
};
private final static ConcurrentMap<Integer, BufferStore> bufferStores = new ConcurrentHashMap<>();
private final static ConcurrentMap<Integer, BufferStore> bufferStores = new ConcurrentHashMap<>(8192, 0.75f, 512);
private final BlockingQueue<byte[]> buffers;

View File

@ -28,7 +28,7 @@ import org.apache.lucene.store.IndexOutput;
*/
public abstract class CustomBufferedIndexInput extends IndexInput {
public static final int BUFFER_SIZE = 32768;
public static final int BUFFER_SIZE = Integer.getInteger("solr.hdfs.readbuffer.size.default", 32768);
private int bufferSize = BUFFER_SIZE;

View File

@ -43,24 +43,25 @@ import org.slf4j.LoggerFactory;
public class HdfsDirectory extends BaseDirectory {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final int BUFFER_SIZE = 8192;
private static final String LF_EXT = ".lf";
protected final Path hdfsDirPath;
protected final Configuration configuration;
private final FileSystem fileSystem;
private final FileContext fileContext;
private final int bufferSize;
public HdfsDirectory(Path hdfsDirPath, Configuration configuration) throws IOException {
this(hdfsDirPath, HdfsLockFactory.INSTANCE, configuration);
this(hdfsDirPath, HdfsLockFactory.INSTANCE, configuration, 4096);
}
public HdfsDirectory(Path hdfsDirPath, LockFactory lockFactory, Configuration configuration)
public HdfsDirectory(Path hdfsDirPath, LockFactory lockFactory, Configuration configuration, int bufferSize)
throws IOException {
super(lockFactory);
this.hdfsDirPath = hdfsDirPath;
this.configuration = configuration;
this.bufferSize = bufferSize;
fileSystem = FileSystem.get(hdfsDirPath.toUri(), configuration);
fileContext = FileContext.getFileContext(hdfsDirPath.toUri(), configuration);
@ -134,12 +135,8 @@ public class HdfsDirectory extends BaseDirectory {
@Override
public IndexInput openInput(String name, IOContext context)
throws IOException {
return openInput(name, BUFFER_SIZE);
}
private IndexInput openInput(String name, int bufferSize) throws IOException {
return new HdfsIndexInput(name, getFileSystem(), new Path(
hdfsDirPath, name), BUFFER_SIZE);
hdfsDirPath, name), bufferSize);
}
@Override
@ -158,8 +155,8 @@ public class HdfsDirectory extends BaseDirectory {
@Override
public long fileLength(String name) throws IOException {
return HdfsFileReader.getLength(getFileSystem(),
new Path(hdfsDirPath, name));
FileStatus fileStatus = fileSystem.getFileStatus(new Path(hdfsDirPath, name));
return fileStatus.getLen();
}
public long fileModified(String name) throws IOException {
@ -203,7 +200,7 @@ public class HdfsDirectory extends BaseDirectory {
public HdfsIndexInput(String name, FileSystem fileSystem, Path path,
int bufferSize) throws IOException {
super(name);
super(name, bufferSize);
this.path = path;
LOG.debug("Opening normal index input on {}", path);
FileStatus fileStatus = fileSystem.getFileStatus(path);

View File

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.store.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.store.DataInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @lucene.experimental
*/
public class HdfsFileReader extends DataInput {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Path path;
private FSDataInputStream inputStream;
private long length;
private boolean isClone;
public HdfsFileReader(FileSystem fileSystem, Path path, int bufferSize)
throws IOException {
this.path = path;
LOG.debug("Opening reader on {}", path);
if (!fileSystem.exists(path)) {
throw new FileNotFoundException(path.toString());
}
inputStream = fileSystem.open(path, bufferSize);
FileStatus fileStatus = fileSystem.getFileStatus(path);
length = fileStatus.getLen();
}
public HdfsFileReader(FileSystem fileSystem, Path path) throws IOException {
this(fileSystem, path, HdfsDirectory.BUFFER_SIZE);
}
public long length() {
return length;
}
public void seek(long pos) throws IOException {
inputStream.seek(pos);
}
public void close() throws IOException {
if (!isClone) {
inputStream.close();
}
LOG.debug("Closing reader on {}", path);
}
/**
* This method should never be used!
*/
@Override
public byte readByte() throws IOException {
LOG.warn("Should not be used!");
return inputStream.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
while (len > 0) {
int lenRead = inputStream.read(b, offset, len);
offset += lenRead;
len -= lenRead;
}
}
public static long getLength(FileSystem fileSystem, Path path)
throws IOException {
FileStatus fileStatus = fileSystem.getFileStatus(path);
return fileStatus.getLen();
}
@Override
public DataInput clone() {
HdfsFileReader reader = (HdfsFileReader) super.clone();
reader.isClone = true;
return reader;
}
}

View File

@ -47,7 +47,10 @@
<double name="maxWriteMBPerSecRead">4000000</double>
<str name="solr.hdfs.home">${solr.hdfs.home:}</str>
<bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
<str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
<bool name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:true}</bool>
<bool name="solr.hdfs.blockcache.write.enabled">${solr.hdfs.blockcache.write.enabled:false}</bool>
<int name="solr.hdfs.blockcache.blocksperbank">10</int>
<int name="solr.hdfs.blockcache.slab.count">1</int>
</directoryFactory>
<schemaFactory class="ClassicIndexSchemaFactory"/>

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.index.BaseTestCheckIndex;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
@ -78,7 +77,7 @@ public class CheckHdfsIndexTest extends AbstractFullDistribZkTestBase {
Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster);
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
directory = new HdfsDirectory(path, NoLockFactory.INSTANCE, conf);
directory = new HdfsDirectory(path, conf);
}
@Override

View File

@ -28,7 +28,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
@ -74,7 +73,7 @@ public class HdfsDirectoryTest extends SolrTestCaseJ4 {
Configuration conf = HdfsTestUtil.getClientConfiguration(dfsCluster);
conf.set("dfs.permissions.enabled", "false");
directory = new HdfsDirectory(new Path(HdfsTestUtil.getURI(dfsCluster) + createTempDir().toFile().getAbsolutePath() + "/hdfs"), NoLockFactory.INSTANCE, conf);
directory = new HdfsDirectory(new Path(dfsCluster.getURI().toString() + createTempDir().toFile().getAbsolutePath() + "/hdfs"), conf);
random = random();
}