add internal source to refresh

can be used to more easily understand where its coming from
This commit is contained in:
Shay Banon 2013-09-22 17:58:40 +02:00
parent 57f962d620
commit 167538ef0d
12 changed files with 30 additions and 20 deletions

View File

@ -112,7 +112,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
@Override @Override
protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throws ElasticSearchException { protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId()); IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.refresh(new Engine.Refresh().force(request.force())); indexShard.refresh(new Engine.Refresh().force(request.force()).source("api"));
logger.trace("{} refresh request executed, force: [{}]", indexShard.shardId(), request.force()); logger.trace("{} refresh request executed, force: [{}]", indexShard.shardId(), request.force());
return new ShardRefreshResponse(request.index(), request.shardId()); return new ShardRefreshResponse(request.index(), request.shardId());
} }

View File

@ -338,7 +338,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh().force(false)); indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_bulk"));
} catch (Throwable e) { } catch (Throwable e) {
// ignore // ignore
} }
@ -553,7 +553,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh().force(false)); indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_bulk"));
} catch (Throwable e) { } catch (Throwable e) {
// ignore // ignore
} }

View File

@ -187,7 +187,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh().force(false)); indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_delete"));
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }
@ -208,7 +208,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh().force(false)); indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_delete"));
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }

View File

@ -100,7 +100,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh().force(false)); indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_delete"));
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }
@ -121,7 +121,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh().force(false)); indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_delete"));
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }

View File

@ -97,7 +97,7 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
IndexShard indexShard = indexService.shardSafe(shardId); IndexShard indexShard = indexService.shardSafe(shardId);
if (request.refresh() && !request.realtime()) { if (request.refresh() && !request.realtime()) {
indexShard.refresh(new Engine.Refresh().force(REFRESH_FORCE)); indexShard.refresh(new Engine.Refresh().force(REFRESH_FORCE).source("refresh_flag_get"));
} }
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(), GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),

View File

@ -106,7 +106,7 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
IndexShard indexShard = indexService.shardSafe(shardId); IndexShard indexShard = indexService.shardSafe(shardId);
if (request.refresh() && !request.realtime()) { if (request.refresh() && !request.realtime()) {
indexShard.refresh(new Engine.Refresh().force(TransportGetAction.REFRESH_FORCE)); indexShard.refresh(new Engine.Refresh().force(TransportGetAction.REFRESH_FORCE).source("refresh_flag_mget"));
} }
MultiGetShardResponse response = new MultiGetShardResponse(); MultiGetShardResponse response = new MultiGetShardResponse();

View File

@ -218,7 +218,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
} }
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh().force(false)); indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_index"));
} catch (Throwable e) { } catch (Throwable e) {
// ignore // ignore
} }
@ -252,7 +252,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
} }
if (request.refresh()) { if (request.refresh()) {
try { try {
indexShard.refresh(new Engine.Refresh().force(false)); indexShard.refresh(new Engine.Refresh().force(false).source("refresh_flag_index"));
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }

View File

@ -193,6 +193,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class Refresh { static class Refresh {
private boolean force = false; private boolean force = false;
private String source = "";
/** /**
* Forces calling refresh, overriding the check that dirty operations even happened. Defaults * Forces calling refresh, overriding the check that dirty operations even happened. Defaults
@ -207,9 +208,18 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.force; return this.force;
} }
public Refresh source(String source) {
this.source = source;
return this;
}
public String source() {
return this.source;
}
@Override @Override
public String toString() { public String toString() {
return "force[" + force + "]"; return "force[" + force + "], source [" + source + "]";
} }
} }

View File

@ -909,7 +909,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private void refreshVersioningTable(long time) { private void refreshVersioningTable(long time) {
// we need to refresh in order to clear older version values // we need to refresh in order to clear older version values
refresh(new Refresh().force(true)); refresh(new Refresh().force(true).source("version_table"));
for (Map.Entry<HashedBytesRef, VersionValue> entry : versionMap.entrySet()) { for (Map.Entry<HashedBytesRef, VersionValue> entry : versionMap.entrySet()) {
HashedBytesRef uid = entry.getKey(); HashedBytesRef uid = entry.getKey();
synchronized (dirtyLock(uid.bytes)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? synchronized (dirtyLock(uid.bytes)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?

View File

@ -183,7 +183,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
indexShard.start("post recovery from gateway"); indexShard.start("post recovery from gateway");
} }
// refresh the shard // refresh the shard
indexShard.refresh(new Engine.Refresh().force(true)); indexShard.refresh(new Engine.Refresh().force(true).source("post_gateway"));
recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime()); recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime());
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE); recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);

View File

@ -226,7 +226,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
private void loadQueries(IndexShard shard) { private void loadQueries(IndexShard shard) {
try { try {
shard.refresh(new Engine.Refresh().force(true)); shard.refresh(new Engine.Refresh().force(true).source("percolator_load_queries"));
Engine.Searcher searcher = shard.acquireSearcher(); Engine.Searcher searcher = shard.acquireSearcher();
try { try {
Query query = new XConstantScoreQuery( Query query = new XConstantScoreQuery(

View File

@ -256,7 +256,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
// make sure we refresh on state change due to cluster state changes // make sure we refresh on state change due to cluster state changes
if (newRouting.state() == ShardRoutingState.STARTED && (currentRouting == null || currentRouting.state() != ShardRoutingState.STARTED)) { if (newRouting.state() == ShardRoutingState.STARTED && (currentRouting == null || currentRouting.state() != ShardRoutingState.STARTED)) {
try { try {
engine.refresh(new Engine.Refresh().force(true)); engine.refresh(new Engine.Refresh().force(true).source("cluster_state_started"));
} catch (Throwable t) { } catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t); logger.debug("failed to refresh due to move to cluster wide started", t);
} }
@ -655,7 +655,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
// clear unreferenced files // clear unreferenced files
translog.clearUnreferenced(); translog.clearUnreferenced();
engine.refresh(new Engine.Refresh().force(true)); engine.refresh(new Engine.Refresh().force(true).source("recovery_finalization"));
synchronized (mutex) { synchronized (mutex) {
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED); logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED; state = IndexShardState.STARTED;
@ -825,7 +825,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public void run() { public void run() {
try { try {
if (engine.refreshNeeded()) { if (engine.refreshNeeded()) {
refresh(new Engine.Refresh().force(false)); refresh(new Engine.Refresh().force(false).source("scheduled"));
} }
} catch (EngineClosedException e) { } catch (EngineClosedException e) {
// we are being closed, ignore // we are being closed, ignore