diff --git a/docs/querying/lookups.md b/docs/querying/lookups.md index e70bb329917..bd35493d0b3 100644 --- a/docs/querying/lookups.md +++ b/docs/querying/lookups.md @@ -326,7 +326,7 @@ These end points can be used to get the propagation status of configured lookups ### List lookup state of all processes -`GET /druid/coordinator/v1/lookups/nodeStatus` with optional query parameter `discover` to discover tiers from zookeeper or configured lookup tiers are listed. +`GET /druid/coordinator/v1/lookups/nodeStatus` with optional query parameter `discover` to discover tiers or configured lookup tiers are listed. ### List lookup state of processes in a tier @@ -383,7 +383,7 @@ The return value will be the json representation of the factory. See [Lookups Dynamic Configuration](../configuration/index.md#lookups-dynamic-configuration) for Coordinator configuration. -To configure a Broker / Router / Historical / Peon to announce itself as part of a lookup tier, use the `druid.zk.paths.lookupTier` property. +To configure a Broker / Router / Historical / Peon to announce itself as part of a lookup tier, use following properties. |Property | Description | Default | |---------|-------------|---------| diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java index 73b69f79d2f..74319d374a7 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java @@ -24,11 +24,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.server.initialization.ZkPathsConfig; -import org.apache.druid.server.listener.announcer.ListeningAnnouncerConfig; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; -class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig +class LookupListeningAnnouncerConfig { public static final String DEFAULT_TIER = "__default"; private final DataSourceTaskIdHolder dataSourceTaskIdHolder; @@ -39,11 +37,9 @@ class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig @JsonCreator public LookupListeningAnnouncerConfig( - @JacksonInject ZkPathsConfig zkPathsConfig, @JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder ) { - super(zkPathsConfig); this.dataSourceTaskIdHolder = dataSourceTaskIdHolder; } @@ -61,9 +57,4 @@ class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE ); } - - public String getLookupKey() - { - return LookupModule.getTierListenerPath(getLookupTier()); - } } diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java b/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java index 1459663d93f..c3e4c8e7e8e 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java @@ -25,14 +25,12 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; -import org.apache.curator.utils.ZKPaths; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; -import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.initialization.DruidModule; import org.apache.druid.query.dimension.LookupDimensionSpec; import org.apache.druid.query.expression.LookupExprMacro; @@ -48,11 +46,6 @@ public class LookupModule implements DruidModule public static final String FAILED_UPDATES_KEY = "failedUpdates"; public static final int LOOKUP_LISTENER_QOS_MAX_REQUESTS = 2; - public static String getTierListenerPath(String tier) - { - return ZKPaths.makePath(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY, tier); - } - @Override public List getJacksonModules() { @@ -75,9 +68,6 @@ public class LookupModule implements DruidModule Jerseys.addResource(binder, LookupListeningResource.class); Jerseys.addResource(binder, LookupIntrospectionResource.class); ExpressionModule.addExprMacro(binder, LookupExprMacro.class); - LifecycleModule.register(binder, LookupResourceListenerAnnouncer.class); - // Nothing else starts this, so we bind it to get it to start - binder.bind(LookupResourceListenerAnnouncer.class).in(ManageLifecycle.class); JettyBindings.addQosFilter( binder, ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY, diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java b/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java deleted file mode 100644 index d58e2b69d7b..00000000000 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.lookup; - -import com.google.inject.Inject; -import org.apache.druid.curator.announcement.Announcer; -import org.apache.druid.guice.annotations.Self; -import org.apache.druid.server.DruidNode; -import org.apache.druid.server.http.HostAndPortWithScheme; -import org.apache.druid.server.listener.announcer.ListenerResourceAnnouncer; - -@Deprecated -class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer -{ - @Inject - public LookupResourceListenerAnnouncer( - Announcer announcer, - LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig, - @Self DruidNode node - ) - { - super( - announcer, - lookupListeningAnnouncerConfig, - lookupListeningAnnouncerConfig.getLookupKey(), - HostAndPortWithScheme.fromString(node.getServiceScheme(), node.getHostAndPortToUse()) - ); - } -} diff --git a/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java b/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java deleted file mode 100644 index 04d51df0b41..00000000000 --- a/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.listener.announcer; - -import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.announcement.Announcer; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.server.http.HostAndPortWithScheme; - -import java.nio.ByteBuffer; - -/** - * Starting 0.11.0 Coordinator uses announcements made by {@link org.apache.druid.discovery.DruidNodeAnnouncer} . - */ -@Deprecated -public abstract class ListenerResourceAnnouncer -{ - private static final byte[] ANNOUNCE_BYTES = ByteBuffer - .allocate(Long.BYTES) - .putLong(System.currentTimeMillis()) - .array(); - private static final Logger LOG = new Logger(ListenerResourceAnnouncer.class); - private final Object startStopSync = new Object(); - private volatile boolean started = false; - private final Announcer announcer; - private final String announcePath; - - public ListenerResourceAnnouncer( - Announcer announcer, - ListeningAnnouncerConfig listeningAnnouncerConfig, - String listener_key, - HostAndPortWithScheme node - ) - { - this( - announcer, - ZKPaths.makePath(listeningAnnouncerConfig.getListenersPath(), listener_key), - node - ); - } - - ListenerResourceAnnouncer( - Announcer announcer, - String announceBasePath, - HostAndPortWithScheme node - ) - { - this.announcePath = ZKPaths.makePath(announceBasePath, node.toString()); - this.announcer = announcer; - } - - @LifecycleStart - public void start() - { - synchronized (startStopSync) { - if (started) { - LOG.debug("Already started, ignoring"); - return; - } - try { - // Announcement is based on MS. This is to make sure we don't collide on announcements - Thread.sleep(2); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - announcer.announce(announcePath, ANNOUNCE_BYTES); - LOG.info("Announcing start time on [%s]", announcePath); - started = true; - } - } - - @LifecycleStop - public void stop() - { - synchronized (startStopSync) { - if (!started) { - LOG.debug("Already stopped, ignoring"); - return; - } - announcer.unannounce(announcePath); - LOG.info("Unannouncing start time on [%s]", announcePath); - started = false; - } - } - - public byte[] getAnnounceBytes() - { - return ByteBuffer.allocate(ANNOUNCE_BYTES.length).put(ANNOUNCE_BYTES).array(); - } -} diff --git a/server/src/main/java/org/apache/druid/server/listener/announcer/ListeningAnnouncerConfig.java b/server/src/main/java/org/apache/druid/server/listener/announcer/ListeningAnnouncerConfig.java deleted file mode 100644 index 4f81c406ddb..00000000000 --- a/server/src/main/java/org/apache/druid/server/listener/announcer/ListeningAnnouncerConfig.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.listener.announcer; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import org.apache.curator.utils.ZKPaths; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.server.initialization.ZkPathsConfig; - -/** - * Even though we provide the mechanism to get zk paths here, we do NOT handle announcing and unannouncing in this module. - * The reason is that it is not appropriate to force a global announce/unannounce since individual listeners may have - * different lifecycles. - */ -public class ListeningAnnouncerConfig -{ - @JacksonInject - private final ZkPathsConfig zkPathsConfig; - @JsonProperty("listenersPath") - private String listenersPath = null; - - @Inject - public ListeningAnnouncerConfig( - ZkPathsConfig zkPathsConfig - ) - { - this.zkPathsConfig = zkPathsConfig; - } - - @JsonProperty("listenersPath") - public String getListenersPath() - { - return listenersPath == null ? zkPathsConfig.defaultPath("listeners") : listenersPath; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ListeningAnnouncerConfig that = (ListeningAnnouncerConfig) o; - - return !(listenersPath != null ? !listenersPath.equals(that.listenersPath) : that.listenersPath != null); - - } - - @Override - public int hashCode() - { - return listenersPath != null ? listenersPath.hashCode() : 0; - } - - @Override - public String toString() - { - return "ListeningAnnouncerConfig{" + - "listenersPath='" + getListenersPath() + '\'' + - '}'; - } - - /** - * Build a path for the particular named listener. The first implementation of this is used with zookeeper, but - * there is nothing restricting its use in a more general pathing (example: http endpoint proxy for raft) - * @param listenerName The key for the listener. - * @return A path appropriate for use in zookeeper to discover the listeners with the particular listener name - */ - public String getAnnouncementPath(String listenerName) - { - return ZKPaths.makePath( - getListenersPath(), - Preconditions.checkNotNull( - StringUtils.emptyToNullNonDruidDataString(listenerName), "Listener name cannot be null" - ) - ); - } -} diff --git a/server/src/main/java/org/apache/druid/server/listener/resource/AbstractListenerHandler.java b/server/src/main/java/org/apache/druid/server/listener/resource/AbstractListenerHandler.java index c2b9d094d79..d6b608ad0a1 100644 --- a/server/src/main/java/org/apache/druid/server/listener/resource/AbstractListenerHandler.java +++ b/server/src/main/java/org/apache/druid/server/listener/resource/AbstractListenerHandler.java @@ -205,9 +205,7 @@ public abstract class AbstractListenerHandler implements ListenerHandle * @param inputObject A list of the objects which were POSTed * * @return An object to be returned in the entity of the response. - * - * @throws Exception */ @Nullable - public abstract Object post(Map inputObject) throws Exception; + public abstract Object post(Map inputObject); } diff --git a/server/src/main/java/org/apache/druid/server/listener/resource/ListenerResource.java b/server/src/main/java/org/apache/druid/server/listener/resource/ListenerResource.java index 7e28f08caca..569f20da775 100644 --- a/server/src/main/java/org/apache/druid/server/listener/resource/ListenerResource.java +++ b/server/src/main/java/org/apache/druid/server/listener/resource/ListenerResource.java @@ -53,9 +53,6 @@ import java.io.InputStream; * * Items tagged with a particular ID for an announcement listener are updated by a POST to the announcement listener's * path "/{announcement}" - * - * Discovery of who can listen to particular announcement keys is not part of this class and should be handled - * by ListenerResourceAnnouncer */ public abstract class ListenerResource { diff --git a/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java b/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java deleted file mode 100644 index 96e2a18bde7..00000000000 --- a/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.listener.announcer; - -import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.curator.announcement.Announcer; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.segment.CloserRule; -import org.apache.druid.server.http.HostAndPortWithScheme; -import org.apache.druid.server.initialization.ZkPathsConfig; -import org.easymock.EasyMock; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import java.io.Closeable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -public class ListenerResourceAnnouncerTest extends CuratorTestBase -{ - private final ListeningAnnouncerConfig listeningAnnouncerConfig = new ListeningAnnouncerConfig(new ZkPathsConfig()); - private final String listenerKey = "someKey"; - private final String announcePath = listeningAnnouncerConfig.getAnnouncementPath(listenerKey); - @Rule - public CloserRule closerRule = new CloserRule(true); - private ExecutorService executorService; - - @Before - public void setUp() - { - executorService = Execs.singleThreaded("listener-resource--%d"); - } - - @After - public void tearDown() - { - executorService.shutdownNow(); - } - - @Test - public void testAnnouncerBehaves() throws Exception - { - setupServerAndCurator(); - closerRule.closeLater(server); - curator.start(); - closerRule.closeLater(curator); - Assert.assertNotNull(curator.create().forPath("/druid")); - Assert.assertTrue(curator.blockUntilConnected(10, TimeUnit.SECONDS)); - final Announcer announcer = new Announcer(curator, executorService); - final HostAndPortWithScheme node = HostAndPortWithScheme.fromString("localhost"); - final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer( - announcer, - listeningAnnouncerConfig, - listenerKey, - node - ) - { - }; - listenerResourceAnnouncer.start(); - announcer.start(); - closerRule.closeLater(new Closeable() - { - @Override - public void close() - { - announcer.stop(); - } - }); - Assert.assertNotNull(curator.checkExists().forPath(announcePath)); - final String nodePath = ZKPaths.makePath(announcePath, StringUtils.format("%s:%s", node.getScheme(), node.getHostText())); - Assert.assertNotNull(curator.checkExists().forPath(nodePath)); - Assert.assertEquals(Long.BYTES, curator.getData().decompressed().forPath(nodePath).length); - Assert.assertNull(curator.checkExists() - .forPath(listeningAnnouncerConfig.getAnnouncementPath(listenerKey + "FOO"))); - listenerResourceAnnouncer.stop(); - listenerResourceAnnouncer.start(); - listenerResourceAnnouncer.start(); - listenerResourceAnnouncer.stop(); - listenerResourceAnnouncer.stop(); - listenerResourceAnnouncer.start(); - listenerResourceAnnouncer.stop(); - listenerResourceAnnouncer.start(); - listenerResourceAnnouncer.stop(); - Assert.assertNull(curator.checkExists().forPath(nodePath)); - } - - @Test - public void testStartCorrect() - { - final Announcer announcer = EasyMock.createStrictMock(Announcer.class); - final HostAndPortWithScheme node = HostAndPortWithScheme.fromString("some_host"); - - final ListenerResourceAnnouncer resourceAnnouncer = new ListenerResourceAnnouncer( - announcer, - listeningAnnouncerConfig, - listenerKey, - node - ) - { - }; - - - announcer.announce( - EasyMock.eq(ZKPaths.makePath(announcePath, StringUtils.format("%s:%s", node.getScheme(), node.getHostText()))), - EasyMock.aryEq(resourceAnnouncer.getAnnounceBytes()) - ); - EasyMock.expectLastCall().once(); - EasyMock.replay(announcer); - resourceAnnouncer.start(); - EasyMock.verify(announcer); - } -} diff --git a/server/src/test/java/org/apache/druid/server/listener/resource/AbstractListenerHandlerTest.java b/server/src/test/java/org/apache/druid/server/listener/resource/AbstractListenerHandlerTest.java deleted file mode 100644 index b4e38958ccf..00000000000 --- a/server/src/test/java/org/apache/druid/server/listener/resource/AbstractListenerHandlerTest.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.listener.resource; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.StringUtils; -import org.easymock.EasyMock; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import javax.annotation.Nullable; -import javax.validation.constraints.NotNull; -import javax.ws.rs.core.Response; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -public class AbstractListenerHandlerTest -{ - final ObjectMapper mapper = new DefaultObjectMapper(); - final AtomicBoolean failPost = new AtomicBoolean(false); - final String error_msg = "err message"; - - final Object good_object = new Object(); - final AtomicBoolean shouldFail = new AtomicBoolean(false); - final AtomicBoolean returnEmpty = new AtomicBoolean(false); - final String error_message = "some error message"; - final String good_id = "good id"; - final String error_id = "error id"; - final Map all = ImmutableMap.of(); - - - final Object obj = new Object(); - final String valid_id = "some_id"; - - final AbstractListenerHandler abstractListenerHandler = - new AbstractListenerHandler(SomeBeanClass.TYPE_REFERENCE) - { - @Override - public Response handleUpdates(InputStream inputStream, ObjectMapper mapper) - { - return null; - } - - @Nullable - @Override - public Object post(@NotNull Map inputObject) throws Exception - { - if (failPost.get()) { - throw new Exception(error_msg); - } - return inputObject.isEmpty() ? null : inputObject; - } - - @Nullable - @Override - protected Object get(@NotNull String id) - { - if (error_id.equals(id)) { - throw new RuntimeException(error_message); - } - return good_id.equals(id) ? good_object : null; - } - - @Nullable - @Override - protected Map getAll() - { - if (shouldFail.get()) { - throw new RuntimeException(error_message); - } - return returnEmpty.get() ? null : all; - } - - @Nullable - @Override - protected Object delete(@NotNull String id) - { - if (error_id.equals(id)) { - throw new RuntimeException(error_msg); - } - return valid_id.equals(id) ? obj : null; - } - }; - - @Before - public void setUp() - { - mapper.registerSubtypes(SomeBeanClass.class); - } - - @Test - public void testSimple() throws Exception - { - final SomeBeanClass val = new SomeBeanClass("a"); - final ByteArrayInputStream bais = new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(val))); - final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id); - Assert.assertEquals(202, response.getStatus()); - Assert.assertEquals(ImmutableMap.of(good_id, val), response.getEntity()); - } - - - @Test - public void testSimpleAll() throws Exception - { - final Map val = ImmutableMap.of("a", new SomeBeanClass("a")); - final ByteArrayInputStream bais = new ByteArrayInputStream( - StringUtils.toUtf8( - mapper.writeValueAsString( - val - ) - ) - ); - final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper); - Assert.assertEquals(202, response.getStatus()); - Assert.assertEquals(val, response.getEntity()); - } - - @Test - public void testMissingAll() throws Exception - { - final Map val = ImmutableMap.of(); - final ByteArrayInputStream bais = new ByteArrayInputStream( - StringUtils.toUtf8( - mapper.writeValueAsString( - val - ) - ) - ); - final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper); - Assert.assertEquals(404, response.getStatus()); - } - - @Test - public void testErrorAll() throws Exception - { - final Map val = ImmutableMap.of(); - final ByteArrayInputStream bais = new ByteArrayInputStream( - StringUtils.toUtf8( - mapper.writeValueAsString( - val - ) - ) - ); - failPost.set(true); - final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper); - Assert.assertEquals(500, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity()); - } - - @Test - public void testError() throws Exception - { - final ByteArrayInputStream bais = new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(new SomeBeanClass( - "a")))); - failPost.set(true); - final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id); - Assert.assertEquals(500, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity()); - } - - @Test - public void testBadInput() - { - final ByteArrayInputStream bais = new ByteArrayInputStream(new byte[]{0, 0, 0}); - final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id); - Assert.assertEquals(400, response.getStatus()); - } - - @Test - public void testBadInnerInput() throws Exception - { - final ByteArrayInputStream bais = new ByteArrayInputStream(new byte[]{}); - final ObjectMapper mapper = EasyMock.createStrictMock(ObjectMapper.class); - EasyMock.expect(mapper.readValue(EasyMock.anyObject(), EasyMock.>anyObject())) - .andThrow(new IOException()); - EasyMock.replay(mapper); - final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper); - Assert.assertEquals(400, response.getStatus()); - EasyMock.verify(mapper); - } - - - @Test - public void testHandleSimpleDELETE() - { - final Response response = abstractListenerHandler.handleDELETE(valid_id); - Assert.assertEquals(202, response.getStatus()); - Assert.assertEquals(obj, response.getEntity()); - } - - @Test - public void testMissingDELETE() - { - final Response response = abstractListenerHandler.handleDELETE("not going to find it"); - Assert.assertEquals(404, response.getStatus()); - } - - @Test - public void testErrorDELETE() - { - final Response response = abstractListenerHandler.handleDELETE(error_id); - Assert.assertEquals(500, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity()); - } - - @Test - public void testHandle() - { - final Response response = abstractListenerHandler.handleGET(good_id); - Assert.assertEquals(200, response.getStatus()); - Assert.assertEquals(good_object, response.getEntity()); - } - - @Test - public void testMissingHandle() - { - final Response response = abstractListenerHandler.handleGET("neva gonna get it"); - Assert.assertEquals(404, response.getStatus()); - } - - @Test - public void testExceptionalHandle() - { - final Response response = abstractListenerHandler.handleGET(error_id); - Assert.assertEquals(500, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", error_message), response.getEntity()); - } - - @Test - public void testHandleAll() - { - final Response response = abstractListenerHandler.handleGETAll(); - Assert.assertEquals(200, response.getStatus()); - Assert.assertEquals(all, response.getEntity()); - } - - @Test - public void testExceptionalHandleAll() - { - shouldFail.set(true); - final Response response = abstractListenerHandler.handleGETAll(); - Assert.assertEquals(500, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", error_message), response.getEntity()); - } - - @Test - public void testMissingHandleAll() - { - returnEmpty.set(true); - final Response response = abstractListenerHandler.handleGETAll(); - Assert.assertEquals(404, response.getStatus()); - } -} diff --git a/server/src/test/java/org/apache/druid/server/listener/resource/ListenerResourceTest.java b/server/src/test/java/org/apache/druid/server/listener/resource/ListenerResourceTest.java deleted file mode 100644 index 13873ff8cc3..00000000000 --- a/server/src/test/java/org/apache/druid/server/listener/resource/ListenerResourceTest.java +++ /dev/null @@ -1,491 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.listener.resource; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.StringUtils; -import org.easymock.EasyMock; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import javax.annotation.Nullable; -import javax.servlet.http.HttpServletRequest; -import javax.validation.constraints.NotNull; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; - - -public class ListenerResourceTest -{ - static final String ANN_ID = "announce_id"; - HttpServletRequest req; - final ObjectMapper mapper = new DefaultObjectMapper(); - private static final Supplier EMPTY_JSON_MAP = () -> new ByteArrayInputStream(StringUtils.toUtf8("{}")); - - @Before - public void setUp() - { - mapper.registerSubtypes(SomeBeanClass.class); - req = EasyMock.createNiceMock(HttpServletRequest.class); - EasyMock.expect(req.getContentType()).andReturn(MediaType.APPLICATION_JSON).anyTimes(); - EasyMock.replay(req); - } - - @After - public void tearDown() - { - - } - - @Test - public void testServiceAnnouncementPOSTExceptionInHandler() - { - final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); - EasyMock.expect(handler.handlePOST( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyString() - )).andThrow(new RuntimeException("test")); - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - EasyMock.replay(handler); - Assert.assertEquals( - Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), - resource.serviceAnnouncementPOST("id", EMPTY_JSON_MAP.get(), req).getStatus() - ); - EasyMock.verify(req, handler); - } - - @Test - public void testServiceAnnouncementPOSTAllExceptionInHandler() - { - final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); - EasyMock.expect(handler.handlePOSTAll(EasyMock.anyObject(), EasyMock.anyObject())) - .andThrow(new RuntimeException("test")); - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - EasyMock.replay(handler); - Assert.assertEquals( - Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), - resource.serviceAnnouncementPOSTAll(EMPTY_JSON_MAP.get(), req).getStatus() - ); - EasyMock.verify(req, handler); - } - - @Test - public void testServiceAnnouncementPOST() - { - final AtomicInteger c = new AtomicInteger(0); - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - new ExceptionalAbstractListenerHandler() - { - @Override - public Object post(Map l) - { - c.incrementAndGet(); - return l; - } - } - ) - { - }; - Assert.assertEquals( - 202, - resource.serviceAnnouncementPOSTAll(EMPTY_JSON_MAP.get(), req).getStatus() - ); - Assert.assertEquals(1, c.get()); - EasyMock.verify(req); - } - - @Test - public void testServiceAnnouncementGET() - { - final AtomicInteger c = new AtomicInteger(0); - final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() - { - @Override - public Object get(String id) - { - c.incrementAndGet(); - return ANN_ID.equals(id) ? ANN_ID : null; - } - }; - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - Assert.assertEquals( - Response.Status.OK.getStatusCode(), - resource.serviceAnnouncementGET(ANN_ID).getStatus() - ); - Assert.assertEquals(1, c.get()); - EasyMock.verify(req); - } - - - @Test - public void testServiceAnnouncementGETNull() - { - final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler(); - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - Assert.assertEquals( - 400, - resource.serviceAnnouncementGET(null).getStatus() - ); - Assert.assertEquals( - 400, - resource.serviceAnnouncementGET("").getStatus() - ); - EasyMock.verify(req); - } - - @Test - public void testServiceAnnouncementGETExceptionInHandler() - { - final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); - EasyMock.expect(handler.handleGET(EasyMock.anyString())).andThrow(new RuntimeException("test")); - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - EasyMock.replay(handler); - Assert.assertEquals( - Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), - resource.serviceAnnouncementGET("id").getStatus() - ); - EasyMock.verify(handler); - } - - @Test - public void testServiceAnnouncementGETAllExceptionInHandler() - { - final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); - EasyMock.expect(handler.handleGETAll()).andThrow(new RuntimeException("test")); - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - EasyMock.replay(handler); - Assert.assertEquals( - Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), - resource.getAll().getStatus() - ); - EasyMock.verify(handler); - } - - @Test - public void testServiceAnnouncementDELETENullID() - { - final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler(); - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - - Assert.assertEquals( - Response.Status.BAD_REQUEST.getStatusCode(), - resource.serviceAnnouncementDELETE(null).getStatus() - ); - } - - @Test - public void testServiceAnnouncementDELETEExceptionInHandler() - { - - final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); - EasyMock.expect(handler.handleDELETE(EasyMock.anyString())).andThrow(new RuntimeException("test")); - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - EasyMock.replay(handler); - Assert.assertEquals( - Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), - resource.serviceAnnouncementDELETE("id").getStatus() - ); - EasyMock.verify(handler); - } - - @Test - public void testServiceAnnouncementDELETE() - { - final AtomicInteger c = new AtomicInteger(0); - final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() - { - @Override - public Object delete(String id) - { - c.incrementAndGet(); - return ANN_ID.equals(id) ? ANN_ID : null; - } - }; - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - Assert.assertEquals( - 202, - resource.serviceAnnouncementDELETE(ANN_ID).getStatus() - ); - Assert.assertEquals(1, c.get()); - EasyMock.verify(req); - } - - @Test - // Take a list of strings wrap them in a JSON POJO and get them back as an array string in the POST function - public void testAbstractPostHandler() throws Exception - { - final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() - { - - @Nullable - @Override - public String post( - @NotNull Map inputObject - ) throws Exception - { - return mapper.writeValueAsString(inputObject); - } - }; - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - final List strings = ImmutableList.of("test1", "test2"); - final Map expectedMap = new HashMap<>(); - for (final String str : strings) { - expectedMap.put(str, new SomeBeanClass(str)); - } - final String expectedString = mapper.writeValueAsString(expectedMap); - final Response response = resource.serviceAnnouncementPOSTAll( - new ByteArrayInputStream(StringUtils.toUtf8(expectedString)), - req - ); - Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); - Assert.assertEquals(expectedString, response.getEntity()); - } - - - @Test - public void testAbstractPostHandlerEmptyList() - { - final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() - { - @Override - public String post(Map inputObject) throws Exception - { - return mapper.writeValueAsString(inputObject); - } - }; - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - final Response response = resource.serviceAnnouncementPOSTAll(EMPTY_JSON_MAP.get(), req); - Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); - Assert.assertEquals("{}", response.getEntity()); - } - - - @Test - public void testAbstractPostHandlerException() throws Exception - { - final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() - { - @Override - public String post(Map inputObject) - { - throw new UnsupportedOperationException("nope!"); - } - }; - final ListenerResource resource = new ListenerResource( - mapper, - mapper, - handler - ) - { - }; - final Response response = resource.serviceAnnouncementPOSTAll( - new ByteArrayInputStream( - StringUtils.toUtf8( - mapper.writeValueAsString( - ImmutableMap.of("test1", new SomeBeanClass("test1"), "test2", new SomeBeanClass("test2")) - ) - ) - ), - req - ); - Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); - } -} - -@JsonTypeName("someBean") -class SomeBeanClass -{ - protected static final TypeReference TYPE_REFERENCE = new TypeReference() - { - }; - - private final String p; - - @JsonCreator - public SomeBeanClass( - @JsonProperty("p") String p - ) - { - this.p = p; - } - - @JsonProperty - public String getP() - { - return this.p; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SomeBeanClass that = (SomeBeanClass) o; - - return p != null ? p.equals(that.p) : that.p == null; - - } - - @Override - public int hashCode() - { - return p != null ? p.hashCode() : 0; - } - - @Override - public String toString() - { - return "SomeBeanClass{" + - "p='" + p + '\'' + - '}'; - } -} - -class ExceptionalAbstractListenerHandler extends AbstractListenerHandler -{ - public ExceptionalAbstractListenerHandler() - { - super(SomeBeanClass.TYPE_REFERENCE); - } - - @Nullable - @Override - protected Object delete(@NotNull String id) - { - throw new UnsupportedOperationException("should not have called DELETE"); - } - - @Nullable - @Override - protected Object get(@NotNull String id) - { - throw new UnsupportedOperationException("should not have called GET"); - } - - @Nullable - @Override - protected Map getAll() - { - throw new UnsupportedOperationException("should not have called GET all"); - } - - @Nullable - @Override - public Object post(@NotNull Map inputObject) throws Exception - { - throw new UnsupportedOperationException("should not have called post"); - } - - @Override - public Response handleUpdates(InputStream inputStream, ObjectMapper mapper) - { - throw new UnsupportedOperationException("should not have called handleUpdates"); - } -}