Query Time Lookup - Dynamic Configuration

This commit is contained in:
Charles Allen 2016-03-17 17:11:00 -07:00
parent 45c413af7e
commit 5da9a280b6
37 changed files with 6001 additions and 62 deletions

View File

@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.common.utils;
import com.google.common.collect.ImmutableMap;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Map;
public class ServletResourceUtils
{
/**
* Sanitize the exception as a map of "error" to information about the exception.
*
* This method explicitly suppresses the stack trace and any other logging. Any logging should be handled by the caller.
* @param t The exception to sanitize
* @return An immutable Map with a single entry which maps "error" to information about the error suitable for passing as an entity in a servlet error response.
*/
public static Map<String, String> sanitizeException(@Nullable Throwable t)
{
return ImmutableMap.of(
"error",
t == null ? "null" : (t.getMessage() == null ? t.toString() : t.getMessage())
);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.common.utils;
import org.junit.Assert;
import org.junit.Test;
public class ServletResourceUtilsTest
{
@Test
public void testSanitizeException() throws Exception
{
final String message = "some message";
Assert.assertEquals(message, ServletResourceUtils.sanitizeException(new Throwable(message)).get("error"));
Assert.assertEquals("null", ServletResourceUtils.sanitizeException(null).get("error"));
Assert.assertEquals(message, ServletResourceUtils.sanitizeException(new Throwable()
{
@Override
public String toString()
{
return message;
}
}).get("error"));
}
}

View File

@ -102,3 +102,16 @@ To view last <n> entries of the audit history of coordinator dynamic config issu
```
http://<COORDINATOR_IP>:<PORT>/druid/coordinator/v1/config/history?count=<n>
```
# Lookups Dynamic Config (EXPERIMENTAL)
These configuration options control the behavior of the Lookup dynamic configuration described in the [lookups page](../querying/lookups.html)
|Property|Description|Default|
|--------|-----------|-------|
|`druid.manager.lookups.hostDeleteTimeout`|How long to wait for a `DELETE` request to a particular node before considering the `DELETE` a failure|PT1s|
|`druid.manager.lookups.hostUpdateTimeout`|How long to wait for a `POST` request to a particular node before considering the `POST` a failure|PT10s|
|`druid.manager.lookups.deleteAllTimeout`|How long to wait for all `DELETE` requests to finish before considering the delete attempt a failure|PT10s|
|`druid.manager.lookups.updateAllTimeout`|How long to wait for all `POST` requests to finish before considering the attempt a failure|PT60s|
|`druid.manager.lookups.threadPoolSize`|How many nodes can be managed concurrently (concurrent POST and DELETE requests). Requests this limit will wait in a queue until a slot becomes available.|10|
|`druid.manager.lookups.period`|How many milliseconds between checks for configuration changes|30_000|

View File

@ -302,3 +302,262 @@ To test this setup, you can send key/value pairs to a kafka stream via the follo
```
Renames can then be published as `OLD_VAL->NEW_VAL` followed by newline (enter or return)
Dynamic configuration (EXPERIMENTAL)
------------------------------------
The following documents the behavior of the cluster-wide config which is accessible through the coordinator.
The configuration is propagated through the concept of "tier" of servers.
A "tier" is defined as a group of services which should receive a set of lookups.
For example, you might have all historicals be part of `__default`, and Peons be part of individual tiers for the datasources they are tasked with.
The tiers for lookups are completely independent of historical tiers.
These configs are accessed using JSON through the following URI template
```
http://<COORDINATOR_IP>:<PORT>/druid/coordinator/v1/lookups/{tier}/{id}
```
All URIs below are assumed to have `http://<COORDINATOR_IP>:<PORT>` prepended.
If you have NEVER configured lookups before, you MUST post an empty json object `{}` to `/druid/coordinator/v1/lookups` to initialize the configuration.
These endpoints will return one of the following results:
* 404 if the resource is not found
* 400 if there is a problem in the formatting of the request
* 202 if the request was accepted asynchronously (`POST` and `DELETE`)
* 200 if the request succeeded (`GET` only)
## Configuration propagation behavior
The configuration is propagated to the query serving nodes (broker / router / peon / historical) by the coordinator.
The query serving nodes have an internal API for managing `POST`/`GET`/`DELETE` of lookups.
The coordinator periodically checks the dynamic configuration for changes and, when it detects a change it does the following:
1. Post all lookups for a tier to all Druid nodes within that tier.
2. Delete lookups from a tier which were dropped between the prior configuration values and this one.
If there is no configuration change, the coordinator checks for any nodes which might be new since the last time it propagated lookups and adds all lookups for that node (assuming that node's tier has lookups).
If there are errors while trying to add or update configuration on a node, that node is temporarily skipped until the next management period. The next management period the update will attempt to be propagated again.
If there is an error while trying to delete a lookup from a node (or if a node is down when the coordinator is propagating the config), the delete is not attempted again. In such a case it is possible that a node has lookups that are no longer managed by the coordinator.
## Bulk update
Lookups can be updated in bulk by posting a JSON object to `/druid/coordinator/v1/lookups`. The format of the json object is as follows:
```json
{
"tierName": {
"lookupExtractorFactoryName": {
"someExtractorField": "someExtractorValue"
}
}
}
```
So a config might look something like:
```json
{
"__default": {
"country_code": {
"type": "simple_json",
"uri": "http://some.host.com/codes.json"
},
"site_id": {
"type": "confidential_jdbc",
"auth": "/etc/jdbc.internal",
"table": "sites",
"key": "site_id",
"value": "site_name"
},
"site_id_customer1": {
"type": "confidential_jdbc",
"auth": "/etc/jdbc.customer1",
"table": "sites",
"key": "site_id",
"value": "site_name"
},
"site_id_customer2": {
"type": "confidential_jdbc",
"auth": "/etc/jdbc.customer2",
"table": "sites",
"key": "site_id",
"value": "site_name"
}
},
"realtime_customer1": {
"country_code": {
"type": "simple_json",
"uri": "http://some.host.com/codes.json"
},
"site_id_customer1": {
"type": "confidential_jdbc",
"auth": "/etc/jdbc.customer1",
"table": "sites",
"key": "site_id",
"value": "site_name"
}
},
"realtime_customer2": {
"country_code": {
"type": "simple_json",
"uri": "http://some.host.com/codes.json"
},
"site_id_customer2": {
"type": "confidential_jdbc",
"auth": "/etc/jdbc.customer2",
"table": "sites",
"key": "site_id",
"value": "site_name"
}
}
}
```
All entries in the map will UPDATE existing entries. No entries will be deleted.
## Update Lookup
A `POST` to a particular lookup extractor factory via `/druid/coordinator/v1/lookups/{tier}/{id}` will update that specific extractor factory.
For example, a post to `/druid/coordinator/v1/lookups/realtime_customer1/site_id_customer1` might contain the following:
```json
{
"type": "confidential_jdbc",
"auth": "/etc/jdbc.customer1",
"table": "sites_updated",
"key": "site_id",
"value": "site_name"
}
```
This will replace the `site_id_customer1` lookup in the `realtime_customer1` with the definition above.
## Get Lookup
A `GET` to a particular lookup extractor factory is accomplished via `/druid/coordinator/v1/lookups/{tier}/{id}`
Using the prior example, a `GET` to `/druid/coordinator/v1/lookups/realtime_customer2/site_id_customer2` should return
```json
{
"type": "confidential_jdbc",
"auth": "/etc/jdbc.customer2",
"table": "sites",
"key": "site_id",
"value": "site_name"
}
```
## Delete Lookup
A `DELETE` to `/druid/coordinator/v1/lookups/{tier}/{id}` will remove that lookup from the cluster.
## List tier names
A `GET` to `/druid/coordinator/v1/lookups` will return a list of known tier names in the dynamic configuration.
To discover a list of tiers currently active in the cluster **instead of** ones known in the dynamic configuration, the parameter `discover=true` can be added as per `/druid/coordinator/v1/lookups?discover=true`.
## List lookup names
A `GET` to `/druid/coordinator/v1/lookups/{tier}` will return a list of known lookup names for that tier.
# Internal API
The Peon, Router, Broker, and Historical nodes all have the ability to consume lookup configuration.
There is an internal API these nodes use to list/load/drop their lookups starting at `/druid/listen/v1/lookups`.
These follow the same convention for return values as the cluster wide dynamic configuration.
Usage of these endpoints is quite advanced and not recommended for most users.
The endpoints are as follows:
## Get Lookups
A `GET` to the node at `/druid/listen/v1/lookups` will return a json map of all the lookups currently active on the node.
The return value will be a json map of the lookups to their extractor factories.
```json
{
"some_lookup_name": {
"type": "simple_json",
"uri": "http://some.host.com/codes.json"
}
}
```
## Get Lookup
A `GET` to the node at `/druid/listen/v1/lookups/some_lookup_name` will return the LookupExtractorFactory for the lookup identified by `some_lookup_name`.
The return value will be the json representation of the factory.
```json
{
"type": "simple_json",
"uri": "http://some.host.com/codes.json"
}
```
## Bulk Add or Update Lookups
A `POST` to the node at `/druid/listen/v1/lookups` of a JSON map of lookup names to LookupExtractorFactory will cause the service to add or update its lookups.
The return value will be a JSON map in the following format:
```json
{
"status": "accepted",
"failedUpdates": {}
}
```
If a lookup cannot be started, or is left in an undefined state, the lookup in error will be returned in the `failedUpdates` field as per:
```json
{
"status": "accepted",
"failedUpdates": {
"country_code": {
"type": "simple_json",
"uri": "http://some.host.com/codes.json"
}
}
}
```
The `failedUpdates` field of the return value should be checked if a user is wanting to assure that every update succeeded.
## Add or Update Lookup
A `POST` to the node at `/druid/listen/v1/lookups/some_lookup_name` will behave very similarly to a bulk update.
If `some_lookup_name` is desired to have the LookupExtractorFactory definition of
```json
{
"type": "simple_json",
"uri": "http://some.host.com/codes.json"
}
```
Then a post to `/druid/listen/v1/lookups/some_lookup_name` will behave the same as a `POST` to `/druid/listen/v1/lookups` of
```json
{
"some_lookup_name": {
"type": "simple_json",
"uri": "http://some.host.com/codes.json"
}
}
```
## Remove a Lookup
A `DELETE` to `/druid/listen/v1/lookups/some_lookup_name` will remove that lookup from the node. Success will reflect the ID.
# Configuration
See the [coordinator configuration guilde](../configuration/coordinator.html) for coordinator configuration
To configure a Broker / Router / Historical / Peon to announce itself as part of a lookup tier, use the `druid.zk.paths.lookupTier` property.
|Property | Description | Default |
|---------|-------------|---------|
|`druid.lookup.tierName`| The tier for **lookups** for this node. This is independent of other tiers.|`__default`|

View File

@ -0,0 +1,104 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.query.lookup.LookupExtractor;
import io.druid.query.lookup.LookupExtractorFactory;
import javax.annotation.Nullable;
import java.util.Map;
public class MapLookupExtractorFactory implements LookupExtractorFactory
{
private final Map<String, String> map;
private final boolean isOneToOne;
private final MapLookupExtractor lookupExtractor;
@JsonCreator
public MapLookupExtractorFactory(
@JsonProperty("map") Map<String, String> map,
@JsonProperty("isOneToOne") boolean isOneToOne
)
{
this.map = Preconditions.checkNotNull(map, "map cannot be null");
this.isOneToOne = isOneToOne;
this.lookupExtractor = new MapLookupExtractor(map, isOneToOne);
}
@Override
public boolean start()
{
return true;
}
@Override
public boolean close()
{
return true;
}
/**
* For MapLookups, the replaces consideration is very easy, it simply considers if the other is the same as this one
*
* @param other Some other LookupExtractorFactory which might need replaced
* @return true - should replace, false - should not replace
*/
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return !equals(other);
}
@Override
public LookupExtractor get()
{
return lookupExtractor;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MapLookupExtractorFactory that = (MapLookupExtractorFactory) o;
if (isOneToOne != that.isOneToOne) {
return false;
}
return map.equals(that.map);
}
@Override
public int hashCode()
{
int result = map.hashCode();
result = 31 * result + (isOneToOne ? 1 : 0);
return result;
}
}

View File

@ -19,14 +19,22 @@
package io.druid.query.lookup;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Supplier;
import io.druid.query.extraction.MapLookupExtractorFactory;
import javax.annotation.Nullable;
/**
* Users of Lookup Extraction need to implement a {@link LookupExtractorFactory} supplier of type {@link LookupExtractor}.
* Such factory will manage the state and life cycle of an given lookup.
* If a LookupExtractorFactory wishes to support idempotent updates, it needs to implement the `replaces` method
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "map", value = MapLookupExtractorFactory.class)
})
public interface LookupExtractorFactory extends Supplier<LookupExtractor>
{
/**
@ -47,4 +55,12 @@ public interface LookupExtractorFactory extends Supplier<LookupExtractor>
* @return true if successfully closed the {@link LookupExtractor}
*/
public boolean close();
/**
* Determine if this LookupExtractorFactory should replace some other LookupExtractorFactory.
* This is used to implement no-down-time
* @param other Some other LookupExtractorFactory which might need replaced
* @return `true` if the other should be replaced by this one. `false` if this one should not replace the other factory
*/
boolean replaces(@Nullable LookupExtractorFactory other);
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.lookup;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LifecycleModule;
import io.druid.initialization.DruidModule;
import java.util.Collections;
import java.util.List;
public class LookupModule implements DruidModule
{
private static final String PROPERTY_BASE = "druid.lookup";
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.emptyList();
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, PROPERTY_BASE, LookupConfig.class);
LifecycleModule.register(binder, LookupReferencesManager.class);
}
}

View File

@ -121,12 +121,17 @@ public class LookupReferencesManager
if (!lookupExtractorFactory.start()) {
throw new ISE("start method returned false for lookup [%s]", lookupName);
}
boolean isAdded = (null == lookupMap.putIfAbsent(lookupName, lookupExtractorFactory));
if (isAdded && lookupSnapshotTaker != null) {
lookupSnapshotTaker.takeSnapshot(getAllAsList());
final boolean noPrior = null == lookupMap.putIfAbsent(lookupName, lookupExtractorFactory);
if (noPrior) {
if(lookupSnapshotTaker != null) {
lookupSnapshotTaker.takeSnapshot(getAllAsList());
}
} else {
if (!lookupExtractorFactory.close()) {
throw new ISE("Error closing [%s] on race condition", lookupName);
}
}
return isAdded;
return noPrior;
}
}
@ -165,6 +170,38 @@ public class LookupReferencesManager
}
}
/**
* Add or update a lookup factory
*
* @param lookupName The name of the lookup
* @param lookupExtractorFactory The factory of the lookup
*
* @return True if the lookup was updated, false otherwise
*
* @throws IllegalStateException if start of the factory fails
*/
public boolean updateIfNew(String lookupName, final LookupExtractorFactory lookupExtractorFactory)
{
final boolean update;
synchronized (lock) {
assertStarted();
final LookupExtractorFactory prior = lookupMap.get(lookupName);
update = lookupExtractorFactory.replaces(prior);
if (update) {
if (!lookupExtractorFactory.start()) {
throw new ISE("Could not start [%s]", lookupName);
}
lookupMap.put(lookupName, lookupExtractorFactory);
if (prior != null) {
if (!prior.close()) {
LOGGER.error("Error closing [%s]:[%s]", lookupName, lookupExtractorFactory);
}
}
}
}
return update;
}
/**
* @param lookupName name of {@link LookupExtractorFactory} to delete from the reference registry.
* this function does call the cleaning method {@link LookupExtractorFactory#close()}

View File

@ -36,6 +36,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@ -64,6 +65,12 @@ public class LookupDimensionSpecTest
return true;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return true;
}
@Override
public LookupExtractor get()
{

View File

@ -0,0 +1,49 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;
import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;
public class MapLookupExtractorFactoryTest
{
private static final String KEY = "foo";
private static final String VALUE = "bar";
private static final MapLookupExtractorFactory factory = new MapLookupExtractorFactory(ImmutableMap.of(KEY, VALUE), true);
@Test
public void testSimpleExtraction()
{
Assert.assertEquals(factory.get().apply(KEY), VALUE);
Assert.assertTrue(factory.get().isOneToOne());
}
@Test
public void testReplaces()
{
Assert.assertFalse(factory.replaces(factory));
Assert.assertFalse(factory.replaces(new MapLookupExtractorFactory(ImmutableMap.of(KEY, VALUE), true)));
Assert.assertTrue(factory.replaces(new MapLookupExtractorFactory(ImmutableMap.of(KEY, VALUE), false)));
Assert.assertTrue(factory.replaces(new MapLookupExtractorFactory(ImmutableMap.of(KEY + "1", VALUE), true)));
Assert.assertTrue(factory.replaces(new MapLookupExtractorFactory(ImmutableMap.of(KEY, VALUE + "1"), true)));
Assert.assertTrue(factory.replaces(null));
}
}

View File

@ -35,6 +35,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException;
public class LookupReferencesManagerTest
@ -182,6 +183,77 @@ public class LookupReferencesManagerTest
lookupReferencesManager.put(extractorImmutableMap);
}
@Test
public void testUpdateIfNewOnlyIfIsNew()
{
final String lookupName = "some lookup";
LookupExtractorFactory oldFactory = EasyMock.createStrictMock(LookupExtractorFactory.class);
LookupExtractorFactory newFactory = EasyMock.createStrictMock(LookupExtractorFactory.class);
EasyMock.expect(oldFactory.replaces(EasyMock.<LookupExtractorFactory>isNull())).andReturn(true).once();
EasyMock.expect(oldFactory.start()).andReturn(true).once();
EasyMock.expect(oldFactory.replaces(EasyMock.eq(oldFactory))).andReturn(false).once();
// Add new
EasyMock.expect(newFactory.replaces(EasyMock.eq(oldFactory))).andReturn(true).once();
EasyMock.expect(newFactory.start()).andReturn(true).once();
EasyMock.expect(oldFactory.close()).andReturn(true).once();
EasyMock.expect(newFactory.close()).andReturn(true).once();
EasyMock.replay(oldFactory, newFactory);
Assert.assertTrue(lookupReferencesManager.updateIfNew(lookupName, oldFactory));
Assert.assertFalse(lookupReferencesManager.updateIfNew(lookupName, oldFactory));
Assert.assertTrue(lookupReferencesManager.updateIfNew(lookupName, newFactory));
// Remove now or else EasyMock gets confused on lazy lookup manager stop handling
lookupReferencesManager.remove(lookupName);
EasyMock.verify(oldFactory, newFactory);
}
@Test(expected = ISE.class)
public void testUpdateIfNewExceptional()
{
final String lookupName = "some lookup";
LookupExtractorFactory newFactory = EasyMock.createStrictMock(LookupExtractorFactory.class);
EasyMock.expect(newFactory.replaces(EasyMock.<LookupExtractorFactory>isNull())).andReturn(true).once();
EasyMock.expect(newFactory.start()).andReturn(false).once();
EasyMock.replay(newFactory);
try {
lookupReferencesManager.updateIfNew(lookupName, newFactory);
}
finally {
EasyMock.verify(newFactory);
}
}
@Test
public void testUpdateIfNewSuppressOldCloseProblem()
{
final String lookupName = "some lookup";
LookupExtractorFactory oldFactory = EasyMock.createStrictMock(LookupExtractorFactory.class);
LookupExtractorFactory newFactory = EasyMock.createStrictMock(LookupExtractorFactory.class);
EasyMock.expect(oldFactory.replaces(EasyMock.<LookupExtractorFactory>isNull())).andReturn(true).once();
EasyMock.expect(oldFactory.start()).andReturn(true).once();
// Add new
EasyMock.expect(newFactory.replaces(EasyMock.eq(oldFactory))).andReturn(true).once();
EasyMock.expect(newFactory.start()).andReturn(true).once();
EasyMock.expect(oldFactory.close()).andReturn(false).once();
EasyMock.expect(newFactory.close()).andReturn(true).once();
EasyMock.replay(oldFactory, newFactory);
lookupReferencesManager.updateIfNew(lookupName, oldFactory);
lookupReferencesManager.updateIfNew(lookupName, newFactory);
// Remove now or else EasyMock gets confused on lazy lookup manager stop handling
lookupReferencesManager.remove(lookupName);
EasyMock.verify(oldFactory, newFactory);
}
@Test
public void testBootstrapFromFile() throws IOException
{
@ -223,6 +295,12 @@ public class LookupReferencesManagerTest
return true;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return false;
}
@Override
public LookupExtractor get()
{

View File

@ -33,6 +33,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
@ -134,6 +135,12 @@ public class LookupSnapshotTakerTest
return true;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return false;
}
@Override
public LookupExtractor get()
{

View File

@ -42,10 +42,12 @@ import java.util.List;
*/
public class ServerModule implements DruidModule
{
public static final String ZK_PATHS_PROPERTY_BASE = "druid.zk.paths";
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.zk.paths", ZkPathsConfig.class);
JsonConfigProvider.bind(binder, ZK_PATHS_PROPERTY_BASE, ZkPathsConfig.class);
JsonConfigProvider.bind(binder, "druid", DruidNode.class, Self.class);
}

View File

@ -0,0 +1,210 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.lookup;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.RE;
import com.metamx.common.logger.Logger;
import io.druid.curator.announcement.Announcer;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LifecycleModule;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Self;
import io.druid.guice.annotations.Smile;
import io.druid.initialization.DruidModule;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.listener.announcer.ListenerResourceAnnouncer;
import io.druid.server.listener.announcer.ListeningAnnouncerConfig;
import io.druid.server.listener.resource.AbstractListenerHandler;
import io.druid.server.listener.resource.ListenerResource;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.curator.utils.ZKPaths;
import javax.ws.rs.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class LookupModule implements DruidModule
{
private static final String PROPERTY_BASE = "druid.lookup";
public static final String FAILED_UPDATES_KEY = "failedUpdates";
public static String getTierListenerPath(String tier)
{
return ZKPaths.makePath(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY, tier);
}
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.emptyList();
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, PROPERTY_BASE, LookupConfig.class);
LifecycleModule.register(binder, LookupReferencesManager.class);
JsonConfigProvider.bind(binder, PROPERTY_BASE, LookupListeningAnnouncerConfig.class);
Jerseys.addResource(binder, LookupListeningResource.class);
LifecycleModule.register(binder, LookupResourceListenerAnnouncer.class);
}
}
@Path(ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY)
class LookupListeningResource extends ListenerResource
{
private static final Logger LOG = new Logger(LookupListeningResource.class);
@Inject
public LookupListeningResource(
final @Json ObjectMapper jsonMapper,
final @Smile ObjectMapper smileMapper,
final LookupReferencesManager manager
)
{
super(
jsonMapper,
smileMapper,
new AbstractListenerHandler<LookupExtractorFactory>(new TypeReference<LookupExtractorFactory>()
{
})
{
private final Object deleteLock = new Object();
@Override
public synchronized Object post(final Map<String, LookupExtractorFactory> lookups)
throws Exception
{
final Map<String, LookupExtractorFactory> failedUpdates = new HashMap<>();
for (final String name : lookups.keySet()) {
final LookupExtractorFactory factory = lookups.get(name);
try {
if (!manager.updateIfNew(name, factory)) {
failedUpdates.put(name, factory);
}
}
catch (ISE ise) {
LOG.error(ise, "Error starting [%s]: [%s]", name, factory);
failedUpdates.put(name, factory);
}
}
return ImmutableMap.of("status", "accepted", LookupModule.FAILED_UPDATES_KEY, failedUpdates);
}
@Override
public Object get(String id)
{
return manager.get(id);
}
@Override
public Map<String, LookupExtractorFactory> getAll()
{
return manager.getAll();
}
@Override
public Object delete(String id)
{
// Prevent races to 404 vs 500 between concurrent delete requests
synchronized (deleteLock) {
if (manager.get(id) == null) {
return null;
}
if (!manager.remove(id)) {
// We don't have more information at this point.
throw new RE("Could not remove lookup [%s]", id);
}
return id;
}
}
}
);
}
}
class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer
{
@Inject
public LookupResourceListenerAnnouncer(
Announcer announcer,
LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig,
@Self DruidNode node
)
{
super(
announcer,
lookupListeningAnnouncerConfig,
lookupListeningAnnouncerConfig.getLookupKey(),
HostAndPort.fromString(node.getHostAndPort())
);
}
}
class LookupListeningAnnouncerConfig extends ListeningAnnouncerConfig
{
public static final String DEFAULT_TIER = "__default";
@JsonProperty("lookupTier")
private String lookupTier = null;
@JsonCreator
public static LookupListeningAnnouncerConfig createLookupListeningAnnouncerConfig(
@JacksonInject ZkPathsConfig zkPathsConfig,
@JsonProperty("lookupTier") String lookupTier
)
{
final LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig = new LookupListeningAnnouncerConfig(
zkPathsConfig);
lookupListeningAnnouncerConfig.lookupTier = lookupTier;
return lookupListeningAnnouncerConfig;
}
@Inject
public LookupListeningAnnouncerConfig(ZkPathsConfig zkPathsConfig)
{
super(zkPathsConfig);
}
public String getLookupTier()
{
return lookupTier == null ? DEFAULT_TIER : lookupTier;
}
public String getLookupKey()
{
return LookupModule.getTierListenerPath(getLookupTier());
}
}

View File

@ -0,0 +1,288 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.http;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.RE;
import com.metamx.common.logger.Logger;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.common.utils.ServletResourceUtils;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
/**
* Contains information about lookups exposed through the coordinator
*/
@Path("/druid/coordinator/v1/lookups")
public class LookupCoordinatorResource
{
private static final Logger LOG = new Logger(LookupCoordinatorResource.class);
private final LookupCoordinatorManager lookupCoordinatorManager;
private final ObjectMapper smileMapper;
private final ObjectMapper jsonMapper;
@Inject
public LookupCoordinatorResource(
final LookupCoordinatorManager lookupCoordinatorManager,
final @Smile ObjectMapper smileMapper,
final @Json ObjectMapper jsonMapper
)
{
this.smileMapper = smileMapper;
this.jsonMapper = jsonMapper;
this.lookupCoordinatorManager = lookupCoordinatorManager;
}
@GET
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response getTiers(
@DefaultValue("false") @QueryParam("discover") boolean discover
)
{
try {
if (discover) {
return Response.ok().entity(lookupCoordinatorManager.discoverTiers()).build();
}
final Map<String, Map<String, Map<String, Object>>> knownLookups = lookupCoordinatorManager.getKnownLookups();
if (knownLookups == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok().entity(knownLookups.keySet()).build();
}
}
catch (Exception e) {
LOG.error(e, "Error getting list of lookups");
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@POST
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response updateAllLookups(
InputStream in,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment,
@Context HttpServletRequest req
)
{
try {
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(req.getContentType());
final ObjectMapper mapper = isSmile ? smileMapper : jsonMapper;
final Map<String, Map<String, Map<String, Object>>> map;
try {
map = mapper.readValue(in, new TypeReference<Map<String, Map<String, Map<String, Object>>>>()
{
});
}
catch (IOException e) {
return Response.status(Response.Status.BAD_REQUEST).entity(ServletResourceUtils.sanitizeException(e)).build();
}
if (lookupCoordinatorManager.updateLookups(map, new AuditInfo(author, comment, req.getRemoteAddr()))) {
return Response.status(Response.Status.ACCEPTED).entity(map).build();
} else {
throw new RuntimeException("Unknown error updating configuration");
}
}
catch (Exception e) {
LOG.error(e, "Error creating new lookups");
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@DELETE
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Path("/{tier}/{lookup}")
public Response deleteLookup(
@PathParam("tier") String tier,
@PathParam("lookup") String lookup,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment,
@Context HttpServletRequest req
)
{
try {
if (Strings.isNullOrEmpty(tier)) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(new NullPointerException("`tier` required")))
.build();
}
if (Strings.isNullOrEmpty(lookup)) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(new IAE("`lookup` required")))
.build();
}
if (lookupCoordinatorManager.deleteLookup(tier, lookup, new AuditInfo(author, comment, req.getRemoteAddr()))) {
return Response.status(Response.Status.ACCEPTED).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
catch (Exception e) {
LOG.error(e, "Error deleting lookup [%s]", lookup);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@POST
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Path("/{tier}/{lookup}")
public Response createOrUpdateLookup(
@PathParam("tier") String tier,
@PathParam("lookup") String lookup,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment,
InputStream in,
@Context HttpServletRequest req
)
{
try {
if (Strings.isNullOrEmpty(tier)) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(new NullPointerException("`tier` required")))
.build();
}
if (Strings.isNullOrEmpty(lookup)) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(new IAE("`lookup` required")))
.build();
}
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(req.getContentType());
final ObjectMapper mapper = isSmile ? smileMapper : jsonMapper;
final Map<String, Object> lookupSpec;
try {
lookupSpec = mapper.readValue(in, new TypeReference<Map<String, Object>>()
{
});
}
catch (IOException e) {
return Response.status(Response.Status.BAD_REQUEST).entity(ServletResourceUtils.sanitizeException(e)).build();
}
if (lookupCoordinatorManager.updateLookup(
tier,
lookup,
lookupSpec,
new AuditInfo(author, comment, req.getRemoteAddr())
)) {
return Response.status(Response.Status.ACCEPTED).build();
} else {
throw new RuntimeException("Unknown error updating configuration");
}
}
catch (Exception e) {
LOG.error(e, "Error updating tier [%s] lookup [%s]", tier, lookup);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@GET
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Path("/{tier}/{lookup}")
public Response getSpecificLookup(
@PathParam("tier") String tier,
@PathParam("lookup") String lookup
)
{
try {
if (Strings.isNullOrEmpty(tier)) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(new NullPointerException("`tier` required")))
.build();
}
if (Strings.isNullOrEmpty(lookup)) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(new NullPointerException("`lookup` required")))
.build();
}
final Map<String, Object> map = lookupCoordinatorManager.getLookup(tier, lookup);
if (map == null) {
return Response.status(Response.Status.NOT_FOUND)
.entity(ServletResourceUtils.sanitizeException(new RE("lookup [%s] not found", lookup)))
.build();
}
return Response.ok().entity(map).build();
}
catch (Exception e) {
LOG.error(e, "Error getting lookup [%s]", lookup);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@GET
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Path("/{tier}")
public Response getSpecificTier(
@PathParam("tier") String tier
)
{
try {
if (Strings.isNullOrEmpty(tier)) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(new NullPointerException("`tier` required")))
.build();
}
final Map<String, Map<String, Map<String, Object>>> map = lookupCoordinatorManager.getKnownLookups();
if (map == null) {
return Response.status(Response.Status.NOT_FOUND)
.entity(ServletResourceUtils.sanitizeException(new RE("No lookups found")))
.build();
}
final Map<String, Map<String, Object>> tierLookups = map.get(tier);
if (tierLookups == null) {
return Response.status(Response.Status.NOT_FOUND)
.entity(ServletResourceUtils.sanitizeException(new RE("Tier [%s] not found", tier)))
.build();
}
return Response.ok().entity(tierLookups.keySet()).build();
}
catch (Exception e) {
LOG.error(e, "Error getting tier [%s]", tier);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
}

View File

@ -90,7 +90,7 @@ public class ZkPathsConfig
return (null == connectorPath) ? defaultPath("connector") : connectorPath;
}
protected String defaultPath(final String subPath)
public String defaultPath(final String subPath)
{
return ZKPaths.makePath(getBase(), subPath);
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.announcer;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class ListenerDiscoverer
{
private static final Logger LOG = new Logger(ListenerDiscoverer.class);
private volatile Map<HostAndPort, Long> lastSeenMap = ImmutableMap.of();
private final CuratorFramework cf;
private final ListeningAnnouncerConfig listeningAnnouncerConfig;
private final Object startStopSync = new Object();
private volatile boolean started = false;
@Inject
public ListenerDiscoverer(
CuratorFramework cf,
ListeningAnnouncerConfig listeningAnnouncerConfig
)
{
this.cf = cf;
this.listeningAnnouncerConfig = listeningAnnouncerConfig;
}
@LifecycleStart
public void start()
{
synchronized (startStopSync) {
if (started) {
LOG.debug("Already started");
return;
}
started = true;
LOG.info("Started");
}
}
@LifecycleStop
public void stop()
{
synchronized (startStopSync) {
if (!started) {
LOG.debug("Already stopped");
return;
}
LOG.info("Stopped");
started = false;
}
}
/**
* Get nodes at a particular listener.
* This method lazily adds service discovery
*
* @param listener_key The Listener's service key
*
* @return A collection of druid nodes as established by the service discovery
*
* @throws IOException if there was an error refreshing the zookeeper cache
*/
public Collection<HostAndPort> getNodes(final String listener_key) throws IOException
{
return getCurrentNodes(listener_key).keySet();
}
Map<HostAndPort, Long> getCurrentNodes(final String listener_key) throws IOException
{
final HashMap<HostAndPort, Long> retVal = new HashMap<>();
final String zkPath = listeningAnnouncerConfig.getAnnouncementPath(listener_key);
final Collection<String> children;
try {
children = cf.getChildren().forPath(zkPath);
}
catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
LOG.debug(e, "No path found at [%s]", zkPath);
return ImmutableMap.of();
}
catch (Exception e) {
throw new IOException("Error getting children for " + zkPath, e);
}
for (String child : children) {
final String childPath = ZKPaths.makePath(zkPath, child);
try {
final byte[] data;
try {
data = cf.getData().decompressed().forPath(childPath);
}
catch (Exception e) {
throw new IOException("Error getting data for " + childPath, e);
}
if (data == null) {
LOG.debug("Lost data at path [%s]", childPath);
continue;
}
final HostAndPort hostAndPort = HostAndPort.fromString(child);
final Long l = ByteBuffer.wrap(data).getLong();
retVal.put(hostAndPort, l);
}
catch (IllegalArgumentException iae) {
LOG.warn(iae, "Error parsing [%s]", childPath);
}
}
return ImmutableMap.copyOf(retVal);
}
/**
* Get only nodes that are new since the last time getNewNodes was called (or all nodes if it has never been called)
*
* @param listener_key The listener key to look for
*
* @return A collection of nodes that are new
*
* @throws IOException If there was an error in refreshing the Zookeeper cache
*/
public synchronized Collection<HostAndPort> getNewNodes(final String listener_key) throws IOException
{
final Map<HostAndPort, Long> priorSeenMap = lastSeenMap;
final Map<HostAndPort, Long> currentMap = getCurrentNodes(listener_key);
final Collection<HostAndPort> retVal = Collections2.filter(
currentMap.keySet(),
new Predicate<HostAndPort>()
{
@Override
public boolean apply(HostAndPort input)
{
final Long l = priorSeenMap.get(input);
return l == null || l < currentMap.get(input);
}
}
);
lastSeenMap = priorSeenMap;
return retVal;
}
/**
* Discovers children of the listener key
*
* @param key_base The base of the listener key, or null or empty string to get all immediate children of the listener path
*
* @return A collection of the names of the children, or empty list on NoNodeException from Curator
*
* @throws IOException from Curator
* @throws RuntimeException for other exceptions from Curator.
*/
public Collection<String> discoverChildren(@Nullable final String key_base) throws IOException
{
final String zkPath = Strings.isNullOrEmpty(key_base)
? listeningAnnouncerConfig.getListenersPath()
: listeningAnnouncerConfig.getAnnouncementPath(key_base);
try {
return cf.getChildren().forPath(zkPath);
}
catch (KeeperException.NoNodeException | KeeperException.NoChildrenForEphemeralsException e) {
LOG.warn(e, "Path [%s] not discoverable", zkPath);
return ImmutableList.of();
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);
throw Throwables.propagate(e);
}
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.announcer;
import com.google.common.base.Throwables;
import com.google.common.net.HostAndPort;
import com.google.common.primitives.Longs;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.curator.announcement.Announcer;
import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
/**
* Announces that there is a particular ListenerResource at the listener_key.
*/
public abstract class ListenerResourceAnnouncer
{
private static final byte[] ANNOUNCE_BYTES = ByteBuffer
.allocate(Longs.BYTES)
.putLong(DateTime.now().getMillis())
.array();
private static final Logger LOG = new Logger(ListenerResourceAnnouncer.class);
private final Object startStopSync = new Object();
private volatile boolean started = false;
private final Announcer announcer;
private final String announcePath;
public ListenerResourceAnnouncer(
Announcer announcer,
ListeningAnnouncerConfig listeningAnnouncerConfig,
String listener_key,
HostAndPort node
)
{
this(
announcer,
ZKPaths.makePath(listeningAnnouncerConfig.getListenersPath(), listener_key),
node
);
}
ListenerResourceAnnouncer(
Announcer announcer,
String announceBasePath,
HostAndPort node
)
{
this.announcePath = ZKPaths.makePath(announceBasePath, node.toString());
this.announcer = announcer;
}
@LifecycleStart
public void start()
{
synchronized (startStopSync) {
if (started) {
LOG.debug("Already started, ignoring");
return;
}
try {
// Announcement is based on MS. This is to make sure we don't collide on announcements
Thread.sleep(2);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
announcer.announce(announcePath, ANNOUNCE_BYTES);
LOG.info("Announcing start time on [%s]", announcePath);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (startStopSync) {
if (!started) {
LOG.debug("Already stopped, ignoring");
return;
}
announcer.unannounce(announcePath);
LOG.info("Unannouncing start time on [%s]", announcePath);
started = false;
}
}
public byte[] getAnnounceBytes()
{
return ByteBuffer.allocate(ANNOUNCE_BYTES.length).put(ANNOUNCE_BYTES).array();
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.announcer;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
/**
* Even though we provide the mechanism to get zk paths here, we do NOT handle announcing and unannouncing in this module.
* The reason is that it is not appropriate to force a global announce/unannounce since individual listeners may have
* different lifecycles.
*/
public class ListeningAnnouncerConfig
{
@JacksonInject
private final ZkPathsConfig zkPathsConfig;
@JsonProperty("listenersPath")
private String listenersPath = null;
@Inject
public ListeningAnnouncerConfig(
ZkPathsConfig zkPathsConfig
)
{
this.zkPathsConfig = zkPathsConfig;
}
@JsonProperty("listenersPath")
public String getListenersPath()
{
return listenersPath == null ? zkPathsConfig.defaultPath("listeners") : listenersPath;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ListeningAnnouncerConfig that = (ListeningAnnouncerConfig) o;
return !(listenersPath != null ? !listenersPath.equals(that.listenersPath) : that.listenersPath != null);
}
@Override
public int hashCode()
{
return listenersPath != null ? listenersPath.hashCode() : 0;
}
@Override
public String toString()
{
return "ListeningAnnouncerConfig{" +
"listenersPath='" + listenersPath + '\'' +
'}';
}
/**
* Build a path for the particular named listener. The first implementation of this is used with zookeeper, but
* there is nothing restricting its use in a more general pathing (example: http endpoint proxy for raft)
* @param listenerName The key for the listener.
* @return A path appropriate for use in zookeeper to discover the listeners with the particular listener name
*/
public String getAnnouncementPath(String listenerName)
{
return ZKPaths.makePath(
getListenersPath(), Preconditions.checkNotNull(
Strings.emptyToNull(listenerName), "Listener name cannot be null"
)
);
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.resource;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.ServletResourceUtils;
import javax.annotation.Nullable;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
/**
* This is a simplified handler for announcement listeners. The input is expected to be a JSON list objects.
*
* Empty maps `{}` are taken care of at this level and never passed down to the subclass's handle method.
*
* @param <ObjType> A List of this type is expected in the input stream as JSON. Must be able to be converted to/from Map<String, Object>
*/
public abstract class AbstractListenerHandler<ObjType> implements ListenerHandler
{
private static final Logger LOG = new Logger(AbstractListenerHandler.class);
private final TypeReference<ObjType> inObjTypeRef;
/**
* The standard constructor takes in a type reference for the object and for a list of the object.
* This is to work around some limitations in Java with type erasure.
*
* @param inObjTypeRef The TypeReference for the input object type
*/
public AbstractListenerHandler(TypeReference<ObjType> inObjTypeRef)
{
this.inObjTypeRef = inObjTypeRef;
}
@Override
public final Response handlePOST(final InputStream inputStream, final ObjectMapper mapper, final String id)
{
try {
final Object o = post(ImmutableMap.of(id, mapper.<ObjType>readValue(inputStream, inObjTypeRef)));
return Response.status(Response.Status.ACCEPTED).entity(o).build();
}
catch (JsonParseException | JsonMappingException e) {
LOG.debug(e, "Bad request");
return Response.status(Response.Status.BAD_REQUEST).entity(ServletResourceUtils.sanitizeException(e)).build();
}
catch (Exception e) {
LOG.error(e, "Error handling request");
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@Override
public final Response handlePOSTAll(final InputStream inputStream, final ObjectMapper mapper)
{
final Map<String, ObjType> inObjMap;
try {
// This actually fails to properly convert due to type erasure. We'll try again in a second
// This effectively just parses
final Map<String, Object> tempMap = mapper.readValue(inputStream, new TypeReference<Map<String, Object>>()
{
});
// Now do the ACTUAL conversion
inObjMap = ImmutableMap.copyOf(Maps.transformValues(
tempMap,
new Function<Object, ObjType>()
{
@Override
public ObjType apply(Object input)
{
return mapper.convertValue(input, inObjTypeRef);
}
}
));
}
catch (final IOException ex) {
LOG.debug(ex, "Bad request");
return Response.status(Response.Status.BAD_REQUEST).entity(ServletResourceUtils.sanitizeException(ex)).build();
}
final Object returnObj;
try {
returnObj = post(inObjMap);
}
catch (Exception e) {
LOG.error(e, "Error handling request");
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
if (returnObj == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.status(Response.Status.ACCEPTED).entity(returnObj).build();
}
}
@Override
public final Response handleGET(String id)
{
try {
final Object returnObj = get(id);
if (returnObj == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok(returnObj).build();
}
}
catch (Exception e) {
LOG.error(e, "Error handling get request for [%s]", id);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@Override
public final Response handleGETAll()
{
final Map<String, ObjType> all;
try {
all = getAll();
if (all == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok(all).build();
}
}
catch (Exception e) {
LOG.error(e, "Error getting all");
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@Override
public final Response handleDELETE(String id)
{
try {
final Object returnObj = delete(id);
if (returnObj == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.status(Response.Status.ACCEPTED).entity(returnObj).build();
}
}
catch (Exception e) {
LOG.error(e, "Error in processing delete request for [%s]", id);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@Override
public final void use_AbstractListenerHandler_instead()
{
// NOOP
}
/**
* Delete the object for a particular id
*
* @param id A string id of the object to be deleted. This id is never null or empty.
*
* @return The object to be returned in the entity. A NULL return will cause a 404 response. A non-null return will cause a 202 response. An Exception thrown will cause a 500 response.
*/
protected abstract
@Nullable
Object delete(String id);
/**
* Get the object for a particular id
*
* @param id A string id of the object desired. This id is never null or empty.
*
* @return The object to be returned in the entity. A NULL return will cause a 404 response. A non-null return will cause a 200 response. An Exception thrown will cause a 500 response.
*/
protected abstract
@Nullable
Object get(String id);
protected abstract
@Nullable
Map<String, ObjType> getAll();
/**
* Process a POST request of the input items
*
* @param inputObject A list of the objects which were POSTed
*
* @return An object to be returned in the entity of the response.
*
* @throws Exception
*/
public abstract
@Nullable
Object post(Map<String, ObjType> inputObject) throws Exception;
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.resource;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.Response;
import java.io.InputStream;
/**
* A handler for events related to the listening-announcer.
* Developers are *STRONGLY* encouraged to use AbstractListenerHandler instead to adhere to return codes.
*/
public interface ListenerHandler
{
Response handlePOST(InputStream inputStream, ObjectMapper mapper, String id);
Response handlePOSTAll(InputStream inputStream, ObjectMapper mapper);
Response handleGET(String id);
Response handleGETAll();
Response handleDELETE(String id);
void use_AbstractListenerHandler_instead();
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.resource;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.ServletResourceUtils;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.InputStream;
/**
* This is a simple announcement resource that handles simple items that have a POST to an announcement endpoint, a
* GET of something in that endpoint with an ID, and a DELETE to that endpoint with an ID.
*
* The idea of this resource is simply to have a simple endpoint for basic POJO handling assuming the POJO has an ID
* which distinguishes it from others of its kind.
*
* This resource is expected to NOT block for POSTs, and is instead expected to make a best effort at returning
* as quickly as possible. Thus, returning ACCEPTED instead of OK is normal for POST methods here.
*
* Items tagged with a particular ID for an announcement listener are updated by a POST to the announcement listener's
* path "/{announcement}"
*
* Discovery of who can listen to particular announcement keys is not part of this class and should be handled
* by ListenerResourceAnnouncer
*/
public abstract class ListenerResource
{
public static final String BASE_PATH = "/druid/listen/v1";
private static final Logger LOG = new Logger(ListenerResource.class);
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final ListenerHandler handler;
public ListenerResource(
final @Json ObjectMapper jsonMapper,
final @Smile ObjectMapper smileMapper,
final ListenerHandler handler
)
{
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.smileMapper = Preconditions.checkNotNull(smileMapper, "smileMapper");
this.handler = Preconditions.checkNotNull(handler, "listener handler");
}
@POST
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response serviceAnnouncementPOSTAll(
final InputStream inputStream,
final @Context HttpServletRequest req // used only to get request content-type
)
{
final String reqContentType = req.getContentType();
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType);
final ObjectMapper mapper = isSmile ? smileMapper : jsonMapper;
try {
return handler.handlePOSTAll(inputStream, mapper);
}
catch (Exception e) {
LOG.error(e, "Exception in handling POSTAll request");
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@GET
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response getAll()
{
try {
return handler.handleGETAll();
}
catch (Exception e) {
LOG.error(e, "Exception in handling GETAll request");
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@Path("/{id}")
@GET
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response serviceAnnouncementGET(
final @PathParam("id") String id
)
{
if (Strings.isNullOrEmpty(id)) {
return makeNullIdResponse();
}
try {
return handler.handleGET(id);
}
catch (Exception e) {
LOG.error(e, "Exception in handling GET request for [%s]", id);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@Path("/{id}")
@DELETE
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response serviceAnnouncementDELETE(
final @PathParam("id") String id
)
{
if (Strings.isNullOrEmpty(id)) {
return makeNullIdResponse();
}
try {
return handler.handleDELETE(id);
}
catch (Exception e) {
LOG.error(e, "Exception in handling DELETE request for [%s]", id);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
@Path("/{id}")
@POST
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response serviceAnnouncementPOST(
final @PathParam("id") String id,
final InputStream inputStream,
final @Context HttpServletRequest req // used only to get request content-type
)
{
if (Strings.isNullOrEmpty(id)) {
return makeNullIdResponse();
}
final String reqContentType = req.getContentType();
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType);
final ObjectMapper mapper = isSmile ? smileMapper : jsonMapper;
try {
return handler.handlePOST(inputStream, mapper, id);
}
catch (Exception e) {
LOG.error(e, "Exception in handling POST request for ID [%s]", id);
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
}
}
public static Response makeNullIdResponse()
{
return Response
.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(new IllegalArgumentException("Cannot have null or empty id")))
.build();
}
}

View File

@ -0,0 +1,617 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.lookup.cache;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.api.client.http.HttpStatusCodes;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.StreamUtils;
import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.SequenceInputStreamResponseHandler;
import io.druid.audit.AuditInfo;
import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Smile;
import io.druid.query.lookup.LookupModule;
import io.druid.server.listener.announcer.ListenerDiscoverer;
import io.druid.server.listener.resource.ListenerResource;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
public class LookupCoordinatorManager
{
public static final String LOOKUP_CONFIG_KEY = "lookups";
// Doesn't have to be the same, but it makes things easy to look at
public static final String LOOKUP_LISTEN_ANNOUNCE_KEY = LOOKUP_CONFIG_KEY;
private static final EmittingLogger LOG = new EmittingLogger(LookupCoordinatorManager.class);
private static final TypeReference<Map<String, Object>> MAP_STRING_OBJ_TYPE = new TypeReference<Map<String, Object>>()
{
};
private final static Function<HostAndPort, URL> HOST_TO_URL = new Function<HostAndPort, URL>()
{
@Nullable
@Override
public URL apply(HostAndPort input)
{
if (input == null) {
LOG.warn("null entry in lookups");
return null;
}
try {
return getLookupsURL(input);
}
catch (MalformedURLException e) {
LOG.warn(e, "Skipping node. Malformed URL from `%s`", input);
return null;
}
}
};
private final ListeningScheduledExecutorService executorService;
private final ListenerDiscoverer listenerDiscoverer;
private final HttpClient httpClient;
private final ObjectMapper smileMapper;
private final JacksonConfigManager configManager;
private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig;
private final Object startStopSync = new Object();
// Updated by config watching service
private AtomicReference<Map<String, Map<String, Map<String, Object>>>> lookupMapConfigRef;
private volatile Map<String, Map<String, Map<String, Object>>> prior_update = ImmutableMap.of();
private volatile boolean started = false;
@Inject
public LookupCoordinatorManager(
final @Global HttpClient httpClient,
final ListenerDiscoverer listenerDiscoverer,
final @Smile ObjectMapper smileMapper,
final JacksonConfigManager configManager,
final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
)
{
this.listenerDiscoverer = listenerDiscoverer;
this.configManager = configManager;
this.httpClient = httpClient;
this.smileMapper = smileMapper;
this.lookupCoordinatorManagerConfig = lookupCoordinatorManagerConfig;
executorService = MoreExecutors.listeningDecorator(
Executors.newScheduledThreadPool(
lookupCoordinatorManagerConfig.getThreadPoolSize(),
Execs.makeThreadFactory("LookupCoordinatorManager--%s")
)
);
}
void deleteOnHost(final URL url)
throws ExecutionException, InterruptedException, IOException
{
final AtomicInteger returnCode = new AtomicInteger(0);
final AtomicReference<String> reasonString = new AtomicReference<>(null);
LOG.debug("Dropping %s", url);
try (final InputStream result = httpClient.go(
new Request(HttpMethod.DELETE, url)
.addHeader(HttpHeaders.Names.ACCEPT, SmileMediaTypes.APPLICATION_JACKSON_SMILE),
makeResponseHandler(returnCode, reasonString),
lookupCoordinatorManagerConfig.getHostDeleteTimeout()
).get()) {
// 404 is ok here, that means it was already deleted
if (!HttpStatusCodes.isSuccess(returnCode.get()) || HttpStatusCodes.STATUS_CODE_NOT_FOUND != returnCode.get()) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
StreamUtils.copyAndClose(result, baos);
}
catch (IOException e2) {
LOG.warn(e2, "Error reading response from [%s]", url);
}
throw new IOException(
String.format(
"Bad lookup delete request to [%s] : [%d] : [%s] Response: [%s]",
url,
returnCode.get(),
reasonString.get(),
StringUtils.fromUtf8(baos.toByteArray())
)
);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Delete to [%s] : Status: %s reason: [%s]", url, returnCode.get(), reasonString.get());
}
}
}
}
void updateAllOnHost(final URL url, Map<String, Map<String, Object>> knownLookups)
throws IOException, InterruptedException, ExecutionException
{
final AtomicInteger returnCode = new AtomicInteger(0);
final AtomicReference<String> reasonString = new AtomicReference<>(null);
final byte[] bytes;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading up %d lookups to %s", knownLookups.size(), url);
}
bytes = smileMapper.writeValueAsBytes(knownLookups);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
try (final InputStream result = httpClient.go(
new Request(HttpMethod.POST, url)
.addHeader(HttpHeaders.Names.ACCEPT, SmileMediaTypes.APPLICATION_JACKSON_SMILE)
.addHeader(HttpHeaders.Names.CONTENT_TYPE, SmileMediaTypes.APPLICATION_JACKSON_SMILE)
.setContent(bytes),
makeResponseHandler(returnCode, reasonString),
lookupCoordinatorManagerConfig.getHostUpdateTimeout()
).get()) {
if (!HttpStatusCodes.isSuccess(returnCode.get())) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
StreamUtils.copyAndClose(result, baos);
}
catch (IOException e2) {
LOG.warn(e2, "Error reading response");
}
throw new IOException(
String.format(
"Bad update request to [%s] : [%d] : [%s] Response: [%s]",
url,
returnCode.get(),
reasonString.get(),
StringUtils.fromUtf8(baos.toByteArray())
)
);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Update on [%s], Status: %s reason: [%s]", url, returnCode.get(), reasonString.get());
}
final Map<String, Object> resultMap = smileMapper.readValue(result, MAP_STRING_OBJ_TYPE);
final Object missingValuesObject = resultMap.get(LookupModule.FAILED_UPDATES_KEY);
if (null == missingValuesObject) {
throw new IAE("Update result did not have field for [%s]", LookupModule.FAILED_UPDATES_KEY);
}
final Map<String, Object> missingValues = smileMapper.convertValue(missingValuesObject, MAP_STRING_OBJ_TYPE);
if (!missingValues.isEmpty()) {
throw new IAE("Lookups failed to update: %s", smileMapper.writeValueAsString(missingValues.keySet()));
} else {
LOG.debug("Updated all lookups on [%s]", url);
}
}
}
}
// Overridden in unit tests
HttpResponseHandler<InputStream, InputStream> makeResponseHandler(
final AtomicInteger returnCode,
final AtomicReference<String> reasonString
)
{
return new SequenceInputStreamResponseHandler()
{
@Override
public ClientResponse<InputStream> handleResponse(HttpResponse response)
{
returnCode.set(response.getStatus().getCode());
reasonString.set(response.getStatus().getReasonPhrase());
return super.handleResponse(response);
}
};
}
void deleteAllOnTier(final String tier, final Collection<String> dropLookups)
throws ExecutionException, InterruptedException, IOException
{
if (dropLookups.isEmpty()) {
LOG.debug("Nothing to drop");
return;
}
final Collection<URL> urls = getAllHostsAnnounceEndpoint(tier);
final List<ListenableFuture<?>> futures = new ArrayList<>(urls.size());
for (final URL url : urls) {
futures.add(executorService.submit(new Runnable()
{
@Override
public void run()
{
for (final String drop : dropLookups) {
final URL lookupURL;
try {
lookupURL = new URL(
url.getProtocol(),
url.getHost(),
url.getPort(),
String.format("%s/%s", url.getFile(), drop)
);
}
catch (MalformedURLException e) {
throw new ISE(e, "Error creating url for [%s]/[%s]", url, drop);
}
try {
deleteOnHost(lookupURL);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Delete [%s] interrupted", lookupURL);
throw Throwables.propagate(e);
}
catch (IOException | ExecutionException e) {
// Don't raise as ExecutionException. Just log and continue
LOG.makeAlert(e, "Error deleting [%s]", lookupURL).emit();
}
}
}
}));
}
final ListenableFuture allFuture = Futures.allAsList(futures);
try {
allFuture.get(lookupCoordinatorManagerConfig.getUpdateAllTimeout().getMillis(), TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
// This should cause Interrupted exceptions on the offending ones
allFuture.cancel(true);
throw new ExecutionException("Timeout in updating hosts! Attempting to cancel", e);
}
}
void updateAllNewOnTier(final String tier, final Map<String, Map<String, Object>> knownLookups)
throws InterruptedException, ExecutionException, IOException
{
final Collection<URL> urls = Collections2.transform(
listenerDiscoverer.getNewNodes(LookupModule.getTierListenerPath(tier)),
HOST_TO_URL
);
if (urls.isEmpty() || knownLookups.isEmpty()) {
LOG.debug("Nothing new to report");
return;
}
updateNodes(urls, knownLookups);
}
void updateAllOnTier(final String tier, final Map<String, Map<String, Object>> knownLookups)
throws InterruptedException, ExecutionException, IOException
{
updateNodes(getAllHostsAnnounceEndpoint(tier), knownLookups);
}
void updateNodes(Collection<URL> urls, final Map<String, Map<String, Object>> knownLookups)
throws IOException, InterruptedException, ExecutionException
{
if (knownLookups == null) {
LOG.debug("No config for lookups found");
return;
}
if (knownLookups.isEmpty()) {
LOG.debug("No known lookups. Skipping update");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating %d lookups on %d nodes", knownLookups.size(), urls.size());
}
final List<ListenableFuture<?>> futures = new ArrayList<>(urls.size());
for (final URL url : urls) {
futures.add(executorService.submit(new Runnable()
{
@Override
public void run()
{
try {
updateAllOnHost(url, knownLookups);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Update on [%s] interrupted", url);
throw Throwables.propagate(e);
}
catch (IOException | ExecutionException e) {
// Don't raise as ExecutionException. Just log and continue
LOG.makeAlert(e, "Error submitting to [%s]", url).emit();
}
}
}));
}
final ListenableFuture allFuture = Futures.allAsList(futures);
try {
allFuture.get(lookupCoordinatorManagerConfig.getUpdateAllTimeout().getMillis(), TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
LOG.warn("Timeout in updating hosts! Attempting to cancel");
// This should cause Interrupted exceptions on the offending ones
allFuture.cancel(true);
}
}
Collection<URL> getAllHostsAnnounceEndpoint(final String tier) throws IOException
{
return ImmutableList.copyOf(
Collections2.filter(
Collections2.transform(
listenerDiscoverer.getNodes(LookupModule.getTierListenerPath(tier)),
HOST_TO_URL
),
Predicates.notNull()
)
);
}
public boolean updateLookup(
final String tier,
final String lookupName,
Map<String, Object> spec,
final AuditInfo auditInfo
)
{
return updateLookups(
ImmutableMap.<String, Map<String, Map<String, Object>>>of(tier, ImmutableMap.of(lookupName, spec)),
auditInfo
);
}
public boolean updateLookups(final Map<String, Map<String, Map<String, Object>>> updateSpec, AuditInfo auditInfo)
{
synchronized (startStopSync) {
final Map<String, Map<String, Map<String, Object>>> priorSpec = getKnownLookups();
if (priorSpec == null && !updateSpec.isEmpty()) {
// To prevent accidentally erasing configs if we haven't updated our cache of the values
throw new ISE("Not initialized. If this is the first lookup, post an empty map to initialize");
}
final Map<String, Map<String, Map<String, Object>>> updatedSpec;
// Only add or update here, don't delete.
if (priorSpec == null) {
// all new
updatedSpec = updateSpec;
} else {
// Needs update
updatedSpec = new HashMap<>(priorSpec);
for (final String tier : updateSpec.keySet()) {
final Map<String, Map<String, Object>> priorTierSpec = priorSpec.get(tier);
final Map<String, Map<String, Object>> updateTierSpec = updateSpec.get(tier);
if (priorTierSpec == null) {
// New tier
updatedSpec.put(tier, updateTierSpec);
} else {
// Update existing tier
final Map<String, Map<String, Object>> updatedTierSpec = new HashMap<>(priorTierSpec);
updatedTierSpec.putAll(updateTierSpec);
updatedSpec.put(tier, updatedTierSpec);
}
}
}
return configManager.set(LOOKUP_CONFIG_KEY, updatedSpec, auditInfo);
}
}
public Map<String, Map<String, Map<String, Object>>> getKnownLookups()
{
if (!started) {
throw new ISE("Not started");
}
return lookupMapConfigRef.get();
}
public boolean deleteLookup(final String tier, final String lookup, AuditInfo auditInfo)
{
synchronized (startStopSync) {
final Map<String, Map<String, Map<String, Object>>> priorSpec = getKnownLookups();
if (priorSpec == null) {
LOG.warn("Requested delete lookup [%s]/[%s]. But no lookups exist!", tier, lookup);
return false;
}
final Map<String, Map<String, Map<String, Object>>> updateSpec = new HashMap<>(priorSpec);
final Map<String, Map<String, Object>> priorTierSpec = updateSpec.get(tier);
if (priorTierSpec == null) {
LOG.warn("Requested delete of lookup [%s]/[%s] but tier does not exist!", tier, lookup);
return false;
}
if (!priorTierSpec.containsKey(lookup)) {
LOG.warn("Requested delete of lookup [%s]/[%s] but lookup does not exist!", tier, lookup);
return false;
}
final Map<String, Map<String, Object>> updateTierSpec = new HashMap<>(priorTierSpec);
updateTierSpec.remove(lookup);
updateSpec.put(tier, updateTierSpec);
return configManager.set(LOOKUP_CONFIG_KEY, updateSpec, auditInfo);
}
}
public Collection<String> discoverTiers()
{
try {
return listenerDiscoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
/**
* Try to find a lookupName spec for the specified lookupName.
*
* @param lookupName The lookupName to look for
*
* @return The lookupName spec if found or null if not found or if no lookups at all are found
*/
public
@Nullable
Map<String, Object> getLookup(final String tier, final String lookupName)
{
final Map<String, Map<String, Map<String, Object>>> prior = getKnownLookups();
if (prior == null) {
LOG.warn("Requested tier [%s] lookupName [%s]. But no lookups exist!", tier, lookupName);
return null;
}
final Map<String, Map<String, Object>> tierLookups = prior.get(tier);
if (tierLookups == null) {
LOG.warn("Tier [%s] does not exist", tier);
return null;
}
return tierLookups.get(lookupName);
}
@LifecycleStart
public void start()
{
synchronized (startStopSync) {
if (started) {
return;
}
if (executorService.isShutdown()) {
throw new ISE("Cannot restart after stop!");
}
lookupMapConfigRef = configManager.watch(
LOOKUP_CONFIG_KEY,
new TypeReference<Map<String, Map<String, Map<String, Object>>>>()
{
},
null
);
executorService.scheduleWithFixedDelay(
new Runnable()
{
@Override
public void run()
{
final Map<String, Map<String, Map<String, Object>>> allLookupTiers = lookupMapConfigRef.get();
// Sanity check for if we are shutting down
if (Thread.currentThread().isInterrupted()) {
LOG.info("Not updating lookups because process was interrupted");
return;
}
if (!started) {
LOG.info("Not started. Returning");
return;
}
for (final String tier : allLookupTiers.keySet()) {
try {
final Map<String, Map<String, Object>> allLookups = allLookupTiers.get(tier);
final Map<String, Map<String, Object>> oldLookups = prior_update.get(tier);
final Collection<String> drops;
if (oldLookups == null) {
drops = ImmutableList.of();
} else {
drops = Sets.difference(oldLookups.keySet(), allLookups.keySet());
}
if (allLookupTiers == prior_update) {
LOG.debug("No updates");
updateAllNewOnTier(tier, allLookups);
} else {
updateAllOnTier(tier, allLookups);
deleteAllOnTier(tier, drops);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (Exception e) {
LOG.error(e, "Error updating lookups for tier [%s]. Will try again soon", tier);
}
}
prior_update = allLookupTiers;
}
},
0,
lookupCoordinatorManagerConfig.getPeriod(),
TimeUnit.MILLISECONDS
);
started = true;
LOG.debug("Started");
}
}
@LifecycleStop
public void stop()
{
synchronized (startStopSync) {
if (!started) {
LOG.warn("Not started, ignoring stop request");
return;
}
started = false;
executorService.shutdownNow();
// NOTE: we can't un-watch the configuration key
LOG.debug("Stopped");
}
}
static URL getLookupsURL(HostAndPort druidNode) throws MalformedURLException
{
return new URL(
"http",
druidNode.getHostText(),
druidNode.getPortOrDefault(-1),
ListenerResource.BASE_PATH + "/" + LOOKUP_LISTEN_ANNOUNCE_KEY
);
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.lookup.cache;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import javax.validation.constraints.Min;
public class LookupCoordinatorManagerConfig
{
public static final Duration DEFAULT_HOST_DELETE_TIMEOUT = Duration.millis(1_000L);
public static final Duration DEFAULT_HOST_UPDATE_TIMEOUT = Duration.millis(10_000L);
public static final Duration DEFAULT_DELETE_ALL_TIMEOUT = Duration.millis(10_000L);
public static final Duration DEFAULT_UPDATE_ALL_TIMEOUT = Duration.millis(60_000L);
@JsonProperty
private Duration hostDeleteTimeout = null;
@JsonProperty
private Duration hostUpdateTimeout = null;
@JsonProperty
private Duration deleteAllTimeout = null;
@JsonProperty
private Duration updateAllTimeout = null;
@JsonProperty
@Min(1)
private int threadPoolSize = 10;
@JsonProperty
@Min(1)
private long period = 30_000L;
public Duration getHostDeleteTimeout()
{
return hostDeleteTimeout == null ? DEFAULT_HOST_DELETE_TIMEOUT : hostDeleteTimeout;
}
public void setHostDeleteTimeout(Duration hostDeleteTimeout)
{
this.hostDeleteTimeout = hostDeleteTimeout;
}
public Duration getHostUpdateTimeout()
{
return hostUpdateTimeout == null ? DEFAULT_HOST_UPDATE_TIMEOUT : hostUpdateTimeout;
}
public void setHostUpdateTimeout(Duration hostUpdateTimeout)
{
this.hostUpdateTimeout = hostUpdateTimeout;
}
public Duration getDeleteAllTimeout()
{
return deleteAllTimeout == null ? DEFAULT_DELETE_ALL_TIMEOUT : deleteAllTimeout;
}
public void setDeleteAllTimeout(Duration deleteAllTimeout)
{
this.deleteAllTimeout = deleteAllTimeout;
}
public Duration getUpdateAllTimeout()
{
return updateAllTimeout == null ? DEFAULT_UPDATE_ALL_TIMEOUT : updateAllTimeout;
}
public void setUpdateAllTimeout(Duration updateAllTimeout)
{
this.updateAllTimeout = updateAllTimeout;
}
public int getThreadPoolSize()
{
return threadPoolSize;
}
public void setThreadPoolSize(int threadPoolSize)
{
this.threadPoolSize = threadPoolSize;
}
public long getPeriod()
{
return period;
}
public void setPeriod(long period)
{
this.period = period;
}
}

View File

@ -0,0 +1,832 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.metamx.common.StringUtils;
import io.druid.audit.AuditInfo;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class LookupCoordinatorResourceTest
{
private static final ObjectMapper mapper = new DefaultObjectMapper();
private static final String LOOKUP_TIER = "lookupTier";
private static final String LOOKUP_NAME = "lookupName";
private static final Map<String, Map<String, Object>> SINGLE_LOOKUP_MAP = ImmutableMap.<String, Map<String, Object>>of(
LOOKUP_NAME,
ImmutableMap.<String, Object>of()
);
private static final Map<String, Map<String, Map<String, Object>>> SINGLE_TIER_MAP = ImmutableMap.<String, Map<String, Map<String, Object>>>of(
LOOKUP_TIER,
SINGLE_LOOKUP_MAP
);
private static final ByteSource SINGLE_TIER_MAP_SOURCE = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(SINGLE_TIER_MAP)));
}
};
private static final ByteSource EMPTY_MAP_SOURCE = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(ImmutableMap.of())));
}
};
@Test
public void testSimpleGet()
{
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
final Map<String, Map<String, Map<String, Object>>> retVal = new HashMap<>();
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(retVal).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getTiers(false);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(retVal.keySet(), response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testMissingGet()
{
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(null).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getTiers(false);
Assert.assertEquals(404, response.getStatus());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testExceptionalGet()
{
final String errMsg = "some error";
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andThrow(new RuntimeException(errMsg)).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getTiers(false);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testDiscoveryGet()
{
final List<String> tiers = ImmutableList.of();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.discoverTiers()).andReturn(tiers).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getTiers(true);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(tiers, response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testDiscoveryExceptionalGet()
{
final List<String> tiers = ImmutableList.of();
final String errMsg = "some error";
final RuntimeException ex = new RuntimeException(errMsg);
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.discoverTiers()).andThrow(ex).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getTiers(true);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testSimpleGetLookup()
{
final Map<String, Object> map = new HashMap<>();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.getLookup(EasyMock.eq(LOOKUP_TIER), EasyMock.eq(LOOKUP_NAME)))
.andReturn(map)
.once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getSpecificLookup(LOOKUP_TIER, LOOKUP_NAME);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(map, response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testMissingGetLookup()
{
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.getLookup(EasyMock.eq(LOOKUP_TIER), EasyMock.eq(LOOKUP_NAME)))
.andReturn(null)
.once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getSpecificLookup(LOOKUP_TIER, LOOKUP_NAME);
Assert.assertEquals(404, response.getStatus());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testInvalidGetLookup()
{
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
Assert.assertEquals(400, lookupCoordinatorResource.getSpecificLookup("foo", null).getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.getSpecificLookup("foo", "").getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.getSpecificLookup("", "foo").getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.getSpecificLookup(null, "foo").getStatus());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testExceptionalGetLookup()
{
final String errMsg = "some message";
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.getLookup(EasyMock.eq(LOOKUP_TIER), EasyMock.eq(LOOKUP_NAME)))
.andThrow(new RuntimeException(errMsg))
.once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getSpecificLookup(LOOKUP_TIER, LOOKUP_NAME);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testSimpleDelete()
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.deleteLookup(
EasyMock.eq(LOOKUP_TIER),
EasyMock.eq(LOOKUP_NAME),
EasyMock.capture(auditInfoCapture)
)).andReturn(true).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.deleteLookup(
LOOKUP_TIER,
LOOKUP_NAME,
author,
comment,
request
);
Assert.assertEquals(202, response.getStatus());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testMissingDelete()
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.deleteLookup(
EasyMock.eq(LOOKUP_TIER),
EasyMock.eq(LOOKUP_NAME),
EasyMock.capture(auditInfoCapture)
)).andReturn(false).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.deleteLookup(
LOOKUP_TIER,
LOOKUP_NAME,
author,
comment,
request
);
Assert.assertEquals(404, response.getStatus());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testExceptionalDelete()
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final String errMsg = "some error";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.deleteLookup(
EasyMock.eq(LOOKUP_TIER),
EasyMock.eq(LOOKUP_NAME),
EasyMock.capture(auditInfoCapture)
)).andThrow(new RuntimeException(errMsg)).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.deleteLookup(
LOOKUP_TIER,
LOOKUP_NAME,
author,
comment,
request
);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testInvalidDelete()
{
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
Assert.assertEquals(400, lookupCoordinatorResource.deleteLookup("foo", null, null, null, null).getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.deleteLookup(null, null, null, null, null).getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.deleteLookup(null, "foo", null, null, null).getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.deleteLookup("foo", "", null, null, null).getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.deleteLookup("", "foo", null, null, null).getStatus());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testSimpleNew() throws Exception
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getContentType()).andReturn(MediaType.APPLICATION_JSON).once();
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.updateLookups(
EasyMock.eq(SINGLE_TIER_MAP),
EasyMock.capture(auditInfoCapture)
)).andReturn(true).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.updateAllLookups(
SINGLE_TIER_MAP_SOURCE.openStream(),
author,
comment,
request
);
Assert.assertEquals(202, response.getStatus());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testExceptionalNew() throws Exception
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final String errMsg = "some error";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getContentType()).andReturn(MediaType.APPLICATION_JSON).once();
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.updateLookups(
EasyMock.eq(SINGLE_TIER_MAP),
EasyMock.capture(auditInfoCapture)
)).andThrow(new RuntimeException(errMsg)).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.updateAllLookups(
SINGLE_TIER_MAP_SOURCE.openStream(),
author,
comment,
request
);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testFailedNew() throws Exception
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getContentType()).andReturn(MediaType.APPLICATION_JSON).once();
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.updateLookups(
EasyMock.eq(SINGLE_TIER_MAP),
EasyMock.capture(auditInfoCapture)
)).andReturn(false).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.updateAllLookups(
SINGLE_TIER_MAP_SOURCE.openStream(),
author,
comment,
request
);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", "Unknown error updating configuration"), response.getEntity());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testSimpleNewLookup() throws Exception
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getContentType()).andReturn(MediaType.APPLICATION_JSON).once();
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.updateLookup(
EasyMock.eq(LOOKUP_TIER),
EasyMock.eq(LOOKUP_NAME),
EasyMock.eq(ImmutableMap.<String, Object>of()),
EasyMock.capture(auditInfoCapture)
)).andReturn(true).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.createOrUpdateLookup(
LOOKUP_TIER,
LOOKUP_NAME,
author,
comment,
EMPTY_MAP_SOURCE.openStream(),
request
);
Assert.assertEquals(202, response.getStatus());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testDBErrNewLookup() throws Exception
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getContentType()).andReturn(MediaType.APPLICATION_JSON).once();
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.updateLookup(
EasyMock.eq(LOOKUP_TIER),
EasyMock.eq(LOOKUP_NAME),
EasyMock.eq(ImmutableMap.<String, Object>of()),
EasyMock.capture(auditInfoCapture)
)).andReturn(false).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.createOrUpdateLookup(
LOOKUP_TIER,
LOOKUP_NAME,
author,
comment,
EMPTY_MAP_SOURCE.openStream(),
request
);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", "Unknown error updating configuration"), response.getEntity());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testExceptionalNewLookup() throws Exception
{
final String errMsg = "error message";
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
EasyMock.expect(request.getContentType()).andReturn(MediaType.APPLICATION_JSON).once();
EasyMock.expect(request.getRemoteAddr()).andReturn(ip).once();
final Capture<AuditInfo> auditInfoCapture = Capture.newInstance();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.updateLookup(
EasyMock.eq(LOOKUP_TIER),
EasyMock.eq(LOOKUP_NAME),
EasyMock.eq(ImmutableMap.<String, Object>of()),
EasyMock.capture(auditInfoCapture)
)).andThrow(new RuntimeException(errMsg)).once();
EasyMock.replay(lookupCoordinatorManager, request);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.createOrUpdateLookup(
LOOKUP_TIER,
LOOKUP_NAME,
author,
comment,
EMPTY_MAP_SOURCE.openStream(),
request
);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
Assert.assertTrue(auditInfoCapture.hasCaptured());
final AuditInfo auditInfo = auditInfoCapture.getValue();
Assert.assertEquals(author, auditInfo.getAuthor());
Assert.assertEquals(comment, auditInfo.getComment());
Assert.assertEquals(ip, auditInfo.getIp());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testNullValsNewLookup() throws Exception
{
final String author = "some author";
final String comment = "some comment";
final String ip = "127.0.0.1";
final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
EasyMock.replay(lookupCoordinatorManager, request);
Assert.assertEquals(400, lookupCoordinatorResource.createOrUpdateLookup(
null,
LOOKUP_NAME,
author,
comment,
EMPTY_MAP_SOURCE.openStream(),
request
).getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.createOrUpdateLookup(
LOOKUP_TIER,
null,
author,
comment,
EMPTY_MAP_SOURCE.openStream(),
request
).getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.createOrUpdateLookup(
LOOKUP_TIER,
"",
author,
comment,
EMPTY_MAP_SOURCE.openStream(),
request
).getStatus());
Assert.assertEquals(400, lookupCoordinatorResource.createOrUpdateLookup(
"",
LOOKUP_NAME,
author,
comment,
EMPTY_MAP_SOURCE.openStream(),
request
).getStatus());
EasyMock.verify(lookupCoordinatorManager, request);
}
@Test
public void testSimpleGetTier()
{
final String tier = "some tier";
final String lookup = "some lookup";
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(LookupCoordinatorManager.class);
final Map<String, Map<String, Map<String, Object>>> retVal =
ImmutableMap.<String, Map<String, Map<String, Object>>>of(
tier, ImmutableMap.<String, Map<String, Object>>of(lookup, ImmutableMap.<String, Object>of())
);
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(retVal).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(retVal.get(tier).keySet(), response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testMissingGetTier()
{
final String tier = "some tier";
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(LookupCoordinatorManager.class);
final Map<String, Map<String, Map<String, Object>>> retVal =
ImmutableMap.<String, Map<String, Map<String, Object>>>of();
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(retVal).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
Assert.assertEquals(404, response.getStatus());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testNullGetTier()
{
final String tier = null;
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(LookupCoordinatorManager.class);
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
Assert.assertEquals(400, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", "`tier` required"), response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testNullLookupsGetTier()
{
final String tier = "some tier";
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(null).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
Assert.assertEquals(404, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", "No lookups found"), response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
@Test
public void testExceptionalGetTier()
{
final String tier = "some tier";
final String errMsg = "some error";
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andThrow(new RuntimeException(errMsg)).once();
EasyMock.replay(lookupCoordinatorManager);
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
lookupCoordinatorManager,
mapper,
mapper
);
final Response response = lookupCoordinatorResource.getSpecificTier(tier);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
EasyMock.verify(lookupCoordinatorManager);
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.announcer;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.announcement.Announcer;
import io.druid.segment.CloserRule;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class ListenerDiscovererTest extends CuratorTestBase
{
@Rule
public CloserRule closerRule = new CloserRule(true);
@Test(timeout = 60_000L)
public void testFullService() throws Exception
{
final String listenerKey = "listenerKey";
final String listenerTier = "listenerTier";
final String listenerTierChild = "tierChild";
final String tierZkPath = ZKPaths.makePath(listenerTier, listenerTierChild);
setupServerAndCurator();
final ExecutorService executorService = Execs.singleThreaded("listenerDiscovererTest--%s");
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
executorService.shutdownNow();
}
});
closerRule.closeLater(server);
closerRule.closeLater(curator);
curator.start();
curator.blockUntilConnected(10, TimeUnit.SECONDS);
Assert.assertEquals("/druid", curator.create().forPath("/druid"));
final Announcer announcer = new Announcer(curator, executorService);
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
announcer.stop();
}
});
final ListeningAnnouncerConfig config = new ListeningAnnouncerConfig(new ZkPathsConfig());
final ListenerDiscoverer listenerDiscoverer = new ListenerDiscoverer(curator, config);
listenerDiscoverer.start();
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
listenerDiscoverer.stop();
}
});
Assert.assertTrue(listenerDiscoverer.getNodes(listenerKey).isEmpty());
final HostAndPort node = HostAndPort.fromParts("someHost", 8888);
final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer(
announcer,
config,
listenerKey,
node
)
{
};
listenerResourceAnnouncer.start();
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
listenerResourceAnnouncer.stop();
}
});
final ListenerResourceAnnouncer tieredListenerResourceAnnouncer = new ListenerResourceAnnouncer(
announcer,
config,
tierZkPath,
node
)
{
};
tieredListenerResourceAnnouncer.start();
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
tieredListenerResourceAnnouncer.stop();
}
});
announcer.start();
Assert.assertNotNull(curator.checkExists().forPath(config.getAnnouncementPath(listenerKey)));
// Have to wait for background syncing
while (listenerDiscoverer.getNodes(listenerKey).isEmpty()) {
// Will timeout at test's timeout setting
Thread.sleep(1);
}
Assert.assertEquals(
ImmutableSet.of(HostAndPort.fromString(node.toString())),
listenerDiscoverer.getNodes(listenerKey)
);
Assert.assertEquals(
ImmutableSet.of(listenerKey, listenerTier),
ImmutableSet.copyOf(listenerDiscoverer.discoverChildren(null))
);
Assert.assertEquals(
ImmutableSet.of(listenerTierChild),
ImmutableSet.copyOf(listenerDiscoverer.discoverChildren(listenerTier))
);
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.announcer;
import com.google.common.net.HostAndPort;
import com.google.common.primitives.Longs;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.announcement.Announcer;
import io.druid.segment.CloserRule;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class ListenerResourceAnnouncerTest extends CuratorTestBase
{
private final ListeningAnnouncerConfig listeningAnnouncerConfig = new ListeningAnnouncerConfig(new ZkPathsConfig());
private final String listenerKey = "someKey";
private final String announcePath = listeningAnnouncerConfig.getAnnouncementPath(listenerKey);
@Rule
public CloserRule closerRule = new CloserRule(true);
private ExecutorService executorService;
@Before
public void setUp()
{
executorService = Execs.singleThreaded("listener-resource--%d");
}
@After
public void tearDown()
{
executorService.shutdownNow();
}
@Test
public void testAnnouncerBehaves() throws Exception
{
setupServerAndCurator();
closerRule.closeLater(server);
curator.start();
closerRule.closeLater(curator);
Assert.assertNotNull(curator.create().forPath("/druid"));
Assert.assertTrue(curator.blockUntilConnected(10, TimeUnit.SECONDS));
final Announcer announcer = new Announcer(curator, executorService);
final HostAndPort node = HostAndPort.fromString("localhost");
final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer(
announcer,
listeningAnnouncerConfig,
listenerKey,
node
)
{
};
listenerResourceAnnouncer.start();
announcer.start();
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
announcer.stop();
}
});
Assert.assertNotNull(curator.checkExists().forPath(announcePath));
final String nodePath = ZKPaths.makePath(announcePath, node.getHostText());
Assert.assertNotNull(curator.checkExists().forPath(nodePath));
Assert.assertEquals(Longs.BYTES, curator.getData().decompressed().forPath(nodePath).length);
Assert.assertNull(curator.checkExists()
.forPath(listeningAnnouncerConfig.getAnnouncementPath(listenerKey + "FOO")));
listenerResourceAnnouncer.stop();
listenerResourceAnnouncer.start();
listenerResourceAnnouncer.start();
listenerResourceAnnouncer.stop();
listenerResourceAnnouncer.stop();
listenerResourceAnnouncer.start();
listenerResourceAnnouncer.stop();
listenerResourceAnnouncer.start();
listenerResourceAnnouncer.stop();
Assert.assertNull(curator.checkExists().forPath(nodePath));
}
@Test
public void testStartCorrect() throws Exception
{
final Announcer announcer = EasyMock.createStrictMock(Announcer.class);
final HostAndPort node = HostAndPort.fromString("some_host");
final ListenerResourceAnnouncer resourceAnnouncer = new ListenerResourceAnnouncer(
announcer,
listeningAnnouncerConfig,
listenerKey,
node
)
{
};
announcer.announce(
EasyMock.eq(ZKPaths.makePath(announcePath, node.getHostText())),
EasyMock.aryEq(resourceAnnouncer.getAnnounceBytes())
);
EasyMock.expectLastCall().once();
EasyMock.replay(announcer);
resourceAnnouncer.start();
EasyMock.verify(announcer);
}
}

View File

@ -0,0 +1,271 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.resource;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.StringUtils;
import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class AbstractListenerHandlerTest
{
final ObjectMapper mapper = new DefaultObjectMapper();
final AtomicBoolean failPost = new AtomicBoolean(false);
final String error_msg = "err message";
final Object good_object = new Object();
final AtomicBoolean shouldFail = new AtomicBoolean(false);
final AtomicBoolean returnEmpty = new AtomicBoolean(false);
final String error_message = "some error message";
final String good_id = "good id";
final String error_id = "error id";
final Map<String, SomeBeanClass> all = ImmutableMap.of();
final Object obj = new Object();
final String valid_id = "some_id";
final AbstractListenerHandler<SomeBeanClass> abstractListenerHandler =
new AbstractListenerHandler<SomeBeanClass>(SomeBeanClass.TYPE_REFERENCE)
{
@Nullable
@Override
public Object post(@NotNull Map<String, SomeBeanClass> inputObject) throws Exception
{
if (failPost.get()) {
throw new Exception(error_msg);
}
return inputObject.isEmpty() ? null : inputObject;
}
@Nullable
@Override
protected Object get(@NotNull String id)
{
if (error_id.equals(id)) {
throw new RuntimeException(error_message);
}
return good_id.equals(id) ? good_object : null;
}
@Nullable
@Override
protected Map<String, SomeBeanClass> getAll()
{
if (shouldFail.get()) {
throw new RuntimeException(error_message);
}
return returnEmpty.get() ? null : all;
}
@Nullable
@Override
protected Object delete(@NotNull String id)
{
if (error_id.equals(id)) {
throw new RuntimeException(error_msg);
}
return valid_id.equals(id) ? obj : null;
}
};
@Before
public void setUp()
{
mapper.registerSubtypes(SomeBeanClass.class);
}
@Test
public void testSimple() throws Exception
{
final SomeBeanClass val = new SomeBeanClass("a");
final ByteArrayInputStream bais = new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(val)));
final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id);
Assert.assertEquals(202, response.getStatus());
Assert.assertEquals(ImmutableMap.of(good_id, val), response.getEntity());
}
@Test
public void testSimpleAll() throws Exception
{
final Map<String, SomeBeanClass> val = ImmutableMap.of("a", new SomeBeanClass("a"));
final ByteArrayInputStream bais = new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
val
)
)
);
final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper);
Assert.assertEquals(202, response.getStatus());
Assert.assertEquals(val, response.getEntity());
}
@Test
public void testMissingAll() throws Exception
{
final Map<String, SomeBeanClass> val = ImmutableMap.of();
final ByteArrayInputStream bais = new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
val
)
)
);
final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper);
Assert.assertEquals(404, response.getStatus());
}
@Test
public void testErrorAll() throws Exception
{
final Map<String, SomeBeanClass> val = ImmutableMap.of();
final ByteArrayInputStream bais = new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
val
)
)
);
failPost.set(true);
final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity());
}
@Test
public void testError() throws Exception
{
final ByteArrayInputStream bais = new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(new SomeBeanClass(
"a"))));
failPost.set(true);
final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity());
}
@Test
public void testBadInput() throws Exception
{
final ByteArrayInputStream bais = new ByteArrayInputStream(new byte[]{0, 0, 0});
final Response response = abstractListenerHandler.handlePOST(bais, mapper, good_id);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testBadInnerInput() throws Exception
{
final ByteArrayInputStream bais = new ByteArrayInputStream(new byte[]{});
final ObjectMapper mapper = EasyMock.createStrictMock(ObjectMapper.class);
EasyMock.expect(mapper.readValue(EasyMock.<InputStream>anyObject(), EasyMock.<TypeReference<Object>>anyObject()))
.andThrow(new IOException());
EasyMock.replay(mapper);
final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper);
Assert.assertEquals(400, response.getStatus());
EasyMock.verify(mapper);
}
@Test
public void testHandleSimpleDELETE() throws Exception
{
final Response response = abstractListenerHandler.handleDELETE(valid_id);
Assert.assertEquals(202, response.getStatus());
Assert.assertEquals(obj, response.getEntity());
}
@Test
public void testMissingDELETE() throws Exception
{
final Response response = abstractListenerHandler.handleDELETE("not going to find it");
Assert.assertEquals(404, response.getStatus());
}
@Test
public void testErrorDELETE() throws Exception
{
final Response response = abstractListenerHandler.handleDELETE(error_id);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity());
}
@Test
public void testHandle() throws Exception
{
final Response response = abstractListenerHandler.handleGET(good_id);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(good_object, response.getEntity());
}
@Test
public void testMissingHandle() throws Exception
{
final Response response = abstractListenerHandler.handleGET("neva gonna get it");
Assert.assertEquals(404, response.getStatus());
}
@Test
public void testExceptionalHandle() throws Exception
{
final Response response = abstractListenerHandler.handleGET(error_id);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_message), response.getEntity());
}
@Test
public void testHandleAll() throws Exception
{
final Response response = abstractListenerHandler.handleGETAll();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(all, response.getEntity());
}
@Test
public void testExceptionalHandleAll() throws Exception
{
shouldFail.set(true);
final Response response = abstractListenerHandler.handleGETAll();
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", error_message), response.getEntity());
}
@Test
public void testMissingHandleAll() throws Exception
{
returnEmpty.set(true);
final Response response = abstractListenerHandler.handleGETAll();
Assert.assertEquals(404, response.getStatus());
}
}

View File

@ -0,0 +1,496 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.resource;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.metamx.common.StringUtils;
import io.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class ListenerResourceTest
{
static final String ANN_ID = "announce_id";
HttpServletRequest req;
final ObjectMapper mapper = new DefaultObjectMapper();
private static final ByteSource EMPTY_JSON_MAP = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return new ByteArrayInputStream(StringUtils.toUtf8("{}"));
}
};
@Before
public void setUp() throws Exception
{
mapper.registerSubtypes(SomeBeanClass.class);
req = EasyMock.createNiceMock(HttpServletRequest.class);
EasyMock.expect(req.getContentType()).andReturn(MediaType.APPLICATION_JSON).anyTimes();
EasyMock.replay(req);
}
@After
public void tearDown() throws Exception
{
}
@Test
public void testServiceAnnouncementPOSTExceptionInHandler() throws Exception
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handlePOST(
EasyMock.<InputStream>anyObject(),
EasyMock.<ObjectMapper>anyObject(),
EasyMock.anyString()
)).andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.serviceAnnouncementPOST("id", EMPTY_JSON_MAP.openStream(), req).getStatus()
);
EasyMock.verify(req, handler);
}
@Test
public void testServiceAnnouncementPOSTAllExceptionInHandler() throws Exception
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handlePOSTAll(EasyMock.<InputStream>anyObject(), EasyMock.<ObjectMapper>anyObject()))
.andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.serviceAnnouncementPOSTAll(EMPTY_JSON_MAP.openStream(), req).getStatus()
);
EasyMock.verify(req, handler);
}
@Test
public void testServiceAnnouncementPOST() throws Exception
{
final AtomicInteger c = new AtomicInteger(0);
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
new ExceptionalAbstractListenerHandler()
{
@Override
public Object post(Map<String, SomeBeanClass> l)
{
c.incrementAndGet();
return l;
}
}
)
{
};
Assert.assertEquals(
202,
resource.serviceAnnouncementPOSTAll(EMPTY_JSON_MAP.openStream(), req).getStatus()
);
Assert.assertEquals(1, c.get());
EasyMock.verify(req);
}
@Test
public void testServiceAnnouncementGET() throws Exception
{
final AtomicInteger c = new AtomicInteger(0);
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Override
public Object get(String id)
{
c.incrementAndGet();
return ANN_ID.equals(id) ? ANN_ID : null;
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
Assert.assertEquals(
Response.Status.OK.getStatusCode(),
resource.serviceAnnouncementGET(ANN_ID).getStatus()
);
Assert.assertEquals(1, c.get());
EasyMock.verify(req);
}
@Test
public void testServiceAnnouncementGETNull() throws Exception
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler();
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
Assert.assertEquals(
400,
resource.serviceAnnouncementGET(null).getStatus()
);
Assert.assertEquals(
400,
resource.serviceAnnouncementGET("").getStatus()
);
EasyMock.verify(req);
}
@Test
public void testServiceAnnouncementGETExceptionInHandler() throws Exception
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handleGET(EasyMock.anyString())).andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.serviceAnnouncementGET("id").getStatus()
);
EasyMock.verify(handler);
}
@Test
public void testServiceAnnouncementGETAllExceptionInHandler() throws Exception
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handleGETAll()).andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.getAll().getStatus()
);
EasyMock.verify(handler);
}
@Test
public void testServiceAnnouncementDELETENullID() throws Exception
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler();
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
Assert.assertEquals(
Response.Status.BAD_REQUEST.getStatusCode(),
resource.serviceAnnouncementDELETE(null).getStatus()
);
}
@Test
public void testServiceAnnouncementDELETEExceptionInHandler() throws Exception
{
final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class);
EasyMock.expect(handler.handleDELETE(EasyMock.anyString())).andThrow(new RuntimeException("test"));
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
EasyMock.replay(handler);
Assert.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resource.serviceAnnouncementDELETE("id").getStatus()
);
EasyMock.verify(handler);
}
@Test
public void testServiceAnnouncementDELETE() throws Exception
{
final AtomicInteger c = new AtomicInteger(0);
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Override
public Object delete(String id)
{
c.incrementAndGet();
return ANN_ID.equals(id) ? ANN_ID : null;
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
Assert.assertEquals(
202,
resource.serviceAnnouncementDELETE(ANN_ID).getStatus()
);
Assert.assertEquals(1, c.get());
EasyMock.verify(req);
}
@Test
// Take a list of strings wrap them in a JSON POJO and get them back as an array string in the POST function
public void testAbstractPostHandler() throws Exception
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Nullable
@Override
public String post(
@NotNull Map<String, SomeBeanClass> inputObject
) throws Exception
{
return mapper.writeValueAsString(inputObject);
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
final List<String> strings = ImmutableList.of("test1", "test2");
final Map<String, SomeBeanClass> expectedMap = new HashMap<>();
for (final String str : strings) {
expectedMap.put(str, new SomeBeanClass(str));
}
final String expectedString = mapper.writeValueAsString(expectedMap);
final Response response = resource.serviceAnnouncementPOSTAll(
new ByteArrayInputStream(StringUtils.toUtf8(expectedString)),
req
);
Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
Assert.assertEquals(expectedString, response.getEntity());
}
@Test
public void testAbstractPostHandlerEmptyList() throws Exception
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Override
public String post(Map<String, SomeBeanClass> inputObject) throws Exception
{
return mapper.writeValueAsString(inputObject);
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
final Response response = resource.serviceAnnouncementPOSTAll(
EMPTY_JSON_MAP.openStream(),
req
);
Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
Assert.assertEquals("{}", response.getEntity());
}
@Test
public void testAbstractPostHandlerException() throws Exception
{
final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler()
{
@Override
public String post(Map<String, SomeBeanClass> inputObject) throws Exception
{
throw new UnsupportedOperationException("nope!");
}
};
final ListenerResource resource = new ListenerResource(
mapper,
mapper,
handler
)
{
};
final Response response = resource.serviceAnnouncementPOSTAll(
new ByteArrayInputStream(
StringUtils.toUtf8(
mapper.writeValueAsString(
ImmutableMap.of("test1", new SomeBeanClass("test1"), "test2", new SomeBeanClass("test2"))
)
)
),
req
);
Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
}
}
@JsonTypeName("someBean")
class SomeBeanClass
{
protected static final TypeReference<SomeBeanClass> TYPE_REFERENCE = new TypeReference<SomeBeanClass>()
{
};
private final String p;
@JsonCreator
public SomeBeanClass(
@JsonProperty("p") String p
)
{
this.p = p;
}
@JsonProperty
public String getP()
{
return this.p;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SomeBeanClass that = (SomeBeanClass) o;
return p != null ? p.equals(that.p) : that.p == null;
}
@Override
public int hashCode()
{
return p != null ? p.hashCode() : 0;
}
@Override
public String toString()
{
return "SomeBeanClass{" +
"p='" + p + '\'' +
'}';
}
}
class ExceptionalAbstractListenerHandler extends AbstractListenerHandler<SomeBeanClass>
{
public ExceptionalAbstractListenerHandler()
{
super(SomeBeanClass.TYPE_REFERENCE);
}
@Nullable
@Override
protected Object delete(@NotNull String id)
{
throw new UnsupportedOperationException("should not have called DELETE");
}
@Nullable
@Override
protected Object get(@NotNull String id)
{
throw new UnsupportedOperationException("should not have called GET");
}
@Nullable
@Override
protected Map<String, SomeBeanClass> getAll()
{
throw new UnsupportedOperationException("should not have called GET all");
}
@Nullable
@Override
public Object post(@NotNull Map<String, SomeBeanClass> inputObject) throws Exception
{
throw new UnsupportedOperationException("should not have called post");
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.lookup.cache;
import io.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
public class LookupCoordinatorManagerConfigTest
{
@Test
public void testConfigsTakeOverrides()
{
final Duration funnyDuration = Duration.standardDays(100);
final LookupCoordinatorManagerConfig config = new LookupCoordinatorManagerConfig();
config.setDeleteAllTimeout(funnyDuration);
config.setHostDeleteTimeout(funnyDuration);
config.setHostUpdateTimeout(funnyDuration);
config.setUpdateAllTimeout(funnyDuration);
config.setPeriod(funnyDuration.getMillis());
config.setThreadPoolSize(1200);
Assert.assertEquals(funnyDuration, config.getDeleteAllTimeout());
Assert.assertEquals(funnyDuration, config.getHostDeleteTimeout());
Assert.assertEquals(funnyDuration, config.getHostUpdateTimeout());
Assert.assertEquals(funnyDuration, config.getUpdateAllTimeout());
Assert.assertEquals(funnyDuration.getMillis(), config.getPeriod());
Assert.assertEquals(1200, config.getThreadPoolSize());
}
@Test
public void testSimpleConfigDefaults()
{
final LookupCoordinatorManagerConfig config = new LookupCoordinatorManagerConfig();
Assert.assertEquals(LookupCoordinatorManagerConfig.DEFAULT_DELETE_ALL_TIMEOUT, config.getDeleteAllTimeout());
Assert.assertEquals(LookupCoordinatorManagerConfig.DEFAULT_HOST_DELETE_TIMEOUT, config.getHostDeleteTimeout());
Assert.assertEquals(LookupCoordinatorManagerConfig.DEFAULT_HOST_UPDATE_TIMEOUT, config.getHostUpdateTimeout());
Assert.assertEquals(LookupCoordinatorManagerConfig.DEFAULT_UPDATE_ALL_TIMEOUT, config.getUpdateAllTimeout());
Assert.assertEquals(10, config.getThreadPoolSize());
Assert.assertEquals(30_000, config.getPeriod());
}
}

File diff suppressed because it is too large Load Diff

View File

@ -74,7 +74,7 @@ public class CliBroker extends ServerRunnable
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
return ImmutableList.of(
new Module()
{
@Override

View File

@ -54,6 +54,7 @@ import io.druid.server.http.CoordinatorRedirectInfo;
import io.druid.server.http.CoordinatorResource;
import io.druid.server.http.DatasourcesResource;
import io.druid.server.http.IntervalsResource;
import io.druid.server.http.LookupCoordinatorResource;
import io.druid.server.http.MetadataResource;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
@ -61,6 +62,9 @@ import io.druid.server.http.RulesResource;
import io.druid.server.http.ServersResource;
import io.druid.server.http.TiersResource;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.listener.announcer.ListenerDiscoverer;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
import io.druid.server.router.TieredBrokerConfig;
import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
@ -92,7 +96,9 @@ public class CliCoordinator extends ServerRunnable
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to(TieredBrokerConfig.DEFAULT_COORDINATOR_SERVICE_NAME);
binder.bindConstant()
.annotatedWith(Names.named("serviceName"))
.to(TieredBrokerConfig.DEFAULT_COORDINATOR_SERVICE_NAME);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8081);
ConfigProvider.bind(binder, DruidCoordinatorConfig.class);
@ -103,6 +109,7 @@ public class CliCoordinator extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.lookups", LookupCoordinatorManagerConfig.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);
@ -124,8 +131,13 @@ public class CliCoordinator extends ServerRunnable
binder.bind(DruidCoordinator.class);
binder.bind(LookupCoordinatorManager.class).in(ManageLifecycle.class);
binder.bind(ListenerDiscoverer.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, ListenerDiscoverer.class);
LifecycleModule.register(binder, MetadataStorage.class);
LifecycleModule.register(binder, DruidCoordinator.class);
LifecycleModule.register(binder, LookupCoordinatorManager.class);
binder.bind(JettyServerInitializer.class)
.to(CoordinatorJettyServerInitializer.class);
@ -138,6 +150,7 @@ public class CliCoordinator extends ServerRunnable
Jerseys.addResource(binder, DatasourcesResource.class);
Jerseys.addResource(binder, MetadataResource.class);
Jerseys.addResource(binder, IntervalsResource.class);
Jerseys.addResource(binder, LookupCoordinatorResource.class);
LifecycleModule.register(binder, Server.class);
LifecycleModule.register(binder, DatasourcesResource.class);

View File

@ -64,7 +64,7 @@ public class CliHistorical extends ServerRunnable
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
return ImmutableList.of(
new Module()
{
@Override

View File

@ -49,7 +49,7 @@ public class CliRealtime extends ServerRunnable
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
return ImmutableList.of(
new RealtimeModule(),
new Module()
{

View File

@ -30,6 +30,7 @@ import io.druid.client.InventoryView;
import io.druid.client.ServerView;
import io.druid.guice.LazySingleton;
import io.druid.guice.RealtimeModule;
import io.druid.query.lookup.LookupModule;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
@ -58,7 +59,7 @@ public class CliRealtimeExample extends ServerRunnable
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
return ImmutableList.of(
new RealtimeModule(),
new Module()
{
@ -74,7 +75,8 @@ public class CliRealtimeExample extends ServerRunnable
binder.bind(ServerView.class).to(NoopServerView.class).in(LazySingleton.class);
}
},
new ChatHandlerServerModule()
new ChatHandlerServerModule(),
new LookupModule()
);
}

View File

@ -67,7 +67,7 @@ public class CliRouter extends ServerRunnable
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
return ImmutableList.of(
new JettyHttpClientModule("druid.router.http", Router.class),
new Module()
{