From f38c82b7a2b4570e8aef8c24d3405a455e80cf7a Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Tue, 4 Mar 2014 03:18:47 +0000 Subject: [PATCH] SOLR-5714: You can now use one pool of memory for for the HDFS block cache that all collections share. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1573847 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../solr/core/HdfsDirectoryFactory.java | 66 +++++-- .../solr/store/blockcache/BlockCache.java | 3 + .../solr/store/blockcache/BlockCacheKey.java | 21 +- .../store/blockcache/BlockCacheLocation.java | 3 + .../solr/store/blockcache/BlockDirectory.java | 25 ++- .../store/blockcache/BlockDirectoryCache.java | 20 +- .../solr/store/blockcache/BlockLocks.java | 3 + .../solr/store/blockcache/BufferStore.java | 4 +- .../apache/solr/store/blockcache/Cache.java | 3 + .../store/blockcache/CachedIndexOutput.java | 3 +- .../blockcache/CustomBufferedIndexInput.java | 3 + .../apache/solr/store/blockcache/Metrics.java | 3 + .../blockcache/ReusedBufferedIndexOutput.java | 3 + .../apache/solr/store/blockcache/Store.java | 3 + .../solr/store/hdfs/HdfsFileReader.java | 3 + .../solr/store/hdfs/HdfsFileWriter.java | 3 + .../solr/store/hdfs/NullIndexOutput.java | 3 + .../solr/collection1/conf/solrconfig-tlog.xml | 1 + .../solr/collection1/conf/solrconfig.xml | 3 +- .../cloud/ChaosMonkeyNothingIsSafeTest.java | 4 +- .../solr/cloud/ChaosMonkeySafeLeaderTest.java | 2 +- .../org/apache/solr/cloud/RecoveryZkTest.java | 6 +- .../apache/solr/cloud/hdfs/HdfsTestUtil.java | 3 + .../HdfsWriteToMultipleCollectionsTest.java | 170 ++++++++++++++++ .../solr/store/blockcache/BlockCacheTest.java | 2 + .../solr/collection1/conf/solrconfig.xml | 3 + .../cloud/AbstractFullDistribZkTestBase.java | 111 +---------- .../solr/cloud/StopableIndexingThread.java | 185 ++++++++++++++++++ 29 files changed, 522 insertions(+), 143 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java create mode 100644 solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index f990f84d160..04430677f41 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -88,6 +88,9 @@ New Features * SOLR-5183: JSON updates now support nested child documents using a "_childDocument_" object key. (Varun Thacker, hossman) +* SOLR-5714: You can now use one pool of memory for for the HDFS block cache + that all collections share. (Mark Miller, Gregory Chanan) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java index 466246ca664..af104c02475 100644 --- a/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java @@ -51,6 +51,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { public static final String BLOCKCACHE_SLAB_COUNT = "solr.hdfs.blockcache.slab.count"; public static final String BLOCKCACHE_DIRECT_MEMORY_ALLOCATION = "solr.hdfs.blockcache.direct.memory.allocation"; public static final String BLOCKCACHE_ENABLED = "solr.hdfs.blockcache.enabled"; + public static final String BLOCKCACHE_GLOBAL = "solr.hdfs.blockcache.global"; public static final String BLOCKCACHE_READ_ENABLED = "solr.hdfs.blockcache.read.enabled"; public static final String BLOCKCACHE_WRITE_ENABLED = "solr.hdfs.blockcache.write.enabled"; @@ -72,6 +73,8 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { private String hdfsDataDir; private String confDir; + + private static BlockCache globalBlockCache; public static Metrics metrics; private static Boolean kerberosInit; @@ -102,6 +105,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { } boolean blockCacheEnabled = params.getBool(BLOCKCACHE_ENABLED, true); + boolean blockCacheGlobal = params.getBool(BLOCKCACHE_GLOBAL, false); // default to false for back compat boolean blockCacheReadEnabled = params.getBool(BLOCKCACHE_READ_ENABLED, true); boolean blockCacheWriteEnabled = params.getBool(BLOCKCACHE_WRITE_ENABLED, true); @@ -117,8 +121,6 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { boolean directAllocation = params.getBool( BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true); - BlockCache blockCache; - int slabSize = numberOfBlocksPerBank * blockSize; LOG.info( "Number of slabs of block cache [{}] with direct memory allocation set to [{}]", @@ -131,22 +133,13 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { int bufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", 128); int bufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 128 * 128); - BufferStore.initNewBuffer(bufferSize, bufferCount); - long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank - * (long) blockSize; - try { - blockCache = new BlockCache(metrics, directAllocation, totalMemory, - slabSize, blockSize); - } catch (OutOfMemoryError e) { - throw new RuntimeException( - "The max direct memory is likely too low. Either increase it (by adding -XX:MaxDirectMemorySize=g -XX:+UseLargePages to your containers startup args)" - + " or disable direct allocation using solr.hdfs.blockcache.direct.memory.allocation=false in solrconfig.xml. If you are putting the block cache on the heap," - + " your java heap size might not be large enough." - + " Failed allocating ~" + totalMemory / 1000000.0 + " MB.", e); - } - Cache cache = new BlockDirectoryCache(blockCache, metrics); + BlockCache blockCache = getBlockDirectoryCache(path, numberOfBlocksPerBank, + blockSize, bankCount, directAllocation, slabSize, + bufferSize, bufferCount, blockCacheGlobal); + + Cache cache = new BlockDirectoryCache(blockCache, path, metrics); HdfsDirectory hdfsDirectory = new HdfsDirectory(new Path(path), conf); - dir = new BlockDirectory("solrcore", hdfsDirectory, cache, null, + dir = new BlockDirectory(path, hdfsDirectory, cache, null, blockCacheReadEnabled, blockCacheWriteEnabled); } else { dir = new HdfsDirectory(new Path(path), conf); @@ -164,6 +157,45 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { } return dir; } + + private BlockCache getBlockDirectoryCache(String path, + int numberOfBlocksPerBank, int blockSize, int bankCount, + boolean directAllocation, int slabSize, int bufferSize, int bufferCount, boolean staticBlockCache) { + if (!staticBlockCache) { + LOG.info("Creating new single instance HDFS BlockCache"); + return createBlockCache(numberOfBlocksPerBank, blockSize, bankCount, directAllocation, slabSize, bufferSize, bufferCount); + } + LOG.info("Creating new global HDFS BlockCache"); + synchronized (HdfsDirectoryFactory.class) { + + if (globalBlockCache == null) { + globalBlockCache = createBlockCache(numberOfBlocksPerBank, blockSize, bankCount, + directAllocation, slabSize, bufferSize, bufferCount); + } + } + return globalBlockCache; + } + + private BlockCache createBlockCache(int numberOfBlocksPerBank, int blockSize, + int bankCount, boolean directAllocation, int slabSize, int bufferSize, + int bufferCount) { + BufferStore.initNewBuffer(bufferSize, bufferCount); + long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank + * (long) blockSize; + + BlockCache blockCache; + try { + blockCache = new BlockCache(metrics, directAllocation, totalMemory, slabSize, blockSize); + } catch (OutOfMemoryError e) { + throw new RuntimeException( + "The max direct memory is likely too low. Either increase it (by adding -XX:MaxDirectMemorySize=g -XX:+UseLargePages to your containers startup args)" + + " or disable direct allocation using solr.hdfs.blockcache.direct.memory.allocation=false in solrconfig.xml. If you are putting the block cache on the heap," + + " your java heap size might not be large enough." + + " Failed allocating ~" + totalMemory / 1000000.0 + " MB.", + e); + } + return blockCache; + } @Override public boolean exists(String path) { diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java index a6cdf64923a..a520c6b6c29 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java @@ -24,6 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import com.googlecode.concurrentlinkedhashmap.EvictionListener; +/** + * @lucene.experimental + */ public class BlockCache { public static final int _128M = 134217728; diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCacheKey.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCacheKey.java index d0daefe0658..cf05c6936bc 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCacheKey.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCacheKey.java @@ -16,12 +16,23 @@ package org.apache.solr.store.blockcache; * See the License for the specific language governing permissions and * limitations under the License. */ - +/** + * @lucene.experimental + */ public class BlockCacheKey implements Cloneable { private long block; private int file; + private String path; + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + public long getBlock() { return block; } @@ -44,9 +55,10 @@ public class BlockCacheKey implements Cloneable { int result = 1; result = prime * result + (int) (block ^ (block >>> 32)); result = prime * result + file; + result = prime * result + ((path == null) ? 0 : path.hashCode()); return result; } - + @Override public boolean equals(Object obj) { if (this == obj) return true; @@ -55,9 +67,12 @@ public class BlockCacheKey implements Cloneable { BlockCacheKey other = (BlockCacheKey) obj; if (block != other.block) return false; if (file != other.file) return false; + if (path == null) { + if (other.path != null) return false; + } else if (!path.equals(other.path)) return false; return true; } - + @Override public BlockCacheKey clone() { try { diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCacheLocation.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCacheLocation.java index 968628f058a..d2a124dda8c 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCacheLocation.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCacheLocation.java @@ -19,6 +19,9 @@ package org.apache.solr.store.blockcache; import java.util.concurrent.atomic.AtomicBoolean; +/** + * @lucene.experimental + */ public class BlockCacheLocation { private int block; diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java index 9982197a574..028fd55aecb 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java @@ -34,6 +34,9 @@ import org.apache.solr.store.hdfs.HdfsDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * @lucene.experimental + */ public class BlockDirectory extends Directory { public static Logger LOG = LoggerFactory.getLogger(BlockDirectory.class); @@ -82,11 +85,11 @@ public class BlockDirectory extends Directory { private Directory directory; private int blockSize; private String dirName; - private Cache cache; + private final Cache cache; private Set blockCacheFileTypes; private final boolean blockCacheReadEnabled; private final boolean blockCacheWriteEnabled; - + public BlockDirectory(String dirName, Directory directory, Cache cache, Set blockCacheFileTypes, boolean blockCacheReadEnabled, boolean blockCacheWriteEnabled) throws IOException { @@ -265,6 +268,15 @@ public class BlockDirectory extends Directory { return dirName + "/" + name; } + /** + * Expert: mostly for tests + * + * @lucene.experimental + */ + public Cache getCache() { + return cache; + } + @Override public void copy(Directory to, String src, String dest, IOContext context) throws IOException { @@ -383,4 +395,13 @@ public class BlockDirectory extends Directory { return directory; } + + public boolean isBlockCacheReadEnabled() { + return blockCacheReadEnabled; + } + + public boolean isBlockCacheWriteEnabled() { + return blockCacheWriteEnabled; + } + } diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectoryCache.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectoryCache.java index 41ca9bb4775..592831b0dad 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectoryCache.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectoryCache.java @@ -21,17 +21,31 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +/** + * @lucene.experimental + */ public class BlockDirectoryCache implements Cache { - private BlockCache blockCache; + private final BlockCache blockCache; private AtomicInteger counter = new AtomicInteger(); private Map names = new ConcurrentHashMap(); + private String path; private Metrics metrics; - public BlockDirectoryCache(BlockCache blockCache, Metrics metrics) { + public BlockDirectoryCache(BlockCache blockCache, String path, Metrics metrics) { this.blockCache = blockCache; + this.path = path; this.metrics = metrics; } + /** + * Expert: mostly for tests + * + * @lucene.experimental + */ + public BlockCache getBlockCache() { + return blockCache; + } + @Override public void delete(String name) { names.remove(name); @@ -46,6 +60,7 @@ public class BlockDirectoryCache implements Cache { names.put(name, file); } BlockCacheKey blockCacheKey = new BlockCacheKey(); + blockCacheKey.setPath(path); blockCacheKey.setBlock(blockId); blockCacheKey.setFile(file); blockCache.store(blockCacheKey, blockOffset, buffer, offset, length); @@ -59,6 +74,7 @@ public class BlockDirectoryCache implements Cache { return false; } BlockCacheKey blockCacheKey = new BlockCacheKey(); + blockCacheKey.setPath(path); blockCacheKey.setBlock(blockId); blockCacheKey.setFile(file); boolean fetch = blockCache.fetch(blockCacheKey, b, blockOffset, off, diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockLocks.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockLocks.java index e91ffb2ab4b..ba696506362 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockLocks.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockLocks.java @@ -21,6 +21,9 @@ import java.util.concurrent.atomic.AtomicLongArray; import org.apache.lucene.util.LongBitSet; +/** + * @lucene.experimental + */ public class BlockLocks { private AtomicLongArray bits; diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java b/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java index 3e637d59d0b..f54b2757041 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java @@ -22,7 +22,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; - +/** + * @lucene.experimental + */ public class BufferStore implements Store { private static final Store EMPTY = new Store() { diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/Cache.java b/solr/core/src/java/org/apache/solr/store/blockcache/Cache.java index 7e70ad0a775..dafa4ffcd9d 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/Cache.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/Cache.java @@ -17,6 +17,9 @@ package org.apache.solr.store.blockcache; * limitations under the License. */ +/** + * @lucene.experimental + */ public interface Cache { /** diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/CachedIndexOutput.java b/solr/core/src/java/org/apache/solr/store/blockcache/CachedIndexOutput.java index 6e3c92ee1ac..858214cf83b 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/CachedIndexOutput.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/CachedIndexOutput.java @@ -21,10 +21,11 @@ import java.io.IOException; import org.apache.lucene.store.IndexOutput; -/* +/** * Cache the blocks as they are written. The cache file name is the name of * the file until the file is closed, at which point the cache is updated * to include the last modified date (which is unknown until that point). + * @lucene.experimental */ public class CachedIndexOutput extends ReusedBufferedIndexOutput { private final BlockDirectory directory; diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java b/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java index be8f260b902..aa79fb99804 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java @@ -23,6 +23,9 @@ import java.io.IOException; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +/** + * @lucene.experimental + */ public abstract class CustomBufferedIndexInput extends IndexInput { public static final int BUFFER_SIZE = 32768; diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/Metrics.java b/solr/core/src/java/org/apache/solr/store/blockcache/Metrics.java index fce1b9d9a73..052e70442f8 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/Metrics.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/Metrics.java @@ -29,6 +29,9 @@ import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.Updater; import org.apache.hadoop.metrics.jvm.JvmMetrics; +/** + * @lucene.experimental + */ public class Metrics implements Updater { public static class MethodCall { diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java b/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java index 6b12c982e44..92018fce7af 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java @@ -21,6 +21,9 @@ import java.io.IOException; import org.apache.lucene.store.IndexOutput; +/** + * @lucene.experimental + */ public abstract class ReusedBufferedIndexOutput extends IndexOutput { public static final int BUFFER_SIZE = 1024; diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/Store.java b/solr/core/src/java/org/apache/solr/store/blockcache/Store.java index 3a491b3db83..8fb4e48cf38 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/Store.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/Store.java @@ -17,6 +17,9 @@ package org.apache.solr.store.blockcache; * limitations under the License. */ +/** + * @lucene.experimental + */ public interface Store { byte[] takeBuffer(int bufferSize); diff --git a/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileReader.java b/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileReader.java index 8a537935ea0..0294496c097 100644 --- a/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileReader.java +++ b/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileReader.java @@ -28,6 +28,9 @@ import org.apache.lucene.store.DataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * @lucene.experimental + */ public class HdfsFileReader extends DataInput { public static Logger LOG = LoggerFactory.getLogger(HdfsFileReader.class); diff --git a/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java b/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java index 459a6d13fe9..d73e353a71e 100644 --- a/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java +++ b/solr/core/src/java/org/apache/solr/store/hdfs/HdfsFileWriter.java @@ -32,6 +32,9 @@ import org.apache.lucene.store.DataOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * @lucene.experimental + */ public class HdfsFileWriter extends DataOutput implements Closeable { public static Logger LOG = LoggerFactory.getLogger(HdfsFileWriter.class); diff --git a/solr/core/src/java/org/apache/solr/store/hdfs/NullIndexOutput.java b/solr/core/src/java/org/apache/solr/store/hdfs/NullIndexOutput.java index 044687c41d4..942dfd73f4f 100644 --- a/solr/core/src/java/org/apache/solr/store/hdfs/NullIndexOutput.java +++ b/solr/core/src/java/org/apache/solr/store/hdfs/NullIndexOutput.java @@ -21,6 +21,9 @@ import java.io.IOException; import org.apache.lucene.store.IndexOutput; +/** + * @lucene.experimental + */ public class NullIndexOutput extends IndexOutput { private long pos; diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml index 22c5b3ff57b..95a57ab23dd 100644 --- a/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml @@ -28,6 +28,7 @@ ${solr.hdfs.blockcache.blocksperbank:1024} ${solr.hdfs.home:} ${solr.hdfs.confdir:} + ${solr.hdfs.blockcache.global:false} ${solr.data.dir:} diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml index d2413b09654..5fe25d7db76 100644 --- a/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml @@ -46,7 +46,8 @@ 3000000 4000000 ${solr.hdfs.home:} - ${solr.hdfs.blockcache.enabled:true} + ${solr.hdfs.blockcache.enabled:true} + ${solr.hdfs.blockcache.global:false} ${tests.luceneMatchVersion:LUCENE_CURRENT} diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java index 8650f216431..ba0f0817843 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java @@ -131,7 +131,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase int threadCount = 1; int i = 0; for (i = 0; i < threadCount; i++) { - StopableIndexingThread indexThread = new StopableIndexingThread(Integer.toString(i), true); + StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true); threads.add(indexThread); indexThread.start(); } @@ -270,7 +270,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase public FullThrottleStopableIndexingThread(List clients, String id, boolean doDeletes) { - super(id, doDeletes); + super(controlClient, cloudClient, id, doDeletes); setName("FullThrottleStopableIndexingThread"); setDaemon(true); this.clients = clients; diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java index 98353476ff0..19e40bfd2db 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java @@ -108,7 +108,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase { List threads = new ArrayList(); int threadCount = 2; for (int i = 0; i < threadCount; i++) { - StopableIndexingThread indexThread = new StopableIndexingThread(Integer.toString(i), true); + StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true); threads.add(indexThread); indexThread.start(); } diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java index 6bb1328cfbd..a15a021706a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java @@ -66,10 +66,10 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase { int maxDoc = maxDocList[random().nextInt(maxDocList.length - 1)]; - indexThread = new StopableIndexingThread("1", true, maxDoc); + indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc); indexThread.start(); - indexThread2 = new StopableIndexingThread("2", true, maxDoc); + indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc); indexThread2.start(); @@ -100,7 +100,7 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase { Thread.sleep(1000); - waitForThingsToLevelOut(45); + waitForThingsToLevelOut(90); Thread.sleep(2000); diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java index 1788aa715ad..6dae9b555ce 100644 --- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java +++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTestUtil.java @@ -64,6 +64,8 @@ public class HdfsTestUtil { System.setProperty("solr.hdfs.home", "/solr_hdfs_home"); + System.setProperty("solr.hdfs.blockcache.global", Boolean.toString(LuceneTestCase.random().nextBoolean())); + final MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null); dfsCluster.waitActive(); @@ -92,6 +94,7 @@ public class HdfsTestUtil { System.clearProperty("test.build.data"); System.clearProperty("test.cache.data"); System.clearProperty("solr.hdfs.home"); + System.clearProperty("solr.hdfs.blockcache.global"); if (dfsCluster != null) { timers.remove(dfsCluster); dfsCluster.shutdown(); diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java new file mode 100644 index 00000000000..5a737826566 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsWriteToMultipleCollectionsTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.hdfs; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.store.NRTCachingDirectory; +import org.apache.lucene.util.LuceneTestCase.Nightly; +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrServer; +import org.apache.solr.cloud.BasicDistributedZkTest; +import org.apache.solr.cloud.StopableIndexingThread; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.HdfsDirectoryFactory; +import org.apache.solr.core.SolrCore; +import org.apache.solr.servlet.SolrDispatchFilter; +import org.apache.solr.store.blockcache.BlockCache; +import org.apache.solr.store.blockcache.BlockDirectory; +import org.apache.solr.store.blockcache.BlockDirectoryCache; +import org.apache.solr.store.blockcache.Cache; +import org.apache.solr.util.RefCounted; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope; + +@Slow +@Nightly +@ThreadLeakScope(Scope.NONE) // hdfs client currently leaks thread(s) +public class HdfsWriteToMultipleCollectionsTest extends BasicDistributedZkTest { + private static final String SOLR_HDFS_HOME = "solr.hdfs.home"; + private static final String SOLR_HDFS_BLOCKCACHE_GLOBAL = "solr.hdfs.blockcache.global"; + private static final String ACOLLECTION = "acollection"; + private static MiniDFSCluster dfsCluster; + + @BeforeClass + public static void setupClass() throws Exception { + schemaString = "schema15.xml"; // we need a string id + dfsCluster = HdfsTestUtil.setupClass(new File(TEMP_DIR, + HdfsBasicDistributedZk2Test.class.getName() + "_" + + System.currentTimeMillis()).getAbsolutePath()); + System.setProperty(SOLR_HDFS_HOME, dfsCluster.getURI().toString() + "/solr"); + } + + @AfterClass + public static void teardownClass() throws Exception { + HdfsTestUtil.teardownClass(dfsCluster); + System.clearProperty(SOLR_HDFS_HOME); + dfsCluster = null; + } + + @Override + protected String getDataDir(String dataDir) throws IOException { + return HdfsTestUtil.getDataDir(dfsCluster, dataDir); + } + + public HdfsWriteToMultipleCollectionsTest() { + super(); + sliceCount = 1; + shardCount = 3; + } + + protected String getSolrXml() { + return "solr-no-core.xml"; + } + + @Override + public void doTest() throws Exception { + int docCount = random().nextInt(1313) + 1; + int cnt = random().nextInt(4) + 1; + for (int i = 0; i < cnt; i++) { + createCollection(ACOLLECTION + i, 2, 2, 9); + } + for (int i = 0; i < cnt; i++) { + waitForRecoveriesToFinish(ACOLLECTION + i, false); + } + List cloudServers = new ArrayList(); + List threads = new ArrayList(); + for (int i = 0; i < cnt; i++) { + CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress()); + server.setDefaultCollection(ACOLLECTION + i); + cloudServers.add(server); + StopableIndexingThread indexThread = new StopableIndexingThread(null, server, "1", true, docCount); + threads.add(indexThread); + indexThread.start(); + } + + int addCnt = 0; + for (StopableIndexingThread thread : threads) { + thread.join(); + addCnt += thread.getNumAdds() - thread.getNumDeletes(); + } + + long collectionsCount = 0; + for (CloudSolrServer server : cloudServers) { + server.commit(); + collectionsCount += server.query(new SolrQuery("*:*")).getResults().getNumFound(); + } + + for (CloudSolrServer server : cloudServers) { + server.shutdown(); + } + + assertEquals(addCnt, collectionsCount); + + BlockCache lastBlockCache = null; + // assert that we are using the block directory and that write and read caching are being used + for (JettySolrRunner jetty : jettys) { + CoreContainer cores = ((SolrDispatchFilter) jetty.getDispatchFilter() + .getFilter()).getCores(); + Collection solrCores = cores.getCores(); + for (SolrCore core : solrCores) { + if (core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + .startsWith(ACOLLECTION)) { + assertTrue(core.getDirectoryFactory() instanceof HdfsDirectoryFactory); + RefCounted iwRef = core.getUpdateHandler() + .getSolrCoreState().getIndexWriter(core); + try { + IndexWriter iw = iwRef.get(); + NRTCachingDirectory directory = (NRTCachingDirectory) iw + .getDirectory(); + BlockDirectory blockDirectory = (BlockDirectory) directory + .getDelegate(); + assertTrue(blockDirectory.isBlockCacheReadEnabled()); + assertTrue(blockDirectory.isBlockCacheWriteEnabled()); + Cache cache = blockDirectory.getCache(); + // we know its a BlockDirectoryCache, but future proof + assertTrue(cache instanceof BlockDirectoryCache); + BlockCache blockCache = ((BlockDirectoryCache) cache) + .getBlockCache(); + if (lastBlockCache != null) { + if (Boolean.getBoolean(SOLR_HDFS_BLOCKCACHE_GLOBAL)) { + assertEquals(lastBlockCache, blockCache); + } else { + assertNotSame(lastBlockCache, blockCache); + } + } + lastBlockCache = blockCache; + } finally { + iwRef.decref(); + } + } + } + } + } +} diff --git a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java index 70fd813aead..bc5e75c844a 100644 --- a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java +++ b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java @@ -51,6 +51,7 @@ public class BlockCacheTest extends LuceneTestCase { int file = 0; blockCacheKey.setBlock(block); blockCacheKey.setFile(file); + blockCacheKey.setPath("/"); if (blockCache.fetch(blockCacheKey, buffer)) { hitsInCache.incrementAndGet(); @@ -91,6 +92,7 @@ public class BlockCacheTest extends LuceneTestCase { BlockCacheKey blockCacheKey = new BlockCacheKey(); blockCacheKey.setBlock(0); blockCacheKey.setFile(0); + blockCacheKey.setPath("/"); byte[] newData = new byte[blockSize*3]; byte[] testData = testData(random, blockSize, newData); diff --git a/solr/example/solr/collection1/conf/solrconfig.xml b/solr/example/solr/collection1/conf/solrconfig.xml index 3126c21d74d..192cbcf24bc 100755 --- a/solr/example/solr/collection1/conf/solrconfig.xml +++ b/solr/example/solr/collection1/conf/solrconfig.xml @@ -129,6 +129,9 @@ ${solr.hdfs.confdir:} ${solr.hdfs.blockcache.enabled:true} + + ${solr.hdfs.blockcache.global:true} diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index e3f193bbe71..66a3adf468a 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -1428,122 +1428,13 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes return rsp; } - abstract class StopableThread extends Thread { + static abstract class StopableThread extends Thread { public StopableThread(String name) { super(name); } public abstract void safeStop(); } - class StopableIndexingThread extends StopableThread { - private volatile boolean stop = false; - protected final String id; - protected final List deletes = new ArrayList(); - protected Set addFails = new HashSet(); - protected Set deleteFails = new HashSet(); - protected boolean doDeletes; - private int numCycles; - - public StopableIndexingThread(String id, boolean doDeletes) { - this(id, doDeletes, -1); - } - - public StopableIndexingThread(String id, boolean doDeletes, int numCycles) { - super("StopableIndexingThread"); - this.id = id; - this.doDeletes = doDeletes; - this.numCycles = numCycles; - setDaemon(true); - } - - @Override - public void run() { - int i = 0; - int numDone = 0; - int numDeletes = 0; - int numAdds = 0; - - while (true && !stop) { - if (numCycles != -1) { - if (numDone > numCycles) { - break; - } - } - ++numDone; - String id = this.id + "-" + i; - ++i; - boolean addFailed = false; - - if (doDeletes && random().nextBoolean() && deletes.size() > 0) { - String delete = deletes.remove(0); - try { - numDeletes++; - UpdateRequest req = new UpdateRequest(); - req.deleteById(delete); - req.setParam("CONTROL", "TRUE"); - req.process(controlClient); - - cloudClient.deleteById(delete); - } catch (Exception e) { - System.err.println("REQUEST FAILED:"); - e.printStackTrace(); - if (e instanceof SolrServerException) { - System.err.println("ROOT CAUSE:"); - ((SolrServerException) e).getRootCause().printStackTrace(); - } - deleteFails.add(id); - } - } - - try { - numAdds++; - indexr("id", id, i1, 50, t1, - "to come to the aid of their country."); - } catch (Exception e) { - addFailed = true; - System.err.println("REQUEST FAILED:"); - e.printStackTrace(); - if (e instanceof SolrServerException) { - System.err.println("ROOT CAUSE:"); - ((SolrServerException) e).getRootCause().printStackTrace(); - } - addFails.add(id); - } - - if (!addFailed && doDeletes && random().nextBoolean()) { - deletes.add(id); - } - - try { - Thread.currentThread().sleep(random().nextInt(100)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - System.err.println("added docs:" + numAdds + " with " + (addFails.size() + deleteFails.size()) + " fails" - + " deletes:" + numDeletes); - } - - @Override - public void safeStop() { - stop = true; - } - - public Set getAddFails() { - return addFails; - } - - public Set getDeleteFails() { - return deleteFails; - } - - public int getFailCount() { - return addFails.size() + deleteFails.size(); - } - - }; - class StopableSearchThread extends StopableThread { private volatile boolean stop = false; protected final AtomicInteger queryFails = new AtomicInteger(); diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java b/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java new file mode 100644 index 00000000000..8446f086849 --- /dev/null +++ b/solr/test-framework/src/java/org/apache/solr/cloud/StopableIndexingThread.java @@ -0,0 +1,185 @@ +package org.apache.solr.cloud; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class StopableIndexingThread extends AbstractFullDistribZkTestBase.StopableThread { + private static String t1 = "a_t"; + private static String i1 = "a_si"; + private volatile boolean stop = false; + protected final String id; + protected final List deletes = new ArrayList(); + protected Set addFails = new HashSet(); + protected Set deleteFails = new HashSet(); + protected boolean doDeletes; + private int numCycles; + private SolrServer controlClient; + private SolrServer cloudClient; + private int numDeletes; + private int numAdds; + + public StopableIndexingThread(SolrServer controlClient, SolrServer cloudClient, String id, boolean doDeletes) { + this(controlClient, cloudClient, id, doDeletes, -1); + } + + public StopableIndexingThread(SolrServer controlClient, SolrServer cloudClient, String id, boolean doDeletes, int numCycles) { + super("StopableIndexingThread"); + this.controlClient = controlClient; + this.cloudClient = cloudClient; + this.id = id; + this.doDeletes = doDeletes; + this.numCycles = numCycles; + setDaemon(true); + } + + @Override + public void run() { + int i = 0; + int numDone = 0; + numDeletes = 0; + numAdds = 0; + + while (true && !stop) { + if (numCycles != -1) { + if (numDone > numCycles) { + break; + } + } + ++numDone; + String id = this.id + "-" + i; + ++i; + boolean addFailed = false; + + if (doDeletes && AbstractFullDistribZkTestBase.random().nextBoolean() && deletes.size() > 0) { + String delete = deletes.remove(0); + try { + numDeletes++; + if (controlClient != null) { + UpdateRequest req = new UpdateRequest(); + req.deleteById(delete); + req.setParam("CONTROL", "TRUE"); + req.process(controlClient); + } + + cloudClient.deleteById(delete); + } catch (Exception e) { + System.err.println("REQUEST FAILED:"); + e.printStackTrace(); + if (e instanceof SolrServerException) { + System.err.println("ROOT CAUSE:"); + ((SolrServerException) e).getRootCause().printStackTrace(); + } + deleteFails.add(id); + } + } + + try { + numAdds++; + indexr("id", id, i1, 50, t1, + "to come to the aid of their country."); + } catch (Exception e) { + addFailed = true; + System.err.println("REQUEST FAILED:"); + e.printStackTrace(); + if (e instanceof SolrServerException) { + System.err.println("ROOT CAUSE:"); + ((SolrServerException) e).getRootCause().printStackTrace(); + } + addFails.add(id); + } + + if (!addFailed && doDeletes && AbstractFullDistribZkTestBase.random().nextBoolean()) { + deletes.add(id); + } + + try { + Thread.currentThread().sleep(AbstractFullDistribZkTestBase.random().nextInt(100)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + System.err.println("added docs:" + numAdds + " with " + (addFails.size() + deleteFails.size()) + " fails" + + " deletes:" + numDeletes); + } + + @Override + public void safeStop() { + stop = true; + } + + public Set getAddFails() { + return addFails; + } + + public Set getDeleteFails() { + return deleteFails; + } + + public int getFailCount() { + return addFails.size() + deleteFails.size(); + } + + protected void addFields(SolrInputDocument doc, Object... fields) { + for (int i = 0; i < fields.length; i += 2) { + doc.addField((String) (fields[i]), fields[i + 1]); + } + } + + protected void indexr(Object... fields) throws Exception { + SolrInputDocument doc = new SolrInputDocument(); + addFields(doc, fields); + addFields(doc, "rnd_b", true); + indexDoc(doc); + } + + protected void indexDoc(SolrInputDocument doc) throws IOException, + SolrServerException { + + if (controlClient != null) { + UpdateRequest req = new UpdateRequest(); + req.add(doc); + req.setParam("CONTROL", "TRUE"); + req.process(controlClient); + } + + + UpdateRequest ureq = new UpdateRequest(); + ureq.add(doc); + ureq.process(cloudClient); + } + + public int getNumDeletes() { + return numDeletes; + } + + public int getNumAdds() { + return numAdds; + } + +} \ No newline at end of file