mirror of https://github.com/apache/druid.git
Add namespace extraction thread config (#4833)
This commit is contained in:
parent
07446ef32c
commit
b56a907145
|
@ -139,6 +139,7 @@ setting namespaces (broker, peon, historical)
|
||||||
|Property|Description|Default|
|
|Property|Description|Default|
|
||||||
|--------|-----------|-------|
|
|--------|-----------|-------|
|
||||||
|`druid.lookup.namespace.cache.type`|Specifies the type of caching to be used by the namespaces. May be one of [`offHeap`, `onHeap`]. `offHeap` uses a temporary file for off-heap storage of the namespace (memory mapped files). `onHeap` stores all cache on the heap in standard java map types.|`onHeap`|
|
|`druid.lookup.namespace.cache.type`|Specifies the type of caching to be used by the namespaces. May be one of [`offHeap`, `onHeap`]. `offHeap` uses a temporary file for off-heap storage of the namespace (memory mapped files). `onHeap` stores all cache on the heap in standard java map types.|`onHeap`|
|
||||||
|
|`druid.lookup.namespace.numExtractionThreads`|The number of threads in the thread pool dedicated for lookup extraction and updates. This number may need to be scaled up, if you have a lot of lookups and they take long time to extract, to avoid timeouts.|2|
|
||||||
|
|
||||||
The cache is populated in different ways depending on the settings below. In general, most namespaces employ
|
The cache is populated in different ways depending on the settings below. In general, most namespaces employ
|
||||||
a `pollPeriod` at the end of which time they poll the remote resource of interest for updates.
|
a `pollPeriod` at the end of which time they poll the remote resource of interest for updates.
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.lookup.namespace;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
public class NamespaceExtractionConfig
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* The default value of two is chosen because the overhead of having an extra idle thread of the minimum priority is
|
||||||
|
* very low, but having more than one thread may save when one namespace extraction is stuck or taking too long time,
|
||||||
|
* so all the others won't queue up and timeout.
|
||||||
|
*/
|
||||||
|
@JsonProperty
|
||||||
|
private int numExtractionThreads = 2;
|
||||||
|
|
||||||
|
public int getNumExtractionThreads()
|
||||||
|
{
|
||||||
|
return numExtractionThreads;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNumExtractionThreads(int numExtractionThreads)
|
||||||
|
{
|
||||||
|
this.numExtractionThreads = numExtractionThreads;
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,6 +26,7 @@ import com.google.inject.Binder;
|
||||||
import com.google.inject.Key;
|
import com.google.inject.Key;
|
||||||
import com.google.inject.TypeLiteral;
|
import com.google.inject.TypeLiteral;
|
||||||
import com.google.inject.multibindings.MapBinder;
|
import com.google.inject.multibindings.MapBinder;
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
import io.druid.guice.LazySingleton;
|
import io.druid.guice.LazySingleton;
|
||||||
import io.druid.guice.PolyBind;
|
import io.druid.guice.PolyBind;
|
||||||
import io.druid.initialization.DruidModule;
|
import io.druid.initialization.DruidModule;
|
||||||
|
@ -77,6 +78,8 @@ public class NamespaceExtractionModule implements DruidModule
|
||||||
@Override
|
@Override
|
||||||
public void configure(Binder binder)
|
public void configure(Binder binder)
|
||||||
{
|
{
|
||||||
|
JsonConfigProvider.bind(binder, "druid.lookup.namespace", NamespaceExtractionConfig.class);
|
||||||
|
|
||||||
PolyBind
|
PolyBind
|
||||||
.createChoiceWithDefault(binder, TYPE_PREFIX, Key.get(NamespaceExtractionCacheManager.class), "onHeap")
|
.createChoiceWithDefault(binder, TYPE_PREFIX, Key.get(NamespaceExtractionCacheManager.class), "onHeap")
|
||||||
.in(LazySingleton.class);
|
.in(LazySingleton.class);
|
||||||
|
|
|
@ -26,6 +26,7 @@ import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import io.druid.java.util.common.concurrent.ExecutorServices;
|
import io.druid.java.util.common.concurrent.ExecutorServices;
|
||||||
import io.druid.java.util.common.lifecycle.Lifecycle;
|
import io.druid.java.util.common.lifecycle.Lifecycle;
|
||||||
import io.druid.java.util.common.logger.Logger;
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;
|
||||||
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -45,10 +46,14 @@ public abstract class NamespaceExtractionCacheManager
|
||||||
|
|
||||||
private final ScheduledThreadPoolExecutor scheduledExecutorService;
|
private final ScheduledThreadPoolExecutor scheduledExecutorService;
|
||||||
|
|
||||||
public NamespaceExtractionCacheManager(final Lifecycle lifecycle, final ServiceEmitter serviceEmitter)
|
public NamespaceExtractionCacheManager(
|
||||||
|
final Lifecycle lifecycle,
|
||||||
|
final ServiceEmitter serviceEmitter,
|
||||||
|
final NamespaceExtractionConfig config
|
||||||
|
)
|
||||||
{
|
{
|
||||||
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
||||||
1,
|
config.getNumExtractionThreads(),
|
||||||
new ThreadFactoryBuilder()
|
new ThreadFactoryBuilder()
|
||||||
.setDaemon(true)
|
.setDaemon(true)
|
||||||
.setNameFormat("NamespaceExtractionCacheManager-%d")
|
.setNameFormat("NamespaceExtractionCacheManager-%d")
|
||||||
|
|
|
@ -25,6 +25,7 @@ import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
import io.druid.java.util.common.lifecycle.Lifecycle;
|
import io.druid.java.util.common.lifecycle.Lifecycle;
|
||||||
import io.druid.java.util.common.logger.Logger;
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;
|
||||||
import org.mapdb.DB;
|
import org.mapdb.DB;
|
||||||
import org.mapdb.DBMaker;
|
import org.mapdb.DBMaker;
|
||||||
import org.mapdb.HTreeMap;
|
import org.mapdb.HTreeMap;
|
||||||
|
@ -132,9 +133,13 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC
|
||||||
private AtomicInteger cacheCount = new AtomicInteger(0);
|
private AtomicInteger cacheCount = new AtomicInteger(0);
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public OffHeapNamespaceExtractionCacheManager(Lifecycle lifecycle, ServiceEmitter serviceEmitter)
|
public OffHeapNamespaceExtractionCacheManager(
|
||||||
|
Lifecycle lifecycle,
|
||||||
|
ServiceEmitter serviceEmitter,
|
||||||
|
NamespaceExtractionConfig config
|
||||||
|
)
|
||||||
{
|
{
|
||||||
super(lifecycle, serviceEmitter);
|
super(lifecycle, serviceEmitter, config);
|
||||||
try {
|
try {
|
||||||
tmpFile = File.createTempFile("druidMapDB", getClass().getCanonicalName());
|
tmpFile = File.createTempFile("druidMapDB", getClass().getCanonicalName());
|
||||||
log.info("Using file [%s] for mapDB off heap namespace cache", tmpFile.getAbsolutePath());
|
log.info("Using file [%s] for mapDB off heap namespace cache", tmpFile.getAbsolutePath());
|
||||||
|
|
|
@ -26,6 +26,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
import io.druid.java.util.common.ISE;
|
import io.druid.java.util.common.ISE;
|
||||||
import io.druid.java.util.common.lifecycle.Lifecycle;
|
import io.druid.java.util.common.lifecycle.Lifecycle;
|
||||||
import io.druid.java.util.common.logger.Logger;
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;
|
||||||
|
|
||||||
import java.lang.ref.WeakReference;
|
import java.lang.ref.WeakReference;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -55,9 +56,13 @@ public class OnHeapNamespaceExtractionCacheManager extends NamespaceExtractionCa
|
||||||
);
|
);
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public OnHeapNamespaceExtractionCacheManager(Lifecycle lifecycle, ServiceEmitter serviceEmitter)
|
public OnHeapNamespaceExtractionCacheManager(
|
||||||
|
Lifecycle lifecycle,
|
||||||
|
ServiceEmitter serviceEmitter,
|
||||||
|
NamespaceExtractionConfig config
|
||||||
|
)
|
||||||
{
|
{
|
||||||
super(lifecycle, serviceEmitter);
|
super(lifecycle, serviceEmitter, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expungeCollectedCaches()
|
private void expungeCollectedCaches()
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class NamespacedExtractorModuleTest
|
||||||
scheduler = new CacheScheduler(
|
scheduler = new CacheScheduler(
|
||||||
noopServiceEmitter,
|
noopServiceEmitter,
|
||||||
factoryMap,
|
factoryMap,
|
||||||
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter)
|
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter, new NamespaceExtractionConfig())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class StaticMapCacheGeneratorTest
|
||||||
scheduler = new CacheScheduler(
|
scheduler = new CacheScheduler(
|
||||||
noopServiceEmitter,
|
noopServiceEmitter,
|
||||||
Collections.<Class<? extends ExtractionNamespace>, CacheGenerator<?>>emptyMap(),
|
Collections.<Class<? extends ExtractionNamespace>, CacheGenerator<?>>emptyMap(),
|
||||||
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter)
|
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter, new NamespaceExtractionConfig())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -180,7 +180,11 @@ public class UriCacheGeneratorTest
|
||||||
@Override
|
@Override
|
||||||
public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
|
public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
|
||||||
{
|
{
|
||||||
return new OnHeapNamespaceExtractionCacheManager(lifecycle, new NoopServiceEmitter());
|
return new OnHeapNamespaceExtractionCacheManager(
|
||||||
|
lifecycle,
|
||||||
|
new NoopServiceEmitter(),
|
||||||
|
new NamespaceExtractionConfig()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
new Function<Lifecycle, NamespaceExtractionCacheManager>()
|
new Function<Lifecycle, NamespaceExtractionCacheManager>()
|
||||||
|
@ -188,7 +192,11 @@ public class UriCacheGeneratorTest
|
||||||
@Override
|
@Override
|
||||||
public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
|
public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
|
||||||
{
|
{
|
||||||
return new OffHeapNamespaceExtractionCacheManager(lifecycle, new NoopServiceEmitter());
|
return new OffHeapNamespaceExtractionCacheManager(
|
||||||
|
lifecycle,
|
||||||
|
new NoopServiceEmitter(),
|
||||||
|
new NamespaceExtractionConfig()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
@ -33,6 +33,7 @@ import io.druid.query.lookup.namespace.CacheGenerator;
|
||||||
import io.druid.query.lookup.namespace.ExtractionNamespace;
|
import io.druid.query.lookup.namespace.ExtractionNamespace;
|
||||||
import io.druid.query.lookup.namespace.UriExtractionNamespace;
|
import io.druid.query.lookup.namespace.UriExtractionNamespace;
|
||||||
import io.druid.query.lookup.namespace.UriExtractionNamespaceTest;
|
import io.druid.query.lookup.namespace.UriExtractionNamespaceTest;
|
||||||
|
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;
|
||||||
import io.druid.server.metrics.NoopServiceEmitter;
|
import io.druid.server.metrics.NoopServiceEmitter;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -76,7 +77,11 @@ public class CacheSchedulerTest
|
||||||
@Override
|
@Override
|
||||||
public NamespaceExtractionCacheManager apply(@Nullable Lifecycle lifecycle)
|
public NamespaceExtractionCacheManager apply(@Nullable Lifecycle lifecycle)
|
||||||
{
|
{
|
||||||
return new OnHeapNamespaceExtractionCacheManager(lifecycle, new NoopServiceEmitter());
|
return new OnHeapNamespaceExtractionCacheManager(
|
||||||
|
lifecycle,
|
||||||
|
new NoopServiceEmitter(),
|
||||||
|
new NamespaceExtractionConfig()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
public static final Function<Lifecycle, NamespaceExtractionCacheManager> CREATE_OFF_HEAP_CACHE_MANAGER =
|
public static final Function<Lifecycle, NamespaceExtractionCacheManager> CREATE_OFF_HEAP_CACHE_MANAGER =
|
||||||
|
@ -86,7 +91,11 @@ public class CacheSchedulerTest
|
||||||
@Override
|
@Override
|
||||||
public NamespaceExtractionCacheManager apply(@Nullable Lifecycle lifecycle)
|
public NamespaceExtractionCacheManager apply(@Nullable Lifecycle lifecycle)
|
||||||
{
|
{
|
||||||
return new OffHeapNamespaceExtractionCacheManager(lifecycle, new NoopServiceEmitter());
|
return new OffHeapNamespaceExtractionCacheManager(
|
||||||
|
lifecycle,
|
||||||
|
new NoopServiceEmitter(),
|
||||||
|
new NamespaceExtractionConfig()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ import io.druid.query.lookup.namespace.CacheGenerator;
|
||||||
import io.druid.query.lookup.namespace.ExtractionNamespace;
|
import io.druid.query.lookup.namespace.ExtractionNamespace;
|
||||||
import io.druid.query.lookup.namespace.JdbcExtractionNamespace;
|
import io.druid.query.lookup.namespace.JdbcExtractionNamespace;
|
||||||
import io.druid.server.lookup.namespace.JdbcCacheGenerator;
|
import io.druid.server.lookup.namespace.JdbcCacheGenerator;
|
||||||
|
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;
|
||||||
import io.druid.server.metrics.NoopServiceEmitter;
|
import io.druid.server.metrics.NoopServiceEmitter;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -224,7 +225,7 @@ public class JdbcExtractionNamespaceTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter)
|
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter, new NamespaceExtractionConfig())
|
||||||
);
|
);
|
||||||
try {
|
try {
|
||||||
lifecycle.start();
|
lifecycle.start();
|
||||||
|
|
Loading…
Reference in New Issue