SOLR-4506: Clean-up old (unused) index directories in the background after initializing a new index

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1683601 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2015-06-04 17:36:43 +00:00
parent ae5a65e4b9
commit 44d4a8adb2
7 changed files with 404 additions and 39 deletions

View File

@ -100,6 +100,10 @@ Bug Fixes
* SOLR-7518: New Facet Module should respect shards.tolerant and process all non-failing shards
instead of throwing an exception. (yonik)
* SOLR-4506: Clean-up old (unused) index directories in the background after initializing a new index;
previously, Solr would leave old index.yyyyMMddHHmmssSSS directories left behind after failed recoveries
in the data directory, which unnecessarily consumes disk space. (Mark Miller, Timothy Potter)
Optimizations
----------------------

View File

@ -500,4 +500,15 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
return livePaths;
}
@Override
protected boolean deleteOldIndexDirectory(String oldDirPath) throws IOException {
Set<String> livePaths = getLivePaths();
if (livePaths.contains(oldDirPath)) {
log.warn("Cannot delete directory {} as it is still being referenced in the cache!", oldDirPath);
return false;
}
return super.deleteOldIndexDirectory(oldDirPath);
}
}

View File

@ -19,10 +19,12 @@ package org.apache.solr.core;
import java.io.Closeable;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
@ -45,6 +47,8 @@ public abstract class DirectoryFactory implements NamedListInitializedPlugin,
// A large estimate should currently have no other side effects.
public static final IOContext IOCONTEXT_NO_CACHE = new IOContext(new FlushInfo(10*1000*1000, 100L*1000*1000*1000));
protected static final String INDEX_W_TIMESTAMP_REGEX = "index\\.[0-9]{17}"; // see SnapShooter.DATE_FMT
// hint about what the directory contains - default is index directory
public enum DirContext {DEFAULT, META_DATA}
@ -271,4 +275,48 @@ public abstract class DirectoryFactory implements NamedListInitializedPlugin,
public Collection<SolrInfoMBean> offerMBeans() {
return Collections.emptySet();
}
public void cleanupOldIndexDirectories(final String dataDirPath, final String currentIndexDirPath) {
File dataDir = new File(dataDirPath);
if (!dataDir.isDirectory()) {
log.warn("{} does not point to a valid data directory; skipping clean-up of old index directories.", dataDirPath);
return;
}
final File currentIndexDir = new File(currentIndexDirPath);
File[] oldIndexDirs = dataDir.listFiles(new FileFilter() {
@Override
public boolean accept(File file) {
String fileName = file.getName();
return file.isDirectory() &&
!file.equals(currentIndexDir) &&
(fileName.equals("index") || fileName.matches(INDEX_W_TIMESTAMP_REGEX));
}
});
if (oldIndexDirs == null || oldIndexDirs.length == 0)
return; // nothing to do (no log message needed)
log.info("Found {} old index directories to clean-up under {}", oldIndexDirs.length, dataDirPath);
for (File dir : oldIndexDirs) {
String dirToRmPath = dir.getAbsolutePath();
try {
if (deleteOldIndexDirectory(dirToRmPath)) {
log.info("Deleted old index directory: {}", dirToRmPath);
} else {
log.warn("Delete old index directory {} failed.", dirToRmPath);
}
} catch (IOException ioExc) {
log.error("Failed to delete old directory {} due to: {}", dir.getAbsolutePath(), ioExc.toString());
}
}
}
// Extension point to allow sub-classes to infuse additional code when deleting old index directories
protected boolean deleteOldIndexDirectory(String oldDirPath) throws IOException {
File dirToRm = new File(oldDirPath);
FileUtils.deleteDirectory(dirToRm);
return !dirToRm.isDirectory();
}
}

View File

@ -24,13 +24,16 @@ import java.net.URLEncoder;
import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
@ -442,4 +445,77 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
public Collection<SolrInfoMBean> offerMBeans() {
return Arrays.<SolrInfoMBean>asList(MetricsHolder.metrics);
}
@Override
public void cleanupOldIndexDirectories(final String dataDir, final String currentIndexDir) {
// Get the FileSystem object
final Path dataDirPath = new Path(dataDir);
final Configuration conf = getConf();
FileSystem fileSystem = null;
try {
fileSystem = tmpFsCache.get(dataDir, new Callable<FileSystem>() {
@Override
public FileSystem call() throws IOException {
return FileSystem.get(dataDirPath.toUri(), conf);
}
});
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
boolean pathExists = false;
try {
pathExists = fileSystem.exists(dataDirPath);
} catch (IOException e) {
LOG.error("Error checking if hdfs path "+dataDir+" exists", e);
}
if (!pathExists) {
LOG.warn("{} does not point to a valid data directory; skipping clean-up of old index directories.", dataDir);
return;
}
final Path currentIndexDirPath = new Path(currentIndexDir); // make sure we don't delete the current
final FileSystem fs = fileSystem;
FileStatus[] oldIndexDirs = null;
try {
oldIndexDirs = fileSystem.listStatus(dataDirPath, new PathFilter() {
@Override
public boolean accept(Path path) {
boolean accept = false;
String pathName = path.getName();
try {
accept = fs.isDirectory(path) && !path.equals(currentIndexDirPath) &&
(pathName.equals("index") || pathName.matches(INDEX_W_TIMESTAMP_REGEX));
} catch (IOException e) {
LOG.error("Error checking if path {} is an old index directory, caused by: {}", path, e);
}
return accept;
}
});
} catch (IOException ioExc) {
LOG.error("Error checking for old index directories to clean-up.", ioExc);
}
if (oldIndexDirs == null || oldIndexDirs.length == 0)
return; // nothing to clean-up
Set<String> livePaths = getLivePaths();
for (FileStatus oldDir : oldIndexDirs) {
Path oldDirPath = oldDir.getPath();
if (livePaths.contains(oldDirPath.toString())) {
LOG.warn("Cannot delete directory {} because it is still being referenced in the cache.", oldDirPath);
} else {
try {
if (fileSystem.delete(oldDirPath, true)) {
LOG.info("Deleted old index directory {}", oldDirPath);
} else {
LOG.warn("Failed to delete old index directory {}", oldDirPath);
}
} catch (IOException e) {
LOG.error("Failed to delete old index directory {} due to: {}", oldDirPath, e);
}
}
}
}
}

View File

@ -500,51 +500,54 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
void initIndex(boolean reload) throws IOException {
String indexDir = getNewIndexDir();
boolean indexExists = getDirectoryFactory().exists(indexDir);
boolean firstTime;
synchronized (SolrCore.class) {
firstTime = dirs.add(getDirectoryFactory().normalize(indexDir));
}
boolean removeLocks = solrConfig.unlockOnStartup;
String indexDir = getNewIndexDir();
boolean indexExists = getDirectoryFactory().exists(indexDir);
boolean firstTime;
synchronized (SolrCore.class) {
firstTime = dirs.add(getDirectoryFactory().normalize(indexDir));
}
boolean removeLocks = solrConfig.unlockOnStartup;
initIndexReaderFactory();
initIndexReaderFactory();
if (indexExists && firstTime && !reload) {
Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT,
getSolrConfig().indexConfig.lockType);
try {
if (IndexWriter.isLocked(dir)) {
if (removeLocks) {
log.warn(
logid
+ "WARNING: Solr index directory '{}' is locked. Unlocking...",
indexDir);
dir.makeLock(IndexWriter.WRITE_LOCK_NAME).close();
} else {
log.error(logid
+ "Solr index directory '{}' is locked. Throwing exception",
indexDir);
throw new LockObtainFailedException(
"Index locked for write for core " + name);
}
if (indexExists && firstTime && !reload) {
Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT,
getSolrConfig().indexConfig.lockType);
try {
if (IndexWriter.isLocked(dir)) {
if (removeLocks) {
log.warn(
logid
+ "WARNING: Solr index directory '{}' is locked. Unlocking...",
indexDir);
dir.makeLock(IndexWriter.WRITE_LOCK_NAME).close();
} else {
log.error(logid
+ "Solr index directory '{}' is locked. Throwing exception",
indexDir);
throw new LockObtainFailedException(
"Index locked for write for core " + name);
}
} finally {
directoryFactory.release(dir);
}
} finally {
directoryFactory.release(dir);
}
}
// Create the index if it doesn't exist.
if(!indexExists) {
log.warn(logid+"Solr index directory '" + new File(indexDir) + "' doesn't exist."
+ " Creating new index...");
// Create the index if it doesn't exist.
if(!indexExists) {
log.warn(logid+"Solr index directory '" + new File(indexDir) + "' doesn't exist."
+ " Creating new index...");
SolrIndexWriter writer = SolrIndexWriter.create(this, "SolrCore.initIndex", indexDir, getDirectoryFactory(), true,
getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
writer.close();
}
SolrIndexWriter writer = SolrIndexWriter.create(this, "SolrCore.initIndex", indexDir, getDirectoryFactory(), true,
getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
writer.close();
}
cleanupOldIndexDirectories();
}
@ -1662,7 +1665,6 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
newestSearcher.decref();
}
}
}
/**
@ -2628,6 +2630,29 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
}
return false;
}
public void cleanupOldIndexDirectories() {
final DirectoryFactory myDirFactory = getDirectoryFactory();
final String myDataDir = getDataDir();
final String myIndexDir = getIndexDir();
final String coreName = getName();
if (myDirFactory != null && myDataDir != null && myIndexDir != null) {
Thread cleanupThread = new Thread() {
@Override
public void run() {
log.info("Looking for old index directories to cleanup for core {} in {}", coreName, myDataDir);
try {
myDirFactory.cleanupOldIndexDirectories(myDataDir, myIndexDir);
} catch (Exception exc) {
log.error("Failed to cleanup old index directories for core "+coreName, exc);
}
}
};
cleanupThread.setName("OldIndexDirectoryCleanupThreadForCore-"+coreName);
cleanupThread.setDaemon(true);
cleanupThread.start();
}
}
}

View File

@ -0,0 +1,165 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.SnapShooter;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slow
public class CleanupOldIndexTest extends AbstractFullDistribZkTestBase {
private static Logger log = LoggerFactory.getLogger(CleanupOldIndexTest.class);
private StoppableIndexingThread indexThread;
public CleanupOldIndexTest() {
super();
sliceCount = 1;
fixShardCount(2);
schemaString = "schema15.xml";
}
public static String[] fieldNames = new String[]{"f_i", "f_f", "f_d", "f_l", "f_dt"};
public static RandVal[] randVals = new RandVal[]{rint, rfloat, rdouble, rlong, rdate};
protected String[] getFieldNames() {
return fieldNames;
}
protected RandVal[] getRandValues() {
return randVals;
}
@Test
public void test() throws Exception {
handle.clear();
handle.put("timestamp", SKIPVAL);
int[] maxDocList = new int[] {300, 700, 1200};
int maxDoc = maxDocList[random().nextInt(maxDocList.length - 1)];
indexThread = new StoppableIndexingThread(controlClient, cloudClient, "1", true, maxDoc, 1, true);
indexThread.start();
// give some time to index...
int[] waitTimes = new int[] {200, 2000, 3000};
Thread.sleep(waitTimes[random().nextInt(waitTimes.length - 1)]);
// create some "old" index directories
JettySolrRunner jetty = chaosMonkey.getShard("shard1", 1);
SolrDispatchFilter filter = (SolrDispatchFilter)jetty.getDispatchFilter().getFilter();
CoreContainer coreContainer = filter.getCores();
File dataDir = null;
try (SolrCore solrCore = coreContainer.getCore("collection1")) {
dataDir = new File(solrCore.getDataDir());
}
assertTrue(dataDir.isDirectory());
long msInDay = 60*60*24L;
String timestamp1 = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date(1*msInDay));
String timestamp2 = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date(2*msInDay));
File oldIndexDir1 = new File(dataDir, "index."+timestamp1);
FileUtils.forceMkdir(oldIndexDir1);
File oldIndexDir2 = new File(dataDir, "index."+timestamp2);
FileUtils.forceMkdir(oldIndexDir2);
// verify the "old" index directories exist
assertTrue(oldIndexDir1.isDirectory());
assertTrue(oldIndexDir2.isDirectory());
// bring shard replica down
JettySolrRunner replica = chaosMonkey.stopShard("shard1", 1).jetty;
// wait a moment - lets allow some docs to be indexed so replication time is non 0
Thread.sleep(waitTimes[random().nextInt(waitTimes.length - 1)]);
// bring shard replica up
replica.start();
// make sure replication can start
Thread.sleep(3000);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
// stop indexing threads
indexThread.safeStop();
indexThread.join();
Thread.sleep(1000);
waitForThingsToLevelOut(120);
waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, false, true);
// test that leader and replica have same doc count
String fail = checkShardConsistency("shard1", false, false);
if (fail != null)
fail(fail);
SolrQuery query = new SolrQuery("*:*");
query.setParam("distrib", "false");
long client1Docs = shardToJetty.get("shard1").get(0).client.solrClient.query(query).getResults().getNumFound();
long client2Docs = shardToJetty.get("shard1").get(1).client.solrClient.query(query).getResults().getNumFound();
assertTrue(client1Docs > 0);
assertEquals(client1Docs, client2Docs);
assertTrue(!oldIndexDir1.isDirectory());
assertTrue(!oldIndexDir2.isDirectory());
}
@Override
protected void indexDoc(SolrInputDocument doc) throws IOException, SolrServerException {
controlClient.add(doc);
cloudClient.add(doc);
}
@Override
public void distribTearDown() throws Exception {
// make sure threads have been stopped...
indexThread.safeStop();
indexThread.join();
super.distribTearDown();
}
// skip the randoms - they can deadlock...
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
indexDoc(doc);
}
}

View File

@ -18,7 +18,11 @@
package org.apache.solr.core;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NoLockFactory;
@ -26,6 +30,7 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.handler.SnapShooter;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
import org.junit.AfterClass;
@ -53,7 +58,7 @@ public class HdfsDirectoryFactoryTest extends SolrTestCaseJ4 {
System.clearProperty(HdfsDirectoryFactory.NRTCACHINGDIRECTORY_MAXMERGESIZEMB);
dfsCluster = null;
}
@Test
public void testInitArgsOrSysPropConfig() throws Exception {
@ -130,4 +135,35 @@ public class HdfsDirectoryFactoryTest extends SolrTestCaseJ4 {
hdfsFactory.close();
}
@Test
public void testCleanupOldIndexDirectories() throws Exception {
HdfsDirectoryFactory hdfsFactory = new HdfsDirectoryFactory();
System.setProperty("solr.hdfs.home", HdfsTestUtil.getURI(dfsCluster) + "/solr1");
hdfsFactory.init(new NamedList<>());
String dataHome = hdfsFactory.getDataHome(new MockCoreDescriptor());
assertTrue(dataHome.endsWith("/solr1/mock/data"));
System.clearProperty("solr.hdfs.home");
FileSystem hdfs = dfsCluster.getFileSystem();
org.apache.hadoop.fs.Path dataHomePath = new org.apache.hadoop.fs.Path(dataHome);
org.apache.hadoop.fs.Path currentIndexDirPath = new org.apache.hadoop.fs.Path(dataHomePath, "index");
assertTrue(!hdfs.isDirectory(currentIndexDirPath));
hdfs.mkdirs(currentIndexDirPath);
assertTrue(hdfs.isDirectory(currentIndexDirPath));
String timestamp1 = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
org.apache.hadoop.fs.Path oldIndexDirPath = new org.apache.hadoop.fs.Path(dataHomePath, "index."+timestamp1);
assertTrue(!hdfs.isDirectory(oldIndexDirPath));
hdfs.mkdirs(oldIndexDirPath);
assertTrue(hdfs.isDirectory(oldIndexDirPath));
hdfsFactory.cleanupOldIndexDirectories(dataHomePath.toString(), currentIndexDirPath.toString());
assertTrue(hdfs.isDirectory(currentIndexDirPath));
assertTrue(!hdfs.isDirectory(oldIndexDirPath));
}
}