SOLR-4640: CachingDirectoryFactory can fail to close directories in some race conditions.

SOLR-4637: Replication can sometimes wait until shutdown or core unload until removing some tmp directories.
SOLR-4638: DefaultSolrCoreState#getIndexWriter(null) is a way to avoid creating the IndexWriter earlier than necessary, but it's not implemented quite right.
SOLR-4597: fixes, improvements
SOLR-4629: Stronger testing.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1460510 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-03-25 02:34:42 +00:00
parent a6a1c199c7
commit a7a799c403
12 changed files with 365 additions and 251 deletions

View File

@ -225,11 +225,18 @@ Bug Fixes
longer and it appears to be causing a missing close directory bug. forceNew
is no longer respected and will be removed in 4.3. (Mark Miller)
* SOLR-4626: getIndexWriter(null) should also respect pauseWriter. (Mark Miller)
* SOLR-3819: Grouped faceting (group.facet=true) did not respect filter
exclusions. (Petter Remen, yonik)
* SOLR-4637: Replication can sometimes wait until shutdown or core unload until
removing some tmp directories. (Mark Miller)
* SOLR-4638: DefaultSolrCoreState#getIndexWriter(null) is a way to avoid
creating the IndexWriter earlier than necessary, but it's not
implemented quite right. (Mark Miller)
* SOLR-4640: CachingDirectoryFactory can fail to close directories in some race
conditions. (Mark Miller)
Optimizations
----------------------

View File

@ -38,6 +38,7 @@ import org.apache.lucene.store.RateLimitedDirectoryWrapper;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,7 +55,8 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
protected class CacheValue {
final public String path;
final public Directory directory;
// for debug
//final Exception originTrace;
// use the setter!
private boolean deleteOnClose = false;
@ -62,10 +64,12 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
this.path = path;
this.directory = directory;
this.closeEntries.add(this);
// for debug
// this.originTrace = new RuntimeException("Originated from:");
}
public int refCnt = 1;
// has close(Directory) been called on this?
public boolean closeDirectoryCalled = false;
// has doneWithDirectory(Directory) been called on this?
public boolean closeCacheValueCalled = false;
public boolean doneWithDir = false;
private boolean deleteAfterCoreClose = false;
public Set<CacheValue> removeEntries = new HashSet<CacheValue>();
@ -81,7 +85,7 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
@Override
public String toString() {
return "CachedDir<<" + directory.toString() + ";refCount=" + refCnt + ";path=" + path + ";done=" + doneWithDir + ">>";
return "CachedDir<<" + "refCount=" + refCnt + ";path=" + path + ";done=" + doneWithDir + ">>";
}
}
@ -139,10 +143,11 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
+ " " + byDirectoryCache);
}
cacheValue.doneWithDir = true;
if (cacheValue.refCnt == 0) {
closeDirectory(cacheValue);
byDirectoryCache.remove(directory);
byPathCache.remove(cacheValue.path);
if (cacheValue.refCnt == 0 && !closed) {
boolean cl = closeCacheValue(cacheValue);
if (cl) {
removeFromCache(cacheValue);
}
}
}
}
@ -155,9 +160,9 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
@Override
public void close() throws IOException {
synchronized (this) {
log.info("Closing " + this.getClass().getSimpleName() + " - " + byDirectoryCache.size() + " directories currently being tracked");
this.closed = true;
Collection<CacheValue> values = new ArrayList<CacheValue>();
values.addAll(byDirectoryCache.values());
Collection<CacheValue> values = byDirectoryCache.values();
for (CacheValue val : values) {
try {
// if there are still refs out, we have to wait for them
@ -165,9 +170,12 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
while(val.refCnt != 0) {
wait(100);
if (cnt++ >= 1200) {
log.error("Timeout waiting for all directory ref counts to be released");
break;
if (cnt++ >= 120) {
String msg = "Timeout waiting for all directory ref counts to be released - gave up waiting on " + val;
log.error(msg);
// debug
// val.originTrace.printStackTrace();
throw new SolrException(ErrorCode.SERVER_ERROR, msg);
}
}
assert val.refCnt == 0 : val.refCnt;
@ -177,52 +185,47 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
values = byDirectoryCache.values();
Set<CacheValue> closedDirs = new HashSet<CacheValue>();
for (CacheValue val : values) {
try {
assert val.refCnt == 0 : val.refCnt;
log.info("Closing directory when closing factory: " + val.path);
closeDirectory(val);
for (CacheValue v : val.closeEntries) {
assert v.refCnt == 0 : val.refCnt;
log.debug("Closing directory when closing factory: " + v.path);
boolean cl = closeCacheValue(v);
if (cl) {
closedDirs.add(v);
}
}
} catch (Throwable t) {
SolrException.log(log, "Error closing directory", t);
}
}
byDirectoryCache.clear();
byPathCache.clear();
for (CacheValue val : removeEntries) {
log.info("Removing directory: " + val.path);
removeDirectory(val);
log.info("Removing directory after core close: " + val.path);
try {
removeDirectory(val);
} catch (Throwable t) {
SolrException.log(log, "Error removing directory", t);
}
}
}
}
private void close(Directory directory) throws IOException {
synchronized (this) {
// don't check if already closed here - we need to able to release
// while #close() waits.
CacheValue cacheValue = byDirectoryCache.get(directory);
if (cacheValue == null) {
throw new IllegalArgumentException("Unknown directory: " + directory
+ " " + byDirectoryCache);
}
log.debug("Releasing directory: " + cacheValue.path);
cacheValue.refCnt--;
if (cacheValue.refCnt == 0 && cacheValue.doneWithDir) {
closeDirectory(cacheValue);
byDirectoryCache.remove(directory);
byPathCache.remove(cacheValue.path);
for (CacheValue v : closedDirs) {
removeFromCache(v);
}
}
}
private void closeDirectory(CacheValue cacheValue) {
private void removeFromCache(CacheValue v) {
byDirectoryCache.remove(v.directory);
byPathCache.remove(v.path);
}
// be sure this is called with the this sync lock
// returns true if we closed the cacheValue, false if it will be closed later
private boolean closeCacheValue(CacheValue cacheValue) {
log.info("looking to close " + cacheValue.path + " " + cacheValue.closeEntries.toString());
List<CloseListener> listeners = closeListeners.remove(cacheValue.directory);
if (listeners != null) {
for (CloseListener listener : listeners) {
@ -233,21 +236,16 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
}
}
cacheValue.closeDirectoryCalled = true;
cacheValue.closeCacheValueCalled = true;
if (cacheValue.deleteOnClose) {
// see if we are a subpath
Collection<CacheValue> values = byPathCache.values();
Collection<CacheValue> cacheValues = new ArrayList<CacheValue>();
cacheValues.addAll(values);
Collection<CacheValue> cacheValues = new ArrayList<CacheValue>(values);
cacheValues.remove(cacheValue);
for (CacheValue otherCacheValue : cacheValues) {
// if we are a parent path and all our sub children are not already closed,
// get a sub path to close us later
if (otherCacheValue.path.startsWith(cacheValue.path) && !otherCacheValue.closeDirectoryCalled) {
// if we are a parent path and a sub path is not already closed, get a sub path to close us later
if (isSubPath(cacheValue, otherCacheValue) && !otherCacheValue.closeCacheValueCalled) {
// we let the sub dir remove and close us
if (!otherCacheValue.deleteAfterCoreClose && cacheValue.deleteAfterCoreClose) {
otherCacheValue.deleteAfterCoreClose = true;
@ -255,15 +253,24 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
otherCacheValue.removeEntries.addAll(cacheValue.removeEntries);
otherCacheValue.closeEntries.addAll(cacheValue.closeEntries);
cacheValue.closeEntries.clear();
break;
cacheValue.removeEntries.clear();
return false;
}
}
}
boolean cl = false;
for (CacheValue val : cacheValue.closeEntries) {
close(val);
if (val == cacheValue) {
cl = true;
}
}
for (CacheValue val : cacheValue.removeEntries) {
if (!val.deleteAfterCoreClose) {
log.info("Removing directory before core close: " + val.path);
try {
log.info("Removing directory: " + val.path);
removeDirectory(val);
} catch (Throwable t) {
SolrException.log(log, "Error removing directory", t);
@ -273,16 +280,6 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
}
for (CacheValue val : cacheValue.closeEntries) {
try {
log.info("Closing directory: " + val.path);
val.directory.close();
} catch (Throwable t) {
SolrException.log(log, "Error closing directory", t);
}
}
if (listeners != null) {
for (CloseListener listener : listeners) {
try {
@ -292,6 +289,23 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
}
}
return cl;
}
private void close(CacheValue val) {
try {
log.info("Closing directory: " + val.path);
val.directory.close();
} catch (Throwable t) {
SolrException.log(log, "Error closing directory", t);
}
}
private boolean isSubPath(CacheValue cacheValue, CacheValue otherCacheValue) {
int one = cacheValue.path.lastIndexOf('/');
int two = otherCacheValue.path.lastIndexOf('/');
return otherCacheValue.path.startsWith(cacheValue.path + "/") && two > one;
}
@Override
@ -374,6 +388,9 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
@Override
public void incRef(Directory directory) {
synchronized (this) {
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed");
}
CacheValue cacheValue = byDirectoryCache.get(directory);
if (cacheValue == null) {
throw new IllegalArgumentException("Unknown directory: " + directory);
@ -403,7 +420,28 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
if (directory == null) {
throw new NullPointerException();
}
close(directory);
synchronized (this) {
// don't check if already closed here - we need to able to release
// while #close() waits.
CacheValue cacheValue = byDirectoryCache.get(directory);
if (cacheValue == null) {
throw new IllegalArgumentException("Unknown directory: " + directory
+ " " + byDirectoryCache);
}
log.debug("Releasing directory: " + cacheValue.path + " " + (cacheValue.refCnt - 1) + " " + cacheValue.doneWithDir);
cacheValue.refCnt--;
assert cacheValue.refCnt >= 0 : cacheValue.refCnt;
if (cacheValue.refCnt == 0 && cacheValue.doneWithDir && !closed) {
boolean cl = closeCacheValue(cacheValue);
if (cl) {
removeFromCache(cacheValue);
}
}
}
}
@Override
@ -466,8 +504,8 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
return dir;
}
protected void removeDirectory(CacheValue cacheValue) throws IOException {
empty(cacheValue.directory);
protected synchronized void removeDirectory(CacheValue cacheValue) throws IOException {
// this page intentionally left blank
}
@Override
@ -482,4 +520,9 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
return path;
}
// for tests
public synchronized Set<String> getPaths() {
return byPathCache.keySet();
}
}

View File

@ -16,12 +16,9 @@ package org.apache.solr.core;
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.store.Directory;
import org.apache.solr.core.CachingDirectoryFactory.CacheValue;
/**
* Directory provider for implementations that do not persist over reboots.

View File

@ -485,7 +485,6 @@ public final class SolrCore implements SolrInfoMBean {
"Index locked for write for core " + name);
}
directoryFactory.release(dir);
}
} finally {
directoryFactory.release(dir);
@ -982,12 +981,13 @@ public final class SolrCore implements SolrInfoMBean {
SolrException.log(log,e);
}
boolean coreStateClosed = false;
try {
if (solrCoreState != null) {
if (updateHandler instanceof IndexWriterCloser) {
solrCoreState.decrefSolrCoreState((IndexWriterCloser) updateHandler);
coreStateClosed = solrCoreState.decrefSolrCoreState((IndexWriterCloser) updateHandler);
} else {
solrCoreState.decrefSolrCoreState(null);
coreStateClosed = solrCoreState.decrefSolrCoreState(null);
}
}
} catch (Throwable e) {
@ -1013,13 +1013,12 @@ public final class SolrCore implements SolrInfoMBean {
SolrException.log(log,e);
}
if (solrCoreState != null) { // bad startup case
if (solrCoreState.getSolrCoreStateRefCnt() == 0) {
try {
directoryFactory.close();
} catch (Throwable t) {
SolrException.log(log, t);
}
if (coreStateClosed) {
try {
directoryFactory.close();
} catch (Throwable t) {
SolrException.log(log, t);
}
}
@ -1362,20 +1361,24 @@ public final class SolrCore implements SolrInfoMBean {
DirectoryReader newReader;
DirectoryReader currentReader = newestSearcher.get().getIndexReader();
if (updateHandlerReopens) {
// SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
RefCounted<IndexWriter> writer = getUpdateHandler().getSolrCoreState().getIndexWriter(this);
try {
newReader = DirectoryReader.openIfChanged(currentReader, writer.get(), true);
} finally {
// SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
RefCounted<IndexWriter> writer = getUpdateHandler().getSolrCoreState()
.getIndexWriter(null);
try {
if (writer != null) {
newReader = DirectoryReader.openIfChanged(currentReader,
writer.get(), true);
} else {
// verbose("start reopen without writer, reader=", currentReader);
newReader = DirectoryReader.openIfChanged(currentReader);
// verbose("reopen result", newReader);
}
} finally {
if (writer != null) {
writer.decref();
}
} else {
// verbose("start reopen without writer, reader=", currentReader);
newReader = DirectoryReader.openIfChanged(currentReader);
// verbose("reopen result", newReader);
}
if (newReader == null) {

View File

@ -49,6 +49,13 @@ public class StandardDirectoryFactory extends CachingDirectoryFactory {
return super.normalize(cpath);
}
@Override
public boolean exists(String path) throws IOException {
// we go by the persistent storage ...
File dirFile = new File(path);
return dirFile.canRead() && dirFile.list().length > 0;
}
public boolean isPersistent() {
return true;
}

View File

@ -84,7 +84,6 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CachingDirectoryFactory.CloseListener;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
@ -382,10 +381,9 @@ public class SnapPuller {
tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
// make sure it's the newest known index dir...
indexDirPath = core.getNewIndexDir();
// cindex dir...
indexDirPath = core.getIndexDir();
indexDir = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
Directory oldDirectory = null;
try {
@ -407,6 +405,16 @@ public class SnapPuller {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
LOG.info("removing old index directory " + indexDir);
core.getDirectoryFactory().doneWithDirectory(indexDir);
core.getDirectoryFactory().remove(indexDir);
}
}
LOG.info("Configuration files are modified, core will be reloaded");
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files.
reloadCore();
@ -416,12 +424,6 @@ public class SnapPuller {
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIdxDirName);
deleteTmpIdxDir = false;
RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
try {
oldDirectory = iw.get().getDirectory();
} finally {
iw.decref();
}
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
@ -429,43 +431,16 @@ public class SnapPuller {
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
}
}
if (isFullCopyNeeded) {
// we have to do this before commit
final Directory freezeIndexDir = indexDir;
final String freezeIndexDirPath = indexDirPath;
core.getDirectoryFactory().addCloseListener(oldDirectory, new CloseListener(){
@Override
public void preClose() {
LOG.info("removing old index files " + freezeIndexDir);
try {
if (core.getDirectoryFactory().exists(freezeIndexDirPath)) {
DirectoryFactory.empty(freezeIndexDir);
}
} catch (IOException e) {
SolrException.log(LOG, null, e);
}
}
@Override
public void postClose() {
LOG.info("removing old index directory " + freezeIndexDir);
try {
core.getDirectoryFactory().remove(freezeIndexDir);
} catch (IOException e) {
SolrException.log(LOG, "Error removing directory " + freezeIndexDir, e);
}
}
});
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
core.getDirectoryFactory().doneWithDirectory(oldDirectory);
if (indexDir != null) {
LOG.info("removing old index directory " + indexDir);
core.getDirectoryFactory().doneWithDirectory(indexDir);
core.getDirectoryFactory().remove(indexDir);
}
}
openNewWriterAndSearcher(isFullCopyNeeded);
}
@ -499,6 +474,7 @@ public class SnapPuller {
} finally {
if (deleteTmpIdxDir && tmpIndexDir != null) {
try {
core.getDirectoryFactory().doneWithDirectory(tmpIndexDir);
core.getDirectoryFactory().remove(tmpIndexDir);
} catch (IOException e) {
SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e);
@ -647,8 +623,6 @@ public class SnapPuller {
RefCounted<SolrIndexSearcher> searcher = null;
IndexCommit commitPoint;
try {
// first try to open an NRT searcher so that the new
// IndexWriter is registered with the reader
Future[] waitSearcher = new Future[1];
searcher = solrCore.getSearcher(true, true, waitSearcher, true);
if (waitSearcher[0] != null) {

View File

@ -159,7 +159,9 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
private DirectoryFactory directoryFactory;
private final AtomicReader atomicReader;
private String path;
private String path;
private final boolean reserveDirectory;
private final boolean createdDirectory;
private static DirectoryReader getReader(SolrCore core, SolrIndexConfig config, DirectoryFactory directoryFactory, String path) throws IOException {
DirectoryReader reader = null;
@ -179,19 +181,21 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
}
public SolrIndexSearcher(SolrCore core, String path, IndexSchema schema, SolrIndexConfig config, String name, DirectoryReader r, boolean closeReader, boolean enableCache, boolean reserveDirectory, DirectoryFactory directoryFactory) throws IOException {
super(r = (r == null ? getReader(core, config, directoryFactory, path) : r));
super(r == null ? getReader(core, config, directoryFactory, path) : r);
this.path = path;
this.directoryFactory = directoryFactory;
this.reader = r;
this.atomicReader = SlowCompositeReaderWrapper.wrap(r);
this.reader = (DirectoryReader) super.readerContext.reader();
this.atomicReader = SlowCompositeReaderWrapper.wrap(this.reader);
this.core = core;
this.schema = schema;
this.name = "Searcher@" + Integer.toHexString(hashCode()) + (name!=null ? " "+name : "");
log.info("Opening " + this.name);
Directory dir = r.directory();
Directory dir = this.reader.directory();
this.reserveDirectory = reserveDirectory;
this.createdDirectory = r == null;
if (reserveDirectory) {
// keep the directory from being released while we use it
directoryFactory.incRef(dir);
@ -336,8 +340,12 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
cache.close();
}
directoryFactory.release(getIndexReader().directory());
if (reserveDirectory) {
directoryFactory.release(getIndexReader().directory());
}
if (createdDirectory) {
directoryFactory.release(getIndexReader().directory());
}
// do this at the end so it only gets done if there are no exceptions

View File

@ -48,7 +48,6 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
private volatile boolean recoveryRunning;
private RecoveryStrategy recoveryStrat;
private volatile boolean closed = false;
private RefCounted<IndexWriter> refCntWriter;
@ -80,13 +79,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
@Override
public RefCounted<IndexWriter> getIndexWriter(SolrCore core)
throws IOException {
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed");
}
synchronized (writerPauseLock) {
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed");
}
while (pauseWriter) {
try {
writerPauseLock.wait(100);
@ -100,11 +97,15 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
if (core == null) {
// core == null is a signal to just return the current writer, or null
// if none.
initRefCntWriter();
if (refCntWriter == null) return null;
writerFree = false;
writerPauseLock.notifyAll();
if (refCntWriter != null) refCntWriter.incref();
return refCntWriter;
}
if (indexWriter == null) {
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
}
@ -117,7 +118,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
private void initRefCntWriter() {
if (refCntWriter == null) {
if (refCntWriter == null && indexWriter != null) {
refCntWriter = new RefCounted<IndexWriter>(indexWriter) {
@Override
public void close() {
@ -132,12 +133,13 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
@Override
public synchronized void newIndexWriter(SolrCore core, boolean rollback) throws IOException {
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed");
}
log.info("Creating new IndexWriter...");
String coreName = core.getName();
synchronized (writerPauseLock) {
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed");
}
// we need to wait for the Writer to fall out of use
// first lets stop it from being lent out
pauseWriter = true;

View File

@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
public abstract class SolrCoreState {
public static Logger log = LoggerFactory.getLogger(SolrCoreState.class);
protected boolean closed = false;
private final Object deleteLock = new Object();
public Object getUpdateLock() {
@ -44,10 +45,6 @@ public abstract class SolrCoreState {
}
private int solrCoreStateRefCnt = 1;
public synchronized int getSolrCoreStateRefCnt() {
return solrCoreStateRefCnt;
}
public void increfSolrCoreState() {
synchronized (this) {
@ -58,11 +55,13 @@ public abstract class SolrCoreState {
}
}
public void decrefSolrCoreState(IndexWriterCloser closer) {
public boolean decrefSolrCoreState(IndexWriterCloser closer) {
boolean close = false;
synchronized (this) {
solrCoreStateRefCnt--;
assert solrCoreStateRefCnt >= 0;
if (solrCoreStateRefCnt == 0) {
closed = true;
close = true;
}
}
@ -75,6 +74,7 @@ public abstract class SolrCoreState {
log.error("Error closing SolrCoreState", t);
}
}
return close;
}
public abstract Lock getCommitLock();

View File

@ -175,6 +175,7 @@ public class SolrIndexWriter extends IndexWriter {
@Override
public void rollback() throws IOException {
Directory dir = getDirectory();
try {
while (true) {
try {
@ -187,7 +188,7 @@ public class SolrIndexWriter extends IndexWriter {
}
} finally {
isClosed = true;
directoryFactory.release(getDirectory());
directoryFactory.release(dir);
numCloses.incrementAndGet();
}
}

View File

@ -11,6 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.junit.Test;
@ -122,32 +123,33 @@ public class CachingDirectoryFactoryTest extends SolrTestCaseJ4 {
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
try {
synchronized (dirs) {
int sz = dirs.size();
List<Tracker> dirsList = new ArrayList<Tracker>();
dirsList.addAll(dirs.values());
if (sz > 0) {
Tracker tracker = dirsList.get(Math.min(dirsList.size() - 1,
random.nextInt(sz + 1)));
synchronized (dirs) {
int sz = dirs.size();
List<Tracker> dirsList = new ArrayList<Tracker>();
dirsList.addAll(dirs.values());
if (sz > 0) {
Tracker tracker = dirsList.get(Math.min(dirsList.size() - 1,
random.nextInt(sz + 1)));
try {
if (tracker.refCnt.get() > 0) {
if (random.nextInt(10) > 7) {
df.doneWithDirectory(tracker.dir);
}
if (random.nextBoolean()) {
df.remove(tracker.dir);
}
if (random.nextBoolean()) {
} else {
df.remove(tracker.path);
}
tracker.refCnt.decrementAndGet();
df.release(tracker.dir);
}
} catch (Exception e) {
throw new RuntimeException("path:" + tracker.path + "ref cnt:" + tracker.refCnt, e);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
@ -170,7 +172,16 @@ public class CachingDirectoryFactoryTest extends SolrTestCaseJ4 {
throw new RuntimeException(e1);
}
try {
String path = "path" + random.nextInt(20);
String path;
if (random.nextBoolean()) {
path = "path" + random.nextInt(20);
} else {
if (random.nextBoolean()) {
path = "path" + random.nextInt(20) + "/" + random.nextInt(20);
} else {
path = "path" + random.nextInt(20) + "/" + random.nextInt(20) + "/" + random.nextInt(20);
}
}
synchronized (dirs) {
Tracker tracker = dirs.get(path);
if (tracker == null) {
@ -216,8 +227,14 @@ public class CachingDirectoryFactoryTest extends SolrTestCaseJ4 {
Tracker tracker = dirs.get(path);
if (tracker != null && tracker.refCnt.get() > 0) {
try {
df.incRef(tracker.dir);
} catch (SolrException e) {
log.warn("", e);
continue;
}
tracker.refCnt.incrementAndGet();
df.incRef(tracker.dir);
}
}

View File

@ -29,6 +29,8 @@ import java.io.Writer;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -55,6 +57,11 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CachingDirectoryFactory;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.StandardDirectoryFactory;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.util.AbstractSolrTestCase;
import org.junit.After;
import org.junit.Before;
@ -66,7 +73,6 @@ import org.junit.Test;
*
* @since 1.4
*/
// TODO: can this test be sped up? it used to not be so slow...
@Slow
public class TestReplicationHandler extends SolrTestCaseJ4 {
@ -185,7 +191,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
docList = (SolrDocumentList) res.get("response");
timeSlept += 100;
Thread.sleep(100);
} while(docList.getNumFound() != expectedDocCount && timeSlept < 45000);
} while(docList.getNumFound() != expectedDocCount && timeSlept < 30000);
return res;
}
@ -467,6 +473,9 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
assertTrue(slaveXsltDir.isDirectory());
assertTrue(slaveXsl.exists());
checkForSingleIndex(masterJetty);
checkForSingleIndex(slaveJetty);
}
@Test
@ -650,81 +659,126 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
NamedList<Object> details = getDetails(masterClient);
details = getDetails(slaveClient);
// NOTE: at this point, the slave is not polling any more
// restore it.
slave.copyConfigFile(CONF_DIR + "solrconfig-slave.xml", "solrconfig.xml");
slaveJetty.stop();
slaveJetty = createJetty(slave);
slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
checkForSingleIndex(masterJetty);
checkForSingleIndex(slaveJetty);
}
@Test
public void doTestStressReplication() throws Exception {
//change solrconfig on slave
//this has no entry for pollinginterval
slave.copyConfigFile(CONF_DIR + "solrconfig-slave1.xml", "solrconfig.xml");
slaveJetty.stop();
slaveJetty = createJetty(slave);
slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
// change solrconfig on slave
// this has no entry for pollinginterval
master.copyConfigFile(CONF_DIR + "solrconfig-master3.xml", "solrconfig.xml");
masterJetty.stop();
masterJetty = createJetty(master);
masterClient = createNewSolrServer(masterJetty.getLocalPort());
masterClient.deleteByQuery("*:*");
slaveClient.deleteByQuery("*:*");
slaveClient.commit();
int maxDocs = TEST_NIGHTLY ? 1000 : 200;
int rounds = TEST_NIGHTLY ? 80 : 10;
int totalDocs = 0;
int id = 0;
for (int x = 0; x < rounds; x++) {
// we randomly trigger a configuration replication
if (random().nextBoolean()) {
master.copyConfigFile(CONF_DIR + "schema-replication" + (random().nextInt(2) + 1) + ".xml", "schema.xml");
}
int docs = random().nextInt(maxDocs);
for (int i = 0; i < docs; i++) {
index(masterClient, "id", id++, "name", "name = " + i);
}
totalDocs += docs;
masterClient.commit();
NamedList masterQueryRsp = rQuery(totalDocs, "*:*", masterClient);
SolrDocumentList masterQueryResult = (SolrDocumentList) masterQueryRsp
.get("response");
assertEquals(totalDocs, masterQueryResult.getNumFound());
// snappull
pullFromMasterToSlave();
// get docs from slave and check if number is equal to master
NamedList slaveQueryRsp = rQuery(totalDocs, "*:*", slaveClient);
SolrDocumentList slaveQueryResult = (SolrDocumentList) slaveQueryRsp
.get("response");
assertEquals(totalDocs, slaveQueryResult.getNumFound());
// compare results
String cmp = BaseDistributedSearchTestCase.compare(masterQueryResult,
slaveQueryResult, 0, null);
assertEquals(null, cmp);
assertVersions(masterClient, slaveClient);
// get us a straight standard fs dir rather than mock*dir
boolean useStraightStandardDirectory = random().nextBoolean();
if (useStraightStandardDirectory) {
useFactory(null);
}
try {
slave
.copyConfigFile(CONF_DIR + "solrconfig-slave1.xml", "solrconfig.xml");
slaveJetty.stop();
slaveJetty = createJetty(slave);
slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
master.copyConfigFile(CONF_DIR + "solrconfig-master3.xml",
"solrconfig.xml");
masterJetty.stop();
masterJetty = createJetty(master);
masterClient = createNewSolrServer(masterJetty.getLocalPort());
masterClient.deleteByQuery("*:*");
slaveClient.deleteByQuery("*:*");
slaveClient.commit();
int maxDocs = TEST_NIGHTLY ? 1000 : 200;
int rounds = TEST_NIGHTLY ? 80 : 8;
int totalDocs = 0;
int id = 0;
for (int x = 0; x < rounds; x++) {
// we randomly trigger a configuration replication
// if (random().nextBoolean()) {
master.copyConfigFile(CONF_DIR + "schema-replication"
+ (random().nextInt(2) + 1) + ".xml", "schema.xml");
// }
int docs = random().nextInt(maxDocs);
for (int i = 0; i < docs; i++) {
index(masterClient, "id", id++, "name", "name = " + i);
}
totalDocs += docs;
masterClient.commit();
NamedList masterQueryRsp = rQuery(totalDocs, "*:*", masterClient);
SolrDocumentList masterQueryResult = (SolrDocumentList) masterQueryRsp
.get("response");
assertEquals(totalDocs, masterQueryResult.getNumFound());
// snappull
pullFromMasterToSlave();
// get docs from slave and check if number is equal to master
NamedList slaveQueryRsp = rQuery(totalDocs, "*:*", slaveClient);
SolrDocumentList slaveQueryResult = (SolrDocumentList) slaveQueryRsp
.get("response");
assertEquals(totalDocs, slaveQueryResult.getNumFound());
// compare results
String cmp = BaseDistributedSearchTestCase.compare(masterQueryResult,
slaveQueryResult, 0, null);
assertEquals(null, cmp);
assertVersions(masterClient, slaveClient);
checkForSingleIndex(masterJetty);
checkForSingleIndex(slaveJetty);
if (random().nextBoolean()) {
// move the slave ahead
for (int i = 0; i < 3; i++) {
index(slaveClient, "id", id++, "name", "name = " + i);
}
slaveClient.commit();
}
}
} finally {
if (useStraightStandardDirectory) {
resetFactory();
}
}
}
// NOTE: at this point, the slave is not polling any more
// restore it.
slave.copyConfigFile(CONF_DIR + "solrconfig-slave.xml", "solrconfig.xml");
slaveJetty.stop();
slaveJetty = createJetty(slave);
slaveClient = createNewSolrServer(slaveJetty.getLocalPort());
private void checkForSingleIndex(JettySolrRunner jetty) {
CoreContainer cores = ((SolrDispatchFilter) jetty.getDispatchFilter().getFilter()).getCores();
Collection<SolrCore> theCores = cores.getCores();
for (SolrCore core : theCores) {
String ddir = core.getDataDir();
CachingDirectoryFactory dirFactory = (CachingDirectoryFactory) core.getDirectoryFactory();
synchronized (dirFactory) {
assertEquals(dirFactory.getPaths().toString(), 2, dirFactory.getPaths().size());
}
if (dirFactory instanceof StandardDirectoryFactory) {
System.out.println(Arrays.asList(new File(ddir).list()));
assertEquals(Arrays.asList(new File(ddir).list()).toString(), 1, indexDirCount(ddir));
}
}
}
private int indexDirCount(String ddir) {
String[] list = new File(ddir).list();
int cnt = 0;
for (String file : list) {
if (!file.endsWith(".properties")) {
cnt++;
}
}
return cnt;
}
private void pullFromMasterToSlave() throws MalformedURLException,
@ -830,7 +884,6 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
// check vs /replication?command=indexversion call
resp = client2.request(req);
version = (Long) resp.get("indexversion");
assertEquals(maxVersionClient2, version);
}
@ -1127,6 +1180,8 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
SolrDocument d = ((SolrDocumentList) slaveQueryRsp.get("response")).get(0);
assertEquals("newname = 2001", (String) d.getFieldValue("newname"));
checkForSingleIndex(masterJetty);
checkForSingleIndex(slaveJetty);
}