From 106a26b399d730e6fba1d960767cdd6e4a046a22 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 3 May 2017 09:23:39 -0500 Subject: [PATCH] Remove unneeded usages of listenable futures (elastic/x-pack-elasticsearch#1261) This is related to elastic/elasticsearch#24412. That commit changed how ListenableActionFuture implementations are created. This commit updates x-pack to be compatible with those changes. In particular, all the usages of ListenableActionFuture in x-pack could be replaced with PlainActionFuture as the "listening" functionality was not being used. Original commit: elastic/x-pack-elasticsearch@7c8d8e3df925c23e28f7ff2d489eba3d68739893 --- .../ml/client/MachineLearningClient.java | 85 +++++++------------ .../MachineLearningLicensingTests.java | 74 ++++++++-------- 2 files changed, 63 insertions(+), 96 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java index 7d9d4de6e36..c249438af26 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java @@ -7,7 +7,7 @@ package org.elasticsearch.xpack.ml.client; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; @@ -51,8 +51,7 @@ public class MachineLearningClient { } public ActionFuture closeJob(CloseJobAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(CloseJobAction.INSTANCE, request, listener); return listener; } @@ -64,8 +63,7 @@ public class MachineLearningClient { public ActionFuture deleteDatafeed( DeleteDatafeedAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(DeleteDatafeedAction.INSTANCE, request, listener); return listener; } @@ -77,8 +75,7 @@ public class MachineLearningClient { public ActionFuture deleteFilter( DeleteFilterAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(DeleteFilterAction.INSTANCE, request, listener); return listener; } @@ -89,8 +86,7 @@ public class MachineLearningClient { } public ActionFuture deleteJob(DeleteJobAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(DeleteJobAction.INSTANCE, request, listener); return listener; } @@ -102,8 +98,7 @@ public class MachineLearningClient { public ActionFuture deleteModelSnapshot( DeleteModelSnapshotAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(DeleteModelSnapshotAction.INSTANCE, request, listener); return listener; } @@ -114,8 +109,7 @@ public class MachineLearningClient { } public ActionFuture flushJob(FlushJobAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(FlushJobAction.INSTANCE, request, listener); return listener; } @@ -126,8 +120,7 @@ public class MachineLearningClient { } public ActionFuture getBuckets(GetBucketsAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetBucketsAction.INSTANCE, request, listener); return listener; } @@ -139,8 +132,7 @@ public class MachineLearningClient { public ActionFuture getCategories( GetCategoriesAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetCategoriesAction.INSTANCE, request, listener); return listener; } @@ -152,8 +144,7 @@ public class MachineLearningClient { public ActionFuture getDatafeeds( GetDatafeedsAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetDatafeedsAction.INSTANCE, request, listener); return listener; } @@ -165,8 +156,7 @@ public class MachineLearningClient { public ActionFuture getDatafeedsStats( GetDatafeedsStatsAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetDatafeedsStatsAction.INSTANCE, request, listener); return listener; } @@ -177,21 +167,19 @@ public class MachineLearningClient { } public ActionFuture getFilters(GetFiltersAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetFiltersAction.INSTANCE, request, listener); return listener; } public void getInfluencers(GetInfluencersAction.Request request, - ActionListener listener) { + ActionListener listener) { client.execute(GetInfluencersAction.INSTANCE, request, listener); } public ActionFuture getInfluencers( GetInfluencersAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetInfluencersAction.INSTANCE, request, listener); return listener; } @@ -202,8 +190,7 @@ public class MachineLearningClient { } public ActionFuture getJobs(GetJobsAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetJobsAction.INSTANCE, request, listener); return listener; } @@ -215,8 +202,7 @@ public class MachineLearningClient { public ActionFuture getJobsStats( GetJobsStatsAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetJobsStatsAction.INSTANCE, request, listener); return listener; } @@ -228,8 +214,7 @@ public class MachineLearningClient { public ActionFuture getModelSnapshots( GetModelSnapshotsAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetModelSnapshotsAction.INSTANCE, request, listener); return listener; } @@ -240,8 +225,7 @@ public class MachineLearningClient { } public ActionFuture getRecords(GetRecordsAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(GetRecordsAction.INSTANCE, request, listener); return listener; } @@ -252,8 +236,7 @@ public class MachineLearningClient { } public ActionFuture openJob(OpenJobAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(OpenJobAction.INSTANCE, request, listener); return listener; } @@ -264,8 +247,7 @@ public class MachineLearningClient { } public ActionFuture postData(PostDataAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(PostDataAction.INSTANCE, request, listener); return listener; } @@ -276,8 +258,7 @@ public class MachineLearningClient { } public ActionFuture putDatafeed(PutDatafeedAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(PutDatafeedAction.INSTANCE, request, listener); return listener; } @@ -288,8 +269,7 @@ public class MachineLearningClient { } public ActionFuture putFilter(PutFilterAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(PutFilterAction.INSTANCE, request, listener); return listener; } @@ -300,8 +280,7 @@ public class MachineLearningClient { } public ActionFuture putJob(PutJobAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(PutJobAction.INSTANCE, request, listener); return listener; } @@ -313,8 +292,7 @@ public class MachineLearningClient { public ActionFuture revertModelSnapshot( RevertModelSnapshotAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(RevertModelSnapshotAction.INSTANCE, request, listener); return listener; } @@ -326,8 +304,7 @@ public class MachineLearningClient { public ActionFuture startDatafeed( StartDatafeedAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(StartDatafeedAction.INSTANCE, request, listener); return listener; } @@ -339,8 +316,7 @@ public class MachineLearningClient { public ActionFuture stopDatafeed( StopDatafeedAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(StopDatafeedAction.INSTANCE, request, listener); return listener; } @@ -352,8 +328,7 @@ public class MachineLearningClient { public ActionFuture updateDatafeed( UpdateDatafeedAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(UpdateDatafeedAction.INSTANCE, request, listener); return listener; } @@ -364,8 +339,7 @@ public class MachineLearningClient { } public ActionFuture updateJob(UpdateJobAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(UpdateJobAction.INSTANCE, request, listener); return listener; } @@ -377,8 +351,7 @@ public class MachineLearningClient { public ActionFuture updateModelSnapshot( UpdateModelSnapshotAction.Request request) { - PlainListenableActionFuture listener = - new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(UpdateModelSnapshotAction.INSTANCE, request, listener); return listener; } diff --git a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java index 836390c6263..5ae2fd0cc83 100644 --- a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java +++ b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java @@ -6,7 +6,7 @@ package org.elasticsearch.license; import org.elasticsearch.ElasticsearchSecurityException; -import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.unit.TimeValue; @@ -58,7 +58,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do not work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture. newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), listener); listener.actionGet(); fail("put job action should not be enabled!"); @@ -75,7 +75,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), listener); PutJobAction.Response response = listener.actionGet(); assertNotNull(response); @@ -88,7 +88,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture putJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener); PutJobAction.Response response = putJobListener.actionGet(); assertNotNull(response); @@ -101,7 +101,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do not work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), listener); listener.actionGet(); fail("open job action should not be enabled!"); @@ -125,7 +125,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), listener); OpenJobAction.Response response = listener.actionGet(); assertNotNull(response); @@ -138,7 +138,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture putJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener); PutJobAction.Response putJobResponse = putJobListener.actionGet(); assertNotNull(putJobResponse); @@ -151,7 +151,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do not work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putDatafeed( new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), listener); listener.actionGet(); @@ -169,7 +169,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putDatafeed( new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), listener); PutDatafeedAction.Response response = listener.actionGet(); @@ -183,24 +183,23 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); // put job - PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture putJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener); PutJobAction.Response putJobResponse = putJobListener.actionGet(); assertNotNull(putJobResponse); // put datafeed - PlainListenableActionFuture putDatafeedListener = new PlainListenableActionFuture<>( - client.threadPool()); + PlainActionFuture putDatafeedListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putDatafeed( new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), putDatafeedListener); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); assertNotNull(putDatafeedResponse); // open job - PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); OpenJobAction.Response openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); // start datafeed - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener); listener.actionGet(); } @@ -231,12 +230,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); // open job - PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); OpenJobAction.Response openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); // start datafeed - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener); listener.actionGet(); } @@ -281,17 +280,16 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture putJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener); PutJobAction.Response putJobResponse = putJobListener.actionGet(); assertNotNull(putJobResponse); - PlainListenableActionFuture putDatafeedListener = new PlainListenableActionFuture<>( - client.threadPool()); + PlainActionFuture putDatafeedListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putDatafeed( new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), putDatafeedListener); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); assertNotNull(putDatafeedResponse); - PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); OpenJobAction.Response openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); @@ -314,7 +312,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do not work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener); listener.actionGet(); fail("start datafeed action should not be enabled!"); @@ -332,12 +330,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); // re-open job now that the license is valid again - PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); OpenJobAction.Response openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener); StartDatafeedAction.Response response = listener.actionGet(); assertNotNull(response); @@ -351,22 +349,20 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture putJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener); PutJobAction.Response putJobResponse = putJobListener.actionGet(); assertNotNull(putJobResponse); - PlainListenableActionFuture putDatafeedListener = new PlainListenableActionFuture<>( - client.threadPool()); + PlainActionFuture putDatafeedListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putDatafeed( new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), putDatafeedListener); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); assertNotNull(putDatafeedResponse); - PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); OpenJobAction.Response openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); - PlainListenableActionFuture startDatafeedListener = new PlainListenableActionFuture<>( - client.threadPool()); + PlainActionFuture startDatafeedListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), startDatafeedListener); StartDatafeedAction.Response startDatafeedResponse = startDatafeedListener.actionGet(); assertNotNull(startDatafeedResponse); @@ -381,8 +377,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>( - client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).stopDatafeed(new StopDatafeedAction.Request("foobar"), listener); if (invalidLicense) { // the stop datafeed due to invalid license happens async, so check if the datafeed turns into stopped state: @@ -403,11 +398,11 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture putJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener); PutJobAction.Response putJobResponse = putJobListener.actionGet(); assertNotNull(putJobResponse); - PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture openJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); OpenJobAction.Response openJobResponse = openJobListener.actionGet(); assertNotNull(openJobResponse); @@ -422,7 +417,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); CloseJobAction.Request request = new CloseJobAction.Request("foo"); request.setCloseTimeout(TimeValue.timeValueSeconds(20)); if (invalidLicense) { @@ -445,7 +440,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture putJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener); PutJobAction.Response putJobResponse = putJobListener.actionGet(); assertNotNull(putJobResponse); @@ -457,7 +452,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).deleteJob(new DeleteJobAction.Request("foo"), listener); listener.actionGet(); } @@ -469,12 +464,11 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture putJobListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener); PutJobAction.Response putJobResponse = putJobListener.actionGet(); assertNotNull(putJobResponse); - PlainListenableActionFuture putDatafeedListener = new PlainListenableActionFuture<>( - client.threadPool()); + PlainActionFuture putDatafeedListener = PlainActionFuture.newFuture(); new MachineLearningClient(client).putDatafeed( new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), putDatafeedListener); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); @@ -487,7 +481,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + PlainActionFuture listener = PlainActionFuture.newFuture(); new MachineLearningClient(client).deleteDatafeed(new DeleteDatafeedAction.Request("foobar"), listener); listener.actionGet(); }