more eager refresh logic, execute a refresh even if one is on going so no operations are missed

This commit is contained in:
kimchy 2011-02-08 12:52:45 +02:00
parent 37b1415b57
commit 4084db378d
1 changed files with 27 additions and 35 deletions

View File

@ -82,8 +82,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final AtomicBoolean refreshMutex = new AtomicBoolean();
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
private final Store store;
@ -589,43 +587,40 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
@Override public void refresh(Refresh refresh) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
rwl.readLock().lock();
if (indexWriter == null) {
throw new EngineClosedException(shardId);
}
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
rwl.readLock().lock();
try {
// this engine always acts as if waitForOperations=true
if (refreshMutex.compareAndSet(false, true)) {
IndexWriter currentWriter = indexWriter;
if (currentWriter == null) {
throw new EngineClosedException(shardId);
IndexWriter currentWriter = indexWriter;
if (currentWriter == null) {
throw new EngineClosedException(shardId);
}
try {
if (dirty) {
// we eagerly set dirty to false so we won't miss refresh requests
dirty = false;
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
IndexReader newReader = current.resource().reader().reopen(true);
if (newReader != current.resource().reader()) {
ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(newReader);
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
nrtResource = newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
current.markForClose();
}
}
try {
if (dirty) {
dirty = false;
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
IndexReader newReader = current.resource().reader().reopen(true);
if (newReader != current.resource().reader()) {
ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(newReader);
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
nrtResource = newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
current.markForClose();
}
}
} catch (AlreadyClosedException e) {
} catch (AlreadyClosedException e) {
// an index writer got replaced on us, ignore
} catch (Exception e) {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
} else if (currentWriter != indexWriter) {
// an index writer got replaced on us, ignore
} catch (Exception e) {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
} else if (currentWriter != indexWriter) {
// an index writer got replaced on us, ignore
} else {
throw new RefreshFailedEngineException(shardId, e);
}
} finally {
refreshMutex.set(false);
} else {
throw new RefreshFailedEngineException(shardId, e);
}
}
} finally {
@ -677,7 +672,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (flush.full()) {
// disable refreshing, not dirty
dirty = false;
refreshMutex.set(true);
try {
// that's ok if the index writer failed and is in inconsistent state
// we will get an exception on a dirty operation, and will cause the shard
@ -690,8 +684,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
translog.newTranslog(newTransactionLogId());
} catch (IOException e) {
throw new FlushFailedEngineException(shardId, e);
} finally {
refreshMutex.set(false);
}
} else {
try {