Acquire ReadLock before reading SegmentInfos in RobinEngine

Before reading segmentinfos we should aquire the read lock to prevent
reading the segment infos on an already closed robin engine.
This commit is contained in:
Simon Willnauer 2013-09-24 13:56:08 +02:00
parent 5b1f263569
commit df0112358d
1 changed files with 27 additions and 31 deletions

View File

@ -278,9 +278,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
translog.newTranslog(translogIdGenerator.get()); translog.newTranslog(translogIdGenerator.get());
this.searcherManager = buildSearchManager(indexWriter); this.searcherManager = buildSearchManager(indexWriter);
SegmentInfos infos = new SegmentInfos(); readLastCommittedSegmentsInfo();
infos.read(store.directory());
lastCommittedSegmentInfos = infos;
} catch (IOException e) { } catch (IOException e) {
try { try {
indexWriter.rollback(); indexWriter.rollback();
@ -296,6 +294,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
} }
private void readLastCommittedSegmentsInfo() throws IOException {
SegmentInfos infos = new SegmentInfos();
infos.read(store.directory());
lastCommittedSegmentInfos = infos;
}
@Override @Override
public TimeValue defaultRefreshInterval() { public TimeValue defaultRefreshInterval() {
return new TimeValue(1, TimeUnit.SECONDS); return new TimeValue(1, TimeUnit.SECONDS);
@ -750,9 +754,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
@Override @Override
public void flush(Flush flush) throws EngineException { public void flush(Flush flush) throws EngineException {
if (indexWriter == null) { ensureOpen();
throw new EngineClosedException(shardId, failedEngine);
}
if (flush.type() == Flush.Type.NEW_WRITER || flush.type() == Flush.Type.COMMIT_TRANSLOG) { if (flush.type() == Flush.Type.NEW_WRITER || flush.type() == Flush.Type.COMMIT_TRANSLOG) {
// check outside the lock as well so we can check without blocking on the write lock // check outside the lock as well so we can check without blocking on the write lock
if (onGoingRecoveries.get() > 0) { if (onGoingRecoveries.get() > 0) {
@ -770,9 +772,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (flush.type() == Flush.Type.NEW_WRITER) { if (flush.type() == Flush.Type.NEW_WRITER) {
rwl.writeLock().lock(); rwl.writeLock().lock();
try { try {
if (indexWriter == null) { ensureOpen();
throw new EngineClosedException(shardId, failedEngine);
}
if (onGoingRecoveries.get() > 0) { if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
} }
@ -820,9 +820,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) { } else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) {
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
if (indexWriter == null) { ensureOpen();
throw new EngineClosedException(shardId, failedEngine);
}
if (onGoingRecoveries.get() > 0) { if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
} }
@ -862,9 +860,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// of that translog // of that translog
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
if (indexWriter == null) { ensureOpen();
throw new EngineClosedException(shardId, failedEngine);
}
// we allow to *just* commit if there is an ongoing recovery happening... // we allow to *just* commit if there is an ongoing recovery happening...
// its ok to use this, only a flush will cause a new translogId, and we are locked here from // its ok to use this, only a flush will cause a new translogId, and we are locked here from
// other flushes use flushLock // other flushes use flushLock
@ -892,21 +888,30 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
// reread the last committed segment infos // reread the last committed segment infos
rwl.readLock().lock();
try { try {
SegmentInfos infos = new SegmentInfos(); ensureOpen();
infos.read(store.directory()); readLastCommittedSegmentsInfo();
lastCommittedSegmentInfos = infos;
} catch (Throwable e) { } catch (Throwable e) {
if (!closed) { if (!closed) {
logger.warn("failed to read latest segment infos on flush", e); logger.warn("failed to read latest segment infos on flush", e);
} }
} finally {
rwl.readLock().unlock();
} }
} finally { } finally {
flushLock.unlock(); flushLock.unlock();
flushing.decrementAndGet(); flushing.decrementAndGet();
} }
} }
private void ensureOpen() {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
}
private void refreshVersioningTable(long time) { private void refreshVersioningTable(long time) {
// we need to refresh in order to clear older version values // we need to refresh in order to clear older version values
refresh(new Refresh("version_table").force(true)); refresh(new Refresh("version_table").force(true));
@ -939,9 +944,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
possibleMergeNeeded = false; possibleMergeNeeded = false;
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
if (indexWriter == null) { ensureOpen();
throw new EngineClosedException(shardId, failedEngine);
}
indexWriter.maybeMerge(); indexWriter.maybeMerge();
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
failEngine(e); failEngine(e);
@ -966,9 +969,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (optimizeMutex.compareAndSet(false, true)) { if (optimizeMutex.compareAndSet(false, true)) {
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
if (indexWriter == null) { ensureOpen();
throw new EngineClosedException(shardId, failedEngine);
}
if (optimize.onlyExpungeDeletes()) { if (optimize.onlyExpungeDeletes()) {
indexWriter.forceMergeDeletes(false); indexWriter.forceMergeDeletes(false);
} else if (optimize.maxNumSegments() <= 0) { } else if (optimize.maxNumSegments() <= 0) {
@ -1031,9 +1032,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true)); flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
if (indexWriter == null) { ensureOpen();
throw new EngineClosedException(shardId, failedEngine);
}
return deletionPolicy.snapshot(); return deletionPolicy.snapshot();
} catch (IOException e) { } catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e); throw new SnapshotFailedEngineException(shardId, e);
@ -1121,10 +1120,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
public List<Segment> segments() { public List<Segment> segments() {
rwl.readLock().lock(); rwl.readLock().lock();
try { try {
IndexWriter indexWriter = this.indexWriter; ensureOpen();
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
Map<String, Segment> segments = new HashMap<String, Segment>(); Map<String, Segment> segments = new HashMap<String, Segment>();
// first, go over and compute the search ones... // first, go over and compute the search ones...