internal-discovery: interfaces for announcement/discovery, curator based impls (#4634)

* internal-discovery: interfaces for announcement/discovery, curator impls

* more tests

* address some review comments

* more fixes

* address more review comments

* simplify ObjectMapper setup in CuratorDruidNodeAnnouncerAndDiscoveryTest

* fix KafkaIndexTaskTest

* make lookupTier overridable via RealtimeIndexTask and KafkaIndexTask context

* make teamcity build happy
This commit is contained in:
Himanshu 2017-08-16 15:07:16 -05:00 committed by cheddar
parent dba7c7d3cd
commit 74a64c88ab
45 changed files with 2247 additions and 234 deletions

View File

@ -42,6 +42,9 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
@ -50,6 +53,7 @@ import io.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
@ -289,12 +293,25 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
)
);
LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ?
toolbox.getLookupNodeService() :
new LookupNodeService((String) getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER));
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
)
);
try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final AppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
appenderator = appenderator0;
@ -597,6 +614,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
toolbox.getDataSegmentServerAnnouncer().unannounce();
return success();

View File

@ -53,6 +53,9 @@ import io.druid.data.input.impl.JSONPathFieldSpec;
import io.druid.data.input.impl.JSONPathSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
@ -117,7 +120,9 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -1637,7 +1642,11 @@ public class KafkaIndexTaskTest
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
testUtils.getTestIndexMergerV9()
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0)
);
}

View File

@ -31,6 +31,9 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.config.TaskConfig;
@ -44,6 +47,7 @@ import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment;
@ -87,6 +91,11 @@ public class TaskToolbox
private final CacheConfig cacheConfig;
private final IndexMergerV9 indexMergerV9;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
private final LookupNodeService lookupNodeService;
private final DataNodeService dataNodeService;
public TaskToolbox(
TaskConfig config,
TaskActionClient taskActionClient,
@ -107,7 +116,11 @@ public class TaskToolbox
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
IndexMergerV9 indexMergerV9
IndexMergerV9 indexMergerV9,
DruidNodeAnnouncer druidNodeAnnouncer,
DruidNode druidNode,
LookupNodeService lookupNodeService,
DataNodeService dataNodeService
)
{
this.config = config;
@ -130,6 +143,10 @@ public class TaskToolbox
this.cache = cache;
this.cacheConfig = cacheConfig;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
this.dataNodeService = dataNodeService;
}
public TaskConfig getConfig()
@ -271,4 +288,24 @@ public class TaskToolbox
{
return new File(taskWorkDir, "persist");
}
public DruidNodeAnnouncer getDruidNodeAnnouncer()
{
return druidNodeAnnouncer;
}
public LookupNodeService getLookupNodeService()
{
return lookupNodeService;
}
public DataNodeService getDataNodeService()
{
return dataNodeService;
}
public DruidNode getDruidNode()
{
return druidNode;
}
}

View File

@ -27,7 +27,11 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.RemoteChatHandler;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
@ -39,6 +43,7 @@ import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
@ -69,6 +74,10 @@ public class TaskToolboxFactory
private final Cache cache;
private final CacheConfig cacheConfig;
private final IndexMergerV9 indexMergerV9;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
private final LookupNodeService lookupNodeService;
private final DataNodeService dataNodeService;
@Inject
public TaskToolboxFactory(
@ -90,7 +99,11 @@ public class TaskToolboxFactory
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
IndexMergerV9 indexMergerV9
IndexMergerV9 indexMergerV9,
DruidNodeAnnouncer druidNodeAnnouncer,
@RemoteChatHandler DruidNode druidNode,
LookupNodeService lookupNodeService,
DataNodeService dataNodeService
)
{
this.config = config;
@ -112,6 +125,10 @@ public class TaskToolboxFactory
this.cache = cache;
this.cacheConfig = cacheConfig;
this.indexMergerV9 = indexMergerV9;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
this.dataNodeService = dataNodeService;
}
public TaskToolbox build(Task task)
@ -137,7 +154,11 @@ public class TaskToolboxFactory
indexIO,
cache,
cacheConfig,
indexMergerV9
indexMergerV9,
druidNodeAnnouncer,
druidNode,
lookupNodeService,
dataNodeService
);
}
}

View File

@ -31,6 +31,9 @@ import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
@ -78,6 +81,8 @@ import java.util.concurrent.CountDownLatch;
public class RealtimeIndexTask extends AbstractTask
{
public static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
private final static Random random = new Random();
@ -324,8 +329,22 @@ public class RealtimeIndexTask extends AbstractTask
Supplier<Committer> committerSupplier = null;
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ?
toolbox.getLookupNodeService() :
new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER));
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
)
);
try {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
plumber.startJob();
@ -431,6 +450,7 @@ public class RealtimeIndexTask extends AbstractTask
}
toolbox.getDataSegmentServerAnnouncer().unannounce();
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
}
log.info("Job done!");

View File

@ -110,7 +110,11 @@ public class TaskToolboxTest
mockIndexIO,
mockCache,
mockCacheConfig,
mockIndexMergerV9
mockIndexMergerV9,
null,
null,
null,
null
);
}

View File

@ -950,7 +950,7 @@ public class IndexTaskTest
throw new UnsupportedOperationException();
}
}, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
indexIO, null, null, indexMergerV9
indexIO, null, null, indexMergerV9, null, null, null, null
)
);

View File

@ -46,6 +46,9 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
@ -104,7 +107,9 @@ import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
@ -1048,7 +1053,11 @@ public class RealtimeIndexTaskTest
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
testUtils.getTestIndexMergerV9()
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
);
return toolboxFactory.build(task);

View File

@ -233,7 +233,8 @@ public class SameIntervalMergeTaskTest
{
}
}, jsonMapper, temporaryFolder.newFolder(),
indexIO, null, null, EasyMock.createMock(IndexMergerV9.class)
indexIO, null, null, EasyMock.createMock(IndexMergerV9.class),
null, null, null, null
)
);

View File

@ -292,7 +292,11 @@ public class IngestSegmentFirehoseFactoryTest
INDEX_IO,
null,
null,
INDEX_MERGER_V9
INDEX_MERGER_V9,
null,
null,
null,
null
);
Collection<Object[]> values = new LinkedList<>();
for (InputRowParser parser : Arrays.<InputRowParser>asList(

View File

@ -333,7 +333,11 @@ public class IngestSegmentFirehoseFactoryTimelineTest
INDEX_IO,
null,
null,
INDEX_MERGER_V9
INDEX_MERGER_V9,
null,
null,
null,
null
);
final Injector injector = Guice.createInjector(
new Module()

View File

@ -45,6 +45,9 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
@ -107,6 +110,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
@ -600,7 +604,11 @@ public class TaskLifecycleTest
INDEX_IO,
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,
INDEX_MERGER_V9
INDEX_MERGER_V9,
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0)
);
}

View File

@ -187,7 +187,11 @@ public class WorkerTaskMonitorTest
indexIO,
null,
null,
indexMergerV9
indexMergerV9,
null,
null,
null,
null
),
taskConfig,
new NoopServiceEmitter(),

View File

@ -1,174 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.curator.inventory.CuratorInventoryManager;
import io.druid.curator.inventory.CuratorInventoryManagerStrategy;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.java.util.common.ISE;
import org.apache.curator.framework.CuratorFramework;
import java.io.IOException;
/**
* Discovers DruidServer instances that serve segments using CuratorInventoryManager.
*/
public class DruidServerDiscovery
{
private final EmittingLogger log = new EmittingLogger(DruidServerDiscovery.class);
private final CuratorInventoryManager curatorInventoryManager;
private volatile Listener listener;
DruidServerDiscovery(
final CuratorFramework curatorFramework,
final String announcementsPath,
final ObjectMapper jsonMapper
)
{
curatorInventoryManager = initCuratorInventoryManager(curatorFramework, announcementsPath, jsonMapper);
}
public void start() throws Exception
{
Preconditions.checkNotNull(listener, "listener is not configured yet");
curatorInventoryManager.start();
}
public void stop() throws IOException
{
curatorInventoryManager.stop();
}
private CuratorInventoryManager initCuratorInventoryManager(
final CuratorFramework curator,
final String announcementsPath,
final ObjectMapper jsonMapper
)
{
return new CuratorInventoryManager<>(
curator,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return announcementsPath;
}
@Override
public String getInventoryPath()
{
return "/NON_EXISTENT_DUMMY_INVENTORY_PATH";
}
},
Execs.singleThreaded("CuratorInventoryManagerBasedServerDiscovery-%s"),
new CuratorInventoryManagerStrategy<DruidServer, Object>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DruidServer.class);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container.getName());
listener.serverAdded(container);
}
@Override
public void deadContainer(DruidServer container)
{
log.info("Server Disappeared[%s]", container.getName());
listener.serverRemoved(container);
}
@Override
public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
{
log.info("Server updated[%s]", oldContainer.getName());
return listener.serverUpdated(oldContainer, newContainer);
}
@Override
public Object deserializeInventory(byte[] bytes)
{
throw new ISE("no inventory should exist.");
}
@Override
public DruidServer addInventory(
final DruidServer container,
String inventoryKey,
final Object inventory
)
{
throw new ISE("no inventory should exist.");
}
@Override
public DruidServer updateInventory(
DruidServer container, String inventoryKey, Object inventory
)
{
throw new ISE("no inventory should exist.");
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
throw new ISE("no inventory should exist.");
}
@Override
public void inventoryInitialized()
{
log.info("Server inventory initialized.");
listener.initialized();
}
}
);
}
public void registerListener(Listener listener)
{
Preconditions.checkArgument(this.listener == null, "listener registered already.");
this.listener = listener;
}
public interface Listener
{
void serverAdded(DruidServer server);
DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer);
void serverRemoved(DruidServer server);
void initialized();
}
}

View File

@ -23,14 +23,13 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.http.client.HttpClient;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
@ -59,18 +58,14 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = null;
@Override
public HttpServerInventoryView get()
{
return new HttpServerInventoryView(
jsonMapper, smileMapper, httpClient,
new DruidServerDiscovery(curator, zkPaths.getAnnouncementsPath(), jsonMapper),
druidNodeDiscoveryProvider,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue(),
config
);

View File

@ -40,6 +40,10 @@ import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.concurrent.LifecycleLock;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
@ -83,7 +87,7 @@ import java.util.concurrent.TimeUnit;
public class HttpServerInventoryView implements ServerInventoryView, FilteredServerInventoryView
{
private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class);
private final DruidServerDiscovery serverDiscovery;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final LifecycleLock lifecycleLock = new LifecycleLock();
@ -105,8 +109,6 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
// to this queue again for next update.
private final BlockingQueue<String> queue = new LinkedBlockingDeque<>();
private final HttpClient httpClient;
private final ObjectMapper smileMapper;
private final HttpServerInventoryViewConfig config;
@ -116,14 +118,14 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
final @Json ObjectMapper jsonMapper,
final @Smile ObjectMapper smileMapper,
final @Global HttpClient httpClient,
final DruidServerDiscovery serverDiscovery,
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter,
final HttpServerInventoryViewConfig config
)
{
this.httpClient = httpClient;
this.smileMapper = smileMapper;
this.serverDiscovery = serverDiscovery;
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.defaultFilter = defaultFilter;
this.finalPredicate = defaultFilter;
this.config = config;
@ -178,36 +180,38 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
);
serverDiscovery.registerListener(
new DruidServerDiscovery.Listener()
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY);
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
@Override
public void serverAdded(DruidServer server)
public void nodeAdded(DiscoveryDruidNode node)
{
serverAddedOrUpdated(server);
serverAddedOrUpdated(toDruidServer(node));
}
@Override
public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer)
public void nodeRemoved(DiscoveryDruidNode node)
{
return serverAddedOrUpdated(newServer);
serverRemoved(toDruidServer(node));
}
@Override
public void serverRemoved(DruidServer server)
private DruidServer toDruidServer(DiscoveryDruidNode node)
{
HttpServerInventoryView.this.serverRemoved(server);
runServerCallbacks(server);
}
@Override
public void initialized()
{
serverInventoryInitialized();
return new DruidServer(
node.getDruidNode().getHostAndPortToUse(),
node.getDruidNode().getHostAndPort(),
node.getDruidNode().getHostAndTlsPort(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getType(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(),
((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority()
);
}
}
);
serverDiscovery.start();
log.info("Started HttpServerInventoryView.");
lifecycleLock.started();
@ -228,8 +232,6 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
log.info("Stopping HttpServerInventoryView.");
serverDiscovery.stop();
if (executor != null) {
executor.shutdownNow();
executor = null;
@ -373,11 +375,6 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
servers.remove(server.getName());
}
public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer)
{
return serverAddedOrUpdated(newServer);
}
@Override
public boolean isStarted()
{

View File

@ -23,14 +23,13 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.http.client.HttpClient;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
@ -59,11 +58,7 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = null;
@Override
public HttpServerInventoryView get()
@ -72,7 +67,7 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi
jsonMapper,
smileMapper,
httpClient,
new DruidServerDiscovery(curator, zkPaths.getAnnouncementsPath(), jsonMapper),
druidNodeDiscoveryProvider,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue(),
config
);

View File

@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator.discovery;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.curator.announcement.Announcer;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
/**
*/
public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer
{
private static final Logger log = new Logger(CuratorDruidNodeAnnouncer.class);
private final Announcer announcer;
private final ZkPathsConfig config;
private final ObjectMapper jsonMapper;
@Inject
public CuratorDruidNodeAnnouncer(
Announcer announcer,
ZkPathsConfig config,
@Json ObjectMapper jsonMapper
)
{
this.announcer = announcer;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public void announce(DiscoveryDruidNode discoveryDruidNode)
{
try {
log.info("Announcing [%s].", discoveryDruidNode);
announcer.announce(
ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
),
jsonMapper.writeValueAsBytes(discoveryDruidNode)
);
log.info("Announced [%s].", discoveryDruidNode);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
@Override
public void unannounce(DiscoveryDruidNode discoveryDruidNode)
{
log.info("Unannouncing [%s].", discoveryDruidNode);
announcer.unannounce(
ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
)
);
log.info("Unannounced [%s].", discoveryDruidNode);
}
}

View File

@ -0,0 +1,371 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.concurrent.Execs;
import io.druid.concurrent.LifecycleLock;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
*/
@ManageLifecycle
public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
private static final Logger log = new Logger(CuratorDruidNodeDiscoveryProvider.class);
private final CuratorFramework curatorFramework;
private final ZkPathsConfig config;
private final ObjectMapper jsonMapper;
private ExecutorService listenerExecutor;
private final Map<String, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
private final LifecycleLock lifecycleLock = new LifecycleLock();
@Inject
public CuratorDruidNodeDiscoveryProvider(
CuratorFramework curatorFramework,
ZkPathsConfig config,
@Json ObjectMapper jsonMapper
)
{
this.curatorFramework = curatorFramework;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public DruidNodeDiscovery getForNodeType(String nodeType)
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return nodeTypeWatchers.compute(
nodeType,
(k, v) -> {
if (v != null) {
return v;
}
log.info("Creating NodeTypeWatcher for nodeType [%s].", nodeType);
NodeTypeWatcher nodeTypeWatcher = new NodeTypeWatcher(
listenerExecutor,
curatorFramework,
config.getInternalDiscoveryPath(),
jsonMapper,
nodeType
);
nodeTypeWatcher.start();
log.info("Created NodeTypeWatcher for nodeType [%s].", nodeType);
return nodeTypeWatcher;
}
);
}
@LifecycleStart
public void start()
{
if (!lifecycleLock.canStart()) {
throw new ISE("can't start.");
}
try {
log.info("starting");
// This is single-threaded to ensure that all listener calls are executed precisely in the oder of add/remove
// event occurences.
listenerExecutor = Execs.singleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
log.info("started");
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
}
@LifecycleStop
public void stop()
{
if (!lifecycleLock.canStop()) {
throw new ISE("can't stop.");
}
log.info("stopping");
for (NodeTypeWatcher watcher : nodeTypeWatchers.values()) {
watcher.stop();
}
listenerExecutor.shutdownNow();
log.info("stopped");
}
private static class NodeTypeWatcher implements DruidNodeDiscovery
{
private static final Logger log = new Logger(NodeTypeWatcher.class);
private final CuratorFramework curatorFramework;
private final String nodeType;
private final ObjectMapper jsonMapper;
// hostAndPort -> DiscoveryDruidNode
private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
private final PathChildrenCache cache;
private final ExecutorService cacheExecutor;
private final ExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList();
private final Object lock = new Object();
NodeTypeWatcher(
ExecutorService listenerExecutor,
CuratorFramework curatorFramework,
String basePath,
ObjectMapper jsonMapper,
String nodeType
)
{
this.listenerExecutor = listenerExecutor;
this.curatorFramework = curatorFramework;
this.nodeType = nodeType;
this.jsonMapper = jsonMapper;
// This is required to be single threaded from Docs in PathChildrenCache;
this.cacheExecutor = Execs.singleThreaded(String.format("NodeTypeWatcher[%s]", nodeType));
this.cache = new PathChildrenCache(
curatorFramework,
ZKPaths.makePath(basePath, nodeType),
true,
true,
cacheExecutor
);
}
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
return Collections.unmodifiableCollection(nodes.values());
}
@Override
public void registerListener(DruidNodeDiscovery.Listener listener)
{
synchronized (lock) {
for (DiscoveryDruidNode node : nodes.values()) {
listenerExecutor.submit(() -> {
try {
listener.nodeAdded(node);
}
catch (Exception ex) {
log.error(
ex,
"Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].",
node,
listener
);
}
});
}
nodeListeners.add(listener);
}
}
public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
synchronized (lock) {
try {
switch (event.getType()) {
case CHILD_ADDED: {
final byte[] data;
try {
data = curatorFramework.getData().decompressed().forPath(event.getData().getPath());
}
catch (Exception ex) {
log.error(
ex,
"Failed to get data for path [%s]. Ignoring event [%s].",
event.getData().getPath(),
event.getType()
);
return;
}
DiscoveryDruidNode druidNode = jsonMapper.readValue(
data,
DiscoveryDruidNode.class
);
if (!nodeType.equals(druidNode.getNodeType())) {
log.warn(
"Node[%s:%s] add is discovered by node watcher of nodeType [%s]. Ignored.",
druidNode.getNodeType(),
druidNode,
nodeType
);
return;
}
log.info("Received event [%s] for Node[%s:%s].", event.getType(), druidNode.getNodeType(), druidNode);
addNode(druidNode);
break;
}
case CHILD_REMOVED: {
DiscoveryDruidNode druidNode = jsonMapper.readValue(event.getData().getData(), DiscoveryDruidNode.class);
if (!nodeType.equals(druidNode.getNodeType())) {
log.warn(
"Node[%s:%s] removal is discovered by node watcher of nodeType [%s]. Ignored.",
druidNode.getNodeType(),
druidNode,
nodeType
);
return;
}
log.info("Node[%s:%s] disappeared.", druidNode.getNodeType(), druidNode);
removeNode(druidNode);
break;
}
default: {
log.error("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), nodeType);
}
}
}
catch (Exception ex) {
log.error(ex, "unknown error in node watcher for type [%s].", nodeType);
}
}
}
private void addNode(DiscoveryDruidNode druidNode)
{
synchronized (lock) {
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
if (prev == null) {
for (DruidNodeDiscovery.Listener l : nodeListeners) {
listenerExecutor.submit(() -> {
try {
l.nodeAdded(druidNode);
}
catch (Exception ex) {
log.error(
ex,
"Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].",
druidNode,
l
);
}
});
}
} else {
log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev);
}
}
}
private void removeNode(DiscoveryDruidNode druidNode)
{
synchronized (lock) {
DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
if (prev == null) {
log.warn("Noticed disappearance of unknown druid node [%s:%s].", druidNode.getNodeType(), druidNode);
return;
}
for (DruidNodeDiscovery.Listener l : nodeListeners) {
listenerExecutor.submit(() -> {
try {
l.nodeRemoved(druidNode);
}
catch (Exception ex) {
log.error(
ex,
"Exception occured in DiscoveryDruidNode.nodeRemoved(node=[%s]) in listener [%s].",
druidNode,
l
);
}
});
}
}
}
public void start()
{
try {
cache.getListenable().addListener(
(client, event) -> handleChildEvent(client, event)
);
cache.start();
}
catch (Exception ex) {
throw Throwables.propagate(ex);
}
}
public void stop()
{
try {
cache.close();
cacheExecutor.shutdownNow();
}
catch (Exception ex) {
log.error(ex, "Failed to stop node watcher for type [%s].", nodeType);
}
}
}
}

View File

@ -30,12 +30,14 @@ import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.KeyHolder;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.PolyBind;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.server.DruidNode;
import io.druid.server.initialization.CuratorDiscoveryConfig;
@ -74,6 +76,9 @@ public class DiscoveryModule implements Module
{
private static final String NAME = "DiscoveryModule:internal";
private static final String INTERNAL_DISCOVERY_PROP = "druid.discovery.type";
private static final String CURATOR_KEY = "curator";
/**
* Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle.
*
@ -146,6 +151,25 @@ public class DiscoveryModule implements Module
binder.bind(ServiceAnnouncer.class)
.to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME)))
.in(LazySingleton.class);
// internal discovery bindings.
PolyBind.createChoiceWithDefault(
binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidNodeAnnouncer.class), CURATOR_KEY
);
PolyBind.createChoiceWithDefault(
binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidNodeDiscoveryProvider.class), CURATOR_KEY
);
PolyBind.optionBinder(binder, Key.get(DruidNodeDiscoveryProvider.class))
.addBinding(CURATOR_KEY)
.to(CuratorDruidNodeDiscoveryProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DruidNodeAnnouncer.class))
.addBinding(CURATOR_KEY)
.to(CuratorDruidNodeAnnouncer.class)
.in(LazySingleton.class);
}
@Provides

View File

@ -0,0 +1,116 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.server.coordination.ServerType;
import java.util.Objects;
/**
* Metadata announced by any node that serves segments.
*/
public class DataNodeService extends DruidService
{
public static final String DISCOVERY_SERVICE_KEY = "dataNodeService";
private final String tier;
private final long maxSize;
private final ServerType type;
private final int priority;
@JsonCreator
public DataNodeService(
@JsonProperty("tier") String tier,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") ServerType type,
@JsonProperty("priority") int priority
)
{
this.tier = tier;
this.maxSize = maxSize;
this.type = type;
this.priority = priority;
}
@Override
public String getName()
{
return DISCOVERY_SERVICE_KEY;
}
@JsonProperty
public String getTier()
{
return tier;
}
@JsonProperty
public long getMaxSize()
{
return maxSize;
}
@JsonProperty
public ServerType getType()
{
return type;
}
@JsonProperty
public int getPriority()
{
return priority;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DataNodeService that = (DataNodeService) o;
return maxSize == that.maxSize &&
priority == that.priority &&
Objects.equals(tier, that.tier) &&
type == that.type;
}
@Override
public int hashCode()
{
return Objects.hash(tier, maxSize, type, priority);
}
@Override
public String toString()
{
return "DataNodeService{" +
"tier='" + tier + '\'' +
", maxSize=" + maxSize +
", type=" + type +
", priority=" + priority +
'}';
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.server.DruidNode;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Representation of all information related to discovery of a node and all the other metadata associated with
* the node per nodeType such as broker, historical etc.
* Note that one Druid process might announce multiple DiscoveryDruidNode if it acts as multiple nodeTypes e.g.
* coordinator would announce DiscoveryDruidNode for overlord nodeType as well when acting as overlord.
*/
public class DiscoveryDruidNode
{
private final DruidNode druidNode;
private final String nodeType;
// Other metadata associated with the node e.g.
// if its a historical node then lookup information, segment loading capacity etc.
private final Map<String, DruidService> services = new HashMap<>();
@JsonCreator
public DiscoveryDruidNode(
@JsonProperty("druidNode") DruidNode druidNode,
@JsonProperty("nodeType") String nodeType,
@JsonProperty("services") Map<String, DruidService> services
)
{
this.druidNode = druidNode;
this.nodeType = nodeType;
if (services != null && !services.isEmpty()) {
this.services.putAll(services);
}
}
@JsonProperty
public Map<String, DruidService> getServices()
{
return services;
}
@JsonProperty
public String getNodeType()
{
return nodeType;
}
@JsonProperty
public DruidNode getDruidNode()
{
return druidNode;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DiscoveryDruidNode that = (DiscoveryDruidNode) o;
return Objects.equals(druidNode, that.druidNode) &&
Objects.equals(nodeType, that.nodeType) &&
Objects.equals(services, that.services);
}
@Override
public int hashCode()
{
return Objects.hash(druidNode, nodeType, services);
}
@Override
public String toString()
{
return "DiscoveryDruidNode{" +
"druidNode=" + druidNode +
", nodeType='" + nodeType + '\'' +
", services=" + services +
'}';
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
/**
* DiscoveryDruidNode announcer for internal discovery.
*/
public interface DruidNodeAnnouncer
{
void announce(DiscoveryDruidNode discoveryDruidNode);
void unannounce(DiscoveryDruidNode discoveryDruidNode);
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import java.util.Collection;
/**
* Interface for discovering Druid Nodes announced by DruidNodeAnnouncer.
*/
public interface DruidNodeDiscovery
{
Collection<DiscoveryDruidNode> getAllNodes();
void registerListener(Listener listener);
interface Listener
{
void nodeAdded(DiscoveryDruidNode node);
void nodeRemoved(DiscoveryDruidNode node);
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Provider of DruidNodeDiscovery instances.
*/
public abstract class DruidNodeDiscoveryProvider
{
private static final Logger log = new Logger(DruidNodeDiscoveryProvider.class);
public static final String NODE_TYPE_COORDINATOR = "coordinator";
public static final String NODE_TYPE_HISTORICAL = "historical";
public static final String NODE_TYPE_BROKER = "broker";
public static final String NODE_TYPE_OVERLORD = "overlord";
public static final String NODE_TYPE_PEON = "peon";
public static final String NODE_TYPE_ROUTER = "router";
public static final String NODE_TYPE_MM = "middleManager";
public static final Set<String> ALL_NODE_TYPES = ImmutableSet.of(
NODE_TYPE_COORDINATOR,
NODE_TYPE_HISTORICAL,
NODE_TYPE_BROKER,
NODE_TYPE_OVERLORD,
NODE_TYPE_PEON,
NODE_TYPE_ROUTER,
NODE_TYPE_MM
);
private static final Map<String, Set<String>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_BROKER, NODE_TYPE_HISTORICAL, NODE_TYPE_PEON),
DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_HISTORICAL, NODE_TYPE_PEON),
WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_MM)
);
private final Map<String, ServiceListener> serviceDiscoveryMap = new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size());
/**
* Get DruidNodeDiscovery instance to discover nodes of given nodeType.
*/
public abstract DruidNodeDiscovery getForNodeType(String nodeType);
/**
* Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata.
*/
public synchronized DruidNodeDiscovery getForService(String serviceName)
{
ServiceListener nodeDiscovery = serviceDiscoveryMap.get(serviceName);
if (nodeDiscovery == null) {
Set<String> nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName);
if (nodeTypesToWatch == null) {
throw new IAE("Unknown service [%s].", serviceName);
}
nodeDiscovery = new ServiceListener(serviceName);
for (String nodeType : nodeTypesToWatch) {
getForNodeType(nodeType).registerListener(nodeDiscovery);
}
serviceDiscoveryMap.put(serviceName, nodeDiscovery);
}
return nodeDiscovery;
}
private static class ServiceListener implements DruidNodeDiscovery, DruidNodeDiscovery.Listener
{
private final String service;
private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
private final List<Listener> listeners = new ArrayList<>();
ServiceListener(String service)
{
this.service = service;
}
@Override
public synchronized void nodeAdded(DiscoveryDruidNode node)
{
if (node.getServices().containsKey(service)) {
DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node);
if (prev == null) {
for (Listener listener : listeners) {
listener.nodeAdded(node);
}
} else {
log.warn("Node[%s] discovered but already exists [%s].", node, prev);
}
} else {
log.warn("Node[%s] discovered but doesn't have service[%s]. Ignored.", node, service);
}
}
@Override
public synchronized void nodeRemoved(DiscoveryDruidNode node)
{
DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse());
if (prev != null) {
for (Listener listener : listeners) {
listener.nodeRemoved(node);
}
} else {
log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service);
}
}
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
return Collections.unmodifiableCollection(nodes.values());
}
@Override
public synchronized void registerListener(Listener listener)
{
for (DiscoveryDruidNode node : nodes.values()) {
listener.nodeAdded(node);
}
listeners.add(listener);
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* Metadata of a service announced by node. See DataNodeService and LookupNodeService for examples.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = DataNodeService.DISCOVERY_SERVICE_KEY, value = DataNodeService.class),
@JsonSubTypes.Type(name = LookupNodeService.DISCOVERY_SERVICE_KEY, value = LookupNodeService.class),
@JsonSubTypes.Type(name = WorkerNodeService.DISCOVERY_SERVICE_KEY, value = WorkerNodeService.class)
})
public abstract class DruidService
{
public abstract String getName();
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
* Metadata announced by any node that serves queries and hence applies lookups.
*/
public class LookupNodeService extends DruidService
{
public static final String DISCOVERY_SERVICE_KEY = "lookupNodeService";
private final String lookupTier;
public LookupNodeService(
@JsonProperty("lookupTier") String lookupTier
)
{
this.lookupTier = lookupTier;
}
@Override
public String getName()
{
return DISCOVERY_SERVICE_KEY;
}
@JsonProperty
public String getLookupTier()
{
return lookupTier;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LookupNodeService that = (LookupNodeService) o;
return Objects.equals(lookupTier, that.lookupTier);
}
@Override
public int hashCode()
{
return Objects.hash(lookupTier);
}
@Override
public String toString()
{
return "LookupNodeService{" +
"lookupTier='" + lookupTier + '\'' +
'}';
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
* Worker metadata announced by Middle Manager.
*/
public class WorkerNodeService extends DruidService
{
public static final String DISCOVERY_SERVICE_KEY = "workerNodeService";
private final String ip;
private final int capacity;
private final String version;
public WorkerNodeService(
@JsonProperty("ip") String ip,
@JsonProperty("capacity") int capacity,
@JsonProperty("version") String version
)
{
this.ip = ip;
this.capacity = capacity;
this.version = version;
}
@Override
public String getName()
{
return DISCOVERY_SERVICE_KEY;
}
@JsonProperty
public String getIp()
{
return ip;
}
@JsonProperty
public int getCapacity()
{
return capacity;
}
@JsonProperty
public String getVersion()
{
return version;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerNodeService that = (WorkerNodeService) o;
return capacity == that.capacity &&
Objects.equals(ip, that.ip) &&
Objects.equals(version, that.version);
}
@Override
public int hashCode()
{
return Objects.hash(ip, capacity, version);
}
@Override
public String toString()
{
return "WorkerNodeService{" +
"ip='" + ip + '\'' +
", capacity=" + capacity +
", version='" + version + '\'' +
'}';
}
}

View File

@ -25,6 +25,7 @@ import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.util.Providers;
import io.druid.client.DruidServerConfig;
import io.druid.discovery.DataNodeService;
import io.druid.guice.annotations.Self;
import io.druid.query.DruidProcessingConfig;
import io.druid.segment.column.ColumnConfig;
@ -66,4 +67,23 @@ public class StorageNodeModule implements Module
config.getPriority()
);
}
@Provides
@LazySingleton
public DataNodeService getDataNodeService(
@Nullable NodeTypeConfig nodeType,
DruidServerConfig config
)
{
if (nodeType == null) {
throw new ProvisionException("Must override the binding for NodeTypeConfig if you want a DruidServerMetadata.");
}
return new DataNodeService(
config.getTier(),
config.getMaxSize(),
nodeType.getNodeType(),
config.getPriority()
);
}
}

View File

@ -32,11 +32,14 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provides;
import io.druid.common.utils.ServletResourceUtils;
import io.druid.curator.announcement.Announcer;
import io.druid.guice.ExpressionModule;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
@ -101,6 +104,13 @@ public class LookupModule implements DruidModule
2 // 1 for "normal" operation and 1 for "emergency" or other
);
}
@Provides
@LazySingleton
public LookupNodeService getLookupNodeService(LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig)
{
return new LookupNodeService(lookupListeningAnnouncerConfig.getLookupTier());
}
}
@Path(ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY)

View File

@ -0,0 +1,113 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.http;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.LazySingleton;
import io.druid.server.http.security.StateResourceFilter;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Collection;
/**
*/
@Path("/druid/coordinator/v1/cluster")
@LazySingleton
@ResourceFilters(StateResourceFilter.class)
public class ClusterResource
{
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
@Inject
public ClusterResource(DruidNodeDiscoveryProvider discoveryProvider)
{
this.druidNodeDiscoveryProvider = discoveryProvider;
}
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getClusterServers()
{
ImmutableMap.Builder<String, Object> entityBuilder = new ImmutableMap.Builder<>();
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR)
.getAllNodes()
);
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD)
.getAllNodes()
);
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER)
.getAllNodes()
);
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL)
.getAllNodes()
);
Collection<DiscoveryDruidNode> mmNodes = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)
.getAllNodes();
if (!mmNodes.isEmpty()) {
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_MM, mmNodes);
}
Collection<DiscoveryDruidNode> routerNodes = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER)
.getAllNodes();
if (!routerNodes.isEmpty()) {
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, routerNodes);
}
return Response.status(Response.Status.OK).entity(entityBuilder.build()).build();
}
@GET
@Produces({MediaType.APPLICATION_JSON})
@Path("/{nodeType}")
public Response getClusterServers(
@PathParam("nodeType") String nodeType
)
{
if (nodeType == null || !DruidNodeDiscoveryProvider.ALL_NODE_TYPES.contains(nodeType)) {
return Response.serverError()
.status(Response.Status.BAD_REQUEST)
.entity(String.format(
"Invalid nodeType [%s]. Valid node types are %s .",
nodeType,
DruidNodeDiscoveryProvider.ALL_NODE_TYPES
))
.build();
} else {
return Response.status(Response.Status.OK).entity(
druidNodeDiscoveryProvider.getForNodeType(nodeType).getAllNodes()
).build();
}
}
}

View File

@ -24,6 +24,7 @@ import org.joda.time.Period;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.Objects;
/**
*/
@ -81,6 +82,30 @@ public class ServerConfig
return tls;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ServerConfig that = (ServerConfig) o;
return numThreads == that.numThreads &&
defaultQueryTimeout == that.defaultQueryTimeout &&
maxScatterGatherBytes == that.maxScatterGatherBytes &&
plaintext == that.plaintext &&
tls == that.tls &&
Objects.equals(maxIdleTime, that.maxIdleTime);
}
@Override
public int hashCode()
{
return Objects.hash(numThreads, maxIdleTime, defaultQueryTimeout, maxScatterGatherBytes, plaintext, tls);
}
@Override
public String toString()
{

View File

@ -90,6 +90,11 @@ public class ZkPathsConfig
return (null == connectorPath) ? defaultPath("connector") : connectorPath;
}
public String getInternalDiscoveryPath()
{
return defaultPath("internal-discovery");
}
public String defaultPath(final String subPath)
{
return ZKPaths.makePath(getBase(), subPath);

View File

@ -0,0 +1,224 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator.discovery;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.announcement.Announcer;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
/**
*/
public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
{
@Before
public void setUp() throws Exception
{
setupServerAndCurator();
}
@Test(timeout = 5000)
public void testAnnouncementAndDiscovery() throws Exception
{
ObjectMapper objectMapper = new DefaultObjectMapper();
//additional setup to serde DruidNode
objectMapper.setInjectableValues(new InjectableValues.Std()
.addValue(ServerConfig.class, new ServerConfig())
.addValue("java.lang.String", "dummy")
.addValue("java.lang.Integer", 1234)
);
curator.start();
curator.blockUntilConnected();
Announcer announcer = new Announcer(
curator,
MoreExecutors.sameThreadExecutor()
);
announcer.start();
CuratorDruidNodeAnnouncer druidNodeAnnouncer = new CuratorDruidNodeAnnouncer(
announcer,
new ZkPathsConfig(),
objectMapper
);
DiscoveryDruidNode node1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
ImmutableMap.of()
);
DiscoveryDruidNode node2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
ImmutableMap.of()
);
DiscoveryDruidNode node3 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
ImmutableMap.of()
);
DiscoveryDruidNode node4 = new DiscoveryDruidNode(
new DruidNode("s4", "h4", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
ImmutableMap.of()
);
druidNodeAnnouncer.announce(node1);
druidNodeAnnouncer.announce(node3);
CuratorDruidNodeDiscoveryProvider druidNodeDiscoveryProvider = new CuratorDruidNodeDiscoveryProvider(
curator,
new ZkPathsConfig(),
objectMapper
);
druidNodeDiscoveryProvider.start();
DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR);
DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD);
while (!checkNodes(ImmutableSet.of(node1), coordDiscovery.getAllNodes())) {
Thread.sleep(100);
}
while (!checkNodes(ImmutableSet.of(node3), overlordDiscovery.getAllNodes())) {
Thread.sleep(100);
}
HashSet<DiscoveryDruidNode> coordNodes = new HashSet<>();
coordDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
@Override
public void nodeAdded(DiscoveryDruidNode node)
{
coordNodes.add(node);
}
@Override
public void nodeRemoved(DiscoveryDruidNode node)
{
coordNodes.remove(node);
}
}
);
HashSet<DiscoveryDruidNode> overlordNodes = new HashSet<>();
overlordDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
@Override
public void nodeAdded(DiscoveryDruidNode node)
{
overlordNodes.add(node);
}
@Override
public void nodeRemoved(DiscoveryDruidNode node)
{
overlordNodes.remove(node);
}
}
);
while (!checkNodes(ImmutableSet.of(node1), coordNodes)) {
Thread.sleep(100);
}
while (!checkNodes(ImmutableSet.of(node3), overlordNodes)) {
Thread.sleep(100);
}
druidNodeAnnouncer.announce(node2);
druidNodeAnnouncer.announce(node4);
while (!checkNodes(ImmutableSet.of(node1, node2), coordDiscovery.getAllNodes())) {
Thread.sleep(100);
}
while (!checkNodes(ImmutableSet.of(node3, node4), overlordDiscovery.getAllNodes())) {
Thread.sleep(100);
}
while (!checkNodes(ImmutableSet.of(node1, node2), coordNodes)) {
Thread.sleep(100);
}
while (!checkNodes(ImmutableSet.of(node3, node4), overlordNodes)) {
Thread.sleep(100);
}
druidNodeAnnouncer.unannounce(node1);
druidNodeAnnouncer.unannounce(node2);
druidNodeAnnouncer.unannounce(node3);
druidNodeAnnouncer.unannounce(node4);
while (!checkNodes(ImmutableSet.of(), coordDiscovery.getAllNodes())) {
Thread.sleep(100);
}
while (!checkNodes(ImmutableSet.of(), overlordDiscovery.getAllNodes())) {
Thread.sleep(100);
}
while (!coordNodes.isEmpty()) {
Thread.sleep(100);
}
while (!overlordNodes.isEmpty()) {
Thread.sleep(100);
}
druidNodeDiscoveryProvider.stop();
announcer.stop();
}
private boolean checkNodes(Set<DiscoveryDruidNode> expected, Collection<DiscoveryDruidNode> actual)
{
return expected.equals(ImmutableSet.copyOf(actual));
}
@After
public void tearDown()
{
tearDownServerAndCurator();
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.TestHelper;
import io.druid.server.coordination.ServerType;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class DataNodeServiceTest
{
@Test
public void testSerde() throws Exception
{
DruidService expected = new DataNodeService(
"tier",
100,
ServerType.HISTORICAL,
1
);
ObjectMapper mapper = TestHelper.getJsonMapper();
DruidService actual = mapper.readValue(
mapper.writeValueAsString(expected),
DruidService.class
);
Assert.assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,218 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.server.DruidNode;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ServerConfig;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
*/
public class DruidNodeDiscoveryProviderTest
{
@Test
public void testGetForService()
{
TestDruidNodeDiscoveryProvider provider = new TestDruidNodeDiscoveryProvider();
DruidNodeDiscovery dataNodeDiscovery = provider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY);
Set<DiscoveryDruidNode> dataNodes = new HashSet<>();
dataNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
@Override
public void nodeAdded(DiscoveryDruidNode node)
{
dataNodes.add(node);
}
@Override
public void nodeRemoved(DiscoveryDruidNode node)
{
dataNodes.remove(node);
}
}
);
DruidNodeDiscovery lookupNodeDiscovery = provider.getForService(LookupNodeService.DISCOVERY_SERVICE_KEY);
Set<DiscoveryDruidNode> lookupNodes = new HashSet<>();
lookupNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
@Override
public void nodeAdded(DiscoveryDruidNode node)
{
lookupNodes.add(node);
}
@Override
public void nodeRemoved(DiscoveryDruidNode node)
{
lookupNodes.remove(node);
}
}
);
Assert.assertTrue(dataNodes.isEmpty());
Assert.assertTrue(dataNodes.isEmpty());
Assert.assertTrue(dataNodeDiscovery.getAllNodes().isEmpty());
Assert.assertTrue(lookupNodes.isEmpty());
Assert.assertTrue(lookupNodeDiscovery.getAllNodes().isEmpty());
DiscoveryDruidNode node1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0),
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
DiscoveryDruidNode node3 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node4 = new DiscoveryDruidNode(
new DruidNode("s4", "h4", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0),
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node5 = new DiscoveryDruidNode(
new DruidNode("s5", "h5", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
DiscoveryDruidNode node6 = new DiscoveryDruidNode(
new DruidNode("s6", "h6", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node7 = new DiscoveryDruidNode(
new DruidNode("s7", "h7", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node7Clone = new DiscoveryDruidNode(
new DruidNode("s7", "h7", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node8 = new DiscoveryDruidNode(
new DruidNode("s8", "h8", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
ImmutableMap.of()
);
provider.add(node1);
provider.add(node2);
provider.add(node3);
provider.add(node4);
provider.add(node5);
provider.add(node6);
provider.add(node7);
provider.add(node7Clone);
provider.add(node8);
Assert.assertEquals(ImmutableSet.of(node1, node2, node4, node5), ImmutableSet.copyOf(dataNodeDiscovery.getAllNodes()));
Assert.assertEquals(ImmutableSet.of(node1, node2, node4, node5), dataNodes);
Assert.assertEquals(ImmutableSet.of(node1, node3, node4, node6, node7), ImmutableSet.copyOf(lookupNodeDiscovery.getAllNodes()));
Assert.assertEquals(ImmutableSet.of(node1, node3, node4, node6, node7), lookupNodes);
provider.remove(node8);
provider.remove(node7Clone);
provider.remove(node6);
provider.remove(node5);
provider.remove(node4);
Assert.assertEquals(ImmutableSet.of(node1, node2), ImmutableSet.copyOf(dataNodeDiscovery.getAllNodes()));
Assert.assertEquals(ImmutableSet.of(node1, node2), dataNodes);
Assert.assertEquals(ImmutableSet.of(node1, node3), ImmutableSet.copyOf(lookupNodeDiscovery.getAllNodes()));
Assert.assertEquals(ImmutableSet.of(node1, node3), lookupNodes);
}
private static class TestDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
private List<DruidNodeDiscovery.Listener> listeners = new ArrayList<>();
@Override
public DruidNodeDiscovery getForNodeType(String nodeType)
{
return new DruidNodeDiscovery()
{
@Override
public Set<DiscoveryDruidNode> getAllNodes()
{
throw new UnsupportedOperationException();
}
@Override
public void registerListener(Listener listener)
{
TestDruidNodeDiscoveryProvider.this.listeners.add(listener);
}
};
}
void add(DiscoveryDruidNode node)
{
for (DruidNodeDiscovery.Listener listener : listeners) {
listener.nodeAdded(node);
}
}
void remove(DiscoveryDruidNode node)
{
for (DruidNodeDiscovery.Listener listener : listeners) {
listener.nodeRemoved(node);
}
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class LookupNodeServiceTest
{
@Test
public void testSerde() throws Exception
{
DruidService expected = new LookupNodeService(
"tier"
);
ObjectMapper mapper = TestHelper.getJsonMapper();
DruidService actual = mapper.readValue(
mapper.writeValueAsString(expected),
DruidService.class
);
Assert.assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class WorkerNodeServiceTest
{
@Test
public void testSerde() throws Exception
{
DruidService expected = new WorkerNodeService(
"1.1.1.1",
100,
"v1"
);
ObjectMapper mapper = TestHelper.getJsonMapper();
DruidService actual = mapper.readValue(
mapper.writeValueAsString(expected),
DruidService.class
);
Assert.assertEquals(expected, actual);
}
}

View File

@ -21,6 +21,7 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
@ -33,6 +34,8 @@ import io.druid.client.cache.CacheMonitor;
import io.druid.client.selector.CustomTierSelectorStrategyConfig;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.client.selector.TierSelectorStrategy;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.CacheModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.Jerseys;
@ -120,6 +123,14 @@ public class CliBroker extends ServerRunnable
MetricsModule.register(binder, CacheMonitor.class);
LifecycleModule.register(binder, Server.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableList.of(LookupNodeService.class)
)
).in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
}
},
new LookupModule(),

View File

@ -21,16 +21,19 @@ package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
import io.druid.audit.AuditManager;
import io.druid.client.CoordinatorServerView;
import io.druid.client.coordinator.Coordinator;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.ConditionalMultibind;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
@ -58,6 +61,7 @@ import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.server.coordinator.helper.DruidCoordinatorVersionConverter;
import io.druid.server.http.ClusterResource;
import io.druid.server.http.CoordinatorDynamicConfigsResource;
import io.druid.server.http.CoordinatorRedirectInfo;
import io.druid.server.http.CoordinatorResource;
@ -182,6 +186,7 @@ public class CliCoordinator extends ServerRunnable
Jerseys.addResource(binder, MetadataResource.class);
Jerseys.addResource(binder, IntervalsResource.class);
Jerseys.addResource(binder, LookupCoordinatorResource.class);
Jerseys.addResource(binder, ClusterResource.class);
LifecycleModule.register(binder, Server.class);
LifecycleModule.register(binder, DatasourcesResource.class);
@ -204,6 +209,14 @@ public class CliCoordinator extends ServerRunnable
Predicates.equalTo("true"),
DruidCoordinatorSegmentKiller.class
);
binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(Coordinator.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
ImmutableList.of()
)
).in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class));
}
@Provides

View File

@ -21,11 +21,15 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CacheMonitor;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.CacheModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.Jerseys;
@ -103,6 +107,14 @@ public class CliHistorical extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class);
binder.install(new CacheModule());
MetricsModule.register(binder, CacheMonitor.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
ImmutableList.of(DataNodeService.class, LookupNodeService.class)
)
).in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
}
},
new LookupModule()

View File

@ -21,12 +21,14 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.WorkerNodeService;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.IndexingServiceTaskLogsModule;
@ -98,6 +100,14 @@ public class CliMiddleManager extends ServerRunnable
Jerseys.addResource(binder, WorkerResource.class);
LifecycleModule.register(binder, Server.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
ImmutableList.of(WorkerNodeService.class)
)
).in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
}
@Provides
@ -112,6 +122,17 @@ public class CliMiddleManager extends ServerRunnable
config.getVersion()
);
}
@Provides
@LazySingleton
public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig)
{
return new WorkerNodeService(
workerConfig.getIp(),
workerConfig.getCapacity(),
workerConfig.getVersion()
);
}
},
new IndexingServiceFirehoseModule(),
new IndexingServiceTaskLogsModule()

View File

@ -32,7 +32,9 @@ import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import io.druid.audit.AuditManager;
import io.druid.client.indexing.IndexingService;
import io.druid.client.indexing.IndexingServiceSelectorConfig;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.IndexingServiceTaskLogsModule;
@ -182,6 +184,14 @@ public class CliOverlord extends ServerRunnable
if (standalone) {
LifecycleModule.register(binder, Server.class);
}
binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(IndexingService.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
ImmutableList.of()
)
).in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, IndexingService.class));
}
private void configureTaskStorage(Binder binder)

View File

@ -21,6 +21,7 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
@ -29,6 +30,7 @@ import io.airlift.airline.Command;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
@ -109,6 +111,14 @@ public class CliRouter extends ServerRunnable
LifecycleModule.register(binder, RouterResource.class);
LifecycleModule.register(binder, Server.class);
DiscoveryModule.register(binder, Self.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER,
ImmutableList.of()
)
).in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
}
@Provides

View File

@ -20,10 +20,19 @@
package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Provider;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.DruidService;
import io.druid.guice.annotations.Self;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.DruidNode;
import java.util.List;
/**
*/
@ -47,4 +56,69 @@ public abstract class ServerRunnable extends GuiceRunnable
throw Throwables.propagate(e);
}
}
/**
* This is a helper class used by CliXXX classes to announce DiscoveryDruidNode
* as part of lifecycle Stage.LAST .
*/
protected static class DiscoverySideEffectsProvider implements Provider<DiscoverySideEffectsProvider.Child>
{
public static class Child {}
@Inject @Self
private DruidNode druidNode;
@Inject
private DruidNodeAnnouncer announcer;
@Inject
private Lifecycle lifecycle;
@Inject
private Injector injector;
private final String nodeType;
private final List<Class<? extends DruidService>> serviceClasses;
public DiscoverySideEffectsProvider(String nodeType, List<Class<? extends DruidService>> serviceClasses)
{
this.nodeType = nodeType;
this.serviceClasses = serviceClasses;
}
@Override
public Child get()
{
ImmutableMap.Builder<String, DruidService> builder = new ImmutableMap.Builder<>();
for (Class<? extends DruidService> clazz : serviceClasses) {
DruidService service = injector.getInstance(clazz);
builder.put(service.getName(), service);
}
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode,
nodeType,
builder.build()
);
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
announcer.announce(discoveryDruidNode);
}
@Override
public void stop()
{
announcer.unannounce(discoveryDruidNode);
}
},
Lifecycle.Stage.LAST
);
return new Child();
}
}
}