SOLR-1781: Replication index directories not always cleaned up.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1362994 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-07-18 16:06:22 +00:00
parent 15542518ba
commit 6c08d7f2a4
6 changed files with 119 additions and 12 deletions

View File

@ -88,9 +88,10 @@ Bug Fixes
* SOLR-3377: edismax fails to correctly parse a fielded query wrapped by parens.
This regression was introduced in 3.6. (Bernd Fehling, Jan Høydahl, yonik)
* SOLR-3621: fix rare concurrency issue when opening a new IndexWriter for replication or rollback.
* SOLR-3621: Fix rare concurrency issue when opening a new IndexWriter for replication or rollback.
(Mark Miller)
* SOLR-1781: Replication index directories not always cleaned up. (Terje Sten Bjerkseth, Mark Miller)
Other Changes

View File

@ -19,7 +19,9 @@ package org.apache.solr.core;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -54,6 +56,46 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
protected Map<Directory,CacheValue> byDirectoryCache = new HashMap<Directory,CacheValue>();
protected Map<Directory,List<CloseListener>> closeListeners = new HashMap<Directory,List<CloseListener>>();
public interface CloseListener {
public void onClose();
}
@Override
public void addCloseListener(Directory dir, CloseListener closeListener) {
synchronized (this) {
if (!byDirectoryCache.containsKey(dir)) {
throw new IllegalArgumentException("Unknown directory: " + dir
+ " " + byDirectoryCache);
}
List<CloseListener> listeners = closeListeners.get(dir);
if (listeners == null) {
listeners = new ArrayList<CloseListener>();
closeListeners.put(dir, listeners);
}
listeners.add(closeListener);
closeListeners.put(dir, listeners);
}
}
@Override
public void doneWithDirectory(Directory directory) throws IOException {
synchronized (this) {
CacheValue cacheValue = byDirectoryCache.get(directory);
if (cacheValue == null) {
throw new IllegalArgumentException("Unknown directory: " + directory
+ " " + byDirectoryCache);
}
cacheValue.doneWithDir = true;
if (cacheValue.refCnt == 0) {
cacheValue.refCnt++; // this will go back to 0 in close
close(directory);
}
}
}
/*
* (non-Javadoc)
*
@ -82,6 +124,13 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
directory.close();
byDirectoryCache.remove(directory);
byPathCache.remove(cacheValue.path);
List<CloseListener> listeners = closeListeners.remove(directory);
if (listeners != null) {
for (CloseListener listener : listeners) {
listener.onClose();
}
closeListeners.remove(directory);
}
}
}
}

View File

@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.store.Directory;
import org.apache.solr.core.CachingDirectoryFactory.CloseListener;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
/**
@ -30,6 +31,24 @@ import org.apache.solr.util.plugin.NamedListInitializedPlugin;
public abstract class DirectoryFactory implements NamedListInitializedPlugin,
Closeable {
/**
* Indicates a Directory will no longer be used, and when it's ref count
* hits 0, it can be closed. On shutdown all directories will be closed
* with this has been called or not. This is simply to allow early cleanup.
*
* @param directory
* @throws IOException
*/
public abstract void doneWithDirectory(Directory directory) throws IOException;
/**
* Adds a close listener for a Directory.
*
* @param dir
* @param closeListener
*/
public abstract void addCloseListener(Directory dir, CloseListener closeListener);
/**
* Close the this and all of the Directories it contains.
*
@ -62,7 +81,9 @@ public abstract class DirectoryFactory implements NamedListInitializedPlugin,
/**
* Returns the Directory for a given path, using the specified rawLockType.
* Will return the same Directory instance for the same path unless forceNew,
* in which case a new Directory is returned.
* in which case a new Directory is returned. There is no need to call
* {@link #doneWithDirectory(Directory)} in this case - the old Directory
* will be closed when it's ref count hits 0.
*
* @throws IOException
*/

View File

@ -20,6 +20,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpClient;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
@ -32,6 +33,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.util.FileUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CachingDirectoryFactory.CloseListener;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import static org.apache.solr.handler.ReplicationHandler.*;
@ -321,9 +323,10 @@ public class SnapPuller {
LOG.info("Starting download to " + tmpIndexDir + " fullCopy=" + isFullCopyNeeded);
successfulInstall = false;
boolean deleteTmpIdxDir = true;
File indexDir = null ;
final File indexDir = new File(core.getIndexDir());
Directory oldDirectory = null;
try {
indexDir = new File(core.getIndexDir());
downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestGeneration);
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
@ -345,14 +348,42 @@ public class SnapPuller {
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIndexDir.getName());
deleteTmpIdxDir = false;
RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
try {
oldDirectory = iw.get().getDirectory();
} finally {
iw.decref();
}
} else {
successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
doCommit();
}
}
if (isFullCopyNeeded) {
// we have to do this before commit
core.getDirectoryFactory().addCloseListener(oldDirectory, new CloseListener(){
@Override
public void onClose() {
LOG.info("removing old index directory " + indexDir);
delTree(indexDir);
}
});
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
core.getDirectoryFactory().doneWithDirectory(oldDirectory);
}
doCommit();
}
replicationStartTime = 0;
return successfulInstall;
} catch (ReplicationHandlerException e) {
@ -368,10 +399,7 @@ public class SnapPuller {
if (deleteTmpIdxDir) {
LOG.info("removing temporary index download directory " + tmpIndexDir);
delTree(tmpIndexDir);
} else {
LOG.info("removing old index directory " + indexDir);
delTree(indexDir);
}
}
}
} finally {
if (!successfulInstall) {

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
@ -92,10 +93,16 @@ public final class DefaultSolrCoreState extends SolrCoreState {
wait();
} catch (InterruptedException e) {}
}
try {
if (indexWriter != null) {
if (indexWriter != null) {
try {
indexWriter.close();
} catch (Exception e) {
SolrException.log(log, "Error closing old IndexWriter", e);
}
}
try {
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2", false,
true);
// we need to null this so it picks up the new writer next get call

View File

@ -122,10 +122,11 @@ public class SolrIndexWriter extends IndexWriter {
final InfoStream infoStream = isClosed ? null : getConfig().getInfoStream();
try {
super.close();
} finally {
if(infoStream != null) {
infoStream.close();
}
} finally {
isClosed = true;
directoryFactory.release(directory);