From 4fa92cbf49326b45509ce2593f962f276282dca9 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Tue, 31 Jul 2018 16:11:17 +0200 Subject: [PATCH] Changed ReindexRequest to use Writeable.Reader (#32401) -- This is a pre-stage for adding the reindex API to the REST high-level-client -- Follows the pattern set in #26315 --- .../reindex/TransportDeleteByQueryAction.java | 4 +-- .../index/reindex/TransportReindexAction.java | 3 +- .../reindex/TransportUpdateByQueryAction.java | 4 +-- .../index/reindex/RoundTripTests.java | 35 ++++++++----------- .../index/reindex/DeleteByQueryRequest.java | 7 ++++ .../index/reindex/ReindexRequest.java | 12 ++++--- .../index/reindex/UpdateByQueryRequest.java | 8 +++-- 7 files changed, 42 insertions(+), 31 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java index c1defe56adc..706f2c0b8f8 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java @@ -27,13 +27,13 @@ import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.ScriptService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.function.Supplier; public class TransportDeleteByQueryAction extends HandledTransportAction { @@ -46,7 +46,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction) DeleteByQueryRequest::new); + (Writeable.Reader) DeleteByQueryRequest::new); this.threadPool = threadPool; this.client = client; this.scriptService = scriptService; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index e54b5f50ae6..1d80c28b585 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.action.index.IndexRequest; @@ -104,7 +105,7 @@ public class TransportReindexAction extends HandledTransportAction)ReindexRequest::new); this.threadPool = threadPool; this.clusterService = clusterService; this.scriptService = scriptService; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index 34ae3fdd0c6..00d14822ba0 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -43,7 +44,6 @@ import org.elasticsearch.transport.TransportService; import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Supplier; public class TransportUpdateByQueryAction extends HandledTransportAction { @@ -56,7 +56,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction) UpdateByQueryRequest::new); + (Writeable.Reader) UpdateByQueryRequest::new); this.threadPool = threadPool; this.client = client; this.scriptService = scriptService; diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 2dc4b59e8d9..97809c9bc8d 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -67,19 +67,17 @@ public class RoundTripTests extends ESTestCase { new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, null, query, username, password, headers, socketTimeout, connectTimeout)); } - ReindexRequest tripped = new ReindexRequest(); - roundTrip(reindex, tripped); + ReindexRequest tripped = new ReindexRequest(toInputByteStream(reindex)); assertRequestEquals(reindex, tripped); // Try slices=auto with a version that doesn't support it, which should fail reindex.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, reindex, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, reindex)); assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); // Try regular slices with a version that doesn't support slices=auto, which should succeed - tripped = new ReindexRequest(); reindex.setSlices(between(1, Integer.MAX_VALUE)); - roundTrip(Version.V_6_0_0_alpha1, reindex, tripped); + tripped = new ReindexRequest(toInputByteStream(reindex)); assertRequestEquals(Version.V_6_0_0_alpha1, reindex, tripped); } @@ -89,20 +87,18 @@ public class RoundTripTests extends ESTestCase { if (randomBoolean()) { update.setPipeline(randomAlphaOfLength(5)); } - UpdateByQueryRequest tripped = new UpdateByQueryRequest(); - roundTrip(update, tripped); + UpdateByQueryRequest tripped = new UpdateByQueryRequest(toInputByteStream(update)); assertRequestEquals(update, tripped); assertEquals(update.getPipeline(), tripped.getPipeline()); // Try slices=auto with a version that doesn't support it, which should fail update.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, update, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, update)); assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); // Try regular slices with a version that doesn't support slices=auto, which should succeed - tripped = new UpdateByQueryRequest(); update.setSlices(between(1, Integer.MAX_VALUE)); - roundTrip(Version.V_6_0_0_alpha1, update, tripped); + tripped = new UpdateByQueryRequest(toInputByteStream(update)); assertRequestEquals(update, tripped); assertEquals(update.getPipeline(), tripped.getPipeline()); } @@ -110,19 +106,17 @@ public class RoundTripTests extends ESTestCase { public void testDeleteByQueryRequest() throws IOException { DeleteByQueryRequest delete = new DeleteByQueryRequest(new SearchRequest()); randomRequest(delete); - DeleteByQueryRequest tripped = new DeleteByQueryRequest(); - roundTrip(delete, tripped); + DeleteByQueryRequest tripped = new DeleteByQueryRequest(toInputByteStream(delete)); assertRequestEquals(delete, tripped); // Try slices=auto with a version that doesn't support it, which should fail delete.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, delete, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, delete)); assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); // Try regular slices with a version that doesn't support slices=auto, which should succeed - tripped = new DeleteByQueryRequest(); delete.setSlices(between(1, Integer.MAX_VALUE)); - roundTrip(Version.V_6_0_0_alpha1, delete, tripped); + tripped = new DeleteByQueryRequest(toInputByteStream(delete)); assertRequestEquals(delete, tripped); } @@ -198,23 +192,24 @@ public class RoundTripTests extends ESTestCase { request.setTaskId(new TaskId(randomAlphaOfLength(5), randomLong())); } RethrottleRequest tripped = new RethrottleRequest(); - roundTrip(request, tripped); + // We use readFrom here because Rethrottle does not support the Writeable.Reader interface + tripped.readFrom(toInputByteStream(request)); assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0.00001); assertArrayEquals(request.getActions(), tripped.getActions()); assertEquals(request.getTaskId(), tripped.getTaskId()); } - private void roundTrip(Streamable example, Streamable empty) throws IOException { - roundTrip(Version.CURRENT, example, empty); + private StreamInput toInputByteStream(Streamable example) throws IOException { + return toInputByteStream(Version.CURRENT, example); } - private void roundTrip(Version version, Streamable example, Streamable empty) throws IOException { + private StreamInput toInputByteStream(Version version, Streamable example) throws IOException { BytesStreamOutput out = new BytesStreamOutput(); out.setVersion(version); example.writeTo(out); StreamInput in = out.bytes().streamInput(); in.setVersion(version); - empty.readFrom(in); + return in; } private Script randomScript() { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java index aa8543175d9..f848e8722c7 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java @@ -23,8 +23,11 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.tasks.TaskId; +import java.io.IOException; + import static org.elasticsearch.action.ValidateActions.addValidationError; /** @@ -53,6 +56,10 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest