From a0c10850d94951c9f5637dca07f8976c25083624 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sun, 13 Sep 2015 21:09:48 +0200 Subject: [PATCH] Use Supplier instead of Reflection Java 8 allows for method references which in-turn will cause compile errors if a method is not visible while reflection fails late and maybe too late. We can now register Request instances via FooRequest::new instead of passing FooRequest.class and call it's ctor via reflection. --- .../health/TransportClusterHealthAction.java | 2 +- .../TransportNodesHotThreadsAction.java | 2 +- .../node/info/TransportNodesInfoAction.java | 2 +- .../liveness/TransportLivenessAction.java | 2 +- .../node/stats/TransportNodesStatsAction.java | 2 +- .../TransportDeleteRepositoryAction.java | 2 +- .../get/TransportGetRepositoriesAction.java | 2 +- .../put/TransportPutRepositoryAction.java | 2 +- .../TransportVerifyRepositoryAction.java | 2 +- .../TransportClusterRerouteAction.java | 2 +- .../TransportClusterUpdateSettingsAction.java | 2 +- .../TransportClusterSearchShardsAction.java | 2 +- .../create/TransportCreateSnapshotAction.java | 2 +- .../delete/TransportDeleteSnapshotAction.java | 2 +- .../get/TransportGetSnapshotsAction.java | 2 +- .../TransportRestoreSnapshotAction.java | 2 +- .../status/TransportNodesSnapshotsStatus.java | 2 +- .../TransportSnapshotsStatusAction.java | 2 +- .../state/TransportClusterStateAction.java | 2 +- .../stats/TransportClusterStatsAction.java | 2 +- .../TransportPendingClusterTasksAction.java | 2 +- .../alias/TransportIndicesAliasesAction.java | 2 +- .../exists/TransportAliasesExistAction.java | 2 +- .../alias/get/TransportGetAliasesAction.java | 2 +- .../analyze/TransportAnalyzeAction.java | 2 +- .../TransportClearIndicesCacheAction.java | 2 +- .../close/TransportCloseIndexAction.java | 2 +- .../create/TransportCreateIndexAction.java | 2 +- .../delete/TransportDeleteIndexAction.java | 2 +- .../indices/TransportIndicesExistsAction.java | 2 +- .../types/TransportTypesExistsAction.java | 2 +- .../indices/flush/TransportFlushAction.java | 2 +- .../flush/TransportShardFlushAction.java | 2 +- .../indices/get/TransportGetIndexAction.java | 2 +- .../get/TransportGetFieldMappingsAction.java | 2 +- .../TransportGetFieldMappingsIndexAction.java | 2 +- .../get/TransportGetMappingsAction.java | 2 +- .../put/TransportPutMappingAction.java | 2 +- .../open/TransportOpenIndexAction.java | 2 +- .../optimize/TransportOptimizeAction.java | 2 +- .../recovery/TransportRecoveryAction.java | 2 +- .../refresh/TransportRefreshAction.java | 2 +- .../refresh/TransportShardRefreshAction.java | 2 +- .../TransportIndicesSegmentsAction.java | 2 +- .../get/TransportGetSettingsAction.java | 2 +- .../put/TransportUpdateSettingsAction.java | 2 +- .../TransportIndicesShardStoresAction.java | 2 +- .../stats/TransportIndicesStatsAction.java | 2 +- .../TransportDeleteIndexTemplateAction.java | 2 +- .../get/TransportGetIndexTemplatesAction.java | 2 +- .../put/TransportPutIndexTemplateAction.java | 2 +- .../get/TransportUpgradeStatusAction.java | 2 +- .../upgrade/post/TransportUpgradeAction.java | 2 +- .../post/TransportUpgradeSettingsAction.java | 2 +- .../query/TransportValidateQueryAction.java | 2 +- .../TransportRenderSearchTemplateAction.java | 2 +- .../delete/TransportDeleteWarmerAction.java | 2 +- .../warmer/get/TransportGetWarmersAction.java | 2 +- .../warmer/put/TransportPutWarmerAction.java | 2 +- .../action/bulk/TransportBulkAction.java | 2 +- .../action/bulk/TransportShardBulkAction.java | 2 +- .../action/delete/TransportDeleteAction.java | 2 +- .../action/exists/TransportExistsAction.java | 2 +- .../explain/TransportExplainAction.java | 2 +- .../TransportFieldStatsTransportAction.java | 2 +- .../action/get/TransportGetAction.java | 2 +- .../action/get/TransportMultiGetAction.java | 2 +- .../get/TransportShardMultiGetAction.java | 2 +- .../action/index/TransportIndexAction.java | 2 +- .../TransportDeleteIndexedScriptAction.java | 2 +- .../get/TransportGetIndexedScriptAction.java | 2 +- .../put/TransportPutIndexedScriptAction.java | 2 +- .../TransportMultiPercolateAction.java | 2 +- .../percolate/TransportPercolateAction.java | 2 +- .../TransportShardMultiPercolateAction.java | 2 +- .../search/TransportClearScrollAction.java | 2 +- .../search/TransportMultiSearchAction.java | 2 +- .../action/search/TransportSearchAction.java | 2 +- .../search/TransportSearchScrollAction.java | 2 +- .../suggest/TransportSuggestAction.java | 2 +- .../support/HandledTransportAction.java | 4 +- .../broadcast/TransportBroadcastAction.java | 3 +- .../node/TransportBroadcastByNodeAction.java | 3 +- .../master/TransportMasterNodeAction.java | 4 +- .../master/TransportMasterNodeReadAction.java | 4 +- .../info/TransportClusterInfoAction.java | 4 +- .../support/nodes/TransportNodesAction.java | 3 +- .../TransportBroadcastReplicationAction.java | 3 +- .../TransportReplicationAction.java | 5 ++- ...ransportInstanceSingleOperationAction.java | 3 +- .../shard/TransportSingleShardAction.java | 4 +- .../TransportMultiTermVectorsAction.java | 2 +- .../TransportShardMultiTermsVectorAction.java | 2 +- .../TransportTermVectorsAction.java | 2 +- .../dfs/TransportDfsOnlyAction.java | 2 +- .../action/update/TransportUpdateAction.java | 2 +- .../action/index/NodeIndexDeletedAction.java | 4 +- .../index/NodeMappingRefreshAction.java | 2 +- .../action/shard/ShardStateAction.java | 4 +- .../discovery/zen/ZenDiscovery.java | 2 +- .../zen/fd/MasterFaultDetection.java | 2 +- .../discovery/zen/fd/NodesFaultDetection.java | 2 +- .../zen/membership/MembershipAction.java | 6 +-- .../zen/ping/unicast/UnicastZenPing.java | 2 +- .../publish/PublishClusterStateAction.java | 2 +- .../gateway/LocalAllocateDangledIndices.java | 2 +- .../TransportNodesListGatewayMetaState.java | 2 +- ...ransportNodesListGatewayStartedShards.java | 2 +- .../indices/flush/SyncedFlushService.java | 6 +-- .../indices/recovery/RecoverySource.java | 2 +- .../indices/recovery/RecoveryTarget.java | 12 +++--- .../indices/store/IndicesStore.java | 2 +- .../TransportNodesListShardStoreMetaData.java | 2 +- .../VerifyNodeRepositoryAction.java | 2 +- .../action/SearchServiceTransportAction.java | 24 ++++++------ .../snapshots/RestoreService.java | 2 +- .../snapshots/SnapshotShardsService.java | 2 +- .../transport/RequestHandlerRegistry.java | 37 ++---------------- .../transport/TransportService.java | 16 ++------ .../action/IndicesRequestIT.java | 5 ++- .../TransportBroadcastByNodeActionTests.java | 5 ++- .../BroadcastReplicationTests.java | 2 +- .../replication/ShardReplicationTests.java | 2 +- .../BenchmarkNettyLargeMessages.java | 2 +- .../transport/TransportBenchmark.java | 2 +- .../AbstractSimpleTransportTestCase.java | 38 +++++++++---------- .../netty/NettyScheduledPingTests.java | 2 +- .../TransportDeleteByQueryAction.java | 2 +- 128 files changed, 194 insertions(+), 215 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 6f9180ed14f..f291bd61b90 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -48,7 +48,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ClusterName clusterName, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, GatewayAllocator gatewayAllocator) { - super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterHealthRequest.class); + super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterHealthRequest::new); this.clusterName = clusterName; this.gatewayAllocator = gatewayAllocator; } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java index 4a5a9bbd4e8..f26177a0ce7 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java @@ -49,7 +49,7 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction extends TransportAction{ - protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Class request) { + protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler()); } diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 2386a82650a..00f04e5c0ff 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; /** * @@ -54,7 +55,7 @@ public abstract class TransportBroadcastAction request, Class shardRequest, String shardExecutor) { + Supplier request, Supplier shardRequest, String shardExecutor) { super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); this.clusterService = clusterService; this.transportService = transportService; diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 76b3995b1ec..77f96650854 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; /** * Abstraction for transporting aggregated shard-level operations in a single request (NodeRequest) per-node @@ -75,7 +76,7 @@ public abstract class TransportBroadcastByNodeAction request, + Supplier request, String executor) { super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 50c5a3f2582..ccaef701747 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -42,6 +42,8 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; +import java.util.function.Supplier; + /** * A base class for operations that needs to be performed on the master node. */ @@ -54,7 +56,7 @@ public abstract class TransportMasterNodeAction request) { + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); this.transportService = transportService; this.clusterService = clusterService; diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java index 3faeb50bba2..d0f64cbb9be 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.function.Supplier; + /** * A base class for read operations that needs to be performed on the master node. * Can also be executed on the local node if needed. @@ -39,7 +41,7 @@ public abstract class TransportMasterNodeReadAction request) { + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request); this.forceLocal = settings.getAsBoolean(FORCE_LOCAL_SETTING, null); } diff --git a/core/src/main/java/org/elasticsearch/action/support/master/info/TransportClusterInfoAction.java b/core/src/main/java/org/elasticsearch/action/support/master/info/TransportClusterInfoAction.java index 560a699ddf1..ec9c3eb46c3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/info/TransportClusterInfoAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/info/TransportClusterInfoAction.java @@ -29,13 +29,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.function.Supplier; + /** */ public abstract class TransportClusterInfoAction extends TransportMasterNodeReadAction { public TransportClusterInfoAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, Class request) { + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, request); } diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 8383189f3ef..118e1124c33 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.transport.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; /** * @@ -50,7 +51,7 @@ public abstract class TransportNodesAction request, Class nodeRequest, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier nodeRequest, String nodeExecutor) { super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); this.clusterName = clusterName; diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index 7f0d1b3bc69..ddd4d42f7a6 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Supplier; /** * Base class for requests that should be executed on all shards of an index or several indices. @@ -57,7 +58,7 @@ public abstract class TransportBroadcastReplicationAction request, Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportBroadcastReplicationAction(String name, Supplier request, Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) { super(settings, name, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 608575007f4..18890dc2afd 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -72,6 +72,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; /** */ @@ -93,8 +94,8 @@ public abstract class TransportReplicationAction request, - Class replicaRequest, String executor) { + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor) { super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); this.transportService = transportService; this.clusterService = clusterService; diff --git a/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index 875e4715d98..2e815da3835 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -43,6 +43,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; /** * @@ -57,7 +58,7 @@ public abstract class TransportInstanceSingleOperationAction request) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); this.clusterService = clusterService; this.transportService = transportService; diff --git a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index b5d41af939c..6b419abece1 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -41,6 +41,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; +import java.util.function.Supplier; + import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; /** @@ -59,7 +61,7 @@ public abstract class TransportSingleShardAction request, String executor) { + Supplier request, String executor) { super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); this.clusterService = clusterService; this.transportService = transportService; diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java index b381cf72d20..84365cfaf6b 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java @@ -49,7 +49,7 @@ public class TransportMultiTermVectorsAction extends HandledTransportAction implemen this.joinThreadControl = new JoinThreadControl(threadPool); - transportService.registerRequestHandler(DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest.class, ThreadPool.Names.SAME, new RejoinClusterRequestHandler()); + transportService.registerRequestHandler(DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler()); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 5ec3f9a899a..b20ed81f1a5 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -75,7 +75,7 @@ public class MasterFaultDetection extends FaultDetection { logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); - transportService.registerRequestHandler(MASTER_PING_ACTION_NAME, MasterPingRequest.class, ThreadPool.Names.SAME, new MasterPingRequestHandler()); + transportService.registerRequestHandler(MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, new MasterPingRequestHandler()); } public DiscoveryNode masterNode() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index a79c00357c8..5619b58dc53 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -64,7 +64,7 @@ public class NodesFaultDetection extends FaultDetection { logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); - transportService.registerRequestHandler(PING_ACTION_NAME, PingRequest.class, ThreadPool.Names.SAME, new PingRequestHandler()); + transportService.registerRequestHandler(PING_ACTION_NAME, PingRequest::new, ThreadPool.Names.SAME, new PingRequestHandler()); } public void setLocalNode(DiscoveryNode localNode) { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index 419ed94078c..4260b992ddb 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -69,9 +69,9 @@ public class MembershipAction extends AbstractComponent { this.listener = listener; this.clusterService = clusterService; - transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest.class, ThreadPool.Names.GENERIC, new JoinRequestRequestHandler()); - transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest.class, ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler()); - transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest.class, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); + transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new, ThreadPool.Names.GENERIC, new JoinRequestRequestHandler()); + transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new, ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler()); + transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); } public void close() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 7e24ebc85ec..c9c4d298cc4 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -167,7 +167,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]); - transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest.class, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); + transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 7db74c734e3..63907b38c54 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -88,7 +88,7 @@ public class PublishClusterStateAction extends AbstractComponent { this.nodesProvider = nodesProvider; this.listener = listener; this.discoverySettings = discoverySettings; - transportService.registerRequestHandler(ACTION_NAME, BytesTransportRequest.class, ThreadPool.Names.SAME, new PublishClusterStateRequestHandler()); + transportService.registerRequestHandler(ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.SAME, new PublishClusterStateRequestHandler()); } public void close() { diff --git a/core/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java b/core/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java index e783007c8ca..24292fc3c30 100644 --- a/core/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java +++ b/core/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java @@ -65,7 +65,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent { this.clusterService = clusterService; this.allocationService = allocationService; this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; - transportService.registerRequestHandler(ACTION_NAME, AllocateDangledRequest.class, ThreadPool.Names.SAME, new AllocateDangledRequestHandler()); + transportService.registerRequestHandler(ACTION_NAME, AllocateDangledRequest::new, ThreadPool.Names.SAME, new AllocateDangledRequestHandler()); } public void allocateDangled(Collection indices, final Listener listener) { diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java index 240c00a03cd..a117eb709af 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java @@ -61,7 +61,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction()); - transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest.class, ThreadPool.Names.SAME, new FreeContextTransportHandler()); - transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest.class, ThreadPool.Names.SAME, new ClearScrollContextsTransportHandler()); - transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchDfsTransportHandler()); - transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchQueryTransportHandler()); - transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest.class, ThreadPool.Names.SEARCH, new SearchQueryByIdTransportHandler()); - transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest.class, ThreadPool.Names.SEARCH, new SearchQueryScrollTransportHandler()); - transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler()); - transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest.class, ThreadPool.Names.SEARCH, new SearchQueryQueryFetchTransportHandler()); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest.class, ThreadPool.Names.SEARCH, new SearchQueryFetchScrollTransportHandler()); - transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest.class, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler<>()); - transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest.class, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler()); + transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME, new FreeContextTransportHandler<>()); + transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, new FreeContextTransportHandler()); + transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME, new ClearScrollContextsTransportHandler()); + transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchDfsTransportHandler()); + transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchQueryTransportHandler()); + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryByIdTransportHandler()); + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryScrollTransportHandler()); + transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler()); + transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryQueryFetchTransportHandler()); + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryFetchScrollTransportHandler()); + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler<>()); + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler()); } public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 105f8513ff0..39c3c5b9a71 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -175,7 +175,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis this.createIndexService = createIndexService; this.dynamicSettings = dynamicSettings; this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; - transportService.registerRequestHandler(UPDATE_RESTORE_ACTION_NAME, UpdateIndexShardRestoreStatusRequest.class, ThreadPool.Names.SAME, new UpdateRestoreStateRequestHandler()); + transportService.registerRequestHandler(UPDATE_RESTORE_ACTION_NAME, UpdateIndexShardRestoreStatusRequest::new, ThreadPool.Names.SAME, new UpdateRestoreStateRequestHandler()); clusterService.add(this); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 43d71baf87f..301ceededc5 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -113,7 +113,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent { private final TransportRequestHandler handler; private final boolean forceExecution; private final String executor; - private final Callable requestFactory; + private final Supplier requestFactory; - RequestHandlerRegistry(String action, Class request, TransportRequestHandler handler, - String executor, boolean forceExecution) { - this(action, new ReflectionFactory<>(request), handler, executor, forceExecution); - } - - public RequestHandlerRegistry(String action, Callable requestFactory, TransportRequestHandler handler, String executor, boolean forceExecution) { + public RequestHandlerRegistry(String action, Supplier requestFactory, TransportRequestHandler handler, String executor, boolean forceExecution) { this.action = action; this.requestFactory = requestFactory; assert newRequest() != null; @@ -53,11 +49,7 @@ public class RequestHandlerRegistry { } public Request newRequest() { - try { - return requestFactory.call(); - } catch (Exception e) { - throw new IllegalStateException("failed to instantiate request ", e); - } + return requestFactory.get(); } public TransportRequestHandler getHandler() { @@ -71,25 +63,4 @@ public class RequestHandlerRegistry { public String getExecutor() { return executor; } - - private final static class ReflectionFactory implements Callable { - private final Constructor requestConstructor; - - public ReflectionFactory(Class request) { - try { - this.requestConstructor = request.getDeclaredConstructor(); - } catch (NoSuchMethodException e) { - throw new IllegalStateException("failed to create constructor (does it have a default constructor?) for request " + request, e); - } - } - - @Override - public Request call() throws Exception { - try { - return requestConstructor.newInstance(); - } catch (IllegalAccessException e) { - throw new IllegalStateException("Could not access '" + requestConstructor + "'. Implementations must be a public class and have a public no-arg ctor.", e); - } - } - } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 40fa908c2b3..172a34e7e43 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -52,6 +52,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS; @@ -395,17 +396,6 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Class request, String executor, TransportRequestHandler handler) { - registerRequestHandler(action, request, executor, false, handler); - } - /** * Registers a new request handler * @param action The action the request handler is associated with @@ -413,7 +403,7 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Callable requestFactory, String executor, TransportRequestHandler handler) { + public void registerRequestHandler(String action, Supplier requestFactory, String executor, TransportRequestHandler handler) { RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, requestFactory, handler, executor, false); registerRequestHandler(reg); } @@ -426,7 +416,7 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Class request, String executor, boolean forceExecution, TransportRequestHandler handler) { + public void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, TransportRequestHandler handler) { RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, handler, executor, forceExecution); registerRequestHandler(reg); } diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 30dd587380f..cecee0fb31d 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -103,6 +103,7 @@ import org.junit.Test; import java.util.*; import java.util.concurrent.Callable; +import java.util.function.Supplier; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -851,12 +852,12 @@ public class IndicesRequestIT extends ESIntegTestCase { } @Override - public void registerRequestHandler(String action, Class request, String executor, boolean forceExecution, TransportRequestHandler handler) { + public void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, TransportRequestHandler handler) { super.registerRequestHandler(action, request, executor, forceExecution, new InterceptingRequestHandler(action, handler)); } @Override - public void registerRequestHandler(String action, Callable requestFactory, String executor, TransportRequestHandler handler) { + public void registerRequestHandler(String action, Supplier requestFactory, String executor, TransportRequestHandler handler) { super.registerRequestHandler(action, requestFactory, executor, new InterceptingRequestHandler(action, handler)); } diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index d892a3d0834..14ee78a7cfb 100644 --- a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -71,6 +71,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; @@ -109,7 +110,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { private final Map shards = new HashMap<>(); - public TestTransportBroadcastByNodeAction(Settings settings, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Class request, String executor) { + public TestTransportBroadcastByNodeAction(Settings settings, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, String executor) { super(settings, "indices:admin/test", THREAD_POOL, TransportBroadcastByNodeActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request, executor); } @@ -191,7 +192,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { transportService, new ActionFilters(new HashSet()), new MyResolver(), - Request.class, + Request::new, ThreadPool.Names.SAME ); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index b77fe66b0a3..2fe04bb9238 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -185,7 +185,7 @@ public class BroadcastReplicationTests extends ESTestCase { protected final Set>> capturedShardRequests = ConcurrentCollections.newConcurrentSet(); public TestBroadcastReplicationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) { - super("test-broadcast-replication-action", BroadcastRequest.class, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction); + super("test-broadcast-replication-action", BroadcastRequest::new, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index 03a6bb7b4cd..8d4591730f1 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -589,7 +589,7 @@ public class ShardReplicationTests extends ESTestCase { ThreadPool threadPool) { super(settings, actionName, transportService, clusterService, null, threadPool, new ShardStateAction(settings, clusterService, transportService, null, null), null, - new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY), Request.class, Request.class, ThreadPool.Names.SAME); + new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME); } @Override diff --git a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index 84f53d15f46..d8a518e3ea0 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -71,7 +71,7 @@ public class BenchmarkNettyLargeMessages { transportServiceClient.connectToNode(bigNode); transportServiceClient.connectToNode(smallNode); - transportServiceServer.registerRequestHandler("benchmark", BenchmarkMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + transportServiceServer.registerRequestHandler("benchmark", BenchmarkMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(BenchmarkMessageRequest request, TransportChannel channel) throws Exception { channel.sendResponse(new BenchmarkMessageResponse(request)); diff --git a/core/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java b/core/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java index 3e5b23b5bfb..5ccc264399b 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java +++ b/core/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java @@ -80,7 +80,7 @@ public class TransportBenchmark { final DiscoveryNode node = new DiscoveryNode("server", serverTransportService.boundAddress().publishAddress(), Version.CURRENT); - serverTransportService.registerRequestHandler("benchmark", BenchmarkMessageRequest.class, executor, new TransportRequestHandler() { + serverTransportService.registerRequestHandler("benchmark", BenchmarkMessageRequest::new, executor, new TransportRequestHandler() { @Override public void messageReceived(BenchmarkMessageRequest request, TransportChannel channel) throws Exception { channel.sendResponse(new BenchmarkMessageResponse(request)); diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 85315be1ff5..50cb00a5c78 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -125,7 +125,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testHelloWorld() { - serviceA.registerRequestHandler("sayHello", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); @@ -211,7 +211,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceA.disconnectFromNode(nodeA); } final AtomicReference exception = new AtomicReference<>(); - serviceA.registerRequestHandler("localNode", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) { try { @@ -253,7 +253,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testVoidMessageCompressed() { - serviceA.registerRequestHandler("sayHello", TransportRequest.Empty.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { try { @@ -300,7 +300,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testHelloWorldCompressed() { - serviceA.registerRequestHandler("sayHello", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); @@ -349,7 +349,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testErrorMessage() { - serviceA.registerRequestHandler("sayHelloException", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { assertThat("moshe", equalTo(request.message)); @@ -413,7 +413,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testNotifyOnShutdown() throws Exception { final CountDownLatch latch2 = new CountDownLatch(1); - serviceA.registerRequestHandler("foobar", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("foobar", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) { try { @@ -439,7 +439,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception { - serviceA.registerRequestHandler("sayHelloTimeoutNoResponse", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHelloTimeoutNoResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); @@ -488,7 +488,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testTimeoutSendExceptionWithDelayedResponse() throws Exception { - serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) { TimeValue sleep = TimeValue.parseTimeValue(request.message, null, "sleep"); @@ -619,10 +619,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }; - serviceA.registerRequestHandler("test", StringMessageRequest.class, ThreadPool.Names.SAME, handler); - serviceA.registerRequestHandler("testError", StringMessageRequest.class, ThreadPool.Names.SAME, handlerWithError); - serviceB.registerRequestHandler("test", StringMessageRequest.class, ThreadPool.Names.SAME, handler); - serviceB.registerRequestHandler("testError", StringMessageRequest.class, ThreadPool.Names.SAME, handlerWithError); + serviceA.registerRequestHandler("test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); + serviceA.registerRequestHandler("testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); + serviceB.registerRequestHandler("test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); + serviceB.registerRequestHandler("testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); final Tracer tracer = new Tracer(); serviceA.addTracer(tracer); @@ -882,7 +882,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testVersion_from0to1() throws Exception { - serviceB.registerRequestHandler("/version", Version1Request.class, ThreadPool.Names.SAME, new TransportRequestHandler() { + serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, new TransportRequestHandler() { @Override public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { assertThat(request.value1, equalTo(1)); @@ -924,7 +924,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testVersion_from1to0() throws Exception { - serviceA.registerRequestHandler("/version", Version0Request.class, ThreadPool.Names.SAME, new TransportRequestHandler() { + serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, new TransportRequestHandler() { @Override public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { assertThat(request.value1, equalTo(1)); @@ -967,7 +967,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testVersion_from1to1() throws Exception { - serviceB.registerRequestHandler("/version", Version1Request.class, ThreadPool.Names.SAME, new TransportRequestHandler() { + serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, new TransportRequestHandler() { @Override public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { assertThat(request.value1, equalTo(1)); @@ -1012,7 +1012,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testVersion_from0to0() throws Exception { - serviceA.registerRequestHandler("/version", Version0Request.class, ThreadPool.Names.SAME, new TransportRequestHandler() { + serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, new TransportRequestHandler() { @Override public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { assertThat(request.value1, equalTo(1)); @@ -1052,7 +1052,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testMockFailToSendNoConnectRule() { - serviceA.registerRequestHandler("sayHello", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { assertThat("moshe", equalTo(request.message)); @@ -1111,7 +1111,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Test public void testMockUnresponsiveRule() { - serviceA.registerRequestHandler("sayHello", StringMessageRequest.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { assertThat("moshe", equalTo(request.message)); @@ -1174,7 +1174,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { final CountDownLatch latch = new CountDownLatch(2); final AtomicReference addressA = new AtomicReference<>(); final AtomicReference addressB = new AtomicReference<>(); - serviceB.registerRequestHandler("action1", TestRequest.class, ThreadPool.Names.SAME, new TransportRequestHandler() { + serviceB.registerRequestHandler("action1", TestRequest::new, ThreadPool.Names.SAME, new TransportRequestHandler() { @Override public void messageReceived(TestRequest request, TransportChannel channel) throws Exception { addressA.set(request.remoteAddress()); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java index d5371999c1e..d26890748bd 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -73,7 +73,7 @@ public class NettyScheduledPingTests extends ESTestCase { assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0l)); assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0l)); - serviceA.registerRequestHandler("sayHello", TransportRequest.Empty.class, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { try { diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java index 890a60ce3c8..252befd85b9 100644 --- a/plugins/delete-by-query/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java @@ -62,7 +62,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction