diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 17cc4fc0dec..5ea39f9e832 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -100,6 +100,10 @@ Bug Fixes * SOLR-7518: New Facet Module should respect shards.tolerant and process all non-failing shards instead of throwing an exception. (yonik) +* SOLR-4506: Clean-up old (unused) index directories in the background after initializing a new index; + previously, Solr would leave old index.yyyyMMddHHmmssSSS directories left behind after failed recoveries + in the data directory, which unnecessarily consumes disk space. (Mark Miller, Timothy Potter) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java index 7d3fdc026e7..6bf5c66c906 100644 --- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java @@ -500,4 +500,15 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory { } return livePaths; } + + @Override + protected boolean deleteOldIndexDirectory(String oldDirPath) throws IOException { + Set livePaths = getLivePaths(); + if (livePaths.contains(oldDirPath)) { + log.warn("Cannot delete directory {} as it is still being referenced in the cache!", oldDirPath); + return false; + } + + return super.deleteOldIndexDirectory(oldDirPath); + } } diff --git a/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java index d889d1ae364..8413e12d509 100644 --- a/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java @@ -19,10 +19,12 @@ package org.apache.solr.core; import java.io.Closeable; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import org.apache.commons.io.FileUtils; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.IOContext; @@ -45,6 +47,8 @@ public abstract class DirectoryFactory implements NamedListInitializedPlugin, // A large estimate should currently have no other side effects. public static final IOContext IOCONTEXT_NO_CACHE = new IOContext(new FlushInfo(10*1000*1000, 100L*1000*1000*1000)); + protected static final String INDEX_W_TIMESTAMP_REGEX = "index\\.[0-9]{17}"; // see SnapShooter.DATE_FMT + // hint about what the directory contains - default is index directory public enum DirContext {DEFAULT, META_DATA} @@ -271,4 +275,48 @@ public abstract class DirectoryFactory implements NamedListInitializedPlugin, public Collection offerMBeans() { return Collections.emptySet(); } + + public void cleanupOldIndexDirectories(final String dataDirPath, final String currentIndexDirPath) { + File dataDir = new File(dataDirPath); + if (!dataDir.isDirectory()) { + log.warn("{} does not point to a valid data directory; skipping clean-up of old index directories.", dataDirPath); + return; + } + + final File currentIndexDir = new File(currentIndexDirPath); + File[] oldIndexDirs = dataDir.listFiles(new FileFilter() { + @Override + public boolean accept(File file) { + String fileName = file.getName(); + return file.isDirectory() && + !file.equals(currentIndexDir) && + (fileName.equals("index") || fileName.matches(INDEX_W_TIMESTAMP_REGEX)); + } + }); + + if (oldIndexDirs == null || oldIndexDirs.length == 0) + return; // nothing to do (no log message needed) + + log.info("Found {} old index directories to clean-up under {}", oldIndexDirs.length, dataDirPath); + for (File dir : oldIndexDirs) { + + String dirToRmPath = dir.getAbsolutePath(); + try { + if (deleteOldIndexDirectory(dirToRmPath)) { + log.info("Deleted old index directory: {}", dirToRmPath); + } else { + log.warn("Delete old index directory {} failed.", dirToRmPath); + } + } catch (IOException ioExc) { + log.error("Failed to delete old directory {} due to: {}", dir.getAbsolutePath(), ioExc.toString()); + } + } + } + + // Extension point to allow sub-classes to infuse additional code when deleting old index directories + protected boolean deleteOldIndexDirectory(String oldDirPath) throws IOException { + File dirToRm = new File(oldDirPath); + FileUtils.deleteDirectory(dirToRm); + return !dirToRm.isDirectory(); + } } diff --git a/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java index 42f29cc4f2f..08577def536 100644 --- a/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java @@ -24,13 +24,16 @@ import java.net.URLEncoder; import java.util.Arrays; import java.util.Collection; import java.util.Locale; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockFactory; @@ -442,4 +445,77 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory { public Collection offerMBeans() { return Arrays.asList(MetricsHolder.metrics); } + + @Override + public void cleanupOldIndexDirectories(final String dataDir, final String currentIndexDir) { + + // Get the FileSystem object + final Path dataDirPath = new Path(dataDir); + final Configuration conf = getConf(); + FileSystem fileSystem = null; + try { + fileSystem = tmpFsCache.get(dataDir, new Callable() { + @Override + public FileSystem call() throws IOException { + return FileSystem.get(dataDirPath.toUri(), conf); + } + }); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + + boolean pathExists = false; + try { + pathExists = fileSystem.exists(dataDirPath); + } catch (IOException e) { + LOG.error("Error checking if hdfs path "+dataDir+" exists", e); + } + if (!pathExists) { + LOG.warn("{} does not point to a valid data directory; skipping clean-up of old index directories.", dataDir); + return; + } + + final Path currentIndexDirPath = new Path(currentIndexDir); // make sure we don't delete the current + final FileSystem fs = fileSystem; + FileStatus[] oldIndexDirs = null; + try { + oldIndexDirs = fileSystem.listStatus(dataDirPath, new PathFilter() { + @Override + public boolean accept(Path path) { + boolean accept = false; + String pathName = path.getName(); + try { + accept = fs.isDirectory(path) && !path.equals(currentIndexDirPath) && + (pathName.equals("index") || pathName.matches(INDEX_W_TIMESTAMP_REGEX)); + } catch (IOException e) { + LOG.error("Error checking if path {} is an old index directory, caused by: {}", path, e); + } + return accept; + } + }); + } catch (IOException ioExc) { + LOG.error("Error checking for old index directories to clean-up.", ioExc); + } + + if (oldIndexDirs == null || oldIndexDirs.length == 0) + return; // nothing to clean-up + + Set livePaths = getLivePaths(); + for (FileStatus oldDir : oldIndexDirs) { + Path oldDirPath = oldDir.getPath(); + if (livePaths.contains(oldDirPath.toString())) { + LOG.warn("Cannot delete directory {} because it is still being referenced in the cache.", oldDirPath); + } else { + try { + if (fileSystem.delete(oldDirPath, true)) { + LOG.info("Deleted old index directory {}", oldDirPath); + } else { + LOG.warn("Failed to delete old index directory {}", oldDirPath); + } + } catch (IOException e) { + LOG.error("Failed to delete old index directory {} due to: {}", oldDirPath, e); + } + } + } + } } diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index 84b4642846f..5251a02707e 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -500,51 +500,54 @@ public final class SolrCore implements SolrInfoMBean, Closeable { void initIndex(boolean reload) throws IOException { - String indexDir = getNewIndexDir(); - boolean indexExists = getDirectoryFactory().exists(indexDir); - boolean firstTime; - synchronized (SolrCore.class) { - firstTime = dirs.add(getDirectoryFactory().normalize(indexDir)); - } - boolean removeLocks = solrConfig.unlockOnStartup; + String indexDir = getNewIndexDir(); + boolean indexExists = getDirectoryFactory().exists(indexDir); + boolean firstTime; + synchronized (SolrCore.class) { + firstTime = dirs.add(getDirectoryFactory().normalize(indexDir)); + } + boolean removeLocks = solrConfig.unlockOnStartup; - initIndexReaderFactory(); + initIndexReaderFactory(); - if (indexExists && firstTime && !reload) { - - Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT, - getSolrConfig().indexConfig.lockType); - try { - if (IndexWriter.isLocked(dir)) { - if (removeLocks) { - log.warn( - logid - + "WARNING: Solr index directory '{}' is locked. Unlocking...", - indexDir); - dir.makeLock(IndexWriter.WRITE_LOCK_NAME).close(); - } else { - log.error(logid - + "Solr index directory '{}' is locked. Throwing exception", - indexDir); - throw new LockObtainFailedException( - "Index locked for write for core " + name); - } + if (indexExists && firstTime && !reload) { + Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT, + getSolrConfig().indexConfig.lockType); + try { + if (IndexWriter.isLocked(dir)) { + if (removeLocks) { + log.warn( + logid + + "WARNING: Solr index directory '{}' is locked. Unlocking...", + indexDir); + dir.makeLock(IndexWriter.WRITE_LOCK_NAME).close(); + } else { + log.error(logid + + "Solr index directory '{}' is locked. Throwing exception", + indexDir); + throw new LockObtainFailedException( + "Index locked for write for core " + name); } - } finally { - directoryFactory.release(dir); + } + } finally { + directoryFactory.release(dir); } + } - // Create the index if it doesn't exist. - if(!indexExists) { - log.warn(logid+"Solr index directory '" + new File(indexDir) + "' doesn't exist." - + " Creating new index..."); + // Create the index if it doesn't exist. + if(!indexExists) { + log.warn(logid+"Solr index directory '" + new File(indexDir) + "' doesn't exist." + + " Creating new index..."); - SolrIndexWriter writer = SolrIndexWriter.create(this, "SolrCore.initIndex", indexDir, getDirectoryFactory(), true, - getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec); - writer.close(); - } + SolrIndexWriter writer = SolrIndexWriter.create(this, "SolrCore.initIndex", indexDir, getDirectoryFactory(), true, + getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec); + writer.close(); + } + + + cleanupOldIndexDirectories(); } @@ -1662,7 +1665,6 @@ public final class SolrCore implements SolrInfoMBean, Closeable { newestSearcher.decref(); } } - } /** @@ -2628,6 +2630,29 @@ public final class SolrCore implements SolrInfoMBean, Closeable { } return false; } + + public void cleanupOldIndexDirectories() { + final DirectoryFactory myDirFactory = getDirectoryFactory(); + final String myDataDir = getDataDir(); + final String myIndexDir = getIndexDir(); + final String coreName = getName(); + if (myDirFactory != null && myDataDir != null && myIndexDir != null) { + Thread cleanupThread = new Thread() { + @Override + public void run() { + log.info("Looking for old index directories to cleanup for core {} in {}", coreName, myDataDir); + try { + myDirFactory.cleanupOldIndexDirectories(myDataDir, myIndexDir); + } catch (Exception exc) { + log.error("Failed to cleanup old index directories for core "+coreName, exc); + } + } + }; + cleanupThread.setName("OldIndexDirectoryCleanupThreadForCore-"+coreName); + cleanupThread.setDaemon(true); + cleanupThread.start(); + } + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/CleanupOldIndexTest.java b/solr/core/src/test/org/apache/solr/cloud/CleanupOldIndexTest.java new file mode 100644 index 00000000000..e4a00419fc6 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/CleanupOldIndexTest.java @@ -0,0 +1,165 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; + +import org.apache.commons.io.FileUtils; +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.SnapShooter; +import org.apache.solr.servlet.SolrDispatchFilter; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Slow +public class CleanupOldIndexTest extends AbstractFullDistribZkTestBase { + + private static Logger log = LoggerFactory.getLogger(CleanupOldIndexTest.class); + private StoppableIndexingThread indexThread; + + public CleanupOldIndexTest() { + super(); + sliceCount = 1; + fixShardCount(2); + schemaString = "schema15.xml"; + } + + public static String[] fieldNames = new String[]{"f_i", "f_f", "f_d", "f_l", "f_dt"}; + public static RandVal[] randVals = new RandVal[]{rint, rfloat, rdouble, rlong, rdate}; + + protected String[] getFieldNames() { + return fieldNames; + } + + protected RandVal[] getRandValues() { + return randVals; + } + + @Test + public void test() throws Exception { + handle.clear(); + handle.put("timestamp", SKIPVAL); + + int[] maxDocList = new int[] {300, 700, 1200}; + int maxDoc = maxDocList[random().nextInt(maxDocList.length - 1)]; + + indexThread = new StoppableIndexingThread(controlClient, cloudClient, "1", true, maxDoc, 1, true); + indexThread.start(); + + // give some time to index... + int[] waitTimes = new int[] {200, 2000, 3000}; + Thread.sleep(waitTimes[random().nextInt(waitTimes.length - 1)]); + + // create some "old" index directories + JettySolrRunner jetty = chaosMonkey.getShard("shard1", 1); + SolrDispatchFilter filter = (SolrDispatchFilter)jetty.getDispatchFilter().getFilter(); + CoreContainer coreContainer = filter.getCores(); + File dataDir = null; + try (SolrCore solrCore = coreContainer.getCore("collection1")) { + dataDir = new File(solrCore.getDataDir()); + } + assertTrue(dataDir.isDirectory()); + + long msInDay = 60*60*24L; + String timestamp1 = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date(1*msInDay)); + String timestamp2 = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date(2*msInDay)); + File oldIndexDir1 = new File(dataDir, "index."+timestamp1); + FileUtils.forceMkdir(oldIndexDir1); + File oldIndexDir2 = new File(dataDir, "index."+timestamp2); + FileUtils.forceMkdir(oldIndexDir2); + + // verify the "old" index directories exist + assertTrue(oldIndexDir1.isDirectory()); + assertTrue(oldIndexDir2.isDirectory()); + + // bring shard replica down + JettySolrRunner replica = chaosMonkey.stopShard("shard1", 1).jetty; + + // wait a moment - lets allow some docs to be indexed so replication time is non 0 + Thread.sleep(waitTimes[random().nextInt(waitTimes.length - 1)]); + + // bring shard replica up + replica.start(); + + // make sure replication can start + Thread.sleep(3000); + ZkStateReader zkStateReader = cloudClient.getZkStateReader(); + + // stop indexing threads + indexThread.safeStop(); + indexThread.join(); + + Thread.sleep(1000); + + waitForThingsToLevelOut(120); + waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, false, true); + + // test that leader and replica have same doc count + + String fail = checkShardConsistency("shard1", false, false); + if (fail != null) + fail(fail); + + SolrQuery query = new SolrQuery("*:*"); + query.setParam("distrib", "false"); + long client1Docs = shardToJetty.get("shard1").get(0).client.solrClient.query(query).getResults().getNumFound(); + long client2Docs = shardToJetty.get("shard1").get(1).client.solrClient.query(query).getResults().getNumFound(); + + assertTrue(client1Docs > 0); + assertEquals(client1Docs, client2Docs); + + assertTrue(!oldIndexDir1.isDirectory()); + assertTrue(!oldIndexDir2.isDirectory()); + } + + @Override + protected void indexDoc(SolrInputDocument doc) throws IOException, SolrServerException { + controlClient.add(doc); + cloudClient.add(doc); + } + + + @Override + public void distribTearDown() throws Exception { + // make sure threads have been stopped... + indexThread.safeStop(); + indexThread.join(); + super.distribTearDown(); + } + + // skip the randoms - they can deadlock... + @Override + protected void indexr(Object... fields) throws Exception { + SolrInputDocument doc = new SolrInputDocument(); + addFields(doc, fields); + addFields(doc, "rnd_b", true); + indexDoc(doc); + } +} diff --git a/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java b/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java index 6ad07f02393..979222b07c6 100644 --- a/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java +++ b/solr/core/src/test/org/apache/solr/core/HdfsDirectoryFactoryTest.java @@ -18,7 +18,11 @@ package org.apache.solr.core; import java.nio.file.Path; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.lucene.store.Directory; import org.apache.lucene.store.NoLockFactory; @@ -26,6 +30,7 @@ import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.hdfs.HdfsTestUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.DirectoryFactory.DirContext; +import org.apache.solr.handler.SnapShooter; import org.apache.solr.util.BadHdfsThreadsFilter; import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor; import org.junit.AfterClass; @@ -53,7 +58,7 @@ public class HdfsDirectoryFactoryTest extends SolrTestCaseJ4 { System.clearProperty(HdfsDirectoryFactory.NRTCACHINGDIRECTORY_MAXMERGESIZEMB); dfsCluster = null; } - + @Test public void testInitArgsOrSysPropConfig() throws Exception { @@ -130,4 +135,35 @@ public class HdfsDirectoryFactoryTest extends SolrTestCaseJ4 { hdfsFactory.close(); } + @Test + public void testCleanupOldIndexDirectories() throws Exception { + + HdfsDirectoryFactory hdfsFactory = new HdfsDirectoryFactory(); + + System.setProperty("solr.hdfs.home", HdfsTestUtil.getURI(dfsCluster) + "/solr1"); + hdfsFactory.init(new NamedList<>()); + String dataHome = hdfsFactory.getDataHome(new MockCoreDescriptor()); + assertTrue(dataHome.endsWith("/solr1/mock/data")); + System.clearProperty("solr.hdfs.home"); + + FileSystem hdfs = dfsCluster.getFileSystem(); + + org.apache.hadoop.fs.Path dataHomePath = new org.apache.hadoop.fs.Path(dataHome); + org.apache.hadoop.fs.Path currentIndexDirPath = new org.apache.hadoop.fs.Path(dataHomePath, "index"); + assertTrue(!hdfs.isDirectory(currentIndexDirPath)); + hdfs.mkdirs(currentIndexDirPath); + assertTrue(hdfs.isDirectory(currentIndexDirPath)); + + String timestamp1 = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date()); + org.apache.hadoop.fs.Path oldIndexDirPath = new org.apache.hadoop.fs.Path(dataHomePath, "index."+timestamp1); + assertTrue(!hdfs.isDirectory(oldIndexDirPath)); + hdfs.mkdirs(oldIndexDirPath); + assertTrue(hdfs.isDirectory(oldIndexDirPath)); + + hdfsFactory.cleanupOldIndexDirectories(dataHomePath.toString(), currentIndexDirPath.toString()); + + assertTrue(hdfs.isDirectory(currentIndexDirPath)); + assertTrue(!hdfs.isDirectory(oldIndexDirPath)); + } + }