Internal: Rename TransportShardReplicationOperationAction to TransportReplicationAction

TransportShardReplicationOperationAction is a mouthful and is the only thing we mean when we say replication.  This commit also changes some related friends.
This commit is contained in:
Boaz Leskes 2015-05-25 16:43:23 +03:00
parent e31049988b
commit 20d575d257
13 changed files with 49 additions and 67 deletions

View File

@ -19,9 +19,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -32,7 +30,7 @@ import java.util.List;
/**
*
*/
public class BulkShardRequest extends ShardReplicationOperationRequest<BulkShardRequest> {
public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
private int shardId;

View File

@ -28,7 +28,7 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
@ -65,7 +65,7 @@ import java.util.Map;
/**
* Performs the index operation.
*/
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
public class TransportShardBulkAction extends TransportReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
private final static String OP_TYPE_UPDATE = "update";
private final static String OP_TYPE_DELETE = "delete";

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -44,7 +44,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
* @see org.elasticsearch.client.Requests#deleteRequest(String)
*/
public class DeleteRequest extends ShardReplicationOperationRequest<DeleteRequest> implements DocumentRequest<DeleteRequest> {
public class DeleteRequest extends ReplicationRequest<DeleteRequest> implements DocumentRequest<DeleteRequest> {
private String type;
private String id;

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.VersionType;
@ -27,7 +27,7 @@ import org.elasticsearch.index.VersionType;
/**
* A delete document action request builder.
*/
public class DeleteRequestBuilder extends ShardReplicationOperationRequestBuilder<DeleteRequest, DeleteResponse, DeleteRequestBuilder> {
public class DeleteRequestBuilder extends ReplicationRequestBuilder<DeleteRequest, DeleteResponse, DeleteRequestBuilder> {
public DeleteRequestBuilder(ElasticsearchClient client, DeleteAction action) {
super(client, action, new DeleteRequest());

View File

@ -27,7 +27,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
@ -50,7 +50,7 @@ import org.elasticsearch.transport.TransportService;
/**
* Performs the delete operation.
*/
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse> {
public class TransportDeleteAction extends TransportReplicationAction<DeleteRequest, DeleteRequest, DeleteResponse> {
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;

View File

@ -20,11 +20,10 @@
package org.elasticsearch.action.index;
import com.google.common.base.Charsets;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -63,7 +62,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* @see org.elasticsearch.client.Requests#indexRequest(String)
* @see org.elasticsearch.client.Client#index(IndexRequest)
*/
public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest> implements DocumentRequest<IndexRequest> {
public class IndexRequest extends ReplicationRequest<IndexRequest> implements DocumentRequest<IndexRequest> {
/**
* Operation type controls if the type of the index operation.

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.index;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
@ -32,7 +32,7 @@ import java.util.Map;
/**
* An index document action request builder.
*/
public class IndexRequestBuilder extends ShardReplicationOperationRequestBuilder<IndexRequest, IndexResponse, IndexRequestBuilder> {
public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest, IndexResponse, IndexRequestBuilder> {
public IndexRequestBuilder(ElasticsearchClient client, IndexAction action) {
super(client, action, new IndexRequest());

View File

@ -27,7 +27,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
@ -61,7 +61,7 @@ import org.elasticsearch.transport.TransportService;
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*/
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
public class TransportIndexAction extends TransportReplicationAction<IndexRequest, IndexRequest, IndexResponse> {
private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration;

View File

@ -37,7 +37,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
*
*/
public abstract class ShardReplicationOperationRequest<T extends ShardReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
public abstract class ReplicationRequest<T extends ReplicationRequest> extends ActionRequest<T> implements IndicesRequest {
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
@ -50,21 +50,21 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private volatile boolean canHaveDuplicates = false;
protected ShardReplicationOperationRequest() {
protected ReplicationRequest() {
}
/**
* Creates a new request that inherits headers and context from the request provided as argument.
*/
protected ShardReplicationOperationRequest(ActionRequest request) {
protected ReplicationRequest(ActionRequest request) {
super(request);
}
/**
* Copy constructor that creates a new request that is a copy of the one provided as an argument.
*/
protected ShardReplicationOperationRequest(T request) {
protected ReplicationRequest(T request) {
this(request, request);
}
@ -72,7 +72,7 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
* Copy constructor that creates a new request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
protected ShardReplicationOperationRequest(T request, ActionRequest originalRequest) {
protected ReplicationRequest(T request, ActionRequest originalRequest) {
super(originalRequest);
this.timeout = request.timeout();
this.index = request.index();

View File

@ -23,16 +23,15 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
/**
*/
public abstract class ShardReplicationOperationRequestBuilder<Request extends ShardReplicationOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends ShardReplicationOperationRequestBuilder<Request, Response, RequestBuilder>>
public abstract class ReplicationRequestBuilder<Request extends ReplicationRequest<Request>, Response extends ActionResponse, RequestBuilder extends ReplicationRequestBuilder<Request, Response, RequestBuilder>>
extends ActionRequestBuilder<Request, Response, RequestBuilder> {
protected ShardReplicationOperationRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
protected ReplicationRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
super(client, action, request);
}

View File

@ -26,8 +26,8 @@ import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
@ -76,7 +76,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
protected final TransportService transportService;
protected final ClusterService clusterService;
@ -90,11 +90,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
final String executor;
final boolean checkWriteConsistency;
protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
Class<Request> request, Class<ReplicaRequest> replicaRequest, String executor) {
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
Class<Request> request, Class<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, threadPool, actionFilters);
this.transportService = transportService;
this.clusterService = clusterService;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.support.replication;
import com.google.common.base.Predicate;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -42,13 +41,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
@ -83,17 +76,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.hamcrest.Matchers.*;
public class ShardReplicationOperationTests extends ElasticsearchTestCase {
public class ShardReplicationTests extends ElasticsearchTestCase {
private static ThreadPool threadPool;
@ -102,13 +88,13 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
private CapturingTransport transport;
private Action action;
/* *
* TransportShardReplicationOperationAction needs an instance of IndexShard to count operations.
* TransportReplicationAction needs an instance of IndexShard to count operations.
* indexShards is reset to null before each test and will be initialized upon request in the tests.
*/
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool("ShardReplicationOperationTests");
threadPool = new ThreadPool("ShardReplicationTests");
}
@Before
@ -145,7 +131,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
assertFalse("primary phase should stop execution", primaryPhase.checkBlocks());
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
@ -277,7 +263,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
Request request = new Request(shardId).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class);
@ -309,7 +295,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
Request request = new Request(shardId);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
assertTrue(primaryPhase.checkBlocks());
primaryPhase.routeRequestOrPerformLocally(shardRoutingTable.primaryShard(), shardRoutingTable.shardsIt());
if (primaryNodeId.equals(clusterService.localNode().id())) {
@ -374,7 +360,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
if (passesWriteConsistency) {
assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue());
primaryPhase.run();
@ -457,11 +443,11 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint());
final TransportShardReplicationOperationAction<Request, Request, Response>.InternalRequest internalRequest = action.new InternalRequest(request);
final TransportReplicationAction<Request, Request, Response>.InternalRequest internalRequest = action.new InternalRequest(request);
internalRequest.concreteIndex(shardId.index().name());
Releasable reference = getOrCreateIndexShardOperationsCounter();
assertIndexShardCounter(2);
TransportShardReplicationOperationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(shardIt, request,
new Response(), new ClusterStateObserver(clusterService, logger),
primaryShard, internalRequest, listener, reference);
@ -532,7 +518,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
* However, this failure would only become apparent once listener.get is called. Seems a little implicit.
* */
action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
final TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
Thread t = new Thread() {
public void run() {
primaryPhase.run();
@ -568,7 +554,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("100ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
assertIndexShardCounter(2);
assertThat(transport.capturedRequests().length, equalTo(1));
@ -635,7 +621,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("100ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
// no replica request should have been sent yet
assertThat(transport.capturedRequests().length, equalTo(0));
@ -662,7 +648,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
};
}
static class Request extends ShardReplicationOperationRequest<Request> {
static class Request extends ReplicationRequest<Request> {
int shardId;
public AtomicBoolean processedOnPrimary = new AtomicBoolean();
public AtomicInteger processedOnReplicas = new AtomicInteger();
@ -694,7 +680,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
static class Response extends ActionWriteResponse {
}
class Action extends TransportShardReplicationOperationAction<Request, Request, Response> {
class Action extends TransportReplicationAction<Request, Request, Response> {
Action(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService,

View File

@ -1512,7 +1512,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
replicaEngine.create(index);
fail();
} catch (VersionConflictEngineException e) {
// we ignore version conflicts on replicas, see TransportShardReplicationOperationAction.ignoreReplicaException
// we ignore version conflicts on replicas, see TransportReplicationAction.ignoreReplicaException
}
replicaEngine.refresh("test");
Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");
@ -1556,7 +1556,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
replicaEngine.create(secondIndexRequestReplica);
fail();
} catch (VersionConflictEngineException e) {
// we ignore version conflicts on replicas, see TransportShardReplicationOperationAction.ignoreReplicaException.
// we ignore version conflicts on replicas, see TransportReplicationAction.ignoreReplicaException.
}
replicaEngine.refresh("test");
Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");