move the internal source for a refresh to be required
This commit is contained in:
parent
78af818d72
commit
ab6715b292
|
@ -112,7 +112,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
|
||||||
@Override
|
@Override
|
||||||
protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throws ElasticSearchException {
|
protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throws ElasticSearchException {
|
||||||
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
|
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
|
||||||
indexShard.refresh(new Engine.Refresh().force(request.force()).source("api"));
|
indexShard.refresh(new Engine.Refresh("api").force(request.force()));
|
||||||
logger.trace("{} refresh request executed, force: [{}]", indexShard.shardId(), request.force());
|
logger.trace("{} refresh request executed, force: [{}]", indexShard.shardId(), request.force());
|
||||||
return new ShardRefreshResponse(request.index(), request.shardId());
|
return new ShardRefreshResponse(request.index(), request.shardId());
|
||||||
}
|
}
|
||||||
|
|
|
@ -338,7 +338,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
|
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
try {
|
try {
|
||||||
indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_bulk"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_bulk").force(false));
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
@ -553,7 +553,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
|
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
try {
|
try {
|
||||||
indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_bulk"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_bulk").force(false));
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
|
|
@ -187,7 +187,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
|
||||||
|
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
try {
|
try {
|
||||||
indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_delete"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
@ -208,7 +208,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
|
||||||
|
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
try {
|
try {
|
||||||
indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_delete"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
|
||||||
|
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
try {
|
try {
|
||||||
indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_delete"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
@ -121,7 +121,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
|
||||||
|
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
try {
|
try {
|
||||||
indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_delete"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
|
||||||
IndexShard indexShard = indexService.shardSafe(shardId);
|
IndexShard indexShard = indexService.shardSafe(shardId);
|
||||||
|
|
||||||
if (request.refresh() && !request.realtime()) {
|
if (request.refresh() && !request.realtime()) {
|
||||||
indexShard.refresh(new Engine.Refresh().force(REFRESH_FORCE).source("refresh_flag_get"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_get").force(REFRESH_FORCE));
|
||||||
}
|
}
|
||||||
|
|
||||||
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),
|
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),
|
||||||
|
|
|
@ -106,7 +106,7 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
|
||||||
IndexShard indexShard = indexService.shardSafe(shardId);
|
IndexShard indexShard = indexService.shardSafe(shardId);
|
||||||
|
|
||||||
if (request.refresh() && !request.realtime()) {
|
if (request.refresh() && !request.realtime()) {
|
||||||
indexShard.refresh(new Engine.Refresh().force(TransportGetAction.REFRESH_FORCE).source("refresh_flag_mget"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_mget").force(TransportGetAction.REFRESH_FORCE));
|
||||||
}
|
}
|
||||||
|
|
||||||
MultiGetShardResponse response = new MultiGetShardResponse();
|
MultiGetShardResponse response = new MultiGetShardResponse();
|
||||||
|
|
|
@ -218,7 +218,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
}
|
}
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
try {
|
try {
|
||||||
indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_index"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
@ -252,7 +252,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
}
|
}
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
try {
|
try {
|
||||||
indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_index"));
|
indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
|
|
@ -204,8 +204,12 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||||
|
|
||||||
static class Refresh {
|
static class Refresh {
|
||||||
|
|
||||||
|
private final String source;
|
||||||
private boolean force = false;
|
private boolean force = false;
|
||||||
private String source = "";
|
|
||||||
|
public Refresh(String source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Forces calling refresh, overriding the check that dirty operations even happened. Defaults
|
* Forces calling refresh, overriding the check that dirty operations even happened. Defaults
|
||||||
|
@ -220,11 +224,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||||
return this.force;
|
return this.force;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Refresh source(String source) {
|
|
||||||
this.source = source;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String source() {
|
public String source() {
|
||||||
return this.source;
|
return this.source;
|
||||||
}
|
}
|
||||||
|
|
|
@ -909,7 +909,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
||||||
|
|
||||||
private void refreshVersioningTable(long time) {
|
private void refreshVersioningTable(long time) {
|
||||||
// we need to refresh in order to clear older version values
|
// we need to refresh in order to clear older version values
|
||||||
refresh(new Refresh().force(true).source("version_table"));
|
refresh(new Refresh("version_table").force(true));
|
||||||
for (Map.Entry<HashedBytesRef, VersionValue> entry : versionMap.entrySet()) {
|
for (Map.Entry<HashedBytesRef, VersionValue> entry : versionMap.entrySet()) {
|
||||||
HashedBytesRef uid = entry.getKey();
|
HashedBytesRef uid = entry.getKey();
|
||||||
synchronized (dirtyLock(uid.bytes)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
|
synchronized (dirtyLock(uid.bytes)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
|
||||||
|
|
|
@ -183,7 +183,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
||||||
indexShard.start("post recovery from gateway");
|
indexShard.start("post recovery from gateway");
|
||||||
}
|
}
|
||||||
// refresh the shard
|
// refresh the shard
|
||||||
indexShard.refresh(new Engine.Refresh().force(true).source("post_gateway"));
|
indexShard.refresh(new Engine.Refresh("post_gateway").force(true));
|
||||||
|
|
||||||
recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime());
|
recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime());
|
||||||
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
|
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
|
||||||
|
|
|
@ -226,7 +226,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
|
||||||
|
|
||||||
private void loadQueries(IndexShard shard) {
|
private void loadQueries(IndexShard shard) {
|
||||||
try {
|
try {
|
||||||
shard.refresh(new Engine.Refresh().force(true).source("percolator_load_queries"));
|
shard.refresh(new Engine.Refresh("percolator_load_queries").force(true));
|
||||||
Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries");
|
Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries");
|
||||||
try {
|
try {
|
||||||
Query query = new XConstantScoreQuery(
|
Query query = new XConstantScoreQuery(
|
||||||
|
|
|
@ -256,7 +256,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
// make sure we refresh on state change due to cluster state changes
|
// make sure we refresh on state change due to cluster state changes
|
||||||
if (newRouting.state() == ShardRoutingState.STARTED && (currentRouting == null || currentRouting.state() != ShardRoutingState.STARTED)) {
|
if (newRouting.state() == ShardRoutingState.STARTED && (currentRouting == null || currentRouting.state() != ShardRoutingState.STARTED)) {
|
||||||
try {
|
try {
|
||||||
engine.refresh(new Engine.Refresh().force(true).source("cluster_state_started"));
|
engine.refresh(new Engine.Refresh("cluster_state_started").force(true));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
logger.debug("failed to refresh due to move to cluster wide started", t);
|
logger.debug("failed to refresh due to move to cluster wide started", t);
|
||||||
}
|
}
|
||||||
|
@ -655,7 +655,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
}
|
}
|
||||||
// clear unreferenced files
|
// clear unreferenced files
|
||||||
translog.clearUnreferenced();
|
translog.clearUnreferenced();
|
||||||
engine.refresh(new Engine.Refresh().force(true).source("recovery_finalization"));
|
engine.refresh(new Engine.Refresh("recovery_finalization").force(true));
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED);
|
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED);
|
||||||
state = IndexShardState.STARTED;
|
state = IndexShardState.STARTED;
|
||||||
|
@ -825,7 +825,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
if (engine.refreshNeeded()) {
|
if (engine.refreshNeeded()) {
|
||||||
refresh(new Engine.Refresh().force(false).source("scheduled"));
|
refresh(new Engine.Refresh("scheduled").force(false));
|
||||||
}
|
}
|
||||||
} catch (EngineClosedException e) {
|
} catch (EngineClosedException e) {
|
||||||
// we are being closed, ignore
|
// we are being closed, ignore
|
||||||
|
|
|
@ -209,7 +209,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
|
|
||||||
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_2, false);
|
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_2, false);
|
||||||
engine.create(new Engine.Create(null, newUid("2"), doc2));
|
engine.create(new Engine.Create(null, newUid("2"), doc2));
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
segments = engine.segments();
|
segments = engine.segments();
|
||||||
assertThat(segments.size(), equalTo(1));
|
assertThat(segments.size(), equalTo(1));
|
||||||
|
@ -233,7 +233,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
|
|
||||||
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
|
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
|
||||||
engine.create(new Engine.Create(null, newUid("3"), doc3));
|
engine.create(new Engine.Create(null, newUid("3"), doc3));
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
segments = engine.segments();
|
segments = engine.segments();
|
||||||
assertThat(segments.size(), equalTo(2));
|
assertThat(segments.size(), equalTo(2));
|
||||||
|
@ -253,7 +253,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
|
|
||||||
|
|
||||||
engine.delete(new Engine.Delete("test", "1", newUid("1")));
|
engine.delete(new Engine.Delete("test", "1", newUid("1")));
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
segments = engine.segments();
|
segments = engine.segments();
|
||||||
assertThat(segments.size(), equalTo(2));
|
assertThat(segments.size(), equalTo(2));
|
||||||
|
@ -273,7 +273,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true).build());
|
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true).build());
|
||||||
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
|
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
|
||||||
engine.create(new Engine.Create(null, newUid("4"), doc4));
|
engine.create(new Engine.Create(null, newUid("4"), doc4));
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
segments = engine.segments();
|
segments = engine.segments();
|
||||||
assertThat(segments.size(), equalTo(3));
|
assertThat(segments.size(), equalTo(3));
|
||||||
|
@ -327,7 +327,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
assertThat(getResult.exists(), equalTo(false));
|
assertThat(getResult.exists(), equalTo(false));
|
||||||
getResult.release();
|
getResult.release();
|
||||||
// refresh and it should be there
|
// refresh and it should be there
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
// now its there...
|
// now its there...
|
||||||
searchResult = engine.acquireSearcher("test");
|
searchResult = engine.acquireSearcher("test");
|
||||||
|
@ -363,7 +363,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
getResult.release();
|
getResult.release();
|
||||||
|
|
||||||
// refresh and it should be updated
|
// refresh and it should be updated
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
searchResult = engine.acquireSearcher("test");
|
searchResult = engine.acquireSearcher("test");
|
||||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||||
|
@ -387,7 +387,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
getResult.release();
|
getResult.release();
|
||||||
|
|
||||||
// refresh and it should be deleted
|
// refresh and it should be deleted
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
searchResult = engine.acquireSearcher("test");
|
searchResult = engine.acquireSearcher("test");
|
||||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||||
|
@ -409,7 +409,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
searchResult.release();
|
searchResult.release();
|
||||||
|
|
||||||
// refresh and it should be there
|
// refresh and it should be there
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
// now its there...
|
// now its there...
|
||||||
searchResult = engine.acquireSearcher("test");
|
searchResult = engine.acquireSearcher("test");
|
||||||
|
@ -443,7 +443,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
searchResult.release();
|
searchResult.release();
|
||||||
|
|
||||||
// refresh and it should be updated
|
// refresh and it should be updated
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
searchResult = engine.acquireSearcher("test");
|
searchResult = engine.acquireSearcher("test");
|
||||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||||
|
@ -471,7 +471,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
searchResult.release();
|
searchResult.release();
|
||||||
|
|
||||||
// refresh and it should be there
|
// refresh and it should be there
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
|
|
||||||
// now its there...
|
// now its there...
|
||||||
searchResult = engine.acquireSearcher("test");
|
searchResult = engine.acquireSearcher("test");
|
||||||
|
@ -481,7 +481,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||||
|
|
||||||
// delete, refresh and do a new search, it should not be there
|
// delete, refresh and do a new search, it should not be there
|
||||||
engine.delete(new Engine.Delete("test", "1", newUid("1")));
|
engine.delete(new Engine.Delete("test", "1", newUid("1")));
|
||||||
engine.refresh(new Engine.Refresh().force(false));
|
engine.refresh(new Engine.Refresh("test").force(false));
|
||||||
Engine.Searcher updateSearchResult = engine.acquireSearcher("test");
|
Engine.Searcher updateSearchResult = engine.acquireSearcher("test");
|
||||||
MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||||
updateSearchResult.release();
|
updateSearchResult.release();
|
||||||
|
|
Loading…
Reference in New Issue