SOLR-5714: You can now use one pool of memory for for the HDFS block cache that all collections share.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1573847 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-03-04 03:18:47 +00:00
parent 08b0073e28
commit f38c82b7a2
29 changed files with 522 additions and 143 deletions

View File

@ -88,6 +88,9 @@ New Features
* SOLR-5183: JSON updates now support nested child documents using a * SOLR-5183: JSON updates now support nested child documents using a
"_childDocument_" object key. (Varun Thacker, hossman) "_childDocument_" object key. (Varun Thacker, hossman)
* SOLR-5714: You can now use one pool of memory for for the HDFS block cache
that all collections share. (Mark Miller, Gregory Chanan)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -51,6 +51,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
public static final String BLOCKCACHE_SLAB_COUNT = "solr.hdfs.blockcache.slab.count"; public static final String BLOCKCACHE_SLAB_COUNT = "solr.hdfs.blockcache.slab.count";
public static final String BLOCKCACHE_DIRECT_MEMORY_ALLOCATION = "solr.hdfs.blockcache.direct.memory.allocation"; public static final String BLOCKCACHE_DIRECT_MEMORY_ALLOCATION = "solr.hdfs.blockcache.direct.memory.allocation";
public static final String BLOCKCACHE_ENABLED = "solr.hdfs.blockcache.enabled"; public static final String BLOCKCACHE_ENABLED = "solr.hdfs.blockcache.enabled";
public static final String BLOCKCACHE_GLOBAL = "solr.hdfs.blockcache.global";
public static final String BLOCKCACHE_READ_ENABLED = "solr.hdfs.blockcache.read.enabled"; public static final String BLOCKCACHE_READ_ENABLED = "solr.hdfs.blockcache.read.enabled";
public static final String BLOCKCACHE_WRITE_ENABLED = "solr.hdfs.blockcache.write.enabled"; public static final String BLOCKCACHE_WRITE_ENABLED = "solr.hdfs.blockcache.write.enabled";
@ -73,6 +74,8 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
private String confDir; private String confDir;
private static BlockCache globalBlockCache;
public static Metrics metrics; public static Metrics metrics;
private static Boolean kerberosInit; private static Boolean kerberosInit;
@ -102,6 +105,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
} }
boolean blockCacheEnabled = params.getBool(BLOCKCACHE_ENABLED, true); boolean blockCacheEnabled = params.getBool(BLOCKCACHE_ENABLED, true);
boolean blockCacheGlobal = params.getBool(BLOCKCACHE_GLOBAL, false); // default to false for back compat
boolean blockCacheReadEnabled = params.getBool(BLOCKCACHE_READ_ENABLED, boolean blockCacheReadEnabled = params.getBool(BLOCKCACHE_READ_ENABLED,
true); true);
boolean blockCacheWriteEnabled = params.getBool(BLOCKCACHE_WRITE_ENABLED, true); boolean blockCacheWriteEnabled = params.getBool(BLOCKCACHE_WRITE_ENABLED, true);
@ -117,8 +121,6 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
boolean directAllocation = params.getBool( boolean directAllocation = params.getBool(
BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true); BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true);
BlockCache blockCache;
int slabSize = numberOfBlocksPerBank * blockSize; int slabSize = numberOfBlocksPerBank * blockSize;
LOG.info( LOG.info(
"Number of slabs of block cache [{}] with direct memory allocation set to [{}]", "Number of slabs of block cache [{}] with direct memory allocation set to [{}]",
@ -131,22 +133,13 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
int bufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", 128); int bufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", 128);
int bufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 128 * 128); int bufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 128 * 128);
BufferStore.initNewBuffer(bufferSize, bufferCount); BlockCache blockCache = getBlockDirectoryCache(path, numberOfBlocksPerBank,
long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank blockSize, bankCount, directAllocation, slabSize,
* (long) blockSize; bufferSize, bufferCount, blockCacheGlobal);
try {
blockCache = new BlockCache(metrics, directAllocation, totalMemory, Cache cache = new BlockDirectoryCache(blockCache, path, metrics);
slabSize, blockSize);
} catch (OutOfMemoryError e) {
throw new RuntimeException(
"The max direct memory is likely too low. Either increase it (by adding -XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages to your containers startup args)"
+ " or disable direct allocation using solr.hdfs.blockcache.direct.memory.allocation=false in solrconfig.xml. If you are putting the block cache on the heap,"
+ " your java heap size might not be large enough."
+ " Failed allocating ~" + totalMemory / 1000000.0 + " MB.", e);
}
Cache cache = new BlockDirectoryCache(blockCache, metrics);
HdfsDirectory hdfsDirectory = new HdfsDirectory(new Path(path), conf); HdfsDirectory hdfsDirectory = new HdfsDirectory(new Path(path), conf);
dir = new BlockDirectory("solrcore", hdfsDirectory, cache, null, dir = new BlockDirectory(path, hdfsDirectory, cache, null,
blockCacheReadEnabled, blockCacheWriteEnabled); blockCacheReadEnabled, blockCacheWriteEnabled);
} else { } else {
dir = new HdfsDirectory(new Path(path), conf); dir = new HdfsDirectory(new Path(path), conf);
@ -165,6 +158,45 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
return dir; return dir;
} }
private BlockCache getBlockDirectoryCache(String path,
int numberOfBlocksPerBank, int blockSize, int bankCount,
boolean directAllocation, int slabSize, int bufferSize, int bufferCount, boolean staticBlockCache) {
if (!staticBlockCache) {
LOG.info("Creating new single instance HDFS BlockCache");
return createBlockCache(numberOfBlocksPerBank, blockSize, bankCount, directAllocation, slabSize, bufferSize, bufferCount);
}
LOG.info("Creating new global HDFS BlockCache");
synchronized (HdfsDirectoryFactory.class) {
if (globalBlockCache == null) {
globalBlockCache = createBlockCache(numberOfBlocksPerBank, blockSize, bankCount,
directAllocation, slabSize, bufferSize, bufferCount);
}
}
return globalBlockCache;
}
private BlockCache createBlockCache(int numberOfBlocksPerBank, int blockSize,
int bankCount, boolean directAllocation, int slabSize, int bufferSize,
int bufferCount) {
BufferStore.initNewBuffer(bufferSize, bufferCount);
long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank
* (long) blockSize;
BlockCache blockCache;
try {
blockCache = new BlockCache(metrics, directAllocation, totalMemory, slabSize, blockSize);
} catch (OutOfMemoryError e) {
throw new RuntimeException(
"The max direct memory is likely too low. Either increase it (by adding -XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages to your containers startup args)"
+ " or disable direct allocation using solr.hdfs.blockcache.direct.memory.allocation=false in solrconfig.xml. If you are putting the block cache on the heap,"
+ " your java heap size might not be large enough."
+ " Failed allocating ~" + totalMemory / 1000000.0 + " MB.",
e);
}
return blockCache;
}
@Override @Override
public boolean exists(String path) { public boolean exists(String path) {
Path hdfsDirPath = new Path(path); Path hdfsDirPath = new Path(path);

View File

@ -24,6 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.EvictionListener; import com.googlecode.concurrentlinkedhashmap.EvictionListener;
/**
* @lucene.experimental
*/
public class BlockCache { public class BlockCache {
public static final int _128M = 134217728; public static final int _128M = 134217728;

View File

@ -16,11 +16,22 @@ package org.apache.solr.store.blockcache;
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
/**
* @lucene.experimental
*/
public class BlockCacheKey implements Cloneable { public class BlockCacheKey implements Cloneable {
private long block; private long block;
private int file; private int file;
private String path;
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public long getBlock() { public long getBlock() {
return block; return block;
@ -44,6 +55,7 @@ public class BlockCacheKey implements Cloneable {
int result = 1; int result = 1;
result = prime * result + (int) (block ^ (block >>> 32)); result = prime * result + (int) (block ^ (block >>> 32));
result = prime * result + file; result = prime * result + file;
result = prime * result + ((path == null) ? 0 : path.hashCode());
return result; return result;
} }
@ -55,6 +67,9 @@ public class BlockCacheKey implements Cloneable {
BlockCacheKey other = (BlockCacheKey) obj; BlockCacheKey other = (BlockCacheKey) obj;
if (block != other.block) return false; if (block != other.block) return false;
if (file != other.file) return false; if (file != other.file) return false;
if (path == null) {
if (other.path != null) return false;
} else if (!path.equals(other.path)) return false;
return true; return true;
} }

View File

@ -19,6 +19,9 @@ package org.apache.solr.store.blockcache;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/**
* @lucene.experimental
*/
public class BlockCacheLocation { public class BlockCacheLocation {
private int block; private int block;

View File

@ -34,6 +34,9 @@ import org.apache.solr.store.hdfs.HdfsDirectory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* @lucene.experimental
*/
public class BlockDirectory extends Directory { public class BlockDirectory extends Directory {
public static Logger LOG = LoggerFactory.getLogger(BlockDirectory.class); public static Logger LOG = LoggerFactory.getLogger(BlockDirectory.class);
@ -82,7 +85,7 @@ public class BlockDirectory extends Directory {
private Directory directory; private Directory directory;
private int blockSize; private int blockSize;
private String dirName; private String dirName;
private Cache cache; private final Cache cache;
private Set<String> blockCacheFileTypes; private Set<String> blockCacheFileTypes;
private final boolean blockCacheReadEnabled; private final boolean blockCacheReadEnabled;
private final boolean blockCacheWriteEnabled; private final boolean blockCacheWriteEnabled;
@ -265,6 +268,15 @@ public class BlockDirectory extends Directory {
return dirName + "/" + name; return dirName + "/" + name;
} }
/**
* Expert: mostly for tests
*
* @lucene.experimental
*/
public Cache getCache() {
return cache;
}
@Override @Override
public void copy(Directory to, String src, String dest, IOContext context) public void copy(Directory to, String src, String dest, IOContext context)
throws IOException { throws IOException {
@ -383,4 +395,13 @@ public class BlockDirectory extends Directory {
return directory; return directory;
} }
public boolean isBlockCacheReadEnabled() {
return blockCacheReadEnabled;
}
public boolean isBlockCacheWriteEnabled() {
return blockCacheWriteEnabled;
}
} }

View File

@ -21,17 +21,31 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/**
* @lucene.experimental
*/
public class BlockDirectoryCache implements Cache { public class BlockDirectoryCache implements Cache {
private BlockCache blockCache; private final BlockCache blockCache;
private AtomicInteger counter = new AtomicInteger(); private AtomicInteger counter = new AtomicInteger();
private Map<String,Integer> names = new ConcurrentHashMap<String,Integer>(); private Map<String,Integer> names = new ConcurrentHashMap<String,Integer>();
private String path;
private Metrics metrics; private Metrics metrics;
public BlockDirectoryCache(BlockCache blockCache, Metrics metrics) { public BlockDirectoryCache(BlockCache blockCache, String path, Metrics metrics) {
this.blockCache = blockCache; this.blockCache = blockCache;
this.path = path;
this.metrics = metrics; this.metrics = metrics;
} }
/**
* Expert: mostly for tests
*
* @lucene.experimental
*/
public BlockCache getBlockCache() {
return blockCache;
}
@Override @Override
public void delete(String name) { public void delete(String name) {
names.remove(name); names.remove(name);
@ -46,6 +60,7 @@ public class BlockDirectoryCache implements Cache {
names.put(name, file); names.put(name, file);
} }
BlockCacheKey blockCacheKey = new BlockCacheKey(); BlockCacheKey blockCacheKey = new BlockCacheKey();
blockCacheKey.setPath(path);
blockCacheKey.setBlock(blockId); blockCacheKey.setBlock(blockId);
blockCacheKey.setFile(file); blockCacheKey.setFile(file);
blockCache.store(blockCacheKey, blockOffset, buffer, offset, length); blockCache.store(blockCacheKey, blockOffset, buffer, offset, length);
@ -59,6 +74,7 @@ public class BlockDirectoryCache implements Cache {
return false; return false;
} }
BlockCacheKey blockCacheKey = new BlockCacheKey(); BlockCacheKey blockCacheKey = new BlockCacheKey();
blockCacheKey.setPath(path);
blockCacheKey.setBlock(blockId); blockCacheKey.setBlock(blockId);
blockCacheKey.setFile(file); blockCacheKey.setFile(file);
boolean fetch = blockCache.fetch(blockCacheKey, b, blockOffset, off, boolean fetch = blockCache.fetch(blockCacheKey, b, blockOffset, off,

View File

@ -21,6 +21,9 @@ import java.util.concurrent.atomic.AtomicLongArray;
import org.apache.lucene.util.LongBitSet; import org.apache.lucene.util.LongBitSet;
/**
* @lucene.experimental
*/
public class BlockLocks { public class BlockLocks {
private AtomicLongArray bits; private AtomicLongArray bits;

View File

@ -22,7 +22,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
/**
* @lucene.experimental
*/
public class BufferStore implements Store { public class BufferStore implements Store {
private static final Store EMPTY = new Store() { private static final Store EMPTY = new Store() {

View File

@ -17,6 +17,9 @@ package org.apache.solr.store.blockcache;
* limitations under the License. * limitations under the License.
*/ */
/**
* @lucene.experimental
*/
public interface Cache { public interface Cache {
/** /**

View File

@ -21,10 +21,11 @@ import java.io.IOException;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
/* /**
* Cache the blocks as they are written. The cache file name is the name of * Cache the blocks as they are written. The cache file name is the name of
* the file until the file is closed, at which point the cache is updated * the file until the file is closed, at which point the cache is updated
* to include the last modified date (which is unknown until that point). * to include the last modified date (which is unknown until that point).
* @lucene.experimental
*/ */
public class CachedIndexOutput extends ReusedBufferedIndexOutput { public class CachedIndexOutput extends ReusedBufferedIndexOutput {
private final BlockDirectory directory; private final BlockDirectory directory;

View File

@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
/**
* @lucene.experimental
*/
public abstract class CustomBufferedIndexInput extends IndexInput { public abstract class CustomBufferedIndexInput extends IndexInput {
public static final int BUFFER_SIZE = 32768; public static final int BUFFER_SIZE = 32768;

View File

@ -29,6 +29,9 @@ import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater; import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.jvm.JvmMetrics; import org.apache.hadoop.metrics.jvm.JvmMetrics;
/**
* @lucene.experimental
*/
public class Metrics implements Updater { public class Metrics implements Updater {
public static class MethodCall { public static class MethodCall {

View File

@ -21,6 +21,9 @@ import java.io.IOException;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
/**
* @lucene.experimental
*/
public abstract class ReusedBufferedIndexOutput extends IndexOutput { public abstract class ReusedBufferedIndexOutput extends IndexOutput {
public static final int BUFFER_SIZE = 1024; public static final int BUFFER_SIZE = 1024;

View File

@ -17,6 +17,9 @@ package org.apache.solr.store.blockcache;
* limitations under the License. * limitations under the License.
*/ */
/**
* @lucene.experimental
*/
public interface Store { public interface Store {
byte[] takeBuffer(int bufferSize); byte[] takeBuffer(int bufferSize);

View File

@ -28,6 +28,9 @@ import org.apache.lucene.store.DataInput;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* @lucene.experimental
*/
public class HdfsFileReader extends DataInput { public class HdfsFileReader extends DataInput {
public static Logger LOG = LoggerFactory.getLogger(HdfsFileReader.class); public static Logger LOG = LoggerFactory.getLogger(HdfsFileReader.class);

View File

@ -32,6 +32,9 @@ import org.apache.lucene.store.DataOutput;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* @lucene.experimental
*/
public class HdfsFileWriter extends DataOutput implements Closeable { public class HdfsFileWriter extends DataOutput implements Closeable {
public static Logger LOG = LoggerFactory.getLogger(HdfsFileWriter.class); public static Logger LOG = LoggerFactory.getLogger(HdfsFileWriter.class);

View File

@ -21,6 +21,9 @@ import java.io.IOException;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
/**
* @lucene.experimental
*/
public class NullIndexOutput extends IndexOutput { public class NullIndexOutput extends IndexOutput {
private long pos; private long pos;

View File

@ -28,6 +28,7 @@
<int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int> <int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int>
<str name="solr.hdfs.home">${solr.hdfs.home:}</str> <str name="solr.hdfs.home">${solr.hdfs.home:}</str>
<str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str> <str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
<str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
</directoryFactory> </directoryFactory>
<dataDir>${solr.data.dir:}</dataDir> <dataDir>${solr.data.dir:}</dataDir>

View File

@ -47,6 +47,7 @@
<double name="maxWriteMBPerSecRead">4000000</double> <double name="maxWriteMBPerSecRead">4000000</double>
<str name="solr.hdfs.home">${solr.hdfs.home:}</str> <str name="solr.hdfs.home">${solr.hdfs.home:}</str>
<bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool> <bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
<str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
</directoryFactory> </directoryFactory>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion> <luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>

View File

@ -131,7 +131,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
int threadCount = 1; int threadCount = 1;
int i = 0; int i = 0;
for (i = 0; i < threadCount; i++) { for (i = 0; i < threadCount; i++) {
StopableIndexingThread indexThread = new StopableIndexingThread(Integer.toString(i), true); StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true);
threads.add(indexThread); threads.add(indexThread);
indexThread.start(); indexThread.start();
} }
@ -270,7 +270,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
public FullThrottleStopableIndexingThread(List<SolrServer> clients, public FullThrottleStopableIndexingThread(List<SolrServer> clients,
String id, boolean doDeletes) { String id, boolean doDeletes) {
super(id, doDeletes); super(controlClient, cloudClient, id, doDeletes);
setName("FullThrottleStopableIndexingThread"); setName("FullThrottleStopableIndexingThread");
setDaemon(true); setDaemon(true);
this.clients = clients; this.clients = clients;

View File

@ -108,7 +108,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
List<StopableIndexingThread> threads = new ArrayList<StopableIndexingThread>(); List<StopableIndexingThread> threads = new ArrayList<StopableIndexingThread>();
int threadCount = 2; int threadCount = 2;
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
StopableIndexingThread indexThread = new StopableIndexingThread(Integer.toString(i), true); StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true);
threads.add(indexThread); threads.add(indexThread);
indexThread.start(); indexThread.start();
} }

View File

@ -66,10 +66,10 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase {
int maxDoc = maxDocList[random().nextInt(maxDocList.length - 1)]; int maxDoc = maxDocList[random().nextInt(maxDocList.length - 1)];
indexThread = new StopableIndexingThread("1", true, maxDoc); indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc);
indexThread.start(); indexThread.start();
indexThread2 = new StopableIndexingThread("2", true, maxDoc); indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc);
indexThread2.start(); indexThread2.start();
@ -100,7 +100,7 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase {
Thread.sleep(1000); Thread.sleep(1000);
waitForThingsToLevelOut(45); waitForThingsToLevelOut(90);
Thread.sleep(2000); Thread.sleep(2000);

View File

@ -64,6 +64,8 @@ public class HdfsTestUtil {
System.setProperty("solr.hdfs.home", "/solr_hdfs_home"); System.setProperty("solr.hdfs.home", "/solr_hdfs_home");
System.setProperty("solr.hdfs.blockcache.global", Boolean.toString(LuceneTestCase.random().nextBoolean()));
final MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null); final MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
dfsCluster.waitActive(); dfsCluster.waitActive();
@ -92,6 +94,7 @@ public class HdfsTestUtil {
System.clearProperty("test.build.data"); System.clearProperty("test.build.data");
System.clearProperty("test.cache.data"); System.clearProperty("test.cache.data");
System.clearProperty("solr.hdfs.home"); System.clearProperty("solr.hdfs.home");
System.clearProperty("solr.hdfs.blockcache.global");
if (dfsCluster != null) { if (dfsCluster != null) {
timers.remove(dfsCluster); timers.remove(dfsCluster);
dfsCluster.shutdown(); dfsCluster.shutdown();

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.hdfs;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.cloud.BasicDistributedZkTest;
import org.apache.solr.cloud.StopableIndexingThread;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.HdfsDirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.store.blockcache.BlockCache;
import org.apache.solr.store.blockcache.BlockDirectory;
import org.apache.solr.store.blockcache.BlockDirectoryCache;
import org.apache.solr.store.blockcache.Cache;
import org.apache.solr.util.RefCounted;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
@Slow
@Nightly
@ThreadLeakScope(Scope.NONE) // hdfs client currently leaks thread(s)
public class HdfsWriteToMultipleCollectionsTest extends BasicDistributedZkTest {
private static final String SOLR_HDFS_HOME = "solr.hdfs.home";
private static final String SOLR_HDFS_BLOCKCACHE_GLOBAL = "solr.hdfs.blockcache.global";
private static final String ACOLLECTION = "acollection";
private static MiniDFSCluster dfsCluster;
@BeforeClass
public static void setupClass() throws Exception {
schemaString = "schema15.xml"; // we need a string id
dfsCluster = HdfsTestUtil.setupClass(new File(TEMP_DIR,
HdfsBasicDistributedZk2Test.class.getName() + "_"
+ System.currentTimeMillis()).getAbsolutePath());
System.setProperty(SOLR_HDFS_HOME, dfsCluster.getURI().toString() + "/solr");
}
@AfterClass
public static void teardownClass() throws Exception {
HdfsTestUtil.teardownClass(dfsCluster);
System.clearProperty(SOLR_HDFS_HOME);
dfsCluster = null;
}
@Override
protected String getDataDir(String dataDir) throws IOException {
return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
}
public HdfsWriteToMultipleCollectionsTest() {
super();
sliceCount = 1;
shardCount = 3;
}
protected String getSolrXml() {
return "solr-no-core.xml";
}
@Override
public void doTest() throws Exception {
int docCount = random().nextInt(1313) + 1;
int cnt = random().nextInt(4) + 1;
for (int i = 0; i < cnt; i++) {
createCollection(ACOLLECTION + i, 2, 2, 9);
}
for (int i = 0; i < cnt; i++) {
waitForRecoveriesToFinish(ACOLLECTION + i, false);
}
List<CloudSolrServer> cloudServers = new ArrayList<CloudSolrServer>();
List<StopableIndexingThread> threads = new ArrayList<StopableIndexingThread>();
for (int i = 0; i < cnt; i++) {
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
server.setDefaultCollection(ACOLLECTION + i);
cloudServers.add(server);
StopableIndexingThread indexThread = new StopableIndexingThread(null, server, "1", true, docCount);
threads.add(indexThread);
indexThread.start();
}
int addCnt = 0;
for (StopableIndexingThread thread : threads) {
thread.join();
addCnt += thread.getNumAdds() - thread.getNumDeletes();
}
long collectionsCount = 0;
for (CloudSolrServer server : cloudServers) {
server.commit();
collectionsCount += server.query(new SolrQuery("*:*")).getResults().getNumFound();
}
for (CloudSolrServer server : cloudServers) {
server.shutdown();
}
assertEquals(addCnt, collectionsCount);
BlockCache lastBlockCache = null;
// assert that we are using the block directory and that write and read caching are being used
for (JettySolrRunner jetty : jettys) {
CoreContainer cores = ((SolrDispatchFilter) jetty.getDispatchFilter()
.getFilter()).getCores();
Collection<SolrCore> solrCores = cores.getCores();
for (SolrCore core : solrCores) {
if (core.getCoreDescriptor().getCloudDescriptor().getCollectionName()
.startsWith(ACOLLECTION)) {
assertTrue(core.getDirectoryFactory() instanceof HdfsDirectoryFactory);
RefCounted<IndexWriter> iwRef = core.getUpdateHandler()
.getSolrCoreState().getIndexWriter(core);
try {
IndexWriter iw = iwRef.get();
NRTCachingDirectory directory = (NRTCachingDirectory) iw
.getDirectory();
BlockDirectory blockDirectory = (BlockDirectory) directory
.getDelegate();
assertTrue(blockDirectory.isBlockCacheReadEnabled());
assertTrue(blockDirectory.isBlockCacheWriteEnabled());
Cache cache = blockDirectory.getCache();
// we know its a BlockDirectoryCache, but future proof
assertTrue(cache instanceof BlockDirectoryCache);
BlockCache blockCache = ((BlockDirectoryCache) cache)
.getBlockCache();
if (lastBlockCache != null) {
if (Boolean.getBoolean(SOLR_HDFS_BLOCKCACHE_GLOBAL)) {
assertEquals(lastBlockCache, blockCache);
} else {
assertNotSame(lastBlockCache, blockCache);
}
}
lastBlockCache = blockCache;
} finally {
iwRef.decref();
}
}
}
}
}
}

View File

@ -51,6 +51,7 @@ public class BlockCacheTest extends LuceneTestCase {
int file = 0; int file = 0;
blockCacheKey.setBlock(block); blockCacheKey.setBlock(block);
blockCacheKey.setFile(file); blockCacheKey.setFile(file);
blockCacheKey.setPath("/");
if (blockCache.fetch(blockCacheKey, buffer)) { if (blockCache.fetch(blockCacheKey, buffer)) {
hitsInCache.incrementAndGet(); hitsInCache.incrementAndGet();
@ -91,6 +92,7 @@ public class BlockCacheTest extends LuceneTestCase {
BlockCacheKey blockCacheKey = new BlockCacheKey(); BlockCacheKey blockCacheKey = new BlockCacheKey();
blockCacheKey.setBlock(0); blockCacheKey.setBlock(0);
blockCacheKey.setFile(0); blockCacheKey.setFile(0);
blockCacheKey.setPath("/");
byte[] newData = new byte[blockSize*3]; byte[] newData = new byte[blockSize*3];
byte[] testData = testData(random, blockSize, newData); byte[] testData = testData(random, blockSize, newData);

View File

@ -129,6 +129,9 @@
<str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str> <str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
<!-- Enable/Disable the hdfs cache. --> <!-- Enable/Disable the hdfs cache. -->
<str name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</str> <str name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</str>
<!-- Enable/Disable using one global cache for all SolrCores.
The settings used will be from the first HdfsDirectoryFactory created. -->
<str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:true}</str>
</directoryFactory> </directoryFactory>

View File

@ -1428,122 +1428,13 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
return rsp; return rsp;
} }
abstract class StopableThread extends Thread { static abstract class StopableThread extends Thread {
public StopableThread(String name) { public StopableThread(String name) {
super(name); super(name);
} }
public abstract void safeStop(); public abstract void safeStop();
} }
class StopableIndexingThread extends StopableThread {
private volatile boolean stop = false;
protected final String id;
protected final List<String> deletes = new ArrayList<String>();
protected Set<String> addFails = new HashSet<String>();
protected Set<String> deleteFails = new HashSet<String>();
protected boolean doDeletes;
private int numCycles;
public StopableIndexingThread(String id, boolean doDeletes) {
this(id, doDeletes, -1);
}
public StopableIndexingThread(String id, boolean doDeletes, int numCycles) {
super("StopableIndexingThread");
this.id = id;
this.doDeletes = doDeletes;
this.numCycles = numCycles;
setDaemon(true);
}
@Override
public void run() {
int i = 0;
int numDone = 0;
int numDeletes = 0;
int numAdds = 0;
while (true && !stop) {
if (numCycles != -1) {
if (numDone > numCycles) {
break;
}
}
++numDone;
String id = this.id + "-" + i;
++i;
boolean addFailed = false;
if (doDeletes && random().nextBoolean() && deletes.size() > 0) {
String delete = deletes.remove(0);
try {
numDeletes++;
UpdateRequest req = new UpdateRequest();
req.deleteById(delete);
req.setParam("CONTROL", "TRUE");
req.process(controlClient);
cloudClient.deleteById(delete);
} catch (Exception e) {
System.err.println("REQUEST FAILED:");
e.printStackTrace();
if (e instanceof SolrServerException) {
System.err.println("ROOT CAUSE:");
((SolrServerException) e).getRootCause().printStackTrace();
}
deleteFails.add(id);
}
}
try {
numAdds++;
indexr("id", id, i1, 50, t1,
"to come to the aid of their country.");
} catch (Exception e) {
addFailed = true;
System.err.println("REQUEST FAILED:");
e.printStackTrace();
if (e instanceof SolrServerException) {
System.err.println("ROOT CAUSE:");
((SolrServerException) e).getRootCause().printStackTrace();
}
addFails.add(id);
}
if (!addFailed && doDeletes && random().nextBoolean()) {
deletes.add(id);
}
try {
Thread.currentThread().sleep(random().nextInt(100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.err.println("added docs:" + numAdds + " with " + (addFails.size() + deleteFails.size()) + " fails"
+ " deletes:" + numDeletes);
}
@Override
public void safeStop() {
stop = true;
}
public Set<String> getAddFails() {
return addFails;
}
public Set<String> getDeleteFails() {
return deleteFails;
}
public int getFailCount() {
return addFails.size() + deleteFails.size();
}
};
class StopableSearchThread extends StopableThread { class StopableSearchThread extends StopableThread {
private volatile boolean stop = false; private volatile boolean stop = false;
protected final AtomicInteger queryFails = new AtomicInteger(); protected final AtomicInteger queryFails = new AtomicInteger();

View File

@ -0,0 +1,185 @@
package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class StopableIndexingThread extends AbstractFullDistribZkTestBase.StopableThread {
private static String t1 = "a_t";
private static String i1 = "a_si";
private volatile boolean stop = false;
protected final String id;
protected final List<String> deletes = new ArrayList<String>();
protected Set<String> addFails = new HashSet<String>();
protected Set<String> deleteFails = new HashSet<String>();
protected boolean doDeletes;
private int numCycles;
private SolrServer controlClient;
private SolrServer cloudClient;
private int numDeletes;
private int numAdds;
public StopableIndexingThread(SolrServer controlClient, SolrServer cloudClient, String id, boolean doDeletes) {
this(controlClient, cloudClient, id, doDeletes, -1);
}
public StopableIndexingThread(SolrServer controlClient, SolrServer cloudClient, String id, boolean doDeletes, int numCycles) {
super("StopableIndexingThread");
this.controlClient = controlClient;
this.cloudClient = cloudClient;
this.id = id;
this.doDeletes = doDeletes;
this.numCycles = numCycles;
setDaemon(true);
}
@Override
public void run() {
int i = 0;
int numDone = 0;
numDeletes = 0;
numAdds = 0;
while (true && !stop) {
if (numCycles != -1) {
if (numDone > numCycles) {
break;
}
}
++numDone;
String id = this.id + "-" + i;
++i;
boolean addFailed = false;
if (doDeletes && AbstractFullDistribZkTestBase.random().nextBoolean() && deletes.size() > 0) {
String delete = deletes.remove(0);
try {
numDeletes++;
if (controlClient != null) {
UpdateRequest req = new UpdateRequest();
req.deleteById(delete);
req.setParam("CONTROL", "TRUE");
req.process(controlClient);
}
cloudClient.deleteById(delete);
} catch (Exception e) {
System.err.println("REQUEST FAILED:");
e.printStackTrace();
if (e instanceof SolrServerException) {
System.err.println("ROOT CAUSE:");
((SolrServerException) e).getRootCause().printStackTrace();
}
deleteFails.add(id);
}
}
try {
numAdds++;
indexr("id", id, i1, 50, t1,
"to come to the aid of their country.");
} catch (Exception e) {
addFailed = true;
System.err.println("REQUEST FAILED:");
e.printStackTrace();
if (e instanceof SolrServerException) {
System.err.println("ROOT CAUSE:");
((SolrServerException) e).getRootCause().printStackTrace();
}
addFails.add(id);
}
if (!addFailed && doDeletes && AbstractFullDistribZkTestBase.random().nextBoolean()) {
deletes.add(id);
}
try {
Thread.currentThread().sleep(AbstractFullDistribZkTestBase.random().nextInt(100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.err.println("added docs:" + numAdds + " with " + (addFails.size() + deleteFails.size()) + " fails"
+ " deletes:" + numDeletes);
}
@Override
public void safeStop() {
stop = true;
}
public Set<String> getAddFails() {
return addFails;
}
public Set<String> getDeleteFails() {
return deleteFails;
}
public int getFailCount() {
return addFails.size() + deleteFails.size();
}
protected void addFields(SolrInputDocument doc, Object... fields) {
for (int i = 0; i < fields.length; i += 2) {
doc.addField((String) (fields[i]), fields[i + 1]);
}
}
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
indexDoc(doc);
}
protected void indexDoc(SolrInputDocument doc) throws IOException,
SolrServerException {
if (controlClient != null) {
UpdateRequest req = new UpdateRequest();
req.add(doc);
req.setParam("CONTROL", "TRUE");
req.process(controlClient);
}
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
ureq.process(cloudClient);
}
public int getNumDeletes() {
return numDeletes;
}
public int getNumAdds() {
return numAdds;
}
}