Don't guard IndexShard#refresh calls by a check to isRefreshNeeded

While conceptually correct this call can be confusing since if a
realtime GET is exectued after refresh is called with realtime=true it might
fetch the wrong document out of the translog since the isRefreshNeeded guard
might return false once the new searcher is published but it will not
wait until the verision map is flushed and that can cause a slight race condition
if a subsequent call GETs a document that it expects to come from the index.

Ie. the `_size` mapper tests do this and expecte the GET call to return the document
from the index but it comes from the t-log due to this race.

This change only uses the guard in an async refresh that we schedule to prevent unnecessary
refresh calls but will allow API Refresh calls to have the somewhat less confusing semantics.

This was introduces lately only on unrelease major version branches.
This commit is contained in:
Simon Willnauer 2016-01-20 11:42:04 +01:00
parent ebe3fd2825
commit 054465287e
2 changed files with 27 additions and 16 deletions

View File

@ -635,7 +635,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
case STARTED: case STARTED:
case RELOCATED: case RELOCATED:
try { try {
shard.refresh("schedule"); if (shard.isRefreshNeeded()) {
shard.refresh("schedule");
}
} catch (EngineClosedException | AlreadyClosedException ex) { } catch (EngineClosedException | AlreadyClosedException ex) {
// fine - continue; // fine - continue;
} }

View File

@ -541,25 +541,23 @@ public class IndexShard extends AbstractIndexShardComponent {
/** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
public void refresh(String source) { public void refresh(String source) {
verifyNotClosed(); verifyNotClosed();
if (getEngine().refreshNeeded()) { if (canIndex()) {
if (canIndex()) { long bytes = getEngine().getIndexBufferRAMBytesUsed();
long bytes = getEngine().getIndexBufferRAMBytesUsed(); writingBytes.addAndGet(bytes);
writingBytes.addAndGet(bytes); try {
try { logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} finally {
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
writingBytes.addAndGet(-bytes);
}
} else {
logger.debug("refresh with source [{}]", source);
long time = System.nanoTime(); long time = System.nanoTime();
getEngine().refresh(source); getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time); refreshMetric.inc(System.nanoTime() - time);
} finally {
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
writingBytes.addAndGet(-bytes);
} }
} else {
logger.debug("refresh with source [{}]", source);
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} }
} }
@ -1514,4 +1512,15 @@ public class IndexShard extends AbstractIndexShardComponent {
return engineFactory; return engineFactory;
} }
/**
* Returns <code>true</code> iff one or more changes to the engine are not visible to via the current searcher.
* Otherwise <code>false</code>.
*
* @throws EngineClosedException if the engine is already closed
* @throws AlreadyClosedException if the internal indexwriter in the engine is already closed
*/
public boolean isRefreshNeeded() {
return getEngine().refreshNeeded();
}
} }