Async lookups-cached-global by default (#3074)

* Async lookups-cached-global by default
* Also better lookup docs

* Fix test timeouts

* Fix timing of deserialized test

* Fix problem with 0 wait failing immediately
This commit is contained in:
Charles Allen 2016-06-03 13:58:10 -07:00 committed by Slim
parent a2290a8f05
commit 8cac710546
4 changed files with 85 additions and 9 deletions

View File

@ -70,9 +70,11 @@ The parameters are as follows
|Property|Description|Required|Default| |Property|Description|Required|Default|
|--------|-----------|--------|-------| |--------|-----------|--------|-------|
|`extractionNamespace`|Specifies how to populate the local cache. See below|Yes|-| |`extractionNamespace`|Specifies how to populate the local cache. See below|Yes|-|
|`firstCacheTimeout`|How long to wait (in ms) for the first run of the cache to populate. 0 indicates to not wait|No|`60000` (1 minute)| |`firstCacheTimeout`|How long to wait (in ms) for the first run of the cache to populate. 0 indicates to not wait|No|`0` (do not wait)|
|`injective`|If the underlying map is injective (keys and values are unique) then optimizations can occur internally by setting this to `true`|No|`false`| |`injective`|If the underlying map is injective (keys and values are unique) then optimizations can occur internally by setting this to `true`|No|`false`|
If `firstCacheTimeout` is set to a non-zero value, it should be less than `druid.manager.lookups.hostUpdateTimeout`. If `firstCacheTimeout` is NOT set, then management is essentially asynchronous and does not know if a lookup succeeded or failed in starting. In such a case logs from the lookup nodes should be monitored for repeated failures.
Proper functionality of Namespaced lookups requires the following extension to be loaded on the broker, peon, and historical nodes: Proper functionality of Namespaced lookups requires the following extension to be loaded on the broker, peon, and historical nodes:
`druid-lookups-cached-global` `druid-lookups-cached-global`

View File

@ -294,6 +294,16 @@ To configure a Broker / Router / Historical / Peon to announce itself as part of
|`druid.lookup.lookupTier`| The tier for **lookups** for this node. This is independent of other tiers.|`__default`| |`druid.lookup.lookupTier`| The tier for **lookups** for this node. This is independent of other tiers.|`__default`|
|`druid.lookup.lookupTierIsDatasource`|For some things like indexing service tasks, the datasource is passed in the runtime properties of a task. This option fetches the tierName from the same value as the datasource for the task. It is suggested to only use this as peon options for the indexing service, if at all. If true, `druid.lookup.lookupTier` MUST NOT be specified|`"false"`| |`druid.lookup.lookupTierIsDatasource`|For some things like indexing service tasks, the datasource is passed in the runtime properties of a task. This option fetches the tierName from the same value as the datasource for the task. It is suggested to only use this as peon options for the indexing service, if at all. If true, `druid.lookup.lookupTier` MUST NOT be specified|`"false"`|
To configure the behavior of the dynamic configuration manager, use the following properties on the coordinator:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.manager.lookups.hostDeleteTimeout`|Timeout (in ms) PER HOST for processing DELETE requests for dropping lookups|`1000`(1 second)|
|`druid.manager.lookups.hostUpdateTimeout`|Timeout (in ms) PER HOST for processing an update/add (POST) for new or updated lookups|`10000`(10 seconds)|
|`druid.manager.lookups.updateAllTimeout`|Timeout (in ms) TOTAL for processing update/adds on ALL hosts. Safety valve in case too many hosts timeout on their update|`60000`(1 minute)|
|`druid.manager.lookups.period`|How long to pause between management cycles|`30000`(30 seconds)|
|`druid.manager.lookups.threadPoolSize`|Number of service nodes that can be managed concurrently|`10`|
## Saving configuration across restarts ## Saving configuration across restarts
It is possible to save the configuration across restarts such that a node will not have to wait for coordinator action to re-populate its lookups. To do this the following property is set: It is possible to save the configuration across restarts such that a node will not have to wait for coordinator action to re-populate its lookups. To do this the following property is set:

View File

@ -52,7 +52,6 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
{ {
private static final Logger LOG = new Logger(NamespaceLookupExtractorFactory.class); private static final Logger LOG = new Logger(NamespaceLookupExtractorFactory.class);
private static final long DEFAULT_SCHEDULE_TIMEOUT = 60_000;
private static final byte[] CLASS_CACHE_KEY; private static final byte[] CLASS_CACHE_KEY;
static { static {
@ -73,7 +72,7 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
@JsonCreator @JsonCreator
public NamespaceLookupExtractorFactory( public NamespaceLookupExtractorFactory(
@JsonProperty("extractionNamespace") ExtractionNamespace extractionNamespace, @JsonProperty("extractionNamespace") ExtractionNamespace extractionNamespace,
@JsonProperty("firstCacheTimeout") Long firstCacheTimeout, @JsonProperty("firstCacheTimeout") long firstCacheTimeout,
@JsonProperty("injective") boolean injective, @JsonProperty("injective") boolean injective,
@JacksonInject final NamespaceExtractionCacheManager manager @JacksonInject final NamespaceExtractionCacheManager manager
) )
@ -82,7 +81,7 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
extractionNamespace, extractionNamespace,
"extractionNamespace should be specified" "extractionNamespace should be specified"
); );
this.firstCacheTimeout = firstCacheTimeout == null ? DEFAULT_SCHEDULE_TIMEOUT : firstCacheTimeout; this.firstCacheTimeout = firstCacheTimeout;
Preconditions.checkArgument(this.firstCacheTimeout >= 0); Preconditions.checkArgument(this.firstCacheTimeout >= 0);
this.injective = injective; this.injective = injective;
this.manager = manager; this.manager = manager;
@ -154,7 +153,7 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
NamespaceExtractionCacheManager manager NamespaceExtractionCacheManager manager
) )
{ {
this(extractionNamespace, null, false, manager); this(extractionNamespace, 60000, false, manager);
} }
@Override @Override
@ -167,9 +166,16 @@ public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
LOG.warn("Already started! [%s]", extractorID); LOG.warn("Already started! [%s]", extractorID);
return true; return true;
} }
if (!manager.scheduleAndWait(extractorID, extractionNamespace, firstCacheTimeout)) { if(firstCacheTimeout > 0) {
LOG.error("Failed to schedule lookup [%s]", extractorID); if (!manager.scheduleAndWait(extractorID, extractionNamespace, firstCacheTimeout)) {
return false; LOG.error("Failed to schedule and wait for lookup [%s]", extractorID);
return false;
}
} else {
if(!manager.scheduleOrUpdate(extractorID, extractionNamespace)) {
LOG.error("Failed to schedule lookup [%s]", extractorID);
return false;
}
} }
LOG.debug("NamespaceLookupExtractorFactory[%s] started", extractorID); LOG.debug("NamespaceLookupExtractorFactory[%s] started", extractorID);
started = true; started = true;

View File

@ -147,6 +147,64 @@ public class NamespaceLookupExtractorFactoryTest
EasyMock.verify(cacheManager); EasyMock.verify(cacheManager);
} }
@Test
public void testStartReturnsImmediately()
{
final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
{
@Override
public long getPollMs()
{
return 0;
}
};
EasyMock.expect(cacheManager.scheduleOrUpdate(
EasyMock.anyString(),
EasyMock.eq(extractionNamespace)
)).andReturn(true).once();
EasyMock.expect(
cacheManager.checkedDelete(EasyMock.anyString())
).andReturn(true).once();
EasyMock.replay(cacheManager);
final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
extractionNamespace,
0,
false,
cacheManager
);
Assert.assertTrue(namespaceLookupExtractorFactory.start());
Assert.assertTrue(namespaceLookupExtractorFactory.close());
EasyMock.verify(cacheManager);
}
@Test
public void testStartReturnsImmediatelyAndFails()
{
final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
{
@Override
public long getPollMs()
{
return 0;
}
};
EasyMock.expect(cacheManager.scheduleOrUpdate(
EasyMock.anyString(),
EasyMock.eq(extractionNamespace)
)).andReturn(false).once();
EasyMock.replay(cacheManager);
final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
extractionNamespace,
0,
false,
cacheManager
);
Assert.assertFalse(namespaceLookupExtractorFactory.start());
EasyMock.verify(cacheManager);
}
@Test @Test
public void testSimpleStartStopStop() public void testSimpleStartStopStop()
{ {
@ -499,7 +557,7 @@ public class NamespaceLookupExtractorFactoryTest
); );
final ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); final ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
mapper.registerSubtypes(NamespaceLookupExtractorFactory.class); mapper.registerSubtypes(NamespaceLookupExtractorFactory.class);
final String str = "{ \"type\": \"cachedNamespace\", \"extractionNamespace\": { \"type\": \"staticMap\", \"map\": {\"foo\":\"bar\"} } }"; final String str = "{ \"type\": \"cachedNamespace\", \"extractionNamespace\": { \"type\": \"staticMap\", \"map\": {\"foo\":\"bar\"} }, \"firstCacheTimeout\":10000 }";
final LookupExtractorFactory lookupExtractorFactory = mapper.readValue(str, LookupExtractorFactory.class); final LookupExtractorFactory lookupExtractorFactory = mapper.readValue(str, LookupExtractorFactory.class);
Assert.assertTrue(lookupExtractorFactory.start()); Assert.assertTrue(lookupExtractorFactory.start());
try { try {