allow registrants to opt out of announcing themselves when registering as a chat handler (#3360)

This commit is contained in:
David Lim 2016-08-15 23:21:28 -06:00 committed by Nishant
parent 362b9266f8
commit ed924bf214
5 changed files with 203 additions and 16 deletions

View File

@ -241,7 +241,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
if (chatHandlerProvider.isPresent()) { if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(getId(), this); chatHandlerProvider.get().register(getId(), this, false);
} else { } else {
log.warn("No chat handler detected"); log.warn("No chat handler detected");
} }
@ -533,6 +533,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
log.info("The task was asked to stop before completing"); log.info("The task was asked to stop before completing");
} }
finally {
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(getId());
}
}
return success(); return success();
} }

View File

@ -25,9 +25,37 @@ import com.google.common.base.Optional;
*/ */
public interface ChatHandlerProvider public interface ChatHandlerProvider
{ {
public void register(final String key, ChatHandler handler); /**
* Registers a chat handler which provides an API for others to talk to objects in the indexing service. Depending
* on the implementation, this method may also announce this node so that it can be discovered by other services.
*
* @param key a unique name identifying this service
* @param handler instance which implements the API to be exposed
*/
void register(final String key, ChatHandler handler);
public void unregister(final String key); /**
* Registers a chat handler which provides an API for others to talk to objects in the indexing service. Setting
* announce to false instructs the implementation to only register the handler to expose the API and skip any
* discovery announcements that might have been broadcast.
*
* @param key a unique name identifying this service
* @param handler instance which implements the API to be exposed
* @param announce for implementations that have a service discovery mechanism, whether this node should be announced
*/
void register(final String key, ChatHandler handler, boolean announce);
public Optional<ChatHandler> get(final String key); /**
* Unregisters a chat handler.
*
* @param key the name of the service
*/
void unregister(final String key);
/**
* Retrieves a chat handler.
*
* @param key the name of the service
*/
Optional<ChatHandler> get(final String key);
} }

View File

@ -31,6 +31,12 @@ public class NoopChatHandlerProvider implements ChatHandlerProvider
// do nothing // do nothing
} }
@Override
public void register(String key, ChatHandler handler, boolean announce)
{
// do nothing
}
@Override @Override
public void unregister(String key) public void unregister(String key)
{ {

View File

@ -29,6 +29,7 @@ import io.druid.server.DruidNode;
import io.druid.guice.annotations.RemoteChatHandler; import io.druid.guice.annotations.RemoteChatHandler;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
/** /**
* Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method * Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method
@ -42,6 +43,7 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider
private final DruidNode node; private final DruidNode node;
private final ServiceAnnouncer serviceAnnouncer; private final ServiceAnnouncer serviceAnnouncer;
private final ConcurrentMap<String, ChatHandler> handlers; private final ConcurrentMap<String, ChatHandler> handlers;
private final ConcurrentSkipListSet<String> announcements;
@Inject @Inject
public ServiceAnnouncingChatHandlerProvider( public ServiceAnnouncingChatHandlerProvider(
@ -52,26 +54,38 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider
this.node = node; this.node = node;
this.serviceAnnouncer = serviceAnnouncer; this.serviceAnnouncer = serviceAnnouncer;
this.handlers = Maps.newConcurrentMap(); this.handlers = Maps.newConcurrentMap();
this.announcements = new ConcurrentSkipListSet<>();
} }
@Override @Override
public void register(final String service, ChatHandler handler) public void register(final String service, ChatHandler handler)
{ {
final DruidNode node = makeDruidNode(service); register(service, handler, true);
}
@Override
public void register(final String service, ChatHandler handler, boolean announce)
{
log.info("Registering Eventhandler[%s]", service); log.info("Registering Eventhandler[%s]", service);
if (handlers.putIfAbsent(service, handler) != null) { if (handlers.putIfAbsent(service, handler) != null) {
throw new ISE("handler already registered for service[%s]", service); throw new ISE("handler already registered for service[%s]", service);
} }
if (announce)
{
try { try {
serviceAnnouncer.announce(node); serviceAnnouncer.announce(makeDruidNode(service));
if (!announcements.add(service)) {
throw new ISE("announcements already has an entry for service[%s]", service);
}
} }
catch (Exception e) { catch (Exception e) {
log.warn(e, "Failed to register service[%s]", service); log.warn(e, "Failed to register service[%s]", service);
handlers.remove(service, handler); handlers.remove(service, handler);
} }
} }
}
@Override @Override
public void unregister(final String service) public void unregister(final String service)
@ -81,8 +95,11 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider
final ChatHandler handler = handlers.get(service); final ChatHandler handler = handlers.get(service);
if (handler == null) { if (handler == null) {
log.warn("handler[%s] not currently registered, ignoring.", service); log.warn("handler[%s] not currently registered, ignoring.", service);
return;
} }
if (announcements.contains(service))
{
try { try {
serviceAnnouncer.unannounce(makeDruidNode(service)); serviceAnnouncer.unannounce(makeDruidNode(service));
} }
@ -90,6 +107,9 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider
log.warn(e, "Failed to unregister service[%s]", service); log.warn(e, "Failed to unregister service[%s]", service);
} }
announcements.remove(service);
}
handlers.remove(service, handler); handlers.remove(service, handler);
} }

View File

@ -0,0 +1,128 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.firehose;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.server.DruidNode;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.IOException;
@RunWith(EasyMockRunner.class)
public class ServiceAnnouncingChatHandlerProviderTest extends EasyMockSupport
{
private class TestChatHandler implements ChatHandler {}
private static final String TEST_SERVICE_NAME = "test-service-name";
private static final String TEST_HOST = "test-host";
private static final int TEST_PORT = 1234;
private ServiceAnnouncingChatHandlerProvider chatHandlerProvider;
@Mock
private DruidNode node;
@Mock
private ServiceAnnouncer serviceAnnouncer;
@Before
public void setUp() throws Exception
{
chatHandlerProvider = new ServiceAnnouncingChatHandlerProvider(node, serviceAnnouncer);
}
@Test
public void testRegistrationDefault() throws IOException
{
testRegistrationWithAnnounce(false);
}
@Test
public void testRegistrationWithAnnounce() throws IOException
{
testRegistrationWithAnnounce(true);
}
@Test
public void testRegistrationWithoutAnnounce() throws IOException
{
ChatHandler testChatHandler = new TestChatHandler();
Assert.assertFalse("bad initial state", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent());
chatHandlerProvider.register(TEST_SERVICE_NAME, testChatHandler, false);
Assert.assertTrue("chatHandler did not register", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent());
Assert.assertEquals(testChatHandler, chatHandlerProvider.get(TEST_SERVICE_NAME).get());
chatHandlerProvider.unregister(TEST_SERVICE_NAME);
Assert.assertFalse("chatHandler did not deregister", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent());
}
private void testRegistrationWithAnnounce(boolean useThreeArgConstructor) throws IOException
{
ChatHandler testChatHandler = new TestChatHandler();
Capture<DruidNode> captured = Capture.newInstance();
EasyMock.expect(node.getHost()).andReturn(TEST_HOST);
EasyMock.expect(node.getPort()).andReturn(TEST_PORT);
serviceAnnouncer.announce(EasyMock.capture(captured));
replayAll();
Assert.assertFalse("bad initial state", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent());
if (useThreeArgConstructor) {
chatHandlerProvider.register(TEST_SERVICE_NAME, testChatHandler, true);
} else {
chatHandlerProvider.register(TEST_SERVICE_NAME, testChatHandler);
}
verifyAll();
DruidNode param = captured.getValues().get(0);
Assert.assertEquals(TEST_SERVICE_NAME, param.getServiceName());
Assert.assertEquals(TEST_HOST, param.getHost());
Assert.assertEquals(TEST_PORT, param.getPort());
Assert.assertTrue("chatHandler did not register", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent());
Assert.assertEquals(testChatHandler, chatHandlerProvider.get(TEST_SERVICE_NAME).get());
captured.reset();
resetAll();
EasyMock.expect(node.getHost()).andReturn(TEST_HOST);
EasyMock.expect(node.getPort()).andReturn(TEST_PORT);
serviceAnnouncer.unannounce(EasyMock.capture(captured));
replayAll();
chatHandlerProvider.unregister(TEST_SERVICE_NAME);
verifyAll();
param = captured.getValues().get(0);
Assert.assertEquals(TEST_SERVICE_NAME, param.getServiceName());
Assert.assertEquals(TEST_HOST, param.getHost());
Assert.assertEquals(TEST_PORT, param.getPort());
Assert.assertFalse("chatHandler did not deregister", chatHandlerProvider.get(TEST_SERVICE_NAME).isPresent());
}
}