mirror of https://github.com/apache/druid.git
Merge pull request #564 from metamx/cacheable-objectstrategy
Cacheable ObjectStrategy
This commit is contained in:
commit
10e925aa67
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server;
|
||||
package io.druid.common.utils;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
|
@ -44,7 +44,9 @@ import io.druid.guice.JsonConfigProvider;
|
|||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.indexer.partitions.PartitionsSpec;
|
||||
import io.druid.indexer.path.PathSpec;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -81,7 +83,7 @@ public class HadoopDruidIndexerConfig
|
|||
|
||||
static {
|
||||
injector = Initialization.makeInjectorWithModules(
|
||||
Initialization.makeStartupInjector(),
|
||||
GuiceInjectors.makeStartupInjector(),
|
||||
ImmutableList.<Object>of(
|
||||
new Module()
|
||||
{
|
||||
|
@ -166,12 +168,14 @@ public class HadoopDruidIndexerConfig
|
|||
|
||||
private volatile HadoopIngestionSpec schema;
|
||||
private volatile PathSpec pathSpec;
|
||||
private volatile ColumnConfig columnConfig;
|
||||
|
||||
@JsonCreator
|
||||
public HadoopDruidIndexerConfig(
|
||||
final @JsonProperty("schema") HadoopIngestionSpec schema
|
||||
)
|
||||
{
|
||||
this.columnConfig = columnConfig;
|
||||
this.schema = schema;
|
||||
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
|
||||
}
|
||||
|
@ -182,6 +186,11 @@ public class HadoopDruidIndexerConfig
|
|||
return schema;
|
||||
}
|
||||
|
||||
public ColumnConfig getColumnConfig()
|
||||
{
|
||||
return columnConfig;
|
||||
}
|
||||
|
||||
public String getDataSource()
|
||||
{
|
||||
return schema.getDataSchema().getDataSource();
|
||||
|
|
|
@ -45,7 +45,6 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void shouldMakeHDFSCompliantSegmentOutputPath()
|
||||
{
|
||||
|
|
|
@ -49,7 +49,6 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
|
@ -187,7 +186,8 @@ public class TaskToolbox
|
|||
return retVal;
|
||||
}
|
||||
|
||||
public void pushSegments(Iterable<DataSegment> segments) throws IOException {
|
||||
public void pushSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
// Request segment pushes for each set
|
||||
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
|
||||
segments,
|
||||
|
|
|
@ -29,6 +29,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
|
|||
import io.druid.indexing.common.config.TaskConfig;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.loading.DataSegmentArchiver;
|
||||
import io.druid.segment.loading.DataSegmentKiller;
|
||||
import io.druid.segment.loading.DataSegmentMover;
|
||||
|
|
|
@ -42,8 +42,9 @@ import io.druid.indexing.common.TaskToolbox;
|
|||
import io.druid.indexing.common.actions.LockAcquireAction;
|
||||
import io.druid.indexing.common.actions.LockTryAcquireAction;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.server.initialization.ExtensionsConfig;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.tesla.aether.internal.DefaultTeslaAether;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -63,7 +64,7 @@ public class HadoopIndexTask extends AbstractTask
|
|||
private static final ExtensionsConfig extensionsConfig;
|
||||
|
||||
static {
|
||||
extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class);
|
||||
extensionsConfig = GuiceInjectors.makeStartupInjector().getInstance(ExtensionsConfig.class);
|
||||
}
|
||||
|
||||
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.indexing.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
@ -43,6 +44,7 @@ import io.druid.query.QueryRunner;
|
|||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeIOConfig;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
|
|
@ -48,6 +48,8 @@ import java.io.File;
|
|||
|
||||
public class TaskSerdeTest
|
||||
{
|
||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
||||
@Test
|
||||
public void testIndexTaskSerde() throws Exception
|
||||
{
|
||||
|
@ -68,7 +70,6 @@ public class TaskSerdeTest
|
|||
-1
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
|
||||
jsonMapper.registerModule(jacksonModule);
|
||||
}
|
||||
|
@ -102,7 +103,6 @@ public class TaskSerdeTest
|
|||
)
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -131,7 +131,6 @@ public class TaskSerdeTest
|
|||
new Interval("2010-01-01/P1D")
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -153,7 +152,6 @@ public class TaskSerdeTest
|
|||
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -177,7 +175,6 @@ public class TaskSerdeTest
|
|||
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -208,7 +205,6 @@ public class TaskSerdeTest
|
|||
null
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -251,7 +247,6 @@ public class TaskSerdeTest
|
|||
new Interval("2010-01-01/P1D")
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -269,7 +264,6 @@ public class TaskSerdeTest
|
|||
@Test
|
||||
public void testDeleteTaskFromJson() throws Exception
|
||||
{
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final DeleteTask task = (DeleteTask) jsonMapper.readValue(
|
||||
"{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}",
|
||||
Task.class
|
||||
|
@ -300,7 +294,6 @@ public class TaskSerdeTest
|
|||
)
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -325,7 +318,6 @@ public class TaskSerdeTest
|
|||
new Interval("2010-01-01/P1D")
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -350,7 +342,6 @@ public class TaskSerdeTest
|
|||
new Interval("2010-01-01/P1D")
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -375,7 +366,6 @@ public class TaskSerdeTest
|
|||
ImmutableMap.<String, Object>of("bucket", "hey", "baseKey", "what")
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
|
@ -434,7 +424,6 @@ public class TaskSerdeTest
|
|||
null
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class);
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import io.druid.data.input.InputRow;
|
|||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.indexing.common.SegmentLoaderFactory;
|
||||
import io.druid.indexing.common.TaskLock;
|
||||
|
|
|
@ -20,11 +20,17 @@
|
|||
package io.druid.indexing.worker;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
|
||||
import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector;
|
||||
import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import io.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||
import io.druid.indexing.common.IndexingServiceCondition;
|
||||
import io.druid.indexing.common.SegmentLoaderFactory;
|
||||
|
@ -38,6 +44,7 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
|
|||
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
|
||||
import io.druid.indexing.worker.config.WorkerConfig;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.loading.DataSegmentPuller;
|
||||
import io.druid.segment.loading.LocalDataSegmentPuller;
|
||||
import io.druid.segment.loading.OmniSegmentLoader;
|
||||
|
@ -61,6 +68,40 @@ import java.util.List;
|
|||
public class WorkerTaskMonitorTest
|
||||
{
|
||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
private static final Injector injector = Guice.createInjector(
|
||||
new com.google.inject.Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.bind(ColumnConfig.class).toInstance(
|
||||
new ColumnConfig()
|
||||
{
|
||||
@Override
|
||||
public int columnCacheSizeBytes()
|
||||
{
|
||||
return 1024 * 1024;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
static {
|
||||
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
|
||||
|
||||
jsonMapper.setInjectableValues(new GuiceInjectableValues(injector));
|
||||
jsonMapper.setAnnotationIntrospectors(
|
||||
new AnnotationIntrospectorPair(
|
||||
guiceIntrospector, jsonMapper.getSerializationConfig().getAnnotationIntrospector()
|
||||
),
|
||||
new AnnotationIntrospectorPair(
|
||||
guiceIntrospector, jsonMapper.getDeserializationConfig().getAnnotationIntrospector()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private static final Joiner joiner = Joiner.on("/");
|
||||
private static final String basePath = "/test/druid";
|
||||
private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.initialization;
|
||||
package io.druid.guice;
|
||||
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.initialization;
|
||||
package io.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.guice;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.util.Modules;
|
||||
import io.druid.guice.ConfigModule;
|
||||
import io.druid.guice.DruidGuiceExtensions;
|
||||
import io.druid.guice.DruidSecondaryModule;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.PropertiesModule;
|
||||
import io.druid.jackson.JacksonModule;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class GuiceInjectors
|
||||
{
|
||||
public static Injector makeStartupInjector()
|
||||
{
|
||||
return Guice.createInjector(
|
||||
new DruidGuiceExtensions(),
|
||||
new JacksonModule(),
|
||||
new PropertiesModule("runtime.properties"),
|
||||
new ConfigModule(),
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.bind(DruidSecondaryModule.class);
|
||||
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public static Injector makeStartupInjectorWithModules(Iterable<Module> modules)
|
||||
{
|
||||
List<Module> theModules = Lists.newArrayList();
|
||||
theModules.add(new DruidGuiceExtensions());
|
||||
theModules.add(new JacksonModule());
|
||||
theModules.add(new PropertiesModule("runtime.properties"));
|
||||
theModules.add(new ConfigModule());
|
||||
theModules.add(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.bind(DruidSecondaryModule.class);
|
||||
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
|
||||
}
|
||||
}
|
||||
);
|
||||
for (Module theModule : modules) {
|
||||
theModules.add(theModule);
|
||||
}
|
||||
|
||||
|
||||
return Guice.createInjector(theModules);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.metamx.common.ISE;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.initialization.DruidModule;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ModuleList
|
||||
{
|
||||
private final Injector baseInjector;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final List<Module> modules;
|
||||
|
||||
public ModuleList(Injector baseInjector)
|
||||
{
|
||||
this.baseInjector = baseInjector;
|
||||
this.jsonMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Json.class));
|
||||
this.smileMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Smile.class));
|
||||
this.modules = Lists.newArrayList();
|
||||
}
|
||||
|
||||
public List<Module> getModules()
|
||||
{
|
||||
return Collections.unmodifiableList(modules);
|
||||
}
|
||||
|
||||
public void addModule(Object input)
|
||||
{
|
||||
if (input instanceof DruidModule) {
|
||||
baseInjector.injectMembers(input);
|
||||
modules.add(registerJacksonModules(((DruidModule) input)));
|
||||
} else if (input instanceof Module) {
|
||||
baseInjector.injectMembers(input);
|
||||
modules.add((Module) input);
|
||||
} else if (input instanceof Class) {
|
||||
if (DruidModule.class.isAssignableFrom((Class) input)) {
|
||||
modules.add(registerJacksonModules(baseInjector.getInstance((Class<? extends DruidModule>) input)));
|
||||
} else if (Module.class.isAssignableFrom((Class) input)) {
|
||||
modules.add(baseInjector.getInstance((Class<? extends Module>) input));
|
||||
return;
|
||||
} else {
|
||||
throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class);
|
||||
}
|
||||
} else {
|
||||
throw new ISE("Unknown module type[%s]", input.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
public void addModules(Object... object)
|
||||
{
|
||||
for (Object o : object) {
|
||||
addModule(o);
|
||||
}
|
||||
}
|
||||
|
||||
private DruidModule registerJacksonModules(DruidModule module)
|
||||
{
|
||||
for (com.fasterxml.jackson.databind.Module jacksonModule : module.getJacksonModules()) {
|
||||
jsonMapper.registerModule(jacksonModule);
|
||||
smileMapper.registerModule(jacksonModule);
|
||||
}
|
||||
return module;
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.initialization;
|
||||
package io.druid.guice;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
|
@ -17,14 +17,13 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server;
|
||||
package io.druid.query;
|
||||
|
||||
import com.metamx.common.concurrent.ExecutorServiceConfig;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import org.skife.config.Config;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class DruidProcessingConfig extends ExecutorServiceConfig
|
||||
public abstract class DruidProcessingConfig extends ExecutorServiceConfig implements ColumnConfig
|
||||
{
|
||||
@Config({"druid.computation.buffer.size", "${base_path}.buffer.sizeBytes"})
|
||||
public int intermediateComputeSizeBytes()
|
||||
|
@ -39,4 +38,10 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig
|
|||
final int processors = Runtime.getRuntime().availableProcessors();
|
||||
return processors > 1 ? processors - 1 : processors;
|
||||
}
|
||||
|
||||
@Config(value = "${base_path}.columnCache.sizeBytes")
|
||||
public int columnCacheSizeBytes()
|
||||
{
|
||||
return 1024 * 1024;
|
||||
}
|
||||
}
|
|
@ -128,8 +128,9 @@ public class HyperUniquesSerde extends ComplexMetricSerde
|
|||
@Override
|
||||
public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
{
|
||||
buffer.limit(buffer.position() + numBytes);
|
||||
return HyperLogLogCollector.makeCollector(buffer);
|
||||
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
|
||||
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
|
||||
return HyperLogLogCollector.makeCollector(readOnlyBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.segment;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -31,6 +32,9 @@ import com.google.common.io.ByteStreams;
|
|||
import com.google.common.io.Closeables;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.metamx.collections.spatial.ImmutableRTree;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
|
@ -41,9 +45,13 @@ import com.metamx.common.io.smoosh.SmooshedWriter;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.common.utils.SerializerUtils;
|
||||
import io.druid.guice.ConfigProvider;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.DruidProcessingConfig;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.column.ColumnDescriptor;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.ArrayIndexed;
|
||||
|
@ -90,6 +98,9 @@ public class IndexIO
|
|||
{
|
||||
public static final byte V8_VERSION = 0x8;
|
||||
public static final byte V9_VERSION = 0x9;
|
||||
public static final int CURRENT_VERSION_ID = V9_VERSION;
|
||||
|
||||
public static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder();
|
||||
|
||||
private static final Map<Integer, IndexLoader> indexLoaders =
|
||||
ImmutableMap.<Integer, IndexLoader>builder()
|
||||
|
@ -107,13 +118,33 @@ public class IndexIO
|
|||
|
||||
private static final EmittingLogger log = new EmittingLogger(IndexIO.class);
|
||||
private static final SerializerUtils serializerUtils = new SerializerUtils();
|
||||
public static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder();
|
||||
|
||||
// This should really be provided by DI, should be changed once we switch around to using a DI framework
|
||||
private static final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
private static final ObjectMapper mapper;
|
||||
protected static final ColumnConfig columnConfig;
|
||||
|
||||
static {
|
||||
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
|
||||
ImmutableList.<Module>of(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
ConfigProvider.bind(
|
||||
binder,
|
||||
DruidProcessingConfig.class,
|
||||
ImmutableMap.of("base_path", "druid.processing")
|
||||
);
|
||||
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
mapper = injector.getInstance(ObjectMapper.class);
|
||||
columnConfig = injector.getInstance(ColumnConfig.class);
|
||||
}
|
||||
|
||||
private static volatile IndexIOHandler handler = null;
|
||||
public static final int CURRENT_VERSION_ID = V9_VERSION;
|
||||
|
||||
@Deprecated
|
||||
public static MMappedIndex mapDir(final File inDir) throws IOException
|
||||
|
@ -626,7 +657,10 @@ public class IndexIO
|
|||
.setHasMultipleValues(true)
|
||||
.setDictionaryEncodedColumn(
|
||||
new DictionaryEncodedColumnSupplier(
|
||||
index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension))
|
||||
index.getDimValueLookup(dimension),
|
||||
null,
|
||||
index.getDimColumn(dimension),
|
||||
columnConfig.columnCacheSizeBytes()
|
||||
)
|
||||
)
|
||||
.setBitmapIndex(
|
||||
|
@ -738,7 +772,7 @@ public class IndexIO
|
|||
ColumnDescriptor serde = mapper.readValue(
|
||||
serializerUtils.readString(byteBuffer), ColumnDescriptor.class
|
||||
);
|
||||
return serde.read(byteBuffer);
|
||||
return serde.read(byteBuffer, columnConfig);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ import io.druid.common.utils.JodaUtils;
|
|||
import io.druid.common.utils.SerializerUtils;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.ToLowerCaseAggregatorFactory;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.data.ByteBufferWriter;
|
||||
import io.druid.segment.data.CompressedLongsSupplierSerializer;
|
||||
import io.druid.segment.data.ConciseCompressedIndexedInts;
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.segment;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
|
@ -38,12 +39,12 @@ import io.druid.segment.column.ValueType;
|
|||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.data.Offset;
|
||||
import io.druid.segment.data.SingleIndexedInts;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -186,6 +187,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
{
|
||||
final Offset baseOffset = offset.clone();
|
||||
|
||||
final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
|
||||
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
|
||||
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
|
||||
final Map<String, Object> objectColumnCache = Maps.newHashMap();
|
||||
|
@ -270,12 +272,16 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
public DimensionSelector makeDimensionSelector(String dimension)
|
||||
{
|
||||
final String dimensionName = dimension.toLowerCase();
|
||||
|
||||
DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimensionName);
|
||||
final Column columnDesc = index.getColumn(dimensionName);
|
||||
if (columnDesc == null) {
|
||||
return null;
|
||||
|
||||
if (cachedColumn == null && columnDesc != null) {
|
||||
cachedColumn = columnDesc.getDictionaryEncoding();
|
||||
dictionaryColumnCache.put(dimensionName, cachedColumn);
|
||||
}
|
||||
|
||||
final DictionaryEncodedColumn column = columnDesc.getDictionaryEncoding();
|
||||
final DictionaryEncodedColumn column = cachedColumn;
|
||||
|
||||
if (column == null) {
|
||||
return null;
|
||||
|
@ -313,7 +319,27 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
return new SingleIndexedInts(column.getSingleValueRow(cursorOffset.getOffset()));
|
||||
// using an anonymous class is faster than creating a class that stores a copy of the value
|
||||
return new IndexedInts()
|
||||
{
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
return column.getSingleValueRow(cursorOffset.getOffset());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return Iterators.singletonIterator(column.getSingleValueRow(cursorOffset.getOffset()));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -536,6 +562,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
public void close() throws IOException
|
||||
{
|
||||
CloseQuietly.close(timestamps);
|
||||
for (DictionaryEncodedColumn column : dictionaryColumnCache.values()) {
|
||||
CloseQuietly.close(column);
|
||||
}
|
||||
for (GenericColumn column : genericColumnCache.values()) {
|
||||
CloseQuietly.close(column);
|
||||
}
|
||||
|
@ -620,6 +649,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
*/
|
||||
public Sequence<Cursor> build()
|
||||
{
|
||||
final Map<String, DictionaryEncodedColumn> dictionaryColumnCache = Maps.newHashMap();
|
||||
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
|
||||
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
|
||||
final Map<String, Object> objectColumnCache = Maps.newHashMap();
|
||||
|
@ -697,41 +727,45 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
public DimensionSelector makeDimensionSelector(String dimension)
|
||||
{
|
||||
final String dimensionName = dimension.toLowerCase();
|
||||
final Column column = index.getColumn(dimensionName);
|
||||
if (column == null) {
|
||||
return null;
|
||||
|
||||
DictionaryEncodedColumn cachedColumn = dictionaryColumnCache.get(dimensionName);
|
||||
final Column columnDesc = index.getColumn(dimensionName);
|
||||
|
||||
if (cachedColumn == null && columnDesc != null) {
|
||||
cachedColumn = columnDesc.getDictionaryEncoding();
|
||||
dictionaryColumnCache.put(dimensionName, cachedColumn);
|
||||
}
|
||||
|
||||
final DictionaryEncodedColumn dict = column.getDictionaryEncoding();
|
||||
final DictionaryEncodedColumn column = cachedColumn;
|
||||
|
||||
if (dict == null) {
|
||||
if (column == null) {
|
||||
return null;
|
||||
} else if (column.getCapabilities().hasMultipleValues()) {
|
||||
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
return dict.getMultiValueRow(currRow);
|
||||
return column.getMultiValueRow(currRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return dict.getCardinality();
|
||||
return column.getCardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
final String retVal = dict.lookupName(id);
|
||||
final String retVal = column.lookupName(id);
|
||||
return retVal == null ? "" : retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
return dict.lookupId(name);
|
||||
return column.lookupId(name);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
|
@ -740,25 +774,45 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
return new SingleIndexedInts(dict.getSingleValueRow(currRow));
|
||||
// using an anonymous class is faster than creating a class that stores a copy of the value
|
||||
return new IndexedInts()
|
||||
{
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
return column.getSingleValueRow(currRow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return Iterators.singletonIterator(column.getSingleValueRow(currRow));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return dict.getCardinality();
|
||||
return column.getCardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return dict.lookupName(id);
|
||||
return column.lookupName(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
return dict.lookupId(name);
|
||||
return column.lookupId(name);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -963,6 +1017,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
public void close() throws IOException
|
||||
{
|
||||
CloseQuietly.close(timestamps);
|
||||
for (DictionaryEncodedColumn column : dictionaryColumnCache.values()) {
|
||||
CloseQuietly.close(column);
|
||||
}
|
||||
for (GenericColumn column : genericColumnCache.values()) {
|
||||
CloseQuietly.close(column);
|
||||
}
|
||||
|
@ -979,31 +1036,4 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static class NullDimensionSelector implements DimensionSelector
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
return new SingleIndexedInts(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.segment.column;
|
||||
|
||||
public interface ColumnConfig
|
||||
{
|
||||
public int columnCacheSizeBytes();
|
||||
}
|
|
@ -92,14 +92,14 @@ public class ColumnDescriptor
|
|||
}
|
||||
}
|
||||
|
||||
public Column read(ByteBuffer buffer)
|
||||
public Column read(ByteBuffer buffer, ColumnConfig columnConfig)
|
||||
{
|
||||
final ColumnBuilder builder = new ColumnBuilder()
|
||||
.setType(valueType)
|
||||
.setHasMultipleValues(hasMultipleValues);
|
||||
|
||||
for (ColumnPartSerde part : parts) {
|
||||
part.read(buffer, builder);
|
||||
part.read(buffer, builder, columnConfig);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
|
|
|
@ -21,9 +21,11 @@ package io.druid.segment.column;
|
|||
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface DictionaryEncodedColumn
|
||||
public interface DictionaryEncodedColumn extends Closeable
|
||||
{
|
||||
public int length();
|
||||
public boolean hasMultipleValues();
|
||||
|
|
|
@ -24,6 +24,8 @@ import io.druid.segment.data.IndexedInts;
|
|||
import io.druid.segment.data.VSizeIndexed;
|
||||
import io.druid.segment.data.VSizeIndexedInts;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
|
||||
|
@ -84,4 +86,10 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
|
|||
{
|
||||
return lookups.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
lookups.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
|
@ -19,35 +19,8 @@
|
|||
|
||||
package io.druid.segment.data;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SingleIndexedInts implements IndexedInts
|
||||
{
|
||||
private final int value;
|
||||
|
||||
public SingleIndexedInts(int value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return Iterators.singletonIterator(value);
|
||||
}
|
||||
}
|
||||
* Implementing CacheableObjectStrategy instead of ObjectSrategy indicates
|
||||
* that a column scan may cache the results of fromByteBuffer
|
||||
*/
|
||||
public interface CacheableObjectStrategy<T> extends ObjectStrategy<T> {}
|
|
@ -123,8 +123,9 @@ public class ConciseCompressedIndexedInts implements IndexedInts, Comparable<Con
|
|||
@Override
|
||||
public ImmutableConciseSet fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
{
|
||||
buffer.limit(buffer.position() + numBytes);
|
||||
return new ImmutableConciseSet(buffer.asReadOnlyBuffer());
|
||||
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
|
||||
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
|
||||
return new ImmutableConciseSet(readOnlyBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.segment.data;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.common.IAE;
|
||||
|
@ -32,6 +33,8 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.channels.WritableByteChannel;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input
|
||||
|
@ -46,10 +49,14 @@ import java.util.Iterator;
|
|||
* bytes 10-((numElements * 4) + 10): integers representing *end* offsets of byte serialized values
|
||||
* bytes ((numElements * 4) + 10)-(numBytesUsed + 2): 4-byte integer representing length of value, followed by bytes for value
|
||||
*/
|
||||
public class GenericIndexed<T> implements Indexed<T>
|
||||
public class GenericIndexed<T> implements Indexed<T>, Closeable
|
||||
{
|
||||
private static final byte version = 0x1;
|
||||
|
||||
public static final int INITIAL_CACHE_CAPACITY = 16384;
|
||||
|
||||
private int indexOffset;
|
||||
|
||||
public static <T> GenericIndexed<T> fromArray(T[] objects, ObjectStrategy<T> strategy)
|
||||
{
|
||||
return fromIterable(Arrays.asList(objects), strategy);
|
||||
|
@ -114,11 +121,44 @@ public class GenericIndexed<T> implements Indexed<T>
|
|||
return new GenericIndexed<T>(theBuffer.asReadOnlyBuffer(), strategy, allowReverseLookup);
|
||||
}
|
||||
|
||||
private static class SizedLRUMap<K, V> extends LinkedHashMap<K, V>
|
||||
{
|
||||
final Map<K, Integer> sizes = Maps.newHashMap();
|
||||
int numBytes = 0;
|
||||
int maxBytes = 0;
|
||||
|
||||
public SizedLRUMap(int initialCapacity, int maxBytes)
|
||||
{
|
||||
super(initialCapacity, 0.75f, true);
|
||||
this.maxBytes = maxBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean removeEldestEntry(Map.Entry<K, V> eldest)
|
||||
{
|
||||
if (numBytes > maxBytes) {
|
||||
numBytes -= sizes.remove(eldest.getKey());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public V put(K key, V value, int size)
|
||||
{
|
||||
numBytes += size;
|
||||
sizes.put(key, size);
|
||||
return super.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
private final ByteBuffer theBuffer;
|
||||
private final ObjectStrategy<T> strategy;
|
||||
private final boolean allowReverseLookup;
|
||||
private final int size;
|
||||
|
||||
private final boolean cacheable;
|
||||
private final ThreadLocal<ByteBuffer> cachedBuffer;
|
||||
private final ThreadLocal<SizedLRUMap<Integer, T>> cachedValues;
|
||||
private final int valuesOffset;
|
||||
|
||||
GenericIndexed(
|
||||
|
@ -132,7 +172,45 @@ public class GenericIndexed<T> implements Indexed<T>
|
|||
this.allowReverseLookup = allowReverseLookup;
|
||||
|
||||
size = theBuffer.getInt();
|
||||
indexOffset = theBuffer.position();
|
||||
valuesOffset = theBuffer.position() + (size << 2);
|
||||
|
||||
this.cachedBuffer = new ThreadLocal<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
protected ByteBuffer initialValue()
|
||||
{
|
||||
return theBuffer.asReadOnlyBuffer();
|
||||
}
|
||||
};
|
||||
|
||||
this.cacheable = false;
|
||||
this.cachedValues = new ThreadLocal<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a copy of the given indexed with the given cache size
|
||||
* The resulting copy must be closed to release resources used by the cache
|
||||
*/
|
||||
GenericIndexed(GenericIndexed<T> other, final int maxBytes)
|
||||
{
|
||||
this.theBuffer = other.theBuffer;
|
||||
this.strategy = other.strategy;
|
||||
this.allowReverseLookup = other.allowReverseLookup;
|
||||
this.size = other.size;
|
||||
this.indexOffset = other.indexOffset;
|
||||
this.valuesOffset = other.valuesOffset;
|
||||
this.cachedBuffer = other.cachedBuffer;
|
||||
|
||||
this.cachedValues = new ThreadLocal<SizedLRUMap<Integer, T>>() {
|
||||
@Override
|
||||
protected SizedLRUMap<Integer, T> initialValue()
|
||||
{
|
||||
return new SizedLRUMap<>(INITIAL_CACHE_CAPACITY, maxBytes);
|
||||
}
|
||||
};
|
||||
|
||||
this.cacheable = strategy instanceof CacheableObjectStrategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -157,24 +235,41 @@ public class GenericIndexed<T> implements Indexed<T>
|
|||
throw new IAE(String.format("Index[%s] >= size[%s]", index, size));
|
||||
}
|
||||
|
||||
ByteBuffer myBuffer = theBuffer.asReadOnlyBuffer();
|
||||
int startOffset = 4;
|
||||
int endOffset;
|
||||
if(cacheable) {
|
||||
final T cached = cachedValues.get().get(index);
|
||||
if (cached != null) {
|
||||
return cached;
|
||||
}
|
||||
}
|
||||
|
||||
// using a cached copy of the buffer instead of making a read-only copy every time get() is called is faster
|
||||
final ByteBuffer copyBuffer = this.cachedBuffer.get();
|
||||
|
||||
final int startOffset;
|
||||
final int endOffset;
|
||||
|
||||
if (index == 0) {
|
||||
endOffset = myBuffer.getInt();
|
||||
startOffset = 4;
|
||||
endOffset = copyBuffer.getInt(indexOffset);
|
||||
} else {
|
||||
myBuffer.position(myBuffer.position() + ((index - 1) * 4));
|
||||
startOffset = myBuffer.getInt() + 4;
|
||||
endOffset = myBuffer.getInt();
|
||||
copyBuffer.position(indexOffset + ((index - 1) * 4));
|
||||
startOffset = copyBuffer.getInt() + 4;
|
||||
endOffset = copyBuffer.getInt();
|
||||
}
|
||||
|
||||
if (startOffset == endOffset) {
|
||||
return null;
|
||||
}
|
||||
|
||||
myBuffer.position(valuesOffset + startOffset);
|
||||
return strategy.fromByteBuffer(myBuffer, endOffset - startOffset);
|
||||
copyBuffer.position(valuesOffset + startOffset);
|
||||
final int size = endOffset - startOffset;
|
||||
// fromByteBuffer must not modify the buffer limit
|
||||
final T value = strategy.fromByteBuffer(copyBuffer, size);
|
||||
|
||||
if(cacheable) {
|
||||
cachedValues.get().put(index, value, size);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -220,6 +315,25 @@ public class GenericIndexed<T> implements Indexed<T>
|
|||
channel.write(theBuffer.asReadOnlyBuffer());
|
||||
}
|
||||
|
||||
/**
|
||||
* The returned GenericIndexed must be closed to release the underlying memory
|
||||
* @param maxBytes
|
||||
* @return
|
||||
*/
|
||||
public GenericIndexed<T> withCache(int maxBytes)
|
||||
{
|
||||
return new GenericIndexed<>(this, maxBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
if(cacheable) {
|
||||
cachedValues.get().clear();
|
||||
cachedValues.remove();
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy)
|
||||
{
|
||||
byte versionFromBuffer = buffer.get();
|
||||
|
@ -241,7 +355,7 @@ public class GenericIndexed<T> implements Indexed<T>
|
|||
throw new IAE("Unknown version[%s]", versionFromBuffer);
|
||||
}
|
||||
|
||||
public static ObjectStrategy<String> stringStrategy = new ObjectStrategy<String>()
|
||||
public static ObjectStrategy<String> stringStrategy = new CacheableObjectStrategy<String>()
|
||||
{
|
||||
@Override
|
||||
public Class<? extends String> getClazz()
|
||||
|
@ -250,9 +364,9 @@ public class GenericIndexed<T> implements Indexed<T>
|
|||
}
|
||||
|
||||
@Override
|
||||
public String fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
public String fromByteBuffer(final ByteBuffer buffer, final int numBytes)
|
||||
{
|
||||
byte[] bytes = new byte[numBytes];
|
||||
final byte[] bytes = new byte[numBytes];
|
||||
buffer.get(bytes);
|
||||
return new String(bytes, Charsets.UTF_8);
|
||||
}
|
||||
|
|
|
@ -81,8 +81,10 @@ public class IndexedRTree implements Comparable<IndexedRTree>
|
|||
@Override
|
||||
public ImmutableRTree fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
{
|
||||
buffer.limit(buffer.position() + numBytes);
|
||||
return new ImmutableRTree(buffer.asReadOnlyBuffer());
|
||||
|
||||
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
|
||||
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
|
||||
return new ImmutableRTree(readOnlyBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -101,8 +101,9 @@ public class IntBufferIndexedInts implements IndexedInts, Comparable<IntBufferIn
|
|||
@Override
|
||||
public IntBufferIndexedInts fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
{
|
||||
buffer.limit(buffer.position() + numBytes);
|
||||
return new IntBufferIndexedInts(buffer);
|
||||
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
|
||||
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
|
||||
return new IntBufferIndexedInts(readOnlyBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,11 +22,20 @@ package io.druid.segment.data;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Comparator;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface ObjectStrategy<T> extends Comparator<T>
|
||||
{
|
||||
public Class<? extends T> getClazz();
|
||||
|
||||
/**
|
||||
* Convert values from their underlying byte representation.
|
||||
*
|
||||
* Implementations of this method must not change the given buffer mark, or limit, but may modify its position.
|
||||
* Use buffer.asReadOnlyBuffer() or buffer.duplicate() if mark or limit need to be set.
|
||||
*
|
||||
* @param buffer buffer to read value from
|
||||
* @param numBytes number of bytes used to store the value, starting at buffer.position()
|
||||
* @return
|
||||
*/
|
||||
public T fromByteBuffer(ByteBuffer buffer, int numBytes);
|
||||
public byte[] toBytes(T val);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.segment.serde;
|
|||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -40,5 +41,5 @@ public interface ColumnPartSerde
|
|||
{
|
||||
public long numBytes();
|
||||
public void write(WritableByteChannel channel) throws IOException;
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder);
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.segment.serde;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.data.GenericIndexed;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -71,7 +72,7 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder)
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
|
||||
{
|
||||
return serde == null ? this : serde.deserializeColumn(buffer, builder);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.primitives.Ints;
|
|||
import com.metamx.collections.spatial.ImmutableRTree;
|
||||
import com.metamx.common.IAE;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.ByteBufferSerializer;
|
||||
import io.druid.segment.data.ConciseCompressedIndexedInts;
|
||||
|
@ -43,7 +44,9 @@ import java.nio.channels.WritableByteChannel;
|
|||
public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
|
||||
{
|
||||
@JsonCreator
|
||||
public static DictionaryEncodedColumnPartSerde createDeserializer(boolean singleValued)
|
||||
public static DictionaryEncodedColumnPartSerde createDeserializer(
|
||||
boolean singleValued
|
||||
)
|
||||
{
|
||||
return new DictionaryEncodedColumnPartSerde();
|
||||
}
|
||||
|
@ -125,7 +128,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder)
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
|
||||
{
|
||||
final boolean isSingleValued = buffer.get() == 0x0;
|
||||
final GenericIndexed<String> dictionary = GenericIndexed.read(buffer, GenericIndexed.stringStrategy);
|
||||
|
@ -138,12 +141,12 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
|
|||
singleValuedColumn = VSizeIndexedInts.readFromByteBuffer(buffer);
|
||||
multiValuedColumn = null;
|
||||
builder.setHasMultipleValues(false)
|
||||
.setDictionaryEncodedColumn(new DictionaryEncodedColumnSupplier(dictionary, singleValuedColumn, null));
|
||||
.setDictionaryEncodedColumn(new DictionaryEncodedColumnSupplier(dictionary, singleValuedColumn, null, columnConfig.columnCacheSizeBytes()));
|
||||
} else {
|
||||
singleValuedColumn = null;
|
||||
multiValuedColumn = VSizeIndexed.readFromByteBuffer(buffer);
|
||||
builder.setHasMultipleValues(true)
|
||||
.setDictionaryEncodedColumn(new DictionaryEncodedColumnSupplier(dictionary, null, multiValuedColumn));
|
||||
.setDictionaryEncodedColumn(new DictionaryEncodedColumnSupplier(dictionary, null, multiValuedColumn, columnConfig.columnCacheSizeBytes()));
|
||||
}
|
||||
|
||||
GenericIndexed<ImmutableConciseSet> bitmaps = GenericIndexed.read(
|
||||
|
|
|
@ -33,21 +33,28 @@ public class DictionaryEncodedColumnSupplier implements Supplier<DictionaryEncod
|
|||
private final GenericIndexed<String> dictionary;
|
||||
private final VSizeIndexedInts singleValuedColumn;
|
||||
private final VSizeIndexed multiValuedColumn;
|
||||
private final int lookupCacheSize;
|
||||
|
||||
public DictionaryEncodedColumnSupplier(
|
||||
GenericIndexed<String> dictionary,
|
||||
VSizeIndexedInts singleValuedColumn,
|
||||
VSizeIndexed multiValuedColumn
|
||||
VSizeIndexed multiValuedColumn,
|
||||
int lookupCacheSize
|
||||
)
|
||||
{
|
||||
this.dictionary = dictionary;
|
||||
this.singleValuedColumn = singleValuedColumn;
|
||||
this.multiValuedColumn = multiValuedColumn;
|
||||
this.lookupCacheSize = lookupCacheSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DictionaryEncodedColumn get()
|
||||
{
|
||||
return new SimpleDictionaryEncodedColumn(singleValuedColumn, multiValuedColumn, dictionary);
|
||||
return new SimpleDictionaryEncodedColumn(
|
||||
singleValuedColumn,
|
||||
multiValuedColumn,
|
||||
lookupCacheSize > 0 ? dictionary.withCache(lookupCacheSize) : dictionary
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.segment.serde;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.CompressedFloatsIndexedSupplier;
|
||||
|
||||
|
@ -70,7 +71,7 @@ public class FloatGenericColumnPartSerde implements ColumnPartSerde
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder)
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
|
||||
{
|
||||
final CompressedFloatsIndexedSupplier column = CompressedFloatsIndexedSupplier.fromByteBuffer(buffer, byteOrder);
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.segment.serde;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.CompressedLongsIndexedSupplier;
|
||||
|
||||
|
@ -70,7 +71,7 @@ public class LongGenericColumnPartSerde implements ColumnPartSerde
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder)
|
||||
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
|
||||
{
|
||||
final CompressedLongsIndexedSupplier column = CompressedLongsIndexedSupplier.fromByteBuffer(buffer, byteOrder);
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.collect.Iterables;
|
|||
import com.google.common.collect.Lists;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexAdapter;
|
||||
import org.joda.time.Interval;
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.segment;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class IndexIOTest
|
||||
{
|
||||
@Test @Ignore // this test depends on static fields, so it has to be tested independently
|
||||
public void testInjector() throws Exception
|
||||
{
|
||||
System.setProperty("druid.processing.columnCache.sizeBytes", "1234");
|
||||
Assert.assertEquals(1234, IndexIO.columnConfig.columnCacheSizeBytes());
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import com.google.common.io.Files;
|
|||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.data.IncrementalIndexTest;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import junit.framework.Assert;
|
||||
|
|
|
@ -35,6 +35,7 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import org.joda.time.DateTime;
|
||||
|
|
|
@ -49,6 +49,7 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.joda.time.DateTime;
|
||||
|
|
|
@ -49,6 +49,7 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.joda.time.DateTime;
|
||||
|
|
|
@ -33,10 +33,10 @@ import com.metamx.emitter.service.ServiceMetricEvent;
|
|||
import io.druid.collections.StupidPool;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.query.DruidProcessingConfig;
|
||||
import io.druid.query.MetricsEmittingExecutorService;
|
||||
import io.druid.query.PrioritizedExecutorService;
|
||||
import io.druid.server.DruidProcessingConfig;
|
||||
import io.druid.server.VMUtils;
|
||||
import io.druid.common.utils.VMUtils;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
|
|
@ -28,10 +28,12 @@ import io.druid.client.DruidServerConfig;
|
|||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.loading.MMappedQueryableIndexFactory;
|
||||
import io.druid.segment.loading.QueryableIndexFactory;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.query.DruidProcessingConfig;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -48,6 +50,7 @@ public class StorageNodeModule implements Module
|
|||
|
||||
binder.bind(NodeTypeConfig.class).toProvider(Providers.<NodeTypeConfig>of(null));
|
||||
binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);
|
||||
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
|
||||
|
||||
binder.bind(QueryRunnerFactoryConglomerate.class)
|
||||
.to(DefaultQueryRunnerFactoryConglomerate.class)
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.base.Throwables;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
|
@ -37,14 +36,13 @@ import io.druid.curator.discovery.DiscoveryModule;
|
|||
import io.druid.guice.AWSModule;
|
||||
import io.druid.guice.AnnouncerModule;
|
||||
import io.druid.guice.DbConnectorModule;
|
||||
import io.druid.guice.DruidGuiceExtensions;
|
||||
import io.druid.guice.DruidProcessingModule;
|
||||
import io.druid.guice.DruidSecondaryModule;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.FirehoseModule;
|
||||
import io.druid.guice.HttpClientModule;
|
||||
import io.druid.guice.IndexingServiceDiscoveryModule;
|
||||
import io.druid.guice.JacksonConfigManagerModule;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LifecycleModule;
|
||||
import io.druid.guice.LocalDataStorageDruidModule;
|
||||
import io.druid.guice.ParsersModule;
|
||||
|
@ -56,12 +54,8 @@ import io.druid.guice.StorageNodeModule;
|
|||
import io.druid.guice.annotations.Client;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.jackson.JacksonModule;
|
||||
import io.druid.server.initialization.ConfigModule;
|
||||
import io.druid.server.initialization.EmitterModule;
|
||||
import io.druid.server.initialization.ExtensionsConfig;
|
||||
import io.druid.server.initialization.JettyServerModule;
|
||||
import io.druid.server.initialization.PropertiesModule;
|
||||
import io.druid.server.metrics.MetricsModule;
|
||||
import io.tesla.aether.Repository;
|
||||
import io.tesla.aether.TeslaAether;
|
||||
|
@ -85,6 +79,7 @@ import java.net.URI;
|
|||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -354,25 +349,6 @@ public class Initialization
|
|||
return Guice.createInjector(Modules.override(defaultModules.getModules()).with(actualModules.getModules()));
|
||||
}
|
||||
|
||||
public static Injector makeStartupInjector()
|
||||
{
|
||||
return Guice.createInjector(
|
||||
new DruidGuiceExtensions(),
|
||||
new JacksonModule(),
|
||||
new PropertiesModule("runtime.properties"),
|
||||
new ConfigModule(),
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
binder.bind(DruidSecondaryModule.class);
|
||||
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private static class ModuleList
|
||||
{
|
||||
private final Injector baseInjector;
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.metamx.emitter.EmittingLogger;
|
|||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.common.guava.ThreadRenamingCallable;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
import io.druid.segment.realtime.FireDepartmentMetrics;
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.metamx.common.Granularity;
|
|||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
import io.druid.segment.realtime.FireDepartmentMetrics;
|
||||
|
|
|
@ -29,6 +29,7 @@ import io.druid.client.FilteredServerView;
|
|||
import io.druid.client.ServerView;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
import io.druid.segment.loading.DataSegmentPusher;
|
||||
|
|
|
@ -27,10 +27,11 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.server.initialization.ExtensionsConfig;
|
||||
import org.junit.Assert;
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
|
@ -58,7 +59,7 @@ public class InitializationTest
|
|||
@Test
|
||||
public void test02MakeStartupInjector() throws Exception
|
||||
{
|
||||
Injector startupInjector = Initialization.makeStartupInjector();
|
||||
Injector startupInjector = GuiceInjectors.makeStartupInjector();
|
||||
Assert.assertNotNull(startupInjector);
|
||||
Assert.assertNotNull(startupInjector.getInstance(ObjectMapper.class));
|
||||
}
|
||||
|
@ -66,7 +67,7 @@ public class InitializationTest
|
|||
@Test
|
||||
public void test03ClassLoaderExtensionsLoading()
|
||||
{
|
||||
Injector startupInjector = Initialization.makeStartupInjector();
|
||||
Injector startupInjector = GuiceInjectors.makeStartupInjector();
|
||||
|
||||
Function<DruidModule, String> fnClassName = new Function<DruidModule, String>()
|
||||
{
|
||||
|
@ -99,7 +100,7 @@ public class InitializationTest
|
|||
@Test
|
||||
public void test04MakeInjectorWithModules() throws Exception
|
||||
{
|
||||
Injector startupInjector = Initialization.makeStartupInjector();
|
||||
Injector startupInjector = GuiceInjectors.makeStartupInjector();
|
||||
Injector injector = Initialization.makeInjectorWithModules(
|
||||
startupInjector, ImmutableList.<Object>of(
|
||||
new com.google.inject.Module()
|
||||
|
|
|
@ -41,6 +41,7 @@ import io.druid.query.Query;
|
|||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
|
|
|
@ -35,6 +35,7 @@ import com.metamx.http.client.response.StatusResponseHolder;
|
|||
import io.druid.guice.Jerseys;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.initialization.Initialization;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
|
@ -90,7 +91,7 @@ public class JettyTest
|
|||
{
|
||||
setProperties();
|
||||
Injector injector = Initialization.makeInjectorWithModules(
|
||||
Initialization.makeStartupInjector(), Lists.<Object>newArrayList(
|
||||
GuiceInjectors.makeStartupInjector(), Lists.<Object>newArrayList(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,7 @@ import io.airlift.command.Arguments;
|
|||
import io.airlift.command.Command;
|
||||
import io.airlift.command.Option;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.server.initialization.ExtensionsConfig;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.tesla.aether.internal.DefaultTeslaAether;
|
||||
|
||||
import java.io.File;
|
||||
|
|
|
@ -25,13 +25,11 @@ import io.airlift.command.Help;
|
|||
import io.airlift.command.ParseException;
|
||||
import io.druid.cli.convert.ConvertProperties;
|
||||
import io.druid.cli.validate.DruidJsonValidator;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.server.initialization.ExtensionsConfig;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -75,7 +73,7 @@ public class Main
|
|||
.withDefaultCommand(Help.class)
|
||||
.withCommands(CliPeon.class, CliInternalHadoopIndexer.class);
|
||||
|
||||
final Injector injector = Initialization.makeStartupInjector();
|
||||
final Injector injector = GuiceInjectors.makeStartupInjector();
|
||||
final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class);
|
||||
final Collection<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(config, CliCommandCreator.class);
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import io.airlift.command.Option;
|
|||
import io.druid.indexing.common.config.TaskConfig;
|
||||
import io.druid.indexing.common.task.HadoopIndexTask;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.server.initialization.ExtensionsConfig;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.tesla.aether.internal.DefaultTeslaAether;
|
||||
|
||||
import java.util.List;
|
||||
|
|
Loading…
Reference in New Issue