SOLR-6089: When using the HDFS block cache, when a file is deleted, it's underlying data entries in the block cache are not removed, which is a problem with the global block cache option.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1619427 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-08-21 15:03:29 +00:00
parent 85d282a245
commit 8e69f04527
14 changed files with 150 additions and 35 deletions

View File

@ -302,6 +302,10 @@ Bug Fixes
* SOLR-6268: HdfsUpdateLog has a race condition that can expose a closed HDFS FileSystem instance and should
close it's FileSystem instance if either inherited close method is called. (Mark Miller)
* SOLR-6089: When using the HDFS block cache, when a file is deleted, it's underlying data entries in the
block cache are not removed, which is a problem with the global block cache option.
(Mark Miller, Patrick Hunt)
Optimizations
---------------------

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@ -65,6 +66,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.hadoop.hack.MiniMRClientCluster;
@ -128,6 +130,10 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
@BeforeClass
public static void setupClass() throws Exception {
System.setProperty("solr.hdfs.blockcache.global", Boolean.toString(LuceneTestCase.random().nextBoolean()));
System.setProperty("solr.hdfs.blockcache.enabled", Boolean.toString(LuceneTestCase.random().nextBoolean()));
System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
solrHomeDirectory = createTempDir();
assumeTrue(
"Currently this test can only be run without the lucene test security policy in place",
@ -156,8 +162,6 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
int taskTrackers = 2;
int dataNodes = 2;
System.setProperty("solr.hdfs.blockcache.enabled", "false");
JobConf conf = new JobConf();
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
@ -218,6 +222,8 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
@AfterClass
public static void teardownClass() throws Exception {
System.clearProperty("solr.hdfs.blockcache.global");
System.clearProperty("solr.hdfs.blockcache.blocksperbank");
System.clearProperty("solr.hdfs.blockcache.enabled");
System.clearProperty("hadoop.log.dir");
System.clearProperty("test.build.dir");
@ -627,8 +633,65 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
checkConsistency(replicatedCollection);
assertEquals(RECORD_COUNT, executeSolrQuery(cloudClient, "*:*").size());
}
}
// delete collection
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.DELETE.toString());
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
params.set(CoreAdminParams.DELETE_DATA_DIR, true);
params.set(CoreAdminParams.DELETE_INDEX, true);
params.set("name", replicatedCollection);
QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
cloudClient.request(request);
long timeout = System.currentTimeMillis() + 10000;
while (cloudClient.getZkStateReader().getClusterState().hasCollection(replicatedCollection)) {
if (System.currentTimeMillis() > timeout) {
throw new AssertionError("Timeout waiting to see removed collection leave clusterstate");
}
Thread.sleep(200);
cloudClient.getZkStateReader().updateClusterState(true);
}
if (TEST_NIGHTLY) {
createCollection(replicatedCollection, 11, 3, 11);
} else {
createCollection(replicatedCollection, 2, 3, 2);
}
waitForRecoveriesToFinish(replicatedCollection, false);
printLayout();
assertEquals(0, executeSolrQuery(cloudClient, "*:*").getNumFound());
args = new String[] {
"--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
"--output-dir=" + outDir.toString(),
"--shards", "2",
"--mappers=3",
"--verbose",
"--go-live",
"--go-live-threads", Integer.toString(random().nextInt(15) + 1), dataDir.toString()
};
args = prependInitialArgs(args);
argList = new ArrayList<>();
getShardUrlArgs(argList, replicatedCollection);
args = concat(args, argList.toArray(new String[0]));
tool = new MapReduceIndexerTool();
res = ToolRunner.run(jobConf, tool, args);
assertEquals(0, res);
assertTrue(tool.job.isComplete());
assertTrue(tool.job.isSuccessful());
checkConsistency(replicatedCollection);
assertEquals(RECORD_COUNT, executeSolrQuery(cloudClient, "*:*").size());
}
private void getShardUrlArgs(List<String> args) {

View File

@ -130,6 +130,7 @@
<bool name="solr.hdfs.nrtcachingdirectory.enable">${solr.hdfs.nrtcachingdirectory.enable:true}</bool>
<int name="solr.hdfs.nrtcachingdirectory.maxmergesizemb">${solr.hdfs.nrtcachingdirectory.maxmergesizemb:16}</int>
<int name="solr.hdfs.nrtcachingdirectory.maxcachedmb">${solr.hdfs.nrtcachingdirectory.maxcachedmb:192}</int>
<str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
</directoryFactory>
<!-- The CodecFactory for defining the format of the inverted index.

View File

@ -133,6 +133,7 @@
<bool name="solr.hdfs.nrtcachingdirectory.enable">${solr.hdfs.nrtcachingdirectory.enable:true}</bool>
<int name="solr.hdfs.nrtcachingdirectory.maxmergesizemb">${solr.hdfs.nrtcachingdirectory.maxmergesizemb:16}</int>
<int name="solr.hdfs.nrtcachingdirectory.maxcachedmb">${solr.hdfs.nrtcachingdirectory.maxcachedmb:192}</int>
<str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
</directoryFactory>
<!-- The CodecFactory for defining the format of the inverted index.

View File

@ -139,11 +139,11 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
int bufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", 128);
int bufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 128 * 128);
BlockCache blockCache = getBlockDirectoryCache(path, numberOfBlocksPerBank,
BlockCache blockCache = getBlockDirectoryCache(numberOfBlocksPerBank,
blockSize, bankCount, directAllocation, slabSize,
bufferSize, bufferCount, blockCacheGlobal);
Cache cache = new BlockDirectoryCache(blockCache, path, metrics);
Cache cache = new BlockDirectoryCache(blockCache, path, metrics, blockCacheGlobal);
HdfsDirectory hdfsDirectory = new HdfsDirectory(new Path(path), conf);
dir = new BlockDirectory(path, hdfsDirectory, cache, null,
blockCacheReadEnabled, blockCacheWriteEnabled);
@ -164,17 +164,16 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
return dir;
}
private BlockCache getBlockDirectoryCache(String path,
int numberOfBlocksPerBank, int blockSize, int bankCount,
private BlockCache getBlockDirectoryCache(int numberOfBlocksPerBank, int blockSize, int bankCount,
boolean directAllocation, int slabSize, int bufferSize, int bufferCount, boolean staticBlockCache) {
if (!staticBlockCache) {
LOG.info("Creating new single instance HDFS BlockCache");
return createBlockCache(numberOfBlocksPerBank, blockSize, bankCount, directAllocation, slabSize, bufferSize, bufferCount);
}
LOG.info("Creating new global HDFS BlockCache");
synchronized (HdfsDirectoryFactory.class) {
if (globalBlockCache == null) {
LOG.info("Creating new global HDFS BlockCache");
globalBlockCache = createBlockCache(numberOfBlocksPerBank, blockSize, bankCount,
directAllocation, slabSize, bufferSize, bufferCount);
}

View File

@ -80,6 +80,10 @@ public class BlockCache {
this.blockSize = blockSize;
}
public void release(BlockCacheKey key) {
releaseLocation(cache.get(key));
}
private void releaseLocation(BlockCacheLocation location) {
if (location == null) {
return;

View File

@ -80,19 +80,27 @@ public class BlockDirectory extends Directory {
@Override
public void renameCacheFile(String source, String dest) {}
@Override
public void releaseResources() {}
};
private Directory directory;
private int blockSize;
private String dirName;
private final Directory directory;
private final int blockSize;
private final String dirName;
private final Cache cache;
private Set<String> blockCacheFileTypes;
private final Set<String> blockCacheFileTypes;
private final boolean blockCacheReadEnabled;
private final boolean blockCacheWriteEnabled;
public BlockDirectory(String dirName, Directory directory, Cache cache,
Set<String> blockCacheFileTypes, boolean blockCacheReadEnabled,
boolean blockCacheWriteEnabled) throws IOException {
this(dirName, directory, cache, blockCacheFileTypes, blockCacheReadEnabled, blockCacheWriteEnabled, false);
}
public BlockDirectory(String dirName, Directory directory, Cache cache,
Set<String> blockCacheFileTypes, boolean blockCacheReadEnabled,
boolean blockCacheWriteEnabled, boolean releaseBlocksOnClose) throws IOException {
this.dirName = dirName;
this.directory = directory;
blockSize = BLOCK_SIZE;
@ -233,7 +241,10 @@ public class BlockDirectory extends Directory {
for (String file : files) {
cache.delete(getFileCacheName(file));
}
// segments.gen won't be removed above
cache.delete(dirName + "/" + "segments.gen");
cache.releaseResources();
} catch (FileNotFoundException e) {
// the local file system folder may be gone
} finally {

View File

@ -17,7 +17,10 @@ package org.apache.solr.store.blockcache;
* limitations under the License.
*/
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -26,15 +29,23 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class BlockDirectoryCache implements Cache {
private final BlockCache blockCache;
private AtomicInteger counter = new AtomicInteger();
private Map<String,Integer> names = new ConcurrentHashMap<>();
private String path;
private Metrics metrics;
private final AtomicInteger counter = new AtomicInteger();
private final Map<String,Integer> names = new ConcurrentHashMap<>();
private Set<BlockCacheKey> keys;
private final String path;
private final Metrics metrics;
public BlockDirectoryCache(BlockCache blockCache, String path, Metrics metrics) {
this(blockCache, path, metrics, false);
}
public BlockDirectoryCache(BlockCache blockCache, String path, Metrics metrics, boolean releaseBlocks) {
this.blockCache = blockCache;
this.path = path;
this.metrics = metrics;
if (releaseBlocks) {
keys = Collections.synchronizedSet(new HashSet<BlockCacheKey>());
}
}
/**
@ -64,6 +75,9 @@ public class BlockDirectoryCache implements Cache {
blockCacheKey.setBlock(blockId);
blockCacheKey.setFile(file);
blockCache.store(blockCacheKey, blockOffset, buffer, offset, length);
if (keys != null) {
keys.add(blockCacheKey);
}
}
@Override
@ -100,4 +114,13 @@ public class BlockDirectoryCache implements Cache {
names.put(dest, file);
}
}
@Override
public void releaseResources() {
if (keys != null) {
for (BlockCacheKey key : keys) {
blockCache.release(key);
}
}
}
}

View File

@ -61,5 +61,10 @@ public interface Cache {
* final name
*/
void renameCacheFile(String source, String dest);
/**
* Release any resources associated with the cache.
*/
void releaseResources();
}

View File

@ -172,14 +172,6 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
numShardsNumReplicas.add(1);
checkForCollection("testcollection",numShardsNumReplicas, null);
}
private void randomlyEnableAutoSoftCommit() {
if (r.nextBoolean()) {
enableAutoSoftCommit(1000);
} else {
log.info("Not turning on auto soft commit");
}
}
// skip the randoms - they can deadlock...
@Override

View File

@ -13,7 +13,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
/*

View File

@ -90,6 +90,8 @@ public class StressHdfsTest extends BasicDistributedZkTest {
@Override
public void doTest() throws Exception {
randomlyEnableAutoSoftCommit();
int cnt = random().nextInt(2) + 1;
for (int i = 0; i < cnt; i++) {
createAndDeleteCollection();
@ -161,19 +163,19 @@ public class StressHdfsTest extends BasicDistributedZkTest {
int i = 0;
for (SolrServer client : clients) {
HttpSolrServer c = new HttpSolrServer(getBaseUrl(client)
+ "/delete_data_dir");
HttpSolrServer c = new HttpSolrServer(getBaseUrl(client) + "/" + DELETE_DATA_DIR_COLLECTION);
try {
c.add(getDoc("id", i++));
if (random().nextBoolean()) c.add(getDoc("id", i++));
if (random().nextBoolean()) c.add(getDoc("id", i++));
int docCnt = random().nextInt(1000) + 1;
for (int j = 0; j < docCnt; j++) {
c.add(getDoc("id", i++, "txt_t", "just some random text for a doc"));
}
if (random().nextBoolean()) {
c.commit();
} else {
c.commit(true, true, true);
}
c.query(new SolrQuery("id:" + i));
c.setConnectionTimeout(30000);
NamedList<Object> response = c.query(
new SolrQuery().setRequestHandler("/admin/system")).getResponse();
@ -192,6 +194,10 @@ public class StressHdfsTest extends BasicDistributedZkTest {
assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
}
cloudClient.commit();
cloudClient.query(new SolrQuery("*:*"));
// delete collection
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.DELETE.toString());
params.set("name", DELETE_DATA_DIR_COLLECTION);

View File

@ -22,17 +22,13 @@ import java.io.IOException;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.store.hdfs.HdfsDirectory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -89,6 +85,9 @@ public class BlockDirectoryTest extends SolrTestCaseJ4 {
@Override
public void renameCacheFile(String source, String dest) {
}
@Override
public void releaseResources() {}
}
private static final int MAX_NUMBER_OF_WRITES = 10000;

View File

@ -1092,6 +1092,14 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
}
protected void randomlyEnableAutoSoftCommit() {
if (r.nextBoolean()) {
enableAutoSoftCommit(1000);
} else {
log.info("Not turning on auto soft commit");
}
}
protected void enableAutoSoftCommit(int time) {
log.info("Turning on auto soft commit: " + time);
for (List<CloudJettyRunner> jettyList : shardToJetty.values()) {