fix relocation of primaries and perform the operation on the relocated primary replica as well

This commit is contained in:
kimchy 2010-07-19 09:54:52 +03:00
parent 0d20790ffe
commit df602054fb
6 changed files with 60 additions and 49 deletions

View File

@ -57,7 +57,7 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
return new ShardReplicationPingResponse();
}
@Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
}
@Override protected ShardsIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) {

View File

@ -61,7 +61,7 @@ public class TransportShardGatewaySnapshotAction extends TransportShardReplicati
return new ShardGatewaySnapshotResponse();
}
@Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
// silently ignore, we disable it with #ignoreBackups anyhow
}
@ -72,7 +72,7 @@ public class TransportShardGatewaySnapshotAction extends TransportShardReplicati
/**
* Snapshot should only happen on primary shards.
*/
@Override protected boolean ignoreBackups() {
@Override protected boolean ignoreReplicas() {
return true;
}
}

View File

@ -100,7 +100,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
return new DeleteResponse(request.index(), request.type(), request.id());
}
@Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
indexShard(shardRequest).delete(request.type(), request.id());
}

View File

@ -66,7 +66,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
return new ShardDeleteByQueryResponse();
}
@Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types());
}

View File

@ -137,7 +137,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
return new IndexResponse(request.index(), request.type(), request.id());
}
@Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
IndexRequest request = shardRequest.request;
if (request.opType() == IndexRequest.OpType.INDEX) {
indexShard(shardRequest).index(request.type(), request.id(), request.source());

View File

@ -83,7 +83,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.shardStateAction = shardStateAction;
transportService.registerHandler(transportAction(), new OperationTransportHandler());
transportService.registerHandler(transportBackupAction(), new BackupOperationTransportHandler());
transportService.registerHandler(transportReplicaAction(), new ReplicaOperationTransportHandler());
this.defaultReplicationType = ReplicationType.fromString(settings.get("action.replication_type", "sync"));
}
@ -100,7 +100,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract Response shardOperationOnPrimary(ShardOperationRequest shardRequest);
protected abstract void shardOperationOnBackup(ShardOperationRequest shardRequest);
protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest);
protected abstract ShardsIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
@ -109,15 +109,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
/**
* Should the operations be performed on the backups as well. Defaults to <tt>false</tt> meaning operations
* will be executed on the backup.
* Should the operations be performed on the replicas as well. Defaults to <tt>false</tt> meaning operations
* will be executed on the replica.
*/
protected boolean ignoreBackups() {
protected boolean ignoreReplicas() {
return false;
}
private String transportBackupAction() {
return transportAction() + "/backup";
private String transportReplicaAction() {
return transportAction() + "/replica";
}
protected IndexShard indexShard(ShardOperationRequest shardRequest) {
@ -159,19 +159,19 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
private class BackupOperationTransportHandler extends BaseTransportRequestHandler<ShardOperationRequest> {
private class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ShardOperationRequest> {
@Override public ShardOperationRequest newInstance() {
return new ShardOperationRequest();
}
@Override public void messageReceived(ShardOperationRequest request, TransportChannel channel) throws Exception {
shardOperationOnBackup(request);
shardOperationOnReplica(request);
channel.sendResponse(VoidStreamable.INSTANCE);
}
/**
* We spawn, since we want to perform the operation on the backup on a different thread.
* We spawn, since we want to perform the operation on the replica on a different thread.
*/
@Override public boolean spawn() {
return true;
@ -356,7 +356,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard) {
try {
Response response = shardOperationOnPrimary(new ShardOperationRequest(primaryShardId, request));
performBackups(response, alreadyThreaded);
performReplicas(response, alreadyThreaded);
} catch (IndexShardNotStartedException e) {
// still in recovery, retry (we know that its not UNASSIGNED OR INITIALIZING since we are checking it in the calling method)
retryPrimary(fromDiscoveryListener, shard.shardId());
@ -368,8 +368,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
private void performBackups(final Response response, boolean alreadyThreaded) {
if (ignoreBackups() || shards.size() == 1 /* no backups */) {
private void performReplicas(final Response response, boolean alreadyThreaded) {
if (ignoreReplicas() || shards.size() == 1 /* no replicas */) {
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response);
} else {
@ -383,7 +383,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
// initialize the counter
int backupCounter = 0;
int replicaCounter = 0;
if (replicationType == ReplicationType.ASYNC) {
// async replication, notify the listener
@ -397,28 +397,37 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
});
}
// now, trick the counter so it won't decrease to 0
backupCounter = -100;
replicaCounter = -100;
}
for (final ShardRouting shard : shards.reset()) {
// if the shard is primary and relocating, add one to the counter since we perform it on the replica as well
if (shard.primary()) {
continue;
if (shard.relocating()) {
replicaCounter++;
}
backupCounter++;
// if we are relocating the backup, we want to perform the index operation on both the relocating
} else {
replicaCounter++;
// if we are relocating the replica, we want to perform the index operation on both the relocating
// shard and the target shard. This means that we won't loose index operations between end of recovery
// and reassignment of the shard by the master node
if (shard.relocating()) {
backupCounter++;
replicaCounter++;
}
}
}
AtomicInteger counter = new AtomicInteger(backupCounter);
AtomicInteger counter = new AtomicInteger(replicaCounter);
for (final ShardRouting shard : shards.reset()) {
boolean doOnlyOnRelocating = false;
if (shard.primary()) {
if (shard.relocating()) {
doOnlyOnRelocating = true;
} else {
continue;
}
// we index on a backup that is initializing as well since we might not have got the event
}
// we index on a replica that is initializing as well since we might not have got the event
// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
@ -439,26 +448,28 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
continue;
}
performOnBackup(response, counter, shard, shard.currentNodeId());
if (!doOnlyOnRelocating) {
performOnReplica(response, counter, shard, shard.currentNodeId());
}
if (shard.relocating()) {
performOnBackup(response, counter, shard, shard.relocatingNodeId());
performOnReplica(response, counter, shard, shard.relocatingNodeId());
}
}
}
private void performOnBackup(final Response response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
private void performOnReplica(final Response response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
final ShardOperationRequest shardRequest = new ShardOperationRequest(shards.shardId().id(), request);
if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId);
transportService.sendRequest(node, transportBackupAction(), shardRequest, new VoidTransportResponseHandler() {
transportService.sendRequest(node, transportReplicaAction(), shardRequest, new VoidTransportResponseHandler() {
@Override public void handleResponse(VoidStreamable vResponse) {
finishIfPossible();
}
@Override public void handleException(RemoteTransportException exp) {
if (!ignoreBackupException(exp.unwrapCause())) {
logger.warn("Failed to perform " + transportAction() + " on backup " + shards.shardId(), exp);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on backup, message [" + detailedMessage(exp) + "]");
if (!ignoreReplicaException(exp.unwrapCause())) {
logger.warn("Failed to perform " + transportAction() + " on replica " + shards.shardId(), exp);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(exp) + "]");
}
finishIfPossible();
}
@ -488,11 +499,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
threadPool.execute(new Runnable() {
@Override public void run() {
try {
shardOperationOnBackup(shardRequest);
shardOperationOnReplica(shardRequest);
} catch (Exception e) {
if (!ignoreBackupException(e)) {
logger.warn("Failed to perform " + transportAction() + " on backup " + shards.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on backup, message [" + detailedMessage(e) + "]");
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction() + " on replica " + shards.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
@ -502,11 +513,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
});
} else {
try {
shardOperationOnBackup(shardRequest);
shardOperationOnReplica(shardRequest);
} catch (Exception e) {
if (!ignoreBackupException(e)) {
logger.warn("Failed to perform " + transportAction() + " on backup " + shards.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on backup, message [" + detailedMessage(e) + "]");
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction() + " on replica" + shards.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(e) + "]");
}
}
if (counter.decrementAndGet() == 0) {
@ -525,7 +536,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
/**
* Should an exception be ignored when the operation is performed on the backup. The exception
* Should an exception be ignored when the operation is performed on the replica. The exception
* is ignored if it is:
*
* <ul>
@ -533,7 +544,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
* <li><tt>IndexMissingException</tt>/<tt>IndexShardMissingException</tt>: The shard has not yet started to initialize on the target node.
* </ul>
*/
private boolean ignoreBackupException(Throwable e) {
private boolean ignoreReplicaException(Throwable e) {
if (e instanceof IllegalIndexShardStateException) {
return true;
}