better timeout handling waiting for primary to be active for indexing

take into account the correct delta timeout when scheduling it, since now we can retry again after removing a listener
This commit is contained in:
Shay Banon 2013-10-31 15:54:18 +01:00
parent fcfc41209b
commit c68016bb83
1 changed files with 57 additions and 45 deletions

View File

@ -313,6 +313,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private volatile ShardIterator shardIt;
private final AtomicBoolean primaryOperationStarted = new AtomicBoolean();
private final ReplicationType replicationType;
protected final long startTime = System.currentTimeMillis();
AsyncShardOperationAction(Request request, ActionListener<Response> listener) {
this.request = request;
@ -481,62 +482,73 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
void retry(boolean fromClusterEvent, @Nullable final Throwable failure) {
if (!fromClusterEvent) {
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
request.operationThreaded(true);
clusterService.add(request.timeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
// check if state version changed while we were adding this listener
long sampledVersion = clusterState.version();
long currentVersion = clusterService.state().version();
if (sampledVersion != currentVersion) {
logger.trace("state change while we were trying to add listener, trying to start again, sampled_version [{}], current_version [{}]", sampledVersion, currentVersion);
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
}
if (fromClusterEvent) {
logger.trace("retry scheduling ignored as it as we already have a listener in place");
return;
}
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
request.operationThreaded(true);
@Override
public void clusterChanged(ClusterChangedEvent event) {
logger.trace("cluster changed (version {}), trying to start again", event.state().version());
TimeValue timeout = new TimeValue(request.timeout().millis() - (System.currentTimeMillis() - startTime));
if (timeout.millis() <= 0) {
raiseTimeoutFailure(timeout, failure);
return;
}
clusterService.add(timeout, new TimeoutClusterStateListener() {
@Override
public void postAdded() {
// check if state version changed while we were adding this listener
long sampledVersion = clusterState.version();
long currentVersion = clusterService.state().version();
if (sampledVersion != currentVersion) {
logger.trace("state change while we were trying to add listener, trying to start again, sampled_version [{}], current_version [{}]", sampledVersion, currentVersion);
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
}
@Override
public void onTimeout(TimeValue timeValue) {
// just to be on the safe side, see if we can start it now?
if (start(true)) {
clusterService.remove(this);
return;
}
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
logger.trace("cluster changed (version {}), trying to start again", event.state().version());
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
Throwable listenerFailure = failure;
if (listenerFailure == null) {
if (shardIt == null) {
listenerFailure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeValue + "], request: " + request.toString());
} else {
listenerFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
}
}
listener.onFailure(listenerFailure);
}
});
} else {
logger.trace("retry scheduling ignored as it as we already have a listener in place");
}
@Override
public void onTimeout(TimeValue timeValue) {
// just to be on the safe side, see if we can start it now?
if (start(true)) {
clusterService.remove(this);
return;
}
clusterService.remove(this);
raiseTimeoutFailure(timeValue, failure);
}
});
}
void raiseTimeoutFailure(TimeValue timeout, @Nullable Throwable failure) {
if (failure == null) {
if (shardIt == null) {
failure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeout + "], request: " + request.toString());
} else {
failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + request.toString());
}
}
listener.onFailure(failure);
}
void performOnPrimary(int primaryShardId, final ShardRouting shard, ClusterState clusterState) {