Make 0.19 brokers compatible with 0.18 router (#10091)

* Make brokers backwards compatible

In 0.19, Brokers gained the ability to serve segments. To support this change,
a `BROKER` ServerType was added to `druid.server.coordination`.

Druid nodes prior to this change do not know of this new server type and so
they would fail to deserialize this node's announcement.

This change makes it so that the broker only announces itself if the segment
cache is configured on the broker. It is expected that a Druid admin will only
configure the segment cache on the broker once the cluster has been upgraded
to a version that supports a broker using the segment cache.

* make code nicer

* Add tests

* Ignore icode coverage for nitialization classes

* Revert "Ignore icode coverage for nitialization classes"

This reverts commit aeec0c2ac2.

* code review
This commit is contained in:
Suneet Saldanha 2020-06-29 20:57:33 -07:00 committed by GitHub
parent 0841c89df6
commit b91a16943b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 450 additions and 51 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.coordination.ServerType;
@ -36,6 +37,7 @@ public class DataNodeService extends DruidService
private final long maxSize;
private final ServerType type;
private final int priority;
private final boolean isDiscoverable;
@JsonCreator
public DataNodeService(
@ -44,11 +46,23 @@ public class DataNodeService extends DruidService
@JsonProperty("type") ServerType type,
@JsonProperty("priority") int priority
)
{
this(tier, maxSize, type, priority, true);
}
public DataNodeService(
String tier,
long maxSize,
ServerType type,
int priority,
boolean isDiscoverable
)
{
this.tier = tier;
this.maxSize = maxSize;
this.type = type;
this.priority = priority;
this.isDiscoverable = isDiscoverable;
}
@Override
@ -81,6 +95,13 @@ public class DataNodeService extends DruidService
return priority;
}
@Override
@JsonIgnore
public boolean isDiscoverable()
{
return isDiscoverable;
}
@Override
public boolean equals(Object o)
{

View File

@ -19,6 +19,7 @@
package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@ -34,4 +35,17 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
public abstract class DruidService
{
public abstract String getName();
/**
* @return Whether the service should be discoverable. The default implementation returns true.
*
* Some implementations may choose to override this so that the service is not discoverable if it has not been
* configured. This will not throw a fatal exception, but instead will just skip binding and log a message. This could
* be useful for optional configuration for the service.
*/
@JsonIgnore
public boolean isDiscoverable()
{
return true;
}
}

View File

@ -19,19 +19,23 @@
package org.apache.druid.guice;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.name.Named;
import com.google.inject.util.Providers;
import org.apache.druid.client.DruidServerConfig;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import javax.annotation.Nullable;
@ -39,6 +43,10 @@ import javax.annotation.Nullable;
*/
public class StorageNodeModule implements Module
{
private static final EmittingLogger log = new EmittingLogger(StorageNodeModule.class);
@VisibleForTesting
static final String IS_SEGMENT_CACHE_CONFIGURED = "IS_SEGMENT_CACHE_CONFIGURED";
@Override
public void configure(Binder binder)
{
@ -74,17 +82,39 @@ public class StorageNodeModule implements Module
@Provides
@LazySingleton
public DataNodeService getDataNodeService(@Nullable ServerTypeConfig serverTypeConfig, DruidServerConfig config)
public DataNodeService getDataNodeService(
@Nullable ServerTypeConfig serverTypeConfig,
DruidServerConfig config,
@Named(IS_SEGMENT_CACHE_CONFIGURED) Boolean isSegmentCacheConfigured
)
{
if (serverTypeConfig == null) {
throw new ProvisionException("Must override the binding for ServerTypeConfig if you want a DruidServerMetadata.");
throw new ProvisionException("Must override the binding for ServerTypeConfig if you want a DataNodeService.");
}
if (!isSegmentCacheConfigured) {
log.info(
"Segment cache not configured on ServerType [%s]. It will not be assignable for segment placement",
serverTypeConfig.getServerType()
);
if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) {
throw new ProvisionException("Segment cache locations must be set on historicals.");
}
}
return new DataNodeService(
config.getTier(),
config.getMaxSize(),
serverTypeConfig.getServerType(),
config.getPriority()
config.getPriority(),
isSegmentCacheConfigured
);
}
@Provides
@LazySingleton
@Named(IS_SEGMENT_CACHE_CONFIGURED)
public Boolean isSegmentCacheConfigured(SegmentLoaderConfig segmentLoaderConfig)
{
return !segmentLoaderConfig.getLocations().isEmpty();
}
}

View File

@ -35,7 +35,6 @@ import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@ -143,14 +142,6 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
this.exec = exec;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
if (config.getLocations().isEmpty()) {
if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) {
throw new IAE("Segment cache locations must be set on historicals.");
} else {
log.info("Not starting SegmentLoadDropHandler with empty segment cache locations.");
}
}
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.google.inject.name.Names;
import com.google.inject.util.Modules;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Collections;
@RunWith(MockitoJUnitRunner.class)
public class StorageNodeModuleTest
{
private static final boolean INJECT_SERVER_TYPE_CONFIG = true;
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
@Mock(answer = Answers.RETURNS_MOCKS)
private ObjectMapper mapper;
@Mock(answer = Answers.RETURNS_MOCKS)
private DruidNode self;
@Mock(answer = Answers.RETURNS_MOCKS)
private ServerTypeConfig serverTypeConfig;
@Mock
private DruidProcessingConfig druidProcessingConfig;
@Mock
private SegmentLoaderConfig segmentLoaderConfig;
@Mock
private StorageLocationConfig storageLocation;
private Injector injector;
private StorageNodeModule target;
@Before
public void setUp()
{
Mockito.when(segmentLoaderConfig.getLocations()).thenReturn(Collections.singletonList(storageLocation));
target = new StorageNodeModule();
injector = makeInjector(INJECT_SERVER_TYPE_CONFIG);
}
@Test
public void testIsSegmentCacheConfiguredIsInjected()
{
Boolean isSegmentCacheConfigured = injector.getInstance(
Key.get(Boolean.class, Names.named(StorageNodeModule.IS_SEGMENT_CACHE_CONFIGURED))
);
Assert.assertNotNull(isSegmentCacheConfigured);
Assert.assertTrue(isSegmentCacheConfigured);
}
@Test
public void testIsSegmentCacheConfiguredWithNoLocationsConfiguredIsInjected()
{
mockSegmentCacheNotConfigured();
Boolean isSegmentCacheConfigured = injector.getInstance(
Key.get(Boolean.class, Names.named(StorageNodeModule.IS_SEGMENT_CACHE_CONFIGURED))
);
Assert.assertNotNull(isSegmentCacheConfigured);
Assert.assertFalse(isSegmentCacheConfigured);
}
@Test
public void getDataNodeServiceWithNoServerTypeConfigShouldThrowProvisionException()
{
exceptionRule.expect(ProvisionException.class);
exceptionRule.expectMessage("Must override the binding for ServerTypeConfig if you want a DataNodeService.");
injector = makeInjector(!INJECT_SERVER_TYPE_CONFIG);
injector.getInstance(DataNodeService.class);
}
@Test
public void getDataNodeServiceWithNoSegmentCacheConfiguredThrowProvisionException()
{
exceptionRule.expect(ProvisionException.class);
exceptionRule.expectMessage("Segment cache locations must be set on historicals.");
Mockito.doReturn(ServerType.HISTORICAL).when(serverTypeConfig).getServerType();
mockSegmentCacheNotConfigured();
injector.getInstance(DataNodeService.class);
}
@Test
public void getDataNodeServiceIsInjectedAsSingleton()
{
DataNodeService dataNodeService = injector.getInstance(DataNodeService.class);
Assert.assertNotNull(dataNodeService);
DataNodeService other = injector.getInstance(DataNodeService.class);
Assert.assertSame(dataNodeService, other);
}
@Test
public void getDataNodeServiceIsInjectedAndDiscoverable()
{
DataNodeService dataNodeService = injector.getInstance(DataNodeService.class);
Assert.assertNotNull(dataNodeService);
Assert.assertTrue(dataNodeService.isDiscoverable());
}
@Test
public void getDataNodeServiceWithSegmentCacheNotConfiguredIsInjectedAndDiscoverable()
{
mockSegmentCacheNotConfigured();
DataNodeService dataNodeService = injector.getInstance(DataNodeService.class);
Assert.assertNotNull(dataNodeService);
Assert.assertFalse(dataNodeService.isDiscoverable());
}
@Test
public void testDruidServerMetadataIsInjectedAsSingleton()
{
DruidServerMetadata druidServerMetadata = injector.getInstance(DruidServerMetadata.class);
Assert.assertNotNull(druidServerMetadata);
DruidServerMetadata other = injector.getInstance(DruidServerMetadata.class);
Assert.assertSame(druidServerMetadata, other);
}
@Test
public void testDruidServerMetadataWithNoServerTypeConfigShouldThrowProvisionException()
{
exceptionRule.expect(ProvisionException.class);
exceptionRule.expectMessage("Must override the binding for ServerTypeConfig if you want a DruidServerMetadata.");
injector = makeInjector(!INJECT_SERVER_TYPE_CONFIG);
injector.getInstance(DruidServerMetadata.class);
}
private Injector makeInjector(boolean withServerTypeConfig)
{
return Guice.createInjector(
Modules.override(
(binder) -> {
binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(self);
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig);
binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
},
target
).with(
(binder) -> {
binder.bind(SegmentLoaderConfig.class).toInstance(segmentLoaderConfig);
if (withServerTypeConfig) {
binder.bind(ServerTypeConfig.class).toInstance(serverTypeConfig);
}
}
)
);
}
private void mockSegmentCacheNotConfigured()
{
Mockito.doReturn(Collections.emptyList()).when(segmentLoaderConfig).getLocations();
}
}

View File

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@ -276,40 +275,6 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.stop();
}
@Test
public void testSegmentLoading1BrokerWithNoLocations() throws Exception
{
SegmentLoadDropHandler segmentLoadDropHandlerBrokerWithNoLocations = new SegmentLoadDropHandler(
jsonMapper,
segmentLoaderConfigNoLocations,
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-brokerNoLocations-[%d]"),
new ServerTypeConfig(ServerType.BROKER)
);
segmentLoadDropHandlerBrokerWithNoLocations.start();
segmentLoadDropHandler.stop();
}
@Test
public void testSegmentLoading1HistoricalWithNoLocations()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Segment cache locations must be set on historicals.");
new SegmentLoadDropHandler(
jsonMapper,
segmentLoaderConfigNoLocations,
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL)
);
}
/**
* Steps:
* 1. addSegment() succesfully loads the segment and annouces it

View File

@ -166,6 +166,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>

View File

@ -19,6 +19,7 @@
package org.apache.druid.cli;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
@ -37,6 +38,7 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.DruidNode;
import java.lang.annotation.Annotation;
@ -47,6 +49,8 @@ import java.util.List;
*/
public abstract class ServerRunnable extends GuiceRunnable
{
private static final EmittingLogger log = new EmittingLogger(ServerRunnable.class);
public ServerRunnable(Logger log)
{
super(log);
@ -154,7 +158,11 @@ public abstract class ServerRunnable extends GuiceRunnable
public DiscoverySideEffectsProvider build()
{
return new DiscoverySideEffectsProvider(nodeRole, serviceClasses, useLegacyAnnouncer);
return new DiscoverySideEffectsProvider(
nodeRole,
serviceClasses,
useLegacyAnnouncer
);
}
}
@ -194,15 +202,43 @@ public abstract class ServerRunnable extends GuiceRunnable
this.useLegacyAnnouncer = useLegacyAnnouncer;
}
@VisibleForTesting
DiscoverySideEffectsProvider(
final NodeRole nodeRole,
final List<Class<? extends DruidService>> serviceClasses,
final boolean useLegacyAnnouncer,
final DruidNode druidNode,
final DruidNodeAnnouncer announcer,
final ServiceAnnouncer legacyAnnouncer,
final Lifecycle lifecycle,
final Injector injector
)
{
this.nodeRole = nodeRole;
this.serviceClasses = serviceClasses;
this.useLegacyAnnouncer = useLegacyAnnouncer;
this.druidNode = druidNode;
this.announcer = announcer;
this.legacyAnnouncer = legacyAnnouncer;
this.lifecycle = lifecycle;
this.injector = injector;
}
@Override
public Child get()
{
ImmutableMap.Builder<String, DruidService> builder = new ImmutableMap.Builder<>();
for (Class<? extends DruidService> clazz : serviceClasses) {
DruidService service = injector.getInstance(clazz);
builder.put(service.getName(), service);
if (service.isDiscoverable()) {
builder.put(service.getName(), service);
} else {
log.info(
"Service[%s] is not discoverable. This will not be listed as a service provided by this node.",
service.getName()
);
}
}
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, nodeRole, builder.build());
lifecycle.addHandler(
@ -232,7 +268,6 @@ public abstract class ServerRunnable extends GuiceRunnable
},
Lifecycle.Stage.ANNOUNCEMENTS
);
return new Child();
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.DruidService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.server.DruidNode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(MockitoJUnitRunner.class)
public class DiscoverySideEffectsProviderTest
{
private static final boolean USE_LEGACY_ANNOUNCER = true;
private NodeRole nodeRole;
@Mock
private DruidNode druidNode;
/**
* This announcer is mocked to fail if it tries to announce a Druid service that is not discoverable.
*/
@Mock
private DruidNodeAnnouncer discoverableOnlyAnnouncer;
@Mock
private ServiceAnnouncer legacyAnnouncer;
@Mock
private Lifecycle lifecycle;
@Mock
private Injector injector;
private List<Lifecycle.Handler> lifecycleHandlers;
private ServerRunnable.DiscoverySideEffectsProvider target;
@Before
public void setUp()
{
nodeRole = NodeRole.HISTORICAL;
lifecycleHandlers = new ArrayList<>();
Mockito.when(injector.getInstance(DiscoverableDruidService.class)).thenReturn(new DiscoverableDruidService());
Mockito.when(injector.getInstance(UnDiscoverableDruidService.class)).thenReturn(new UnDiscoverableDruidService());
Mockito.doAnswer((invocation) -> {
DiscoveryDruidNode discoveryDruidNode = invocation.getArgument(0);
boolean isAllServicesDiscoverable =
discoveryDruidNode.getServices().values().stream().allMatch(DruidService::isDiscoverable);
Assert.assertTrue(isAllServicesDiscoverable);
return null;
}).when(discoverableOnlyAnnouncer).announce(ArgumentMatchers.any(DiscoveryDruidNode.class));
Mockito.doAnswer((invocation) -> lifecycleHandlers.add(invocation.getArgument(0)))
.when(lifecycle).addHandler(
ArgumentMatchers.any(Lifecycle.Handler.class),
ArgumentMatchers.eq(Lifecycle.Stage.ANNOUNCEMENTS)
);
target = new ServerRunnable.DiscoverySideEffectsProvider(
nodeRole,
ImmutableList.of(DiscoverableDruidService.class, UnDiscoverableDruidService.class),
USE_LEGACY_ANNOUNCER,
druidNode,
discoverableOnlyAnnouncer,
legacyAnnouncer,
lifecycle,
injector
);
}
@Test
public void testGetShouldAddAnnouncementsForDiscoverableServices() throws Exception
{
ServerRunnable.DiscoverySideEffectsProvider.Child child = target.get();
Assert.assertNotNull(child);
Assert.assertEquals(1, lifecycleHandlers.size());
// Start the lifecycle handler. This will make announcements via the announcer
lifecycleHandlers.get(0).start();
}
/**
* Dummy service which is discoverable.
*/
private static class DiscoverableDruidService extends DruidService
{
@Override
public String getName()
{
return "DiscoverableDruidService";
}
@Override
public boolean isDiscoverable()
{
return true;
}
}
/**
* Dummy service which is not discoverable.
*/
private static class UnDiscoverableDruidService extends DruidService
{
@Override
public String getName()
{
return "UnDiscoverableDruidService";
}
@Override
public boolean isDiscoverable()
{
return false;
}
}
}