Fix node discovery to ignore unknown DruidServices (#12157)

* Fix node discovery to ignore unknown DruidServices

* ignore all runtime exceptions

* fix test

* add custom deserializer

* custom serializer

* log host for unparseable druidService
This commit is contained in:
Jihoon Son 2022-01-18 22:08:59 -08:00 committed by GitHub
parent 53c0e489c2
commit cc2ffc6c0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1025 additions and 31 deletions

View File

@ -177,7 +177,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
node.getDruidNode().getHostAndPort(), node.getDruidNode().getHostAndPort(),
node.getDruidNode().getHostAndTlsPort(), node.getDruidNode().getHostAndTlsPort(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(), ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getType(), ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getServerType(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(), ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority() ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority()
); );

View File

@ -22,45 +22,94 @@ package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects; import java.util.Objects;
/** /**
* Metadata announced by any node that serves segments. * Metadata announced by any node that serves segments.
*
* Note for JSON serialization and deserialization.
*
* This class has a bug that it has the "type" property which is the duplicate name
* with the subtype key of {@link DruidService}. It seems to happen to work
* if the "type" subtype key appears first in the serialized JSON since
* Jackson uses the first "type" property as the subtype key.
* To always enforce this property order, this class does not use the {@link JsonProperty} annotation for serialization,
* but uses {@link org.apache.druid.jackson.DruidServiceSerializer}.
* Since this is a hacky-way to not break compatibility, a new "serverType" field is added
* to replace the deprecated "type" field. Once we completely remove the "type" field from this class,
* we can remove DruidServiceSerializer as well.
*
* The set of properties to serialize is hard-coded in DruidServiceSerializer.
* If you want to add a new field in this class before we remove the "type" field,
* you must add a proper handling of that new field in DruidServiceSerializer as well.
*
* For deserialization, DruidServices are deserialized as a part of {@link DiscoveryDruidNode}.
* To handle the bug of the duplicate "type" key, DiscoveryDruidNode first deserializes
* the JSON to {@link org.apache.druid.jackson.StringObjectPairList},
* handles the duplicate "type" keys in the StringObjectPairList,
* and then finally converts it to a DruidService. See {@link DiscoveryDruidNode#toMap(List)}.
*
* @see org.apache.druid.jackson.DruidServiceSerializer
* @see DiscoveryDruidNode#toMap(List)
*/ */
public class DataNodeService extends DruidService public class DataNodeService extends DruidService
{ {
public static final String DISCOVERY_SERVICE_KEY = "dataNodeService"; public static final String DISCOVERY_SERVICE_KEY = "dataNodeService";
public static final String SERVER_TYPE_PROP_KEY = "serverType";
private final String tier; private final String tier;
private final long maxSize; private final long maxSize;
private final ServerType type; private final ServerType serverType;
private final int priority; private final int priority;
private final boolean isDiscoverable; private final boolean isDiscoverable;
/**
* This JSON creator requires for the "type" subtype key of {@link DruidService} to appear before
* the "type" property of this class in the serialized JSON. Deserialization can fail otherwise.
* See the Javadoc of this class for more details.
*/
@JsonCreator @JsonCreator
public DataNodeService( public static DataNodeService fromJson(
@JsonProperty("tier") String tier, @JsonProperty("tier") String tier,
@JsonProperty("maxSize") long maxSize, @JsonProperty("maxSize") long maxSize,
@JsonProperty("type") ServerType type, @JsonProperty("type") @Deprecated @Nullable ServerType type,
@JsonProperty(SERVER_TYPE_PROP_KEY) @Nullable ServerType serverType,
@JsonProperty("priority") int priority @JsonProperty("priority") int priority
) )
{ {
this(tier, maxSize, type, priority, true); if (type == null && serverType == null) {
throw new IAE("ServerType is missing");
}
final ServerType theServerType = serverType == null ? type : serverType;
return new DataNodeService(tier, maxSize, theServerType, priority);
} }
public DataNodeService( public DataNodeService(
String tier, String tier,
long maxSize, long maxSize,
ServerType type, ServerType serverType,
int priority
)
{
this(tier, maxSize, serverType, priority, true);
}
public DataNodeService(
String tier,
long maxSize,
ServerType serverType,
int priority, int priority,
boolean isDiscoverable boolean isDiscoverable
) )
{ {
this.tier = tier; this.tier = tier;
this.maxSize = maxSize; this.maxSize = maxSize;
this.type = type; this.serverType = serverType;
this.priority = priority; this.priority = priority;
this.isDiscoverable = isDiscoverable; this.isDiscoverable = isDiscoverable;
} }
@ -71,30 +120,28 @@ public class DataNodeService extends DruidService
return DISCOVERY_SERVICE_KEY; return DISCOVERY_SERVICE_KEY;
} }
@JsonProperty
public String getTier() public String getTier()
{ {
return tier; return tier;
} }
@JsonProperty
public long getMaxSize() public long getMaxSize()
{ {
return maxSize; return maxSize;
} }
@JsonProperty public ServerType getServerType()
public ServerType getType()
{ {
return type; return serverType;
} }
@JsonProperty
public int getPriority() public int getPriority()
{ {
return priority; return priority;
} }
// leaving the "JsonIgnore" annotation to remember that "discoverable" is ignored in serialization,
// even though the annotation is not actually used.
@Override @Override
@JsonIgnore @JsonIgnore
public boolean isDiscoverable() public boolean isDiscoverable()
@ -115,13 +162,13 @@ public class DataNodeService extends DruidService
return maxSize == that.maxSize && return maxSize == that.maxSize &&
priority == that.priority && priority == that.priority &&
Objects.equals(tier, that.tier) && Objects.equals(tier, that.tier) &&
type == that.type; serverType == that.serverType;
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(tier, maxSize, type, priority); return Objects.hash(tier, maxSize, serverType, priority);
} }
@Override @Override
@ -130,7 +177,7 @@ public class DataNodeService extends DruidService
return "DataNodeService{" + return "DataNodeService{" +
"tier='" + tier + '\'' + "tier='" + tier + '\'' +
", maxSize=" + maxSize + ", maxSize=" + maxSize +
", type=" + type + ", serverType=" + serverType +
", priority=" + priority + ", priority=" + priority +
'}'; '}';
} }

View File

@ -19,12 +19,21 @@
package org.apache.druid.discovery; package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import org.apache.druid.jackson.StringObjectPairList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
/** /**
@ -35,22 +44,25 @@ import java.util.Objects;
*/ */
public class DiscoveryDruidNode public class DiscoveryDruidNode
{ {
private static final Logger LOG = new Logger(DiscoveryDruidNode.class);
private final DruidNode druidNode; private final DruidNode druidNode;
private final NodeRole nodeRole; private final NodeRole nodeRole;
/** /**
* Other metadata associated with the node e.g. * Map of service name -> DruidServices.
* if it's a historical node then lookup information, segment loading capacity etc. * This map has only the DruidServices that is understandable.
* It means, if there is some DruidService not understandable found while converting rawServices to services,
* that DruidService will be ignored and not stored in this map.
* *
* @see DruidNodeDiscoveryProvider#SERVICE_TO_NODE_TYPES * @see DruidNodeDiscoveryProvider#SERVICE_TO_NODE_TYPES
*/ */
private final Map<String, DruidService> services = new HashMap<>(); private final Map<String, DruidService> services = new HashMap<>();
@JsonCreator
public DiscoveryDruidNode( public DiscoveryDruidNode(
@JsonProperty("druidNode") DruidNode druidNode, DruidNode druidNode,
@JsonProperty("nodeType") NodeRole nodeRole, NodeRole nodeRole,
@JsonProperty("services") Map<String, DruidService> services Map<String, DruidService> services
) )
{ {
this.druidNode = druidNode; this.druidNode = druidNode;
@ -61,6 +73,75 @@ public class DiscoveryDruidNode
} }
} }
@JsonCreator
private static DiscoveryDruidNode fromJson(
@JsonProperty("druidNode") DruidNode druidNode,
@JsonProperty("nodeType") NodeRole nodeRole,
@JsonProperty("services") Map<String, StringObjectPairList> rawServices,
@JacksonInject ObjectMapper jsonMapper
)
{
Map<String, DruidService> services = new HashMap<>();
if (rawServices != null && !rawServices.isEmpty()) {
for (Entry<String, StringObjectPairList> entry : rawServices.entrySet()) {
List<NonnullPair<String, Object>> val = entry.getValue().getPairs();
try {
services.put(entry.getKey(), jsonMapper.convertValue(toMap(val), DruidService.class));
}
catch (RuntimeException e) {
LOG.warn("Ignore unparseable DruidService for [%s]: %s", druidNode.getHostAndPortToUse(), val);
}
}
}
return new DiscoveryDruidNode(druidNode, nodeRole, services);
}
/**
* A JSON of a {@link DruidService} is deserialized to a Map and then converted to aDruidService
* to ignore any "unknown" DruidServices to the current node. However, directly deserializing a JSON to a Map
* is problematic for {@link DataNodeService} as it has duplicate "type" keys in its serialized form.
* Because of the duplicate key, if we directly deserialize a JSON to a Map, we will lose one of the "type" property.
* This is definitely a bug of DataNodeService, but, since renaming one of those duplicate keys will
* break compatibility, DataNodeService still has the deprecated "type" property.
* See the Javadoc of DataNodeService for more details.
*
* This function catches such duplicate keys and rewrites the deprecated "type" to "serverType",
* so that we don't lose any properties.
*
* This method can be removed together when we entirely remove the deprecated "type" property from DataNodeService.
*/
@Deprecated
private static Map<String, Object> toMap(List<NonnullPair<String, Object>> pairs)
{
final Map<String, Object> map = Maps.newHashMapWithExpectedSize(pairs.size());
for (NonnullPair<String, Object> pair : pairs) {
final Object prevVal = map.put(pair.lhs, pair.rhs);
if (prevVal != null) {
if ("type".equals(pair.lhs)) {
if (DataNodeService.DISCOVERY_SERVICE_KEY.equals(prevVal)) {
map.put("type", prevVal);
// this one is likely serverType.
map.put(DataNodeService.SERVER_TYPE_PROP_KEY, pair.rhs);
continue;
} else if (DataNodeService.DISCOVERY_SERVICE_KEY.equals(pair.rhs)) {
// this one is likely serverType.
map.put(DataNodeService.SERVER_TYPE_PROP_KEY, prevVal);
continue;
}
} else if (DataNodeService.SERVER_TYPE_PROP_KEY.equals(pair.lhs)) {
// Ignore duplicate "serverType" keys since it can happen
// when the JSON has both "type" and "serverType" keys for serverType.
continue;
}
if (!prevVal.equals(pair.rhs)) {
throw new IAE("Duplicate key[%s] with different values: [%s] and [%s]", pair.lhs, prevVal, pair.rhs);
}
}
}
return map;
}
@JsonProperty @JsonProperty
public Map<String, DruidService> getServices() public Map<String, DruidService> getServices()
{ {

View File

@ -19,19 +19,27 @@
package org.apache.druid.guice; package org.apache.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides; import com.google.inject.Provides;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.jackson.DruidServiceSerializerModifier;
import org.apache.druid.jackson.StringObjectPairList;
import org.apache.druid.jackson.ToStringObjectPairListDeserializer;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.initialization.ZkPathsConfig;
import java.util.List;
/** /**
*/ */
public class ServerModule implements Module public class ServerModule implements DruidModule
{ {
public static final String ZK_PATHS_PROPERTY_BASE = "druid.zk.paths"; public static final String ZK_PATHS_PROPERTY_BASE = "druid.zk.paths";
@ -47,4 +55,14 @@ public class ServerModule implements Module
{ {
return ScheduledExecutors.createFactory(lifecycle); return ScheduledExecutors.createFactory(lifecycle);
} }
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule()
.addDeserializer(StringObjectPairList.class, new ToStringObjectPairListDeserializer())
.setSerializerModifier(new DruidServiceSerializerModifier())
);
}
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.jackson;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidService;
import java.io.IOException;
/**
* A custom serializer to handle the bug of duplicate "type" keys in {@link DataNodeService}.
* This class can be removed together when we entirely remove the deprecated "type" property from DataNodeService.
* See the Javadoc of DataNodeService for more details.
*/
public class DruidServiceSerializer extends StdSerializer<DruidService>
{
private final JsonSerializer<Object> defaultSerializer;
public DruidServiceSerializer(JsonSerializer<Object> defaultSerializer)
{
super(DruidService.class);
this.defaultSerializer = defaultSerializer;
}
@Override
public void serialize(DruidService druidService, JsonGenerator gen, SerializerProvider serializers) throws IOException
{
defaultSerializer.serialize(druidService, gen, serializers);
}
@Override
public void serializeWithType(
DruidService druidService,
JsonGenerator gen,
SerializerProvider serializers,
TypeSerializer typeSer
) throws IOException
{
if (druidService instanceof DataNodeService) {
DataNodeService dataNodeService = (DataNodeService) druidService;
gen.writeStartObject();
// Write subtype key first. This is important because Jackson picks up the first "type" field as the subtype key
// for deserialization.
gen.writeStringField("type", DataNodeService.DISCOVERY_SERVICE_KEY);
// Write properties of DataNodeService
gen.writeStringField("tier", dataNodeService.getTier());
gen.writeNumberField("maxSize", dataNodeService.getMaxSize());
// NOTE: the below line writes a duplicate key of "type".
// This is a bug that DataNodeService has a key of the same name as the subtype key.
// To address the bug, a new "serverType" field has been added.
// However, we cannot remove the deprecated "type" entirely yet because it will break rolling upgrade.
// It seems OK to have duplicate keys though because Jackson seems to always pick up the first "type" property
// as the subtype key for deserialization.
// This duplicate key should be removed in a future release.
// See DiscoveryDruidNode.toMap() for deserialization of DruidServices.
gen.writeObjectField("type", dataNodeService.getServerType());
gen.writeObjectField("serverType", dataNodeService.getServerType());
gen.writeNumberField("priority", dataNodeService.getPriority());
gen.writeEndObject();
} else {
defaultSerializer.serializeWithType(druidService, gen, serializers, typeSer);
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.jackson;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializationConfig;
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
import org.apache.druid.discovery.DruidService;
/**
* A modifier to use a custom serializer for {@link DruidService}.
* See {@link DruidServiceSerializer} for details.
*/
public class DruidServiceSerializerModifier extends BeanSerializerModifier
{
@Override
public JsonSerializer<?> modifySerializer(
SerializationConfig config,
BeanDescription beanDesc,
JsonSerializer<?> serializer
)
{
if (DruidService.class.isAssignableFrom(beanDesc.getBeanClass())) {
return new DruidServiceSerializer((JsonSerializer<Object>) serializer);
}
return serializer;
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.jackson;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.java.util.common.NonnullPair;
import java.util.List;
import java.util.Objects;
/**
* When {@link DiscoveryDruidNode} is deserialized from a JSON, the JSON is first converted to this class,
* and then to a Map. See {@link DiscoveryDruidNode#toMap} for details.
*
* @see ToStringObjectPairListDeserializer
*/
public class StringObjectPairList
{
private final List<NonnullPair<String, Object>> pairs;
public StringObjectPairList(List<NonnullPair<String, Object>> pairs)
{
this.pairs = pairs;
}
public List<NonnullPair<String, Object>> getPairs()
{
return pairs;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StringObjectPairList that = (StringObjectPairList) o;
return Objects.equals(pairs, that.pairs);
}
@Override
public int hashCode()
{
return Objects.hash(pairs);
}
@Override
public String toString()
{
return "StringObjectPairList{" +
"pairs=" + pairs +
'}';
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.jackson;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidService;
import org.apache.druid.java.util.common.NonnullPair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* When {@link DiscoveryDruidNode} is deserialized from a JSON,
* the JSON is first converted to {@link StringObjectPairList}, and then to a Map.
* See {@link DiscoveryDruidNode#toMap} for details.
*/
public class ToStringObjectPairListDeserializer extends StdDeserializer<StringObjectPairList>
{
public ToStringObjectPairListDeserializer()
{
super(StringObjectPairList.class);
}
@Override
public StringObjectPairList deserialize(JsonParser parser, DeserializationContext ctx) throws IOException
{
if (parser.currentToken() != JsonToken.START_OBJECT) {
throw ctx.wrongTokenException(parser, DruidService.class, JsonToken.START_OBJECT, null);
}
final List<NonnullPair<String, Object>> pairs = new ArrayList<>();
parser.nextToken();
while (parser.currentToken() == JsonToken.FIELD_NAME) {
final String key = parser.getText();
parser.nextToken();
final Object val = ctx.readValue(parser, Object.class);
pairs.add(new NonnullPair<>(key, val));
parser.nextToken();
}
if (parser.currentToken() != JsonToken.END_OBJECT) {
throw ctx.wrongTokenException(parser, DruidService.class, JsonToken.END_OBJECT, null);
}
return new StringObjectPairList(pairs);
}
}

View File

@ -64,6 +64,7 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
.addValue(ServerConfig.class, new ServerConfig()) .addValue(ServerConfig.class, new ServerConfig())
.addValue("java.lang.String", "dummy") .addValue("java.lang.String", "dummy")
.addValue("java.lang.Integer", 1234) .addValue("java.lang.Integer", 1234)
.addValue(ObjectMapper.class, objectMapper)
); );
curator.start(); curator.start();

View File

@ -19,8 +19,9 @@
package org.apache.druid.discovery; package org.apache.druid.discovery;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.TestHelper; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -29,6 +30,8 @@ import org.junit.Test;
*/ */
public class DataNodeServiceTest public class DataNodeServiceTest
{ {
private final ObjectMapper mapper = DruidServiceTestUtils.newJsonMapper();
@Test @Test
public void testSerde() throws Exception public void testSerde() throws Exception
{ {
@ -39,7 +42,6 @@ public class DataNodeServiceTest
1 1
); );
ObjectMapper mapper = TestHelper.makeJsonMapper();
DruidService actual = mapper.readValue( DruidService actual = mapper.readValue(
mapper.writeValueAsString(expected), mapper.writeValueAsString(expected),
DruidService.class DruidService.class
@ -47,4 +49,99 @@ public class DataNodeServiceTest
Assert.assertEquals(expected, actual); Assert.assertEquals(expected, actual);
} }
@Test
public void testDeserializeWithDeprecatedServerTypeProperty() throws Exception
{
String json = "{\n"
+ " \"type\" : \"dataNodeService\",\n"
+ " \"tier\" : \"tier\",\n"
+ " \"maxSize\" : 100,\n"
+ " \"type\" : \"historical\",\n"
+ " \"priority\" : 1\n"
+ "}";
DruidService actual = mapper.readValue(
json,
DruidService.class
);
Assert.assertEquals(
new DataNodeService(
"tier",
100,
ServerType.HISTORICAL,
1
),
actual
);
}
@Test
public void testDeserializeWithServerTypeProperty() throws Exception
{
String json = "{\n"
+ " \"type\" : \"dataNodeService\",\n"
+ " \"tier\" : \"tier\",\n"
+ " \"maxSize\" : 100,\n"
+ " \"serverType\" : \"historical\",\n"
+ " \"priority\" : 1\n"
+ "}";
DruidService actual = mapper.readValue(
json,
DruidService.class
);
Assert.assertEquals(
new DataNodeService(
"tier",
100,
ServerType.HISTORICAL,
1
),
actual
);
}
@Test
public void testSerdeDeserializeWithBothDeprecatedAndNewServerTypes() throws Exception
{
String json = "{\n"
+ " \"type\" : \"dataNodeService\",\n"
+ " \"tier\" : \"tier\",\n"
+ " \"maxSize\" : 100,\n"
+ " \"type\" : \"historical\",\n"
+ " \"serverType\" : \"historical\",\n"
+ " \"priority\" : 1\n"
+ "}";
DruidService actual = mapper.readValue(
json,
DruidService.class
);
Assert.assertEquals(
new DataNodeService(
"tier",
100,
ServerType.HISTORICAL,
1
),
actual
);
}
@Test
public void testSerializeSubtypeKeyShouldAppearFirstInJson() throws JsonProcessingException
{
final DataNodeService dataNodeService = new DataNodeService(
"tier",
100,
ServerType.HISTORICAL,
1
);
final String json = mapper.writeValueAsString(dataNodeService);
Assert.assertTrue(json.startsWith(StringUtils.format("{\"type\":\"%s\"", DataNodeService.DISCOVERY_SERVICE_KEY)));
}
} }

View File

@ -0,0 +1,328 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.discovery;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.guice.ServerModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ServerType;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collection;
public class DiscoveryDruidNodeTest
{
private final DruidNode druidNode;
private final NodeRole nodeRole;
public DiscoveryDruidNodeTest()
{
this.druidNode = new DruidNode(
"testNode",
"host",
true,
8082,
null,
true,
false
);
nodeRole = NodeRole.BROKER;
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(DiscoveryDruidNode.class)
.withNonnullFields("druidNode", "nodeRole", "services")
.usingGetClass()
.verify();
}
@Test
public void testDeserialize() throws JsonProcessingException
{
final ObjectMapper mapper = createObjectMapper(ImmutableList.of(Service1.class, Service2.class));
final DiscoveryDruidNode node = new DiscoveryDruidNode(
druidNode,
nodeRole,
ImmutableMap.of("service1", new Service1(), "service2", new Service2())
);
final String json = mapper.writeValueAsString(node);
final DiscoveryDruidNode fromJson = mapper.readValue(json, DiscoveryDruidNode.class);
Assert.assertEquals(node, fromJson);
}
@Test
public void testDeserializeIgnorUnknownDruidService() throws JsonProcessingException
{
final ObjectMapper mapper = createObjectMapper(ImmutableList.of(Service1.class));
final DiscoveryDruidNode node = new DiscoveryDruidNode(
druidNode,
nodeRole,
ImmutableMap.of("service1", new Service1(), "service2", new Service2())
);
final String json = mapper.writeValueAsString(node);
final DiscoveryDruidNode fromJson = mapper.readValue(json, DiscoveryDruidNode.class);
Assert.assertEquals(
new DiscoveryDruidNode(
druidNode,
nodeRole,
ImmutableMap.of("service1", new Service1())
),
fromJson
);
}
@Test
public void testSerdeWithDataNodeAndLookupNodeServices() throws JsonProcessingException
{
final ObjectMapper mapper = createObjectMapper(ImmutableList.of());
final DiscoveryDruidNode node = new DiscoveryDruidNode(
new DruidNode(
"druid/broker",
"druid-broker",
false,
8082,
-1,
8282,
true,
true
),
NodeRole.BROKER,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY,
new DataNodeService("_default_tier", 1000000000, ServerType.BROKER, 0),
LookupNodeService.DISCOVERY_SERVICE_KEY,
new LookupNodeService("lookup_tier")
)
);
final String json = mapper.writeValueAsString(node);
Assert.assertEquals(
node,
mapper.readValue(json, DiscoveryDruidNode.class)
);
}
@Test
public void testDeserializeWithDataNodeServiceWithAWrongPropertyOrder() throws JsonProcessingException
{
final ObjectMapper mapper = createObjectMapper(ImmutableList.of());
final String json = "{\n"
+ " \"druidNode\" : {\n"
+ " \"service\" : \"druid/broker\",\n"
+ " \"host\" : \"druid-broker\",\n"
+ " \"bindOnHost\" : false,\n"
+ " \"plaintextPort\" : 8082,\n"
+ " \"port\" : -1,\n"
+ " \"tlsPort\" : 8282,\n"
+ " \"enablePlaintextPort\" : true,\n"
+ " \"enableTlsPort\" : true\n"
+ " },\n"
+ " \"nodeType\" : \"broker\",\n"
+ " \"services\" : {\n"
+ " \"dataNodeService\" : {\n"
// In normal case, this proprty must appear after another "type" below.
+ " \"type\" : \"broker\",\n"
+ " \"type\" : \"dataNodeService\",\n"
+ " \"tier\" : \"_default_tier\",\n"
+ " \"maxSize\" : 1000000000,\n"
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
new DruidNode(
"druid/broker",
"druid-broker",
false,
8082,
-1,
8282,
true,
true
),
NodeRole.BROKER,
ImmutableMap.of(
"dataNodeService",
new DataNodeService("_default_tier", 1000000000, ServerType.BROKER, 0)
)
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
}
@Test
public void testDeserialize_duplicateProperties_shouldSucceedToDeserialize() throws JsonProcessingException
{
final ObjectMapper mapper = createObjectMapper(ImmutableList.of());
final String json = "{\n"
+ " \"druidNode\" : {\n"
+ " \"service\" : \"druid/broker\",\n"
+ " \"host\" : \"druid-broker\",\n"
+ " \"bindOnHost\" : false,\n"
+ " \"plaintextPort\" : 8082,\n"
+ " \"port\" : -1,\n"
+ " \"tlsPort\" : 8282,\n"
+ " \"enablePlaintextPort\" : true,\n"
+ " \"enableTlsPort\" : true\n"
+ " },\n"
+ " \"nodeType\" : \"broker\",\n"
+ " \"services\" : {\n"
+ " \"dataNodeService\" : {\n"
+ " \"type\" : \"dataNodeService\",\n"
+ " \"tier\" : \"_default_tier\",\n"
+ " \"maxSize\" : 1000000000,\n"
+ " \"maxSize\" : 1000000000,\n"
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
new DruidNode(
"druid/broker",
"druid-broker",
false,
8082,
-1,
8282,
true,
true
),
NodeRole.BROKER,
ImmutableMap.of(
"dataNodeService",
new DataNodeService("_default_tier", 1000000000, ServerType.BROKER, 0)
)
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
}
@Test
public void testDeserialize_duplicateKeysWithDifferentValus_shouldIgnoreDataNodeService()
throws JsonProcessingException
{
final ObjectMapper mapper = createObjectMapper(ImmutableList.of());
final String json = "{\n"
+ " \"druidNode\" : {\n"
+ " \"service\" : \"druid/broker\",\n"
+ " \"host\" : \"druid-broker\",\n"
+ " \"bindOnHost\" : false,\n"
+ " \"plaintextPort\" : 8082,\n"
+ " \"port\" : -1,\n"
+ " \"tlsPort\" : 8282,\n"
+ " \"enablePlaintextPort\" : true,\n"
+ " \"enableTlsPort\" : true\n"
+ " },\n"
+ " \"nodeType\" : \"broker\",\n"
+ " \"services\" : {\n"
+ " \"dataNodeService\" : {\n"
+ " \"type\" : \"dataNodeService\",\n"
+ " \"tier\" : \"_default_tier\",\n"
+ " \"maxSize\" : 1000000000,\n"
+ " \"maxSize\" : 10,\n"
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
new DruidNode(
"druid/broker",
"druid-broker",
false,
8082,
-1,
8282,
true,
true
),
NodeRole.BROKER,
ImmutableMap.of()
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
}
private static class Service1 extends DruidService
{
@Override
public String getName()
{
return "service1";
}
@Override
public int hashCode()
{
return 0;
}
@Override
public boolean equals(Object obj)
{
return obj instanceof Service1;
}
}
private static class Service2 extends DruidService
{
@Override
public String getName()
{
return "service2";
}
@Override
public int hashCode()
{
return 0;
}
@Override
public boolean equals(Object obj)
{
return obj instanceof Service2;
}
}
private static ObjectMapper createObjectMapper(Collection<Class<? extends DruidService>> druidServicesToRegister)
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerModules(new ServerModule().getJacksonModules());
//noinspection unchecked,rawtypes
mapper.registerSubtypes((Collection) druidServicesToRegister);
mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
return mapper;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.guice.ServerModule;
import org.apache.druid.jackson.DefaultObjectMapper;
public final class DruidServiceTestUtils
{
public static ObjectMapper newJsonMapper()
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerModules(new ServerModule().getJacksonModules());
return mapper;
}
private DruidServiceTestUtils()
{
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.discovery; package org.apache.druid.discovery;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -35,7 +34,7 @@ public class LookupNodeServiceTest
"tier" "tier"
); );
ObjectMapper mapper = TestHelper.makeJsonMapper(); ObjectMapper mapper = DruidServiceTestUtils.newJsonMapper();
DruidService actual = mapper.readValue( DruidService actual = mapper.readValue(
mapper.writeValueAsString(expected), mapper.writeValueAsString(expected),
DruidService.class DruidService.class

View File

@ -20,7 +20,6 @@
package org.apache.druid.discovery; package org.apache.druid.discovery;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -38,7 +37,7 @@ public class WorkerNodeServiceTest
"c1" "c1"
); );
ObjectMapper mapper = TestHelper.makeJsonMapper(); ObjectMapper mapper = DruidServiceTestUtils.newJsonMapper();
DruidService actual = mapper.readValue( DruidService actual = mapper.readValue(
mapper.writeValueAsString(expected), mapper.writeValueAsString(expected),
DruidService.class DruidService.class

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.jackson;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class StringObjectPairListTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(StringObjectPairList.class).usingGetClass().withNonnullFields("pairs").verify();
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.jackson;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.NonnullPair;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class ToStringObjectPairListDeserializerTest
{
private final ObjectMapper objectMapper;
public ToStringObjectPairListDeserializerTest()
{
objectMapper = new DefaultObjectMapper();
objectMapper.registerModule(
new SimpleModule().addDeserializer(
StringObjectPairList.class,
new ToStringObjectPairListDeserializer()
)
);
}
@Test
public void testDeserializeNestedMap() throws JsonProcessingException
{
final Map<String, Object> map = ImmutableMap.of(
"rootKey",
"rootVal",
"innerMap",
ImmutableMap.of(
"innerKey",
"innerVal"
)
);
final String json = objectMapper.writeValueAsString(map);
final StringObjectPairList pairList = objectMapper.readValue(json, StringObjectPairList.class);
Assert.assertEquals(
new StringObjectPairList(
ImmutableList.of(
new NonnullPair<>("rootKey", "rootVal"),
new NonnullPair<>("innerMap", ImmutableMap.of("innerKey", "innerVal"))
)
),
pairList
);
}
}

View File

@ -665,7 +665,7 @@ public class SystemSchema extends AbstractSchema
druidNode.getHostAndPort(), druidNode.getHostAndPort(),
druidNode.getHostAndTlsPort(), druidNode.getHostAndTlsPort(),
dataNodeService.getMaxSize(), dataNodeService.getMaxSize(),
dataNodeService.getType(), dataNodeService.getServerType(),
dataNodeService.getTier(), dataNodeService.getTier(),
dataNodeService.getPriority() dataNodeService.getPriority()
); );