remove old unused zookeeper dependent lookups code (#9480)

* remove old unused zookeeper dependent lookups code

* make  intellij inspector happy
This commit is contained in:
Himanshu 2020-03-10 12:12:48 -07:00 committed by GitHub
parent 559c7b64cc
commit 75a5591448
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 4 additions and 1188 deletions

View File

@ -326,7 +326,7 @@ These end points can be used to get the propagation status of configured lookups
### List lookup state of all processes ### List lookup state of all processes
`GET /druid/coordinator/v1/lookups/nodeStatus` with optional query parameter `discover` to discover tiers from zookeeper or configured lookup tiers are listed. `GET /druid/coordinator/v1/lookups/nodeStatus` with optional query parameter `discover` to discover tiers or configured lookup tiers are listed.
### List lookup state of processes in a tier ### List lookup state of processes in a tier
@ -383,7 +383,7 @@ The return value will be the json representation of the factory.
See [Lookups Dynamic Configuration](../configuration/index.md#lookups-dynamic-configuration) for Coordinator configuration. See [Lookups Dynamic Configuration](../configuration/index.md#lookups-dynamic-configuration) for Coordinator configuration.
To configure a Broker / Router / Historical / Peon to announce itself as part of a lookup tier, use the `druid.zk.paths.lookupTier` property. To configure a Broker / Router / Historical / Peon to announce itself as part of a lookup tier, use following properties.
|Property | Description | Default | |Property | Description | Default |
|---------|-------------|---------| |---------|-------------|---------|

View File

@ -24,11 +24,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.listener.announcer.ListeningAnnouncerConfig;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig class LookupListeningAnnouncerConfig
{ {
public static final String DEFAULT_TIER = "__default"; public static final String DEFAULT_TIER = "__default";
private final DataSourceTaskIdHolder dataSourceTaskIdHolder; private final DataSourceTaskIdHolder dataSourceTaskIdHolder;
@ -39,11 +37,9 @@ class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig
@JsonCreator @JsonCreator
public LookupListeningAnnouncerConfig( public LookupListeningAnnouncerConfig(
@JacksonInject ZkPathsConfig zkPathsConfig,
@JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder @JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder
) )
{ {
super(zkPathsConfig);
this.dataSourceTaskIdHolder = dataSourceTaskIdHolder; this.dataSourceTaskIdHolder = dataSourceTaskIdHolder;
} }
@ -61,9 +57,4 @@ class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig
lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE
); );
} }
public String getLookupKey()
{
return LookupModule.getTierListenerPath(getLookupTier());
}
} }

View File

@ -25,14 +25,12 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Provides; import com.google.inject.Provides;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.dimension.LookupDimensionSpec; import org.apache.druid.query.dimension.LookupDimensionSpec;
import org.apache.druid.query.expression.LookupExprMacro; import org.apache.druid.query.expression.LookupExprMacro;
@ -48,11 +46,6 @@ public class LookupModule implements DruidModule
public static final String FAILED_UPDATES_KEY = "failedUpdates"; public static final String FAILED_UPDATES_KEY = "failedUpdates";
public static final int LOOKUP_LISTENER_QOS_MAX_REQUESTS = 2; public static final int LOOKUP_LISTENER_QOS_MAX_REQUESTS = 2;
public static String getTierListenerPath(String tier)
{
return ZKPaths.makePath(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY, tier);
}
@Override @Override
public List<? extends Module> getJacksonModules() public List<? extends Module> getJacksonModules()
{ {
@ -75,9 +68,6 @@ public class LookupModule implements DruidModule
Jerseys.addResource(binder, LookupListeningResource.class); Jerseys.addResource(binder, LookupListeningResource.class);
Jerseys.addResource(binder, LookupIntrospectionResource.class); Jerseys.addResource(binder, LookupIntrospectionResource.class);
ExpressionModule.addExprMacro(binder, LookupExprMacro.class); ExpressionModule.addExprMacro(binder, LookupExprMacro.class);
LifecycleModule.register(binder, LookupResourceListenerAnnouncer.class);
// Nothing else starts this, so we bind it to get it to start
binder.bind(LookupResourceListenerAnnouncer.class).in(ManageLifecycle.class);
JettyBindings.addQosFilter( JettyBindings.addQosFilter(
binder, binder,
ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY, ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY,

View File

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.lookup;
import com.google.inject.Inject;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.http.HostAndPortWithScheme;
import org.apache.druid.server.listener.announcer.ListenerResourceAnnouncer;
@Deprecated
class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer
{
@Inject
public LookupResourceListenerAnnouncer(
Announcer announcer,
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig,
@Self DruidNode node
)
{
super(
announcer,
lookupListeningAnnouncerConfig,
lookupListeningAnnouncerConfig.getLookupKey(),
HostAndPortWithScheme.fromString(node.getServiceScheme(), node.getHostAndPortToUse())
);
}
}

View File

@ -1,111 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.listener.announcer;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.http.HostAndPortWithScheme;
import java.nio.ByteBuffer;
/**
* Starting 0.11.0 Coordinator uses announcements made by {@link org.apache.druid.discovery.DruidNodeAnnouncer} .
*/
@Deprecated
public abstract class ListenerResourceAnnouncer
{
private static final byte[] ANNOUNCE_BYTES = ByteBuffer
.allocate(Long.BYTES)
.putLong(System.currentTimeMillis())
.array();
private static final Logger LOG = new Logger(ListenerResourceAnnouncer.class);
private final Object startStopSync = new Object();
private volatile boolean started = false;
private final Announcer announcer;
private final String announcePath;
public ListenerResourceAnnouncer(
Announcer announcer,
ListeningAnnouncerConfig listeningAnnouncerConfig,
String listener_key,
HostAndPortWithScheme node
)
{
this(
announcer,
ZKPaths.makePath(listeningAnnouncerConfig.getListenersPath(), listener_key),
node
);
}
ListenerResourceAnnouncer(
Announcer announcer,
String announceBasePath,
HostAndPortWithScheme node
)
{
this.announcePath = ZKPaths.makePath(announceBasePath, node.toString());
this.announcer = announcer;
}
@LifecycleStart
public void start()
{
synchronized (startStopSync) {
if (started) {
LOG.debug("Already started, ignoring");
return;
}
try {
// Announcement is based on MS. This is to make sure we don't collide on announcements
Thread.sleep(2);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
announcer.announce(announcePath, ANNOUNCE_BYTES);
LOG.info("Announcing start time on [%s]", announcePath);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (startStopSync) {
if (!started) {
LOG.debug("Already stopped, ignoring");
return;
}
announcer.unannounce(announcePath);
LOG.info("Unannouncing start time on [%s]", announcePath);
started = false;
}
}
public byte[] getAnnounceBytes()
{
return ByteBuffer.allocate(ANNOUNCE_BYTES.length).put(ANNOUNCE_BYTES).array();
}
}

View File

@ -1,101 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.listener.announcer;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.initialization.ZkPathsConfig;
/**
* Even though we provide the mechanism to get zk paths here, we do NOT handle announcing and unannouncing in this module.
* The reason is that it is not appropriate to force a global announce/unannounce since individual listeners may have
* different lifecycles.
*/
public class ListeningAnnouncerConfig
{
@JacksonInject
private final ZkPathsConfig zkPathsConfig;
@JsonProperty("listenersPath")
private String listenersPath = null;
@Inject
public ListeningAnnouncerConfig(
ZkPathsConfig zkPathsConfig
)
{
this.zkPathsConfig = zkPathsConfig;
}
@JsonProperty("listenersPath")
public String getListenersPath()
{
return listenersPath == null ? zkPathsConfig.defaultPath("listeners") : listenersPath;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ListeningAnnouncerConfig that = (ListeningAnnouncerConfig) o;
return !(listenersPath != null ? !listenersPath.equals(that.listenersPath) : that.listenersPath != null);
}
@Override
public int hashCode()
{
return listenersPath != null ? listenersPath.hashCode() : 0;
}
@Override
public String toString()
{
return "ListeningAnnouncerConfig{" +
"listenersPath='" + getListenersPath() + '\'' +
'}';
}
/**
* Build a path for the particular named listener. The first implementation of this is used with zookeeper, but
* there is nothing restricting its use in a more general pathing (example: http endpoint proxy for raft)
* @param listenerName The key for the listener.
* @return A path appropriate for use in zookeeper to discover the listeners with the particular listener name
*/
public String getAnnouncementPath(String listenerName)
{
return ZKPaths.makePath(
getListenersPath(),
Preconditions.checkNotNull(
StringUtils.emptyToNullNonDruidDataString(listenerName), "Listener name cannot be null"
)
);
}
}

View File

@ -205,9 +205,7 @@ public abstract class AbstractListenerHandler<ObjType> implements ListenerHandle
* @param inputObject A list of the objects which were POSTed * @param inputObject A list of the objects which were POSTed
* *
* @return An object to be returned in the entity of the response. * @return An object to be returned in the entity of the response.
*
* @throws Exception
*/ */
@Nullable @Nullable
public abstract Object post(Map<String, ObjType> inputObject) throws Exception; public abstract Object post(Map<String, ObjType> inputObject);
} }

View File

@ -53,9 +53,6 @@ import java.io.InputStream;
* *
* Items tagged with a particular ID for an announcement listener are updated by a POST to the announcement listener's * Items tagged with a particular ID for an announcement listener are updated by a POST to the announcement listener's
* path "/{announcement}" * path "/{announcement}"
*
* Discovery of who can listen to particular announcement keys is not part of this class and should be handled
* by ListenerResourceAnnouncer
*/ */
public abstract class ListenerResource public abstract class ListenerResource
{ {

View File

@ -1,134 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.listener.announcer;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.CloserRule;
import org.apache.druid.server.http.HostAndPortWithScheme;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class ListenerResourceAnnouncerTest extends CuratorTestBase
{
private final ListeningAnnouncerConfig listeningAnnouncerConfig = new ListeningAnnouncerConfig(new ZkPathsConfig());
private final String listenerKey = "someKey";
private final String announcePath = listeningAnnouncerConfig.getAnnouncementPath(listenerKey);
@Rule
public CloserRule closerRule = new CloserRule(true);
private ExecutorService executorService;
@Before
public void setUp()
{
executorService = Execs.singleThreaded("listener-resource--%d");
}
@After
public void tearDown()
{
executorService.shutdownNow();
}
@Test
public void testAnnouncerBehaves() throws Exception
{
setupServerAndCurator();
closerRule.closeLater(server);
curator.start();
closerRule.closeLater(curator);
Assert.assertNotNull(curator.create().forPath("/druid"));
Assert.assertTrue(curator.blockUntilConnected(10, TimeUnit.SECONDS));
final Announcer announcer = new Announcer(curator, executorService);
final HostAndPortWithScheme node = HostAndPortWithScheme.fromString("localhost");
final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer(
announcer,
listeningAnnouncerConfig,
listenerKey,
node
)
{
};
listenerResourceAnnouncer.start();
announcer.start();
closerRule.closeLater(new Closeable()
{
@Override
public void close()
{
announcer.stop();
}
});
Assert.assertNotNull(curator.checkExists().forPath(announcePath));
final String nodePath = ZKPaths.makePath(announcePath, StringUtils.format("%s:%s", node.getScheme(), node.getHostText()));
Assert.assertNotNull(curator.checkExists().forPath(nodePath));
Assert.assertEquals(Long.BYTES, curator.getData().decompressed().forPath(nodePath).length);
Assert.assertNull(curator.checkExists()
.forPath(listeningAnnouncerConfig.getAnnouncementPath(listenerKey + "FOO")));
listenerResourceAnnouncer.stop();
listenerResourceAnnouncer.start();
listenerResourceAnnouncer.start();
listenerResourceAnnouncer.stop();
listenerResourceAnnouncer.stop();
listenerResourceAnnouncer.start();
listenerResourceAnnouncer.stop();
listenerResourceAnnouncer.start();
listenerResourceAnnouncer.stop();
Assert.assertNull(curator.checkExists().forPath(nodePath));
}
@Test
public void testStartCorrect()
{
final Announcer announcer = EasyMock.createStrictMock(Announcer.class);
final HostAndPortWithScheme node = HostAndPortWithScheme.fromString("some_host");
final ListenerResourceAnnouncer resourceAnnouncer = new ListenerResourceAnnouncer(
announcer,
listeningAnnouncerConfig,
listenerKey,
node
)
{
};
announcer.announce(
EasyMock.eq(ZKPaths.makePath(announcePath, StringUtils.format("%s:%s", node.getScheme(), node.getHostText()))),
EasyMock.aryEq(resourceAnnouncer.getAnnounceBytes())
);
EasyMock.expectLastCall().once();
EasyMock.replay(announcer);
resourceAnnouncer.start();
EasyMock.verify(announcer);
}
}

View File

@ -1,277 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.listener.resource;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class AbstractListenerHandlerTest
{
final ObjectMapper mapper = new DefaultObjectMapper();
final AtomicBoolean failPost = new AtomicBoolean(false);
final String error_msg = "err message";
final Object good_object = new Object();
final AtomicBoolean shouldFail = new AtomicBoolean(false);
final AtomicBoolean returnEmpty = new AtomicBoolean(false);
final String error_message = "some error message";
final String good_id = "good id";
final String error_id = "error id";
final Map<String, SomeBeanClass> all = ImmutableMap.of();
final Object obj = new Object();
final String valid_id = "some_id";
final AbstractListenerHandler<SomeBeanClass> abstractListenerHandler =
new AbstractListenerHandler<SomeBeanClass>(SomeBeanClass.TYPE_REFERENCE)
{
@Override
public Response handleUpdates(InputStream inputStream, ObjectMapper mapper)
{
return null;
}
@Nullable
@Override
public Object post(@NotNull Map<String, SomeBeanClass> inputObject) throws Exception
{
if (failPost.get()) {
throw new Exception(error_msg);
}
return inputObject.isEmpty() ? null : inputObject;
}
@Nullable
@Override
protected Object get(@NotNull String id)
{
if (error_id.equals(id)) {
throw new RuntimeException(error_message);
}
return good_id.equals(id) ? good_object : null;
}
@Nullable
@Override
protected Map<String, SomeBeanClass> getAll()
{
if (shouldFail.get()) {
throw new RuntimeException(error_message);
}
return returnEmpty.get() ? null : all;
}
@Nullable
@Override
protected Object delete(@NotNull String id)
{
if (error_id.equals(id)) {
throw new RuntimeException(error_msg);
}
return valid_id.equals(id) ? obj : null;
}
};
@Before
public void setUp()
{
mapper.registerSubtypes(SomeBeanClass.class);
}
@Test
public void testSimple() throws Exception
{
final SomeBeanClass val = new SomeBeanClass("a");
final ByteArrayInputStream bais = new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(val)));
final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id);
Assert.assertEquals(202, response.getStatus());
Assert.assertEquals(ImmutableMap.of(good_id, val), response.getEntity());
}
@Test
public void testSimpleAll() throws Exception
{
final Map<String, SomeBeanClass> val = ImmutableMap.of("a", new SomeBeanClass("a"));
final ByteArrayInputStream bais = new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
val
)
)
);
final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper);
Assert.assertEquals(202, response.getStatus());
Assert.assertEquals(val, response.getEntity());
}
@Test
public void testMissingAll() throws Exception
{
final Map<String, SomeBeanClass> val = ImmutableMap.of();
final ByteArrayInputStream bais = new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
val
)
)
);
final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper);
Assert.assertEquals(404, response.getStatus());
}
@Test
public void testErrorAll() throws Exception
{
final Map<String, SomeBeanClass> val = ImmutableMap.of();
final ByteArrayInputStream bais = new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
val
)
)
);
failPost.set(true);
final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity());
}
@Test
public void testError() throws Exception
{
final ByteArrayInputStream bais = new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(new SomeBeanClass(
"a"))));
failPost.set(true);
final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity());
}
@Test
public void testBadInput()
{
final ByteArrayInputStream bais = new ByteArrayInputStream(new byte[]{0, 0, 0});
final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testBadInnerInput() throws Exception
{
final ByteArrayInputStream bais = new ByteArrayInputStream(new byte[]{});
final ObjectMapper mapper = EasyMock.createStrictMock(ObjectMapper.class);
EasyMock.expect(mapper.readValue(EasyMock.<InputStream>anyObject(), EasyMock.<TypeReference<Object>>anyObject()))
.andThrow(new IOException());
EasyMock.replay(mapper);
final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper);
Assert.assertEquals(400, response.getStatus());
EasyMock.verify(mapper);
}
@Test
public void testHandleSimpleDELETE()
{
final Response response = abstractListenerHandler.handleDELETE(valid_id);
Assert.assertEquals(202, response.getStatus());
Assert.assertEquals(obj, response.getEntity());
}
@Test
public void testMissingDELETE()
{
final Response response = abstractListenerHandler.handleDELETE("not going to find it");
Assert.assertEquals(404, response.getStatus());
}
@Test
public void testErrorDELETE()
{
final Response response = abstractListenerHandler.handleDELETE(error_id);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity());
}
@Test
public void testHandle()
{
final Response response = abstractListenerHandler.handleGET(good_id);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(good_object, response.getEntity());
}
@Test
public void testMissingHandle()
{
final Response response = abstractListenerHandler.handleGET("neva gonna get it");
Assert.assertEquals(404, response.getStatus());
}
@Test
public void testExceptionalHandle()
{
final Response response = abstractListenerHandler.handleGET(error_id);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_message), response.getEntity());
}
@Test
public void testHandleAll()
{
final Response response = abstractListenerHandler.handleGETAll();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(all, response.getEntity());
}
@Test
public void testExceptionalHandleAll()
{
shouldFail.set(true);
final Response response = abstractListenerHandler.handleGETAll();
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_message), response.getEntity());
}
@Test
public void testMissingHandleAll()
{
returnEmpty.set(true);
final Response response = abstractListenerHandler.handleGETAll();
Assert.assertEquals(404, response.getStatus());
}
}

View File

@ -1,491 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.listener.resource;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class ListenerResourceTest
{
static final String ANN_ID = "announce_id";
HttpServletRequest req;
final ObjectMapper mapper = new DefaultObjectMapper();
private static final Supplier<InputStream> EMPTY_JSON_MAP = () -> new ByteArrayInputStream(StringUtils.toUtf8("{}"));
@Before
public void setUp()
{
mapper.registerSubtypes(SomeBeanClass.class);
req = EasyMock.createNiceMock(HttpServletRequest.class);
EasyMock.expect(req.getContentType()).andReturn(MediaType.APPLICATION_JSON).anyTimes();
EasyMock.replay(req);
}
@After
public void tearDown()
{
}
@Test
public void testServiceAnnouncementPOSTExceptionInHandler()
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handlePOST(
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyString()
)).andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.serviceAnnouncementPOST("id", EMPTY_JSON_MAP.get(), req).getStatus()
);
EasyMock.verify(req, handler);
}
@Test
public void testServiceAnnouncementPOSTAllExceptionInHandler()
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handlePOSTAll(EasyMock.anyObject(), EasyMock.anyObject()))
.andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.serviceAnnouncementPOSTAll(EMPTY_JSON_MAP.get(), req).getStatus()
);
EasyMock.verify(req, handler);
}
@Test
public void testServiceAnnouncementPOST()
{
final AtomicInteger c = new AtomicInteger(0);
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
new ExceptionalAbstractListenerHandler()
{
@Override
public Object post(Map<String, SomeBeanClass> l)
{
c.incrementAndGet();
return l;
}
}
)
{
};
Assert.assertEquals(
202,
resource.serviceAnnouncementPOSTAll(EMPTY_JSON_MAP.get(), req).getStatus()
);
Assert.assertEquals(1, c.get());
EasyMock.verify(req);
}
@Test
public void testServiceAnnouncementGET()
{
final AtomicInteger c = new AtomicInteger(0);
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Override
public Object get(String id)
{
c.incrementAndGet();
return ANN_ID.equals(id) ? ANN_ID : null;
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
Assert.assertEquals(
Response.Status.OK.getStatusCode(),
resource.serviceAnnouncementGET(ANN_ID).getStatus()
);
Assert.assertEquals(1, c.get());
EasyMock.verify(req);
}
@Test
public void testServiceAnnouncementGETNull()
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler();
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
Assert.assertEquals(
400,
resource.serviceAnnouncementGET(null).getStatus()
);
Assert.assertEquals(
400,
resource.serviceAnnouncementGET("").getStatus()
);
EasyMock.verify(req);
}
@Test
public void testServiceAnnouncementGETExceptionInHandler()
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handleGET(EasyMock.anyString())).andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.serviceAnnouncementGET("id").getStatus()
);
EasyMock.verify(handler);
}
@Test
public void testServiceAnnouncementGETAllExceptionInHandler()
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handleGETAll()).andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.getAll().getStatus()
);
EasyMock.verify(handler);
}
@Test
public void testServiceAnnouncementDELETENullID()
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler();
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
Assert.assertEquals(
Response.Status.BAD_REQUEST.getStatusCode(),
resource.serviceAnnouncementDELETE(null).getStatus()
);
}
@Test
public void testServiceAnnouncementDELETEExceptionInHandler()
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handleDELETE(EasyMock.anyString())).andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.serviceAnnouncementDELETE("id").getStatus()
);
EasyMock.verify(handler);
}
@Test
public void testServiceAnnouncementDELETE()
{
final AtomicInteger c = new AtomicInteger(0);
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Override
public Object delete(String id)
{
c.incrementAndGet();
return ANN_ID.equals(id) ? ANN_ID : null;
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
Assert.assertEquals(
202,
resource.serviceAnnouncementDELETE(ANN_ID).getStatus()
);
Assert.assertEquals(1, c.get());
EasyMock.verify(req);
}
@Test
// Take a list of strings wrap them in a JSON POJO and get them back as an array string in the POST function
public void testAbstractPostHandler() throws Exception
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Nullable
@Override
public String post(
@NotNull Map<String, SomeBeanClass> inputObject
) throws Exception
{
return mapper.writeValueAsString(inputObject);
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
final List<String> strings = ImmutableList.of("test1", "test2");
final Map<String, SomeBeanClass> expectedMap = new HashMap<>();
for (final String str : strings) {
expectedMap.put(str, new SomeBeanClass(str));
}
final String expectedString = mapper.writeValueAsString(expectedMap);
final Response response = resource.serviceAnnouncementPOSTAll(
new ByteArrayInputStream(StringUtils.toUtf8(expectedString)),
req
);
Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
Assert.assertEquals(expectedString, response.getEntity());
}
@Test
public void testAbstractPostHandlerEmptyList()
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Override
public String post(Map<String, SomeBeanClass> inputObject) throws Exception
{
return mapper.writeValueAsString(inputObject);
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
final Response response = resource.serviceAnnouncementPOSTAll(EMPTY_JSON_MAP.get(), req);
Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
Assert.assertEquals("{}", response.getEntity());
}
@Test
public void testAbstractPostHandlerException() throws Exception
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Override
public String post(Map<String, SomeBeanClass> inputObject)
{
throw new UnsupportedOperationException("nope!");
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
final Response response = resource.serviceAnnouncementPOSTAll(
new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
ImmutableMap.of("test1", new SomeBeanClass("test1"), "test2", new SomeBeanClass("test2"))
)
)
),
req
);
Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
}
}
@JsonTypeName("someBean")
class SomeBeanClass
{
protected static final TypeReference<SomeBeanClass> TYPE_REFERENCE = new TypeReference<SomeBeanClass>()
{
};
private final String p;
@JsonCreator
public SomeBeanClass(
@JsonProperty("p") String p
)
{
this.p = p;
}
@JsonProperty
public String getP()
{
return this.p;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SomeBeanClass that = (SomeBeanClass) o;
return p != null ? p.equals(that.p) : that.p == null;
}
@Override
public int hashCode()
{
return p != null ? p.hashCode() : 0;
}
@Override
public String toString()
{
return "SomeBeanClass{" +
"p='" + p + '\'' +
'}';
}
}
class ExceptionalAbstractListenerHandler extends AbstractListenerHandler<SomeBeanClass>
{
public ExceptionalAbstractListenerHandler()
{
super(SomeBeanClass.TYPE_REFERENCE);
}
@Nullable
@Override
protected Object delete(@NotNull String id)
{
throw new UnsupportedOperationException("should not have called DELETE");
}
@Nullable
@Override
protected Object get(@NotNull String id)
{
throw new UnsupportedOperationException("should not have called GET");
}
@Nullable
@Override
protected Map<String, SomeBeanClass> getAll()
{
throw new UnsupportedOperationException("should not have called GET all");
}
@Nullable
@Override
public Object post(@NotNull Map<String, SomeBeanClass> inputObject) throws Exception
{
throw new UnsupportedOperationException("should not have called post");
}
@Override
public Response handleUpdates(InputStream inputStream, ObjectMapper mapper)
{
throw new UnsupportedOperationException("should not have called handleUpdates");
}
}