[ML-DataFrame] fix wire serialization issues in data frame response objects (#39790)

fix wire serialization issues in data frame response objects
This commit is contained in:
Hendrik Muhs 2019-03-07 19:22:34 +01:00
parent af4e740500
commit 4d41310be5
14 changed files with 241 additions and 14 deletions

View File

@ -49,7 +49,7 @@ public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransf
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
}
public Request() {
private Request() {
}
public Request(StreamInput in) throws IOException {
@ -113,7 +113,7 @@ public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransf
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}

View File

@ -63,7 +63,7 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
}
}
public Request() {}
private Request() {}
public Request(StreamInput in) throws IOException {
super(in);
@ -149,7 +149,7 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
}
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}
@ -173,6 +173,7 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
List<String> invalidTransforms = new ArrayList<>();
builder.startObject();
toXContentCommon(builder, params);
builder.field(DataFrameField.COUNT.getPreferredName(), transformConfigurations.size());
// XContentBuilder does not support passing the params object for Iterables
builder.field(DataFrameField.TRANSFORMS.getPreferredName());

View File

@ -55,7 +55,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
}
}
public Request() {}
private Request() {}
public Request(StreamInput in) throws IOException {
super(in);
@ -138,7 +138,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
}
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}
@ -161,6 +161,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field(DataFrameField.COUNT.getPreferredName(), transformsStateAndStats.size());
builder.field(DataFrameField.TRANSFORMS.getPreferredName(), transformsStateAndStats);
builder.endObject();

View File

@ -46,7 +46,7 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
}
public Request() {
private Request() {
}
public Request(StreamInput in) throws IOException {
@ -108,7 +108,7 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
}
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}
@ -136,6 +136,7 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field("started", started);
builder.endObject();
return builder;

View File

@ -56,7 +56,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
}
public Request() {
private Request() {
this(null, false, null);
}
@ -149,7 +149,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
}
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}
@ -177,6 +177,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field("stopped", stopped);
builder.endObject();
return builder;

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.junit.Before;
import static java.util.Collections.emptyList;
public abstract class AbstractWireSerializingDataFrameTestCase<T extends Writeable> extends AbstractWireSerializingTestCase<T> {
/**
* Test case that ensures aggregation named objects are registered
*/
private NamedWriteableRegistry namedWriteableRegistry;
private NamedXContentRegistry namedXContentRegistry;
@Before
public void registerAggregationNamedObjects() throws Exception {
// register aggregations as NamedWriteable
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return namedWriteableRegistry;
}
@Override
protected NamedXContentRegistry xContentRegistry() {
return namedXContentRegistry;
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response;
public class DeleteDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}

View File

@ -6,23 +6,23 @@
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.watcher.watch.Payload.XContent;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.watcher.watch.Payload.XContent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class GetDataFrameTransformsActionResponseTests extends ESTestCase {
public class GetDataFrameTransformsActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
public void testInvalidTransforms() throws IOException {
List<DataFrameTransformConfig> transforms = new ArrayList<>();
@ -66,4 +66,19 @@ public class GetDataFrameTransformsActionResponseTests extends ESTestCase {
assertEquals(null, XContentMapValues.extractValue("headers", transformsResponse.get(i)));
}
}
@Override
protected Response createTestInstance() {
List<DataFrameTransformConfig> configs = new ArrayList<>();
for (int i = 0; i < randomInt(10); ++i) {
configs.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
}
return new Response(configs);
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
import java.util.ArrayList;
import java.util.List;
public class GetDataFrameTransformsStatsActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
@Override
protected Response createTestInstance() {
List<DataFrameTransformStateAndStats> stats = new ArrayList<>();
for (int i = 0; i < randomInt(10); ++i) {
stats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats());
}
return new Response(stats);
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PreviewDataFrameTransformsActionResponseWireTests extends AbstractWireSerializingDataFrameTestCase<Response> {
@Override
protected Response createTestInstance() {
int size = randomIntBetween(0, 10);
List<Map<String, Object>> data = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Map<String, Object> datum = new HashMap<>();
Map<String, Object> entry = new HashMap<>();
entry.put("value1", randomIntBetween(1, 100));
datum.put(randomAlphaOfLength(10), entry);
data.add(datum);
}
return new Response(data);
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Response;
public class PutDataFrameTransformActionResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createBlankInstance() {
return new Response();
}
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
}

View File

@ -10,7 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Request;
public class StartDataFrameTransformActionTests extends AbstractWireSerializingTestCase<Request> {
public class StartDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20));

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Response;
public class StartDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Response;
public class StopDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}