improve flush concurrency by refreshing outside of the write lock

This commit is contained in:
kimchy 2011-05-14 00:49:17 +03:00
parent c5305a0545
commit 603d462d4a
1 changed files with 64 additions and 52 deletions

View File

@ -663,70 +663,78 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// We can't do prepareCommit here, since we rely on the the segment version for the translog version
rwl.writeLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
rwl.writeLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
if (flush.full()) {
// disable refreshing, not dirty
dirty = false;
try {
// that's ok if the index writer failed and is in inconsistent state
// we will get an exception on a dirty operation, and will cause the shard
// to be allocated to a different node
indexWriter.close();
indexWriter = createWriter();
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
nrtResource = buildNrtResource(indexWriter);
current.markForClose();
translog.newTranslog(newTransactionLogId());
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
}
} else {
try {
indexWriter.commit();
translog.newTranslog(newTransactionLogId());
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
rwl.writeLock().unlock();
}
// we need to refresh in order to clear older version values
long time = threadPool.estimatedTimeInMillis(); // mark time here, before we refresh, and then delete all older values
refresh(new Refresh(true).force(true));
if (indexingSearcher.get() != null) {
indexingSearcher.get().release();
indexingSearcher.set(null);
}
if (flush.full()) {
// disable refreshing, not dirty
dirty = false;
try {
// that's ok if the index writer failed and is in inconsistent state
// we will get an exception on a dirty operation, and will cause the shard
// to be allocated to a different node
indexWriter.close();
indexWriter = createWriter();
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
nrtResource = buildNrtResource(indexWriter);
current.markForClose();
translog.newTranslog(newTransactionLogId());
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
}
} else {
try {
indexWriter.commit();
translog.newTranslog(newTransactionLogId());
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) {
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
}
}
// remove all version except for deletes, which we expire based on GC value
long time = threadPool.estimatedTimeInMillis();
for (Map.Entry<String, VersionValue> entry : versionMap.entrySet()) {
if (entry.getValue().delete()) {
if ((time - entry.getValue().time()) > gcDeletesInMillis) {
versionMap.remove(entry.getKey());
String id = entry.getKey();
synchronized (dirtyLock(id)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
VersionValue versionValue = versionMap.get(id);
if (versionValue == null) {
continue;
}
if (time - versionValue.time() <= 0) {
continue; // its a newer value, from after/during we refreshed, don't clear it
}
if (versionValue.delete()) {
if ((time - versionValue.time()) > gcDeletesInMillis) {
versionMap.remove(id);
}
} else {
versionMap.remove(id);
}
} else {
versionMap.remove(entry.getKey());
}
}
dirty = true; // force a refresh
// we need to do a refresh here so we sync versioning support
refresh(new Refresh(true).force(true));
} finally {
rwl.writeLock().unlock();
flushing.set(false);
}
// we refresh anyhow before...
// if (flush.refresh()) {
// refresh(new Refresh(false));
// }
}
@Override public void maybeMerge() throws EngineException {
@ -944,8 +952,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
private Object dirtyLock(String id) {
return dirtyLocks[Math.abs(id.hashCode()) % dirtyLocks.length];
}
private Object dirtyLock(Term uid) {
return dirtyLocks[Math.abs(uid.hashCode()) % dirtyLocks.length];
return dirtyLock(uid.text());
}
private long loadCurrentVersionFromIndex(Term uid) {