Cleanup Exception Handling in RobinEngine & raise write lock timeout

The write-lock timeout on the index writer is 1s by default. Given the
default lock poll interval of 1s this gives an upper bound of 2 obtain
checks for a write lock which might be not enough under load.
This commit adds cleaned up exception handling and more warn logging
related to obtaining locks under load.
This commit is contained in:
Simon Willnauer 2013-08-15 22:11:02 +02:00
parent dbdef00a88
commit a16d1142a3
2 changed files with 35 additions and 42 deletions

View File

@ -129,7 +129,7 @@ public class Lucene {
try {
writer.close();
return true;
} catch (IOException e) {
} catch (Throwable e) {
return false;
}
}

View File

@ -26,7 +26,9 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
@ -80,8 +82,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.elasticsearch.common.lucene.Lucene.safeClose;
/**
*
*/
@ -225,7 +225,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// ignore
} catch (FlushNotAllowedEngineException e) {
// ignore
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to flush after setting shard to inactive", e);
}
} else {
@ -286,11 +286,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} catch (IOException e1) {
// ignore
} finally {
try {
indexWriter.close();
} catch (IOException e1) {
// ignore
}
IOUtils.closeWhileHandlingException(indexWriter);
}
throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e);
}
@ -344,7 +340,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
final Versions.DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
} catch (Exception e) {
} catch (Throwable e) {
searcher.release();
//TODO: A better exception goes here
throw new EngineException(shardId(), "Couldn't resolve version", e);
@ -733,7 +729,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
failEngine(e);
}
throw new RefreshFailedEngineException(shardId, e);
} catch (Exception e) {
} catch (Throwable e) {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
} else if (currentWriter != indexWriter) {
@ -796,8 +792,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
SearcherManager current = this.searcherManager;
this.searcherManager = buildSearchManager(indexWriter);
current.close();
IOUtils.closeWhileHandlingException(current); // ignore
refreshVersioningTable(threadPool.estimatedTimeInMillis());
} catch (OutOfMemoryError e) {
failEngine(e);
@ -807,7 +802,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
failEngine(e);
}
throw new FlushFailedEngineException(shardId, e);
} catch (Exception e) {
} catch (Throwable e) {
throw new FlushFailedEngineException(shardId, e);
}
} finally {
@ -844,7 +839,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
failEngine(e);
}
throw new FlushFailedEngineException(shardId, e);
} catch (Exception e) {
} catch (Throwable e) {
translog.revertTransient();
throw new FlushFailedEngineException(shardId, e);
}
@ -877,7 +872,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
failEngine(e);
}
throw new FlushFailedEngineException(shardId, e);
} catch (Exception e) {
} catch (Throwable e) {
throw new FlushFailedEngineException(shardId, e);
}
} finally {
@ -892,7 +887,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
SegmentInfos infos = new SegmentInfos();
infos.read(store.directory());
lastCommittedSegmentInfos = infos;
} catch (Exception e) {
} catch (Throwable e) {
if (!closed) {
logger.warn("failed to read latest segment infos on flush", e);
}
@ -947,7 +942,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
failEngine(e);
}
throw new OptimizeFailedEngineException(shardId, e);
} catch (Exception e) {
} catch (Throwable e) {
throw new OptimizeFailedEngineException(shardId, e);
} finally {
rwl.readLock().unlock();
@ -981,7 +976,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
failEngine(e);
}
throw new OptimizeFailedEngineException(shardId, e);
} catch (Exception e) {
} catch (Throwable e) {
throw new OptimizeFailedEngineException(shardId, e);
} finally {
rwl.readLock().unlock();
@ -1008,7 +1003,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
snapshotIndexCommit = deletionPolicy.snapshot();
traslogSnapshot = translog.snapshot();
} catch (Exception e) {
} catch (Throwable e) {
if (snapshotIndexCommit != null) {
snapshotIndexCommit.release();
}
@ -1055,14 +1050,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
SnapshotIndexCommit phase1Snapshot;
try {
phase1Snapshot = deletionPolicy.snapshot();
} catch (Exception e) {
} catch (Throwable e) {
--onGoingRecoveries;
throw new RecoveryEngineException(shardId, 1, "Snapshot failed", e);
}
try {
recoveryHandler.phase1(phase1Snapshot);
} catch (Exception e) {
} catch (Throwable e) {
--onGoingRecoveries;
phase1Snapshot.release();
if (closed) {
@ -1074,7 +1069,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
Translog.Snapshot phase2Snapshot;
try {
phase2Snapshot = translog.snapshot();
} catch (Exception e) {
} catch (Throwable e) {
--onGoingRecoveries;
phase1Snapshot.release();
if (closed) {
@ -1085,7 +1080,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
recoveryHandler.phase2(phase2Snapshot);
} catch (Exception e) {
} catch (Throwable e) {
--onGoingRecoveries;
phase1Snapshot.release();
phase2Snapshot.release();
@ -1100,7 +1095,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
phase3Snapshot = translog.snapshot(phase2Snapshot);
recoveryHandler.phase3(phase3Snapshot);
} catch (Exception e) {
} catch (Throwable e) {
throw new RecoveryEngineException(shardId, 3, "Execution failed", e);
} finally {
--onGoingRecoveries;
@ -1226,9 +1221,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
this.versionMap.clear();
this.failedEngineListeners.clear();
try {
if (searcherManager != null) {
searcherManager.close();
}
IOUtils.closeWhileHandlingException(searcherManager);
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
if (indexWriter != null) {
try {
@ -1237,7 +1230,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// ignore
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to rollback writer on close", e);
} finally {
indexWriter = null;
@ -1271,7 +1264,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
private IndexWriter createWriter() throws IOException {
IndexWriter indexWriter = null;
try {
// release locks when started
if (IndexWriter.isLocked(store.directory())) {
@ -1294,13 +1286,19 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
config.setReaderTermsIndexDivisor(termIndexDivisor);
config.setMaxThreadStates(indexConcurrency);
config.setCodec(codecService.codec(codecName));
/* We set this timeout to a highish value to work around
* the default poll interval in the Lucene lock that is
* 1000ms by default. We might need to poll multiple times
* here but with 1s poll this is only executed twice at most
* in combination with the default writelock timeout*/
config.setWriteLockTimeout(5000);
config.setUseCompoundFile(this.compoundOnFlush);
indexWriter = new IndexWriter(store.directory(), config);
} catch (IOException e) {
safeClose(indexWriter);
throw e;
return new IndexWriter(store.directory(), config);
} catch (LockObtainFailedException ex) {
boolean isLocked = IndexWriter.isLocked(store.directory());
logger.warn("Could not lock IndexWriter isLocked [{}]", ex, isLocked);
throw ex;
}
return indexWriter;
}
public static final String INDEX_TERM_INDEX_INTERVAL = "index.term_index_interval";
@ -1484,7 +1482,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
new SimpleSearcher(newSearcher));
warmer.warm(context);
}
} catch (Exception e) {
} catch (Throwable e) {
if (!closed) {
logger.warn("failed to prepare/warm", e);
}
@ -1494,12 +1492,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
currentSearcher.release();
}
if (newSearcher != null && closeNewSearcher) {
try {
// close the reader since we want decRef the inner readers
newSearcher.getIndexReader().close();
} catch (IOException e) {
// ignore
}
IOUtils.closeWhileHandlingException(newSearcher.getIndexReader()); // ignore
}
}
}