From ed924bf214a373a0ceae4e078157c8b8e7f674b5 Mon Sep 17 00:00:00 2001 From: David Lim Date: Mon, 15 Aug 2016 23:21:28 -0600 Subject: [PATCH] allow registrants to opt out of announcing themselves when registering as a chat handler (#3360) --- .../druid/indexing/kafka/KafkaIndexTask.java | 7 +- .../firehose/ChatHandlerProvider.java | 34 ++++- .../firehose/NoopChatHandlerProvider.java | 6 + .../ServiceAnnouncingChatHandlerProvider.java | 44 ++++-- ...viceAnnouncingChatHandlerProviderTest.java | 128 ++++++++++++++++++ 5 files changed, 203 insertions(+), 16 deletions(-) create mode 100644 server/src/test/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProviderTest.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 3639bebd62a..5462fbdc889 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -241,7 +241,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler if (chatHandlerProvider.isPresent()) { log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); - chatHandlerProvider.get().register(getId(), this); + chatHandlerProvider.get().register(getId(), this, false); } else { log.warn("No chat handler detected"); } @@ -533,6 +533,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler log.info("The task was asked to stop before completing"); } + finally { + if (chatHandlerProvider.isPresent()) { + chatHandlerProvider.get().unregister(getId()); + } + } return success(); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerProvider.java b/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerProvider.java index 19214837932..06dce5838fa 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerProvider.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ChatHandlerProvider.java @@ -25,9 +25,37 @@ import com.google.common.base.Optional; */ public interface ChatHandlerProvider { - public void register(final String key, ChatHandler handler); + /** + * Registers a chat handler which provides an API for others to talk to objects in the indexing service. Depending + * on the implementation, this method may also announce this node so that it can be discovered by other services. + * + * @param key a unique name identifying this service + * @param handler instance which implements the API to be exposed + */ + void register(final String key, ChatHandler handler); - public void unregister(final String key); + /** + * Registers a chat handler which provides an API for others to talk to objects in the indexing service. Setting + * announce to false instructs the implementation to only register the handler to expose the API and skip any + * discovery announcements that might have been broadcast. + * + * @param key a unique name identifying this service + * @param handler instance which implements the API to be exposed + * @param announce for implementations that have a service discovery mechanism, whether this node should be announced + */ + void register(final String key, ChatHandler handler, boolean announce); - public Optional get(final String key); + /** + * Unregisters a chat handler. + * + * @param key the name of the service + */ + void unregister(final String key); + + /** + * Retrieves a chat handler. + * + * @param key the name of the service + */ + Optional get(final String key); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/NoopChatHandlerProvider.java b/server/src/main/java/io/druid/segment/realtime/firehose/NoopChatHandlerProvider.java index 686b3d18ff4..7ca01a116f1 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/NoopChatHandlerProvider.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/NoopChatHandlerProvider.java @@ -31,6 +31,12 @@ public class NoopChatHandlerProvider implements ChatHandlerProvider // do nothing } + @Override + public void register(String key, ChatHandler handler, boolean announce) + { + // do nothing + } + @Override public void unregister(String key) { diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java b/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java index a10c828e413..f01698db54d 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java @@ -29,6 +29,7 @@ import io.druid.server.DruidNode; import io.druid.guice.annotations.RemoteChatHandler; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; /** * Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method @@ -42,6 +43,7 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider private final DruidNode node; private final ServiceAnnouncer serviceAnnouncer; private final ConcurrentMap handlers; + private final ConcurrentSkipListSet announcements; @Inject public ServiceAnnouncingChatHandlerProvider( @@ -52,24 +54,36 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider this.node = node; this.serviceAnnouncer = serviceAnnouncer; this.handlers = Maps.newConcurrentMap(); + this.announcements = new ConcurrentSkipListSet<>(); } @Override public void register(final String service, ChatHandler handler) { - final DruidNode node = makeDruidNode(service); + register(service, handler, true); + } + + @Override + public void register(final String service, ChatHandler handler, boolean announce) + { log.info("Registering Eventhandler[%s]", service); if (handlers.putIfAbsent(service, handler) != null) { throw new ISE("handler already registered for service[%s]", service); } - try { - serviceAnnouncer.announce(node); - } - catch (Exception e) { - log.warn(e, "Failed to register service[%s]", service); - handlers.remove(service, handler); + if (announce) + { + try { + serviceAnnouncer.announce(makeDruidNode(service)); + if (!announcements.add(service)) { + throw new ISE("announcements already has an entry for service[%s]", service); + } + } + catch (Exception e) { + log.warn(e, "Failed to register service[%s]", service); + handlers.remove(service, handler); + } } } @@ -81,13 +95,19 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider final ChatHandler handler = handlers.get(service); if (handler == null) { log.warn("handler[%s] not currently registered, ignoring.", service); + return; } - try { - serviceAnnouncer.unannounce(makeDruidNode(service)); - } - catch (Exception e) { - log.warn(e, "Failed to unregister service[%s]", service); + if (announcements.contains(service)) + { + try { + serviceAnnouncer.unannounce(makeDruidNode(service)); + } + catch (Exception e) { + log.warn(e, "Failed to unregister service[%s]", service); + } + + announcements.remove(service); } handlers.remove(service, handler); diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProviderTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProviderTest.java new file mode 100644 index 00000000000..62c2d4defb6 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProviderTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.firehose; + +import io.druid.curator.discovery.ServiceAnnouncer; +import io.druid.server.DruidNode; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.IOException; + +@RunWith(EasyMockRunner.class) +public class ServiceAnnouncingChatHandlerProviderTest extends EasyMockSupport +{ + private class TestChatHandler implements ChatHandler {} + + private static final String TEST_SERVICE_NAME = "test-service-name"; + private static final String TEST_HOST = "test-host"; + private static final int TEST_PORT = 1234; + + private ServiceAnnouncingChatHandlerProvider chatHandlerProvider; + + @Mock + private DruidNode node; + + @Mock + private ServiceAnnouncer serviceAnnouncer; + + @Before + public void setUp() throws Exception + { + chatHandlerProvider = new ServiceAnnouncingChatHandlerProvider(node, serviceAnnouncer); + } + + @Test + public void testRegistrationDefault() throws IOException + { + testRegistrationWithAnnounce(false); + } + + @Test + public void testRegistrationWithAnnounce() throws IOException + { + testRegistrationWithAnnounce(true); + } + + @Test + public void testRegistrationWithoutAnnounce() throws IOException + { + ChatHandler testChatHandler = new TestChatHandler(); + + Assert.assertFalse("bad initial state", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent()); + + chatHandlerProvider.register(TEST_SERVICE_NAME, testChatHandler, false); + Assert.assertTrue("chatHandler did not register", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent()); + Assert.assertEquals(testChatHandler, chatHandlerProvider.get(TEST_SERVICE_NAME).get()); + + chatHandlerProvider.unregister(TEST_SERVICE_NAME); + Assert.assertFalse("chatHandler did not deregister", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent()); + } + + private void testRegistrationWithAnnounce(boolean useThreeArgConstructor) throws IOException + { + ChatHandler testChatHandler = new TestChatHandler(); + Capture captured = Capture.newInstance(); + + EasyMock.expect(node.getHost()).andReturn(TEST_HOST); + EasyMock.expect(node.getPort()).andReturn(TEST_PORT); + serviceAnnouncer.announce(EasyMock.capture(captured)); + replayAll(); + + Assert.assertFalse("bad initial state", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent()); + + if (useThreeArgConstructor) { + chatHandlerProvider.register(TEST_SERVICE_NAME, testChatHandler, true); + } else { + chatHandlerProvider.register(TEST_SERVICE_NAME, testChatHandler); + } + verifyAll(); + + DruidNode param = captured.getValues().get(0); + Assert.assertEquals(TEST_SERVICE_NAME, param.getServiceName()); + Assert.assertEquals(TEST_HOST, param.getHost()); + Assert.assertEquals(TEST_PORT, param.getPort()); + Assert.assertTrue("chatHandler did not register", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent()); + Assert.assertEquals(testChatHandler, chatHandlerProvider.get(TEST_SERVICE_NAME).get()); + + captured.reset(); + resetAll(); + EasyMock.expect(node.getHost()).andReturn(TEST_HOST); + EasyMock.expect(node.getPort()).andReturn(TEST_PORT); + serviceAnnouncer.unannounce(EasyMock.capture(captured)); + replayAll(); + + chatHandlerProvider.unregister(TEST_SERVICE_NAME); + verifyAll(); + + param = captured.getValues().get(0); + Assert.assertEquals(TEST_SERVICE_NAME, param.getServiceName()); + Assert.assertEquals(TEST_HOST, param.getHost()); + Assert.assertEquals(TEST_PORT, param.getPort()); + Assert.assertFalse("chatHandler did not deregister", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent()); + } +}