From b91a16943b03abbaea6215e9517f8bf62a442c71 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Mon, 29 Jun 2020 20:57:33 -0700 Subject: [PATCH] Make 0.19 brokers compatible with 0.18 router (#10091) * Make brokers backwards compatible In 0.19, Brokers gained the ability to serve segments. To support this change, a `BROKER` ServerType was added to `druid.server.coordination`. Druid nodes prior to this change do not know of this new server type and so they would fail to deserialize this node's announcement. This change makes it so that the broker only announces itself if the segment cache is configured on the broker. It is expected that a Druid admin will only configure the segment cache on the broker once the cluster has been upgraded to a version that supports a broker using the segment cache. * make code nicer * Add tests * Ignore icode coverage for nitialization classes * Revert "Ignore icode coverage for nitialization classes" This reverts commit aeec0c2ac2b07c1b9262e32201913c7194167271. * code review --- .../druid/discovery/DataNodeService.java | 21 ++ .../apache/druid/discovery/DruidService.java | 14 ++ .../apache/druid/guice/StorageNodeModule.java | 36 +++- .../coordination/SegmentLoadDropHandler.java | 9 - .../druid/guice/StorageNodeModuleTest.java | 196 ++++++++++++++++++ .../SegmentLoadDropHandlerTest.java | 35 ---- services/pom.xml | 5 + .../org/apache/druid/cli/ServerRunnable.java | 43 +++- .../cli/DiscoverySideEffectsProviderTest.java | 142 +++++++++++++ 9 files changed, 450 insertions(+), 51 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/guice/StorageNodeModuleTest.java create mode 100644 services/src/test/java/org/apache/druid/cli/DiscoverySideEffectsProviderTest.java diff --git a/server/src/main/java/org/apache/druid/discovery/DataNodeService.java b/server/src/main/java/org/apache/druid/discovery/DataNodeService.java index 8a6f9679486..0414684346b 100644 --- a/server/src/main/java/org/apache/druid/discovery/DataNodeService.java +++ b/server/src/main/java/org/apache/druid/discovery/DataNodeService.java @@ -20,6 +20,7 @@ package org.apache.druid.discovery; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.server.coordination.ServerType; @@ -36,6 +37,7 @@ public class DataNodeService extends DruidService private final long maxSize; private final ServerType type; private final int priority; + private final boolean isDiscoverable; @JsonCreator public DataNodeService( @@ -44,11 +46,23 @@ public class DataNodeService extends DruidService @JsonProperty("type") ServerType type, @JsonProperty("priority") int priority ) + { + this(tier, maxSize, type, priority, true); + } + + public DataNodeService( + String tier, + long maxSize, + ServerType type, + int priority, + boolean isDiscoverable + ) { this.tier = tier; this.maxSize = maxSize; this.type = type; this.priority = priority; + this.isDiscoverable = isDiscoverable; } @Override @@ -81,6 +95,13 @@ public class DataNodeService extends DruidService return priority; } + @Override + @JsonIgnore + public boolean isDiscoverable() + { + return isDiscoverable; + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/discovery/DruidService.java b/server/src/main/java/org/apache/druid/discovery/DruidService.java index d23c411680f..729593f40c1 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidService.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidService.java @@ -19,6 +19,7 @@ package org.apache.druid.discovery; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; @@ -34,4 +35,17 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; public abstract class DruidService { public abstract String getName(); + + /** + * @return Whether the service should be discoverable. The default implementation returns true. + * + * Some implementations may choose to override this so that the service is not discoverable if it has not been + * configured. This will not throw a fatal exception, but instead will just skip binding and log a message. This could + * be useful for optional configuration for the service. + */ + @JsonIgnore + public boolean isDiscoverable() + { + return true; + } } diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java index a1770a95c71..d3750a735a0 100644 --- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java @@ -19,19 +19,23 @@ package org.apache.druid.guice; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.ProvisionException; +import com.google.inject.name.Named; import com.google.inject.util.Providers; import org.apache.druid.client.DruidServerConfig; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; import javax.annotation.Nullable; @@ -39,6 +43,10 @@ import javax.annotation.Nullable; */ public class StorageNodeModule implements Module { + private static final EmittingLogger log = new EmittingLogger(StorageNodeModule.class); + @VisibleForTesting + static final String IS_SEGMENT_CACHE_CONFIGURED = "IS_SEGMENT_CACHE_CONFIGURED"; + @Override public void configure(Binder binder) { @@ -74,17 +82,39 @@ public class StorageNodeModule implements Module @Provides @LazySingleton - public DataNodeService getDataNodeService(@Nullable ServerTypeConfig serverTypeConfig, DruidServerConfig config) + public DataNodeService getDataNodeService( + @Nullable ServerTypeConfig serverTypeConfig, + DruidServerConfig config, + @Named(IS_SEGMENT_CACHE_CONFIGURED) Boolean isSegmentCacheConfigured + ) { if (serverTypeConfig == null) { - throw new ProvisionException("Must override the binding for ServerTypeConfig if you want a DruidServerMetadata."); + throw new ProvisionException("Must override the binding for ServerTypeConfig if you want a DataNodeService."); + } + if (!isSegmentCacheConfigured) { + log.info( + "Segment cache not configured on ServerType [%s]. It will not be assignable for segment placement", + serverTypeConfig.getServerType() + ); + if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) { + throw new ProvisionException("Segment cache locations must be set on historicals."); + } } return new DataNodeService( config.getTier(), config.getMaxSize(), serverTypeConfig.getServerType(), - config.getPriority() + config.getPriority(), + isSegmentCacheConfigured ); } + + @Provides + @LazySingleton + @Named(IS_SEGMENT_CACHE_CONFIGURED) + public Boolean isSegmentCacheConfigured(SegmentLoaderConfig segmentLoaderConfig) + { + return !segmentLoaderConfig.getLocations().isEmpty(); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 87a19365e6f..e9d2f075c34 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -35,7 +35,6 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ServerTypeConfig; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -143,14 +142,6 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler this.exec = exec; this.segmentsToDelete = new ConcurrentSkipListSet<>(); - - if (config.getLocations().isEmpty()) { - if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) { - throw new IAE("Segment cache locations must be set on historicals."); - } else { - log.info("Not starting SegmentLoadDropHandler with empty segment cache locations."); - } - } requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build(); } diff --git a/server/src/test/java/org/apache/druid/guice/StorageNodeModuleTest.java b/server/src/test/java/org/apache/druid/guice/StorageNodeModuleTest.java new file mode 100644 index 00000000000..c844cdd1e80 --- /dev/null +++ b/server/src/test/java/org/apache/druid/guice/StorageNodeModuleTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.ProvisionException; +import com.google.inject.Scopes; +import com.google.inject.name.Names; +import com.google.inject.util.Modules; +import org.apache.druid.discovery.DataNodeService; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.validation.Validation; +import javax.validation.Validator; +import java.util.Collections; + +@RunWith(MockitoJUnitRunner.class) +public class StorageNodeModuleTest +{ + private static final boolean INJECT_SERVER_TYPE_CONFIG = true; + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + @Mock(answer = Answers.RETURNS_MOCKS) + private ObjectMapper mapper; + @Mock(answer = Answers.RETURNS_MOCKS) + private DruidNode self; + @Mock(answer = Answers.RETURNS_MOCKS) + private ServerTypeConfig serverTypeConfig; + @Mock + private DruidProcessingConfig druidProcessingConfig; + @Mock + private SegmentLoaderConfig segmentLoaderConfig; + @Mock + private StorageLocationConfig storageLocation; + + private Injector injector; + private StorageNodeModule target; + + @Before + public void setUp() + { + Mockito.when(segmentLoaderConfig.getLocations()).thenReturn(Collections.singletonList(storageLocation)); + + target = new StorageNodeModule(); + injector = makeInjector(INJECT_SERVER_TYPE_CONFIG); + } + + @Test + public void testIsSegmentCacheConfiguredIsInjected() + { + Boolean isSegmentCacheConfigured = injector.getInstance( + Key.get(Boolean.class, Names.named(StorageNodeModule.IS_SEGMENT_CACHE_CONFIGURED)) + ); + Assert.assertNotNull(isSegmentCacheConfigured); + Assert.assertTrue(isSegmentCacheConfigured); + } + + @Test + public void testIsSegmentCacheConfiguredWithNoLocationsConfiguredIsInjected() + { + mockSegmentCacheNotConfigured(); + Boolean isSegmentCacheConfigured = injector.getInstance( + Key.get(Boolean.class, Names.named(StorageNodeModule.IS_SEGMENT_CACHE_CONFIGURED)) + ); + Assert.assertNotNull(isSegmentCacheConfigured); + Assert.assertFalse(isSegmentCacheConfigured); + } + + @Test + public void getDataNodeServiceWithNoServerTypeConfigShouldThrowProvisionException() + { + exceptionRule.expect(ProvisionException.class); + exceptionRule.expectMessage("Must override the binding for ServerTypeConfig if you want a DataNodeService."); + injector = makeInjector(!INJECT_SERVER_TYPE_CONFIG); + injector.getInstance(DataNodeService.class); + } + + @Test + public void getDataNodeServiceWithNoSegmentCacheConfiguredThrowProvisionException() + { + exceptionRule.expect(ProvisionException.class); + exceptionRule.expectMessage("Segment cache locations must be set on historicals."); + Mockito.doReturn(ServerType.HISTORICAL).when(serverTypeConfig).getServerType(); + mockSegmentCacheNotConfigured(); + injector.getInstance(DataNodeService.class); + } + + @Test + public void getDataNodeServiceIsInjectedAsSingleton() + { + DataNodeService dataNodeService = injector.getInstance(DataNodeService.class); + Assert.assertNotNull(dataNodeService); + DataNodeService other = injector.getInstance(DataNodeService.class); + Assert.assertSame(dataNodeService, other); + } + + @Test + public void getDataNodeServiceIsInjectedAndDiscoverable() + { + DataNodeService dataNodeService = injector.getInstance(DataNodeService.class); + Assert.assertNotNull(dataNodeService); + Assert.assertTrue(dataNodeService.isDiscoverable()); + } + + @Test + public void getDataNodeServiceWithSegmentCacheNotConfiguredIsInjectedAndDiscoverable() + { + mockSegmentCacheNotConfigured(); + DataNodeService dataNodeService = injector.getInstance(DataNodeService.class); + Assert.assertNotNull(dataNodeService); + Assert.assertFalse(dataNodeService.isDiscoverable()); + } + + @Test + public void testDruidServerMetadataIsInjectedAsSingleton() + { + DruidServerMetadata druidServerMetadata = injector.getInstance(DruidServerMetadata.class); + Assert.assertNotNull(druidServerMetadata); + DruidServerMetadata other = injector.getInstance(DruidServerMetadata.class); + Assert.assertSame(druidServerMetadata, other); + } + + @Test + public void testDruidServerMetadataWithNoServerTypeConfigShouldThrowProvisionException() + { + exceptionRule.expect(ProvisionException.class); + exceptionRule.expectMessage("Must override the binding for ServerTypeConfig if you want a DruidServerMetadata."); + injector = makeInjector(!INJECT_SERVER_TYPE_CONFIG); + injector.getInstance(DruidServerMetadata.class); + } + + private Injector makeInjector(boolean withServerTypeConfig) + { + return Guice.createInjector( + Modules.override( + (binder) -> { + binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(self); + binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); + binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig); + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + }, + target + ).with( + (binder) -> { + binder.bind(SegmentLoaderConfig.class).toInstance(segmentLoaderConfig); + if (withServerTypeConfig) { + binder.bind(ServerTypeConfig.class).toInstance(serverTypeConfig); + } + } + ) + ); + } + + private void mockSegmentCacheNotConfigured() + { + Mockito.doReturn(Collections.emptyList()).when(segmentLoaderConfig).getLocations(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 6d8ef0a8cc4..32368b700c6 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.guice.ServerTypeConfig; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -276,40 +275,6 @@ public class SegmentLoadDropHandlerTest segmentLoadDropHandler.stop(); } - @Test - public void testSegmentLoading1BrokerWithNoLocations() throws Exception - { - SegmentLoadDropHandler segmentLoadDropHandlerBrokerWithNoLocations = new SegmentLoadDropHandler( - jsonMapper, - segmentLoaderConfigNoLocations, - announcer, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), - segmentManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-brokerNoLocations-[%d]"), - new ServerTypeConfig(ServerType.BROKER) - ); - - segmentLoadDropHandlerBrokerWithNoLocations.start(); - segmentLoadDropHandler.stop(); - } - - @Test - public void testSegmentLoading1HistoricalWithNoLocations() - { - expectedException.expect(IAE.class); - expectedException.expectMessage("Segment cache locations must be set on historicals."); - - new SegmentLoadDropHandler( - jsonMapper, - segmentLoaderConfigNoLocations, - announcer, - EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), - segmentManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"), - new ServerTypeConfig(ServerType.HISTORICAL) - ); - } - /** * Steps: * 1. addSegment() succesfully loads the segment and annouces it diff --git a/services/pom.xml b/services/pom.xml index abf820e2a82..2d27d30fa67 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -166,6 +166,11 @@ junit test + + org.mockito + mockito-core + test + org.hamcrest hamcrest-core diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java index 4beeab8f9fe..70b93824f2d 100644 --- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java +++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java @@ -19,6 +19,7 @@ package org.apache.druid.cli; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; @@ -37,6 +38,7 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.DruidNode; import java.lang.annotation.Annotation; @@ -47,6 +49,8 @@ import java.util.List; */ public abstract class ServerRunnable extends GuiceRunnable { + private static final EmittingLogger log = new EmittingLogger(ServerRunnable.class); + public ServerRunnable(Logger log) { super(log); @@ -154,7 +158,11 @@ public abstract class ServerRunnable extends GuiceRunnable public DiscoverySideEffectsProvider build() { - return new DiscoverySideEffectsProvider(nodeRole, serviceClasses, useLegacyAnnouncer); + return new DiscoverySideEffectsProvider( + nodeRole, + serviceClasses, + useLegacyAnnouncer + ); } } @@ -194,15 +202,43 @@ public abstract class ServerRunnable extends GuiceRunnable this.useLegacyAnnouncer = useLegacyAnnouncer; } + @VisibleForTesting + DiscoverySideEffectsProvider( + final NodeRole nodeRole, + final List> serviceClasses, + final boolean useLegacyAnnouncer, + final DruidNode druidNode, + final DruidNodeAnnouncer announcer, + final ServiceAnnouncer legacyAnnouncer, + final Lifecycle lifecycle, + final Injector injector + ) + { + this.nodeRole = nodeRole; + this.serviceClasses = serviceClasses; + this.useLegacyAnnouncer = useLegacyAnnouncer; + this.druidNode = druidNode; + this.announcer = announcer; + this.legacyAnnouncer = legacyAnnouncer; + this.lifecycle = lifecycle; + this.injector = injector; + } + @Override public Child get() { ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); for (Class clazz : serviceClasses) { DruidService service = injector.getInstance(clazz); - builder.put(service.getName(), service); + if (service.isDiscoverable()) { + builder.put(service.getName(), service); + } else { + log.info( + "Service[%s] is not discoverable. This will not be listed as a service provided by this node.", + service.getName() + ); + } } - DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, nodeRole, builder.build()); lifecycle.addHandler( @@ -232,7 +268,6 @@ public abstract class ServerRunnable extends GuiceRunnable }, Lifecycle.Stage.ANNOUNCEMENTS ); - return new Child(); } } diff --git a/services/src/test/java/org/apache/druid/cli/DiscoverySideEffectsProviderTest.java b/services/src/test/java/org/apache/druid/cli/DiscoverySideEffectsProviderTest.java new file mode 100644 index 00000000000..8832da28673 --- /dev/null +++ b/services/src/test/java/org/apache/druid/cli/DiscoverySideEffectsProviderTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.cli; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.curator.discovery.ServiceAnnouncer; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeAnnouncer; +import org.apache.druid.discovery.DruidService; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.server.DruidNode; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class DiscoverySideEffectsProviderTest +{ + private static final boolean USE_LEGACY_ANNOUNCER = true; + + private NodeRole nodeRole; + @Mock + private DruidNode druidNode; + /** + * This announcer is mocked to fail if it tries to announce a Druid service that is not discoverable. + */ + @Mock + private DruidNodeAnnouncer discoverableOnlyAnnouncer; + @Mock + private ServiceAnnouncer legacyAnnouncer; + @Mock + private Lifecycle lifecycle; + @Mock + private Injector injector; + private List lifecycleHandlers; + + private ServerRunnable.DiscoverySideEffectsProvider target; + + @Before + public void setUp() + { + nodeRole = NodeRole.HISTORICAL; + lifecycleHandlers = new ArrayList<>(); + Mockito.when(injector.getInstance(DiscoverableDruidService.class)).thenReturn(new DiscoverableDruidService()); + Mockito.when(injector.getInstance(UnDiscoverableDruidService.class)).thenReturn(new UnDiscoverableDruidService()); + Mockito.doAnswer((invocation) -> { + DiscoveryDruidNode discoveryDruidNode = invocation.getArgument(0); + boolean isAllServicesDiscoverable = + discoveryDruidNode.getServices().values().stream().allMatch(DruidService::isDiscoverable); + Assert.assertTrue(isAllServicesDiscoverable); + return null; + }).when(discoverableOnlyAnnouncer).announce(ArgumentMatchers.any(DiscoveryDruidNode.class)); + Mockito.doAnswer((invocation) -> lifecycleHandlers.add(invocation.getArgument(0))) + .when(lifecycle).addHandler( + ArgumentMatchers.any(Lifecycle.Handler.class), + ArgumentMatchers.eq(Lifecycle.Stage.ANNOUNCEMENTS) + ); + target = new ServerRunnable.DiscoverySideEffectsProvider( + nodeRole, + ImmutableList.of(DiscoverableDruidService.class, UnDiscoverableDruidService.class), + USE_LEGACY_ANNOUNCER, + druidNode, + discoverableOnlyAnnouncer, + legacyAnnouncer, + lifecycle, + injector + ); + } + + @Test + public void testGetShouldAddAnnouncementsForDiscoverableServices() throws Exception + { + ServerRunnable.DiscoverySideEffectsProvider.Child child = target.get(); + Assert.assertNotNull(child); + Assert.assertEquals(1, lifecycleHandlers.size()); + // Start the lifecycle handler. This will make announcements via the announcer + lifecycleHandlers.get(0).start(); + } + + /** + * Dummy service which is discoverable. + */ + private static class DiscoverableDruidService extends DruidService + { + @Override + public String getName() + { + return "DiscoverableDruidService"; + } + + @Override + public boolean isDiscoverable() + { + return true; + } + } + + /** + * Dummy service which is not discoverable. + */ + private static class UnDiscoverableDruidService extends DruidService + { + @Override + public String getName() + { + return "UnDiscoverableDruidService"; + } + + @Override + public boolean isDiscoverable() + { + return false; + } + } +}