inject column config

This commit is contained in:
fjy 2014-06-18 13:17:48 -07:00 committed by Xavier Léauté
parent c2e2391e3a
commit a870fe5cbe
48 changed files with 302 additions and 314 deletions

View File

@ -19,7 +19,6 @@
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
@ -45,6 +44,7 @@ import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
@ -83,7 +83,7 @@ public class HadoopDruidIndexerConfig
static {
injector = Initialization.makeInjectorWithModules(
Initialization.makeStartupInjector(),
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Object>of(
new Module()
{
@ -107,7 +107,7 @@ public class HadoopDruidIndexerConfig
public static HadoopDruidIndexerConfig fromSchema(HadoopIngestionSpec schema)
{
return new HadoopDruidIndexerConfig(injector.getInstance(ColumnConfig.class), schema);
return new HadoopDruidIndexerConfig(schema);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
@ -117,7 +117,6 @@ public class HadoopDruidIndexerConfig
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
} else {
return new HadoopDruidIndexerConfig(
injector.getInstance(ColumnConfig.class),
HadoopDruidIndexerConfig.jsonMapper.convertValue(
argSpec,
HadoopIngestionSpec.class
@ -173,7 +172,6 @@ public class HadoopDruidIndexerConfig
@JsonCreator
public HadoopDruidIndexerConfig(
@JacksonInject final ColumnConfig columnConfig,
final @JsonProperty("schema") HadoopIngestionSpec schema
)
{

View File

@ -40,7 +40,6 @@ import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.timeline.DataSegment;
@ -282,8 +281,6 @@ public class IndexGeneratorJob implements Jobby
//final DataRollupSpec rollupSpec = config.getRollupSpec();
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
final ColumnConfig columnConfig = config.getColumnConfig();
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
File baseFlushFile = File.createTempFile("base", "flush");
@ -374,7 +371,7 @@ public class IndexGeneratorJob implements Jobby
}
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file, columnConfig));
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = IndexMerger.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new IndexMerger.ProgressIndicator()

View File

@ -22,7 +22,6 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.column.ColumnConfig;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -46,16 +45,6 @@ public class HadoopDruidIndexerConfigTest
}
}
private static final ColumnConfig columnConfig = new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
};
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath()
{
@ -80,7 +69,6 @@ public class HadoopDruidIndexerConfigTest
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
columnConfig,
schema.withTuningConfig(
schema.getTuningConfig()
.withVersion(
@ -121,7 +109,6 @@ public class HadoopDruidIndexerConfigTest
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
columnConfig,
schema.withTuningConfig(
schema.getTuningConfig()
.withVersion(

View File

@ -34,7 +34,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
@ -50,7 +49,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
@ -74,7 +72,6 @@ public class TaskToolbox
private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper;
private final File taskWorkDir;
private final ColumnConfig columnConfig;
public TaskToolbox(
TaskConfig config,
@ -92,8 +89,7 @@ public class TaskToolbox
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper objectMapper,
final File taskWorkDir,
ColumnConfig columnConfig
final File taskWorkDir
)
{
this.config = config;
@ -112,7 +108,6 @@ public class TaskToolbox
this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper;
this.taskWorkDir = taskWorkDir;
this.columnConfig = columnConfig;
}
public TaskConfig getConfig()
@ -180,11 +175,6 @@ public class TaskToolbox
return objectMapper;
}
public ColumnConfig getColumnConfig()
{
return columnConfig;
}
public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
throws SegmentLoadingException
{
@ -196,7 +186,8 @@ public class TaskToolbox
return retVal;
}
public void pushSegments(Iterable<DataSegment> segments) throws IOException {
public void pushSegments(Iterable<DataSegment> segments) throws IOException
{
// Request segment pushes for each set
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
segments,

View File

@ -58,7 +58,6 @@ public class TaskToolboxFactory
private final MonitorScheduler monitorScheduler;
private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper objectMapper;
private final ColumnConfig columnConfig;
@Inject
public TaskToolboxFactory(
@ -75,8 +74,7 @@ public class TaskToolboxFactory
@Processing ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
SegmentLoaderFactory segmentLoaderFactory,
ObjectMapper objectMapper,
ColumnConfig columnConfig
ObjectMapper objectMapper
)
{
this.config = config;
@ -93,7 +91,6 @@ public class TaskToolboxFactory
this.monitorScheduler = monitorScheduler;
this.segmentLoaderFactory = segmentLoaderFactory;
this.objectMapper = objectMapper;
this.columnConfig = columnConfig;
}
public TaskToolbox build(Task task)
@ -116,8 +113,7 @@ public class TaskToolboxFactory
monitorScheduler,
segmentLoaderFactory.manufacturate(taskWorkDir),
objectMapper,
taskWorkDir,
columnConfig
taskWorkDir
);
}
}

View File

@ -38,7 +38,6 @@ import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -66,7 +65,6 @@ public class YeOldePlumberSchool implements PlumberSchool
private final String version;
private final DataSegmentPusher dataSegmentPusher;
private final File tmpSegmentDir;
private final ColumnConfig columnConfig;
private static final Logger log = new Logger(YeOldePlumberSchool.class);
@ -75,15 +73,13 @@ public class YeOldePlumberSchool implements PlumberSchool
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher,
@JacksonInject("tmpSegmentDir") File tmpSegmentDir,
@JacksonInject ColumnConfig columnConfig
@JacksonInject("tmpSegmentDir") File tmpSegmentDir
)
{
this.interval = interval;
this.version = version;
this.dataSegmentPusher = dataSegmentPusher;
this.tmpSegmentDir = tmpSegmentDir;
this.columnConfig = columnConfig;
}
@Override
@ -166,7 +162,7 @@ public class YeOldePlumberSchool implements PlumberSchool
} else {
List<QueryableIndex> indexes = Lists.newArrayList();
for (final File oneSpill : spilled) {
indexes.add(IndexIO.loadIndex(oneSpill, columnConfig));
indexes.add(IndexIO.loadIndex(oneSpill));
}
fileToUpload = new File(tmpSegmentDir, "merged");
@ -174,7 +170,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
// Map merged segment so we can extract dimensions
final QueryableIndex mappedSegment = IndexIO.loadIndex(fileToUpload, columnConfig);
final QueryableIndex mappedSegment = IndexIO.loadIndex(fileToUpload);
final DataSegment segmentToUpload = theSink.getSegment()
.withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions()))

View File

@ -19,7 +19,6 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
@ -33,7 +32,6 @@ import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat;
import io.druid.segment.RowboatFilteringIndexAdapter;
import io.druid.segment.column.ColumnConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -48,18 +46,14 @@ import java.util.Map;
*/
public class AppendTask extends MergeTaskBase
{
private final ColumnConfig columnConfig;
@JsonCreator
public AppendTask(
@JacksonInject ColumnConfig columnConfig,
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments
)
{
super(id, dataSource, segments);
this.columnConfig = columnConfig;
}
@Override
@ -98,7 +92,7 @@ public class AppendTask extends MergeTaskBase
adapters.add(
new RowboatFilteringIndexAdapter(
new QueryableIndexIndexableAdapter(
IndexIO.loadIndex(holder.getFile(), columnConfig)
IndexIO.loadIndex(holder.getFile())
),
new Predicate<Rowboat>()
{

View File

@ -42,8 +42,9 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.initialization.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.guice.ExtensionsConfig;
import io.druid.timeline.DataSegment;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
@ -63,7 +64,7 @@ public class HadoopIndexTask extends AbstractTask
private static final ExtensionsConfig extensionsConfig;
static {
extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class);
extensionsConfig = GuiceInjectors.makeStartupInjector().getInstance(ExtensionsConfig.class);
}
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)

View File

@ -382,8 +382,7 @@ public class IndexTask extends AbstractFixedIntervalTask
interval,
version,
wrappedDataSegmentPusher,
tmpDir,
toolbox.getColumnConfig()
tmpDir
).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics);
// rowFlushBoundary for this job

View File

@ -19,7 +19,6 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -31,7 +30,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnConfig;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
@ -45,20 +43,17 @@ public class MergeTask extends MergeTaskBase
{
@JsonIgnore
private final List<AggregatorFactory> aggregators;
private final ColumnConfig columnConfig;
@JsonCreator
public MergeTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JacksonInject ColumnConfig columnConfig
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
)
{
super(id, dataSource, segments);
this.aggregators = aggregators;
this.columnConfig = columnConfig;
}
@Override
@ -74,7 +69,7 @@ public class MergeTask extends MergeTaskBase
public QueryableIndex apply(@Nullable File input)
{
try {
return IndexIO.loadIndex(input, columnConfig);
return IndexIO.loadIndex(input);
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -96,7 +96,6 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private final FireDepartment spec;
private final ColumnConfig columnConfig;
@JsonIgnore
private volatile Plumber plumber = null;
@ -116,8 +115,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JacksonInject ColumnConfig columnConfig
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
{
super(
@ -151,7 +149,6 @@ public class RealtimeIndexTask extends AbstractTask
null, null, null, null
);
}
this.columnConfig = columnConfig;
}
@Override
@ -313,7 +310,6 @@ public class RealtimeIndexTask extends AbstractTask
segmentPublisher,
toolbox.getNewSegmentServerView(),
toolbox.getQueryExecutorService(),
columnConfig,
null,
null,
null,

View File

@ -252,7 +252,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out");
if (IndexIO.convertSegment(location, outLocation, toolbox.getColumnConfig())) {
if (IndexIO.convertSegment(location, outLocation)) {
final int outVersion = IndexIO.getVersionFromDir(outLocation);
// Appending to the version makes a new version that inherits most comparability parameters of the original

View File

@ -19,14 +19,12 @@
package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Lists;
import io.druid.indexing.common.task.MergeTask;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.ColumnConfig;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -56,8 +54,7 @@ public class TestMergeTask extends MergeTask
0
)
),
Lists.<AggregatorFactory>newArrayList(),
null
Lists.<AggregatorFactory>newArrayList()
);
}
@ -68,11 +65,10 @@ public class TestMergeTask extends MergeTask
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JacksonInject ColumnConfig columnConfig
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
)
{
super(id, dataSource, segments, aggregators, columnConfig);
super(id, dataSource, segments, aggregators);
this.id = id;
}

View File

@ -54,7 +54,6 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
1,
null,
null,
null
);
this.status = status;

View File

@ -21,15 +21,8 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector;
import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.JSONDataSpec;
import io.druid.data.input.impl.TimestampSpec;
@ -41,7 +34,6 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
@ -56,40 +48,7 @@ import java.io.File;
public class TaskSerdeTest
{
private static final ColumnConfig columnConfig = new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
};
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final Injector injector = Guice.createInjector(
new com.google.inject.Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(ColumnConfig.class).toInstance(columnConfig);
}
}
);
static {
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
jsonMapper.setInjectableValues(new GuiceInjectableValues(injector));
jsonMapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector, jsonMapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector, jsonMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
}
@Test
public void testIndexTaskSerde() throws Exception
@ -141,8 +100,7 @@ public class TaskSerdeTest
),
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("cnt")
),
null
)
);
final String json = jsonMapper.writeValueAsString(task);
@ -244,7 +202,6 @@ public class TaskSerdeTest
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
);
@ -330,7 +287,6 @@ public class TaskSerdeTest
public void testAppendTaskSerde() throws Exception
{
final AppendTask task = new AppendTask(
columnConfig,
null,
"foo",
ImmutableList.of(

View File

@ -206,15 +206,7 @@ public class TaskLifecycleTest
}
)
),
new DefaultObjectMapper(),
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
}
new DefaultObjectMapper()
);
tr = new ThreadPoolTaskRunner(tb);
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);

View File

@ -87,8 +87,7 @@ public class SimpleResourceManagementStrategyTest
0
)
),
Lists.<AggregatorFactory>newArrayList(),
null
Lists.<AggregatorFactory>newArrayList()
);
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
autoScalingStrategy,

View File

@ -49,7 +49,6 @@ public class TaskAnnouncementTest
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
);
final TaskStatus status = TaskStatus.running(task.getId());

View File

@ -179,7 +179,7 @@ public class WorkerTaskMonitorTest
}
}
)
), jsonMapper, null
), jsonMapper
)
),
new WorkerConfig().setCapacity(1)

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.initialization;
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.initialization;
package io.druid.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.initialization;
package io.druid.guice;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;

View File

@ -0,0 +1,86 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.initialization;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import io.druid.guice.ConfigModule;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.ExtensionsConfig;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.PropertiesModule;
import io.druid.jackson.JacksonModule;
import java.util.List;
/**
*/
public class GuiceInjectors
{
public static Injector makeStartupInjector()
{
return Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new ConfigModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(DruidSecondaryModule.class);
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
}
}
);
}
public static Injector makeStartupInjectorWithModules(Iterable<Module> modules)
{
List<Module> theModules = Lists.newArrayList();
theModules.add(new DruidGuiceExtensions());
theModules.add(new JacksonModule());
theModules.add(new PropertiesModule("runtime.properties"));
theModules.add(new ConfigModule());
theModules.add(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(DruidSecondaryModule.class);
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
}
}
);
for (Module theModule : modules) {
theModules.add(theModule);
}
return Guice.createInjector(theModules);
}
}

View File

@ -0,0 +1,93 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.ISE;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import java.util.Collections;
import java.util.List;
/**
*/
public class ModuleList
{
private final Injector baseInjector;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final List<Module> modules;
public ModuleList(Injector baseInjector)
{
this.baseInjector = baseInjector;
this.jsonMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Json.class));
this.smileMapper = baseInjector.getInstance(Key.get(ObjectMapper.class, Smile.class));
this.modules = Lists.newArrayList();
}
public List<Module> getModules()
{
return Collections.unmodifiableList(modules);
}
public void addModule(Object input)
{
if (input instanceof DruidModule) {
baseInjector.injectMembers(input);
modules.add(registerJacksonModules(((DruidModule) input)));
} else if (input instanceof Module) {
baseInjector.injectMembers(input);
modules.add((Module) input);
} else if (input instanceof Class) {
if (DruidModule.class.isAssignableFrom((Class) input)) {
modules.add(registerJacksonModules(baseInjector.getInstance((Class<? extends DruidModule>) input)));
} else if (Module.class.isAssignableFrom((Class) input)) {
modules.add(baseInjector.getInstance((Class<? extends Module>) input));
return;
} else {
throw new ISE("Class[%s] does not implement %s", input.getClass(), Module.class);
}
} else {
throw new ISE("Unknown module type[%s]", input.getClass());
}
}
public void addModules(Object... object)
{
for (Object o : object) {
addModule(o);
}
}
private DruidModule registerJacksonModules(DruidModule module)
{
for (com.fasterxml.jackson.databind.Module jacksonModule : module.getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
smileMapper.registerModule(jacksonModule);
}
return module;
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
package io.druid.query;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import io.druid.segment.column.ColumnConfig;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
package io.druid.query;
import java.lang.reflect.InvocationTargetException;

View File

@ -22,6 +22,7 @@ package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -31,6 +32,9 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
@ -41,7 +45,10 @@ import com.metamx.common.io.smoosh.SmooshedWriter;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.EmittingLogger;
import io.druid.common.utils.SerializerUtils;
import io.druid.guice.ConfigProvider;
import io.druid.initialization.GuiceInjectors;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.DruidProcessingConfig;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.column.ColumnConfig;
@ -91,6 +98,9 @@ public class IndexIO
{
public static final byte V8_VERSION = 0x8;
public static final byte V9_VERSION = 0x9;
public static final int CURRENT_VERSION_ID = V9_VERSION;
public static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder();
private static final Map<Integer, IndexLoader> indexLoaders =
ImmutableMap.<Integer, IndexLoader>builder()
@ -108,13 +118,32 @@ public class IndexIO
private static final EmittingLogger log = new EmittingLogger(IndexIO.class);
private static final SerializerUtils serializerUtils = new SerializerUtils();
public static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder();
// This should really be provided by DI, should be changed once we switch around to using a DI framework
private static final ObjectMapper mapper = new DefaultObjectMapper();
private static final ObjectMapper mapper;
private static final ColumnConfig columnConfig;
static {
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(
binder,
DruidProcessingConfig.class,
ImmutableMap.of("base_path", "druid.processing")
);
}
}
)
);
mapper = injector.getInstance(ObjectMapper.class);
columnConfig = injector.getInstance(DruidProcessingConfig.class);
}
private static volatile IndexIOHandler handler = null;
public static final int CURRENT_VERSION_ID = V9_VERSION;
@Deprecated
public static MMappedIndex mapDir(final File inDir) throws IOException
@ -123,7 +152,7 @@ public class IndexIO
return handler.mapDir(inDir);
}
public static QueryableIndex loadIndex(File inDir, ColumnConfig columnConfig) throws IOException
public static QueryableIndex loadIndex(File inDir) throws IOException
{
init();
final int version = SegmentUtils.getVersionFromDir(inDir);
@ -131,7 +160,7 @@ public class IndexIO
final IndexLoader loader = indexLoaders.get(version);
if (loader != null) {
return loader.load(inDir, columnConfig);
return loader.load(inDir);
} else {
throw new ISE("Unknown index version[%s]", version);
}
@ -181,7 +210,7 @@ public class IndexIO
}
}
public static boolean convertSegment(File toConvert, File converted, ColumnConfig columnConfig) throws IOException
public static boolean convertSegment(File toConvert, File converted) throws IOException
{
final int version = SegmentUtils.getVersionFromDir(toConvert);
@ -199,7 +228,7 @@ public class IndexIO
case 7:
log.info("Old version, re-persisting.");
IndexMerger.append(
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert, columnConfig))),
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))),
converted
);
return true;
@ -609,13 +638,13 @@ public class IndexIO
static interface IndexLoader
{
public QueryableIndex load(File inDir, ColumnConfig columnConfig) throws IOException;
public QueryableIndex load(File inDir) throws IOException;
}
static class LegacyIndexLoader implements IndexLoader
{
@Override
public QueryableIndex load(File inDir, ColumnConfig columnConfig) throws IOException
public QueryableIndex load(File inDir) throws IOException
{
MMappedIndex index = IndexIO.mapDir(inDir);
@ -627,7 +656,10 @@ public class IndexIO
.setHasMultipleValues(true)
.setDictionaryEncodedColumn(
new DictionaryEncodedColumnSupplier(
index.getDimValueLookup(dimension), null, index.getDimColumn(dimension), columnConfig.columnCacheSizeBytes()
index.getDimValueLookup(dimension),
null,
index.getDimColumn(dimension),
columnConfig.columnCacheSizeBytes()
)
)
.setBitmapIndex(
@ -700,7 +732,7 @@ public class IndexIO
static class V9IndexLoader implements IndexLoader
{
@Override
public QueryableIndex load(File inDir, ColumnConfig columnConfig) throws IOException
public QueryableIndex load(File inDir) throws IOException
{
log.debug("Mapping v9 index[%s]", inDir);
long startTime = System.currentTimeMillis();
@ -722,11 +754,11 @@ public class IndexIO
ObjectMapper mapper = new DefaultObjectMapper();
for (String columnName : cols) {
columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName), columnConfig));
columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName)));
}
final QueryableIndex index = new SimpleQueryableIndex(
dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), columnConfig), columns, smooshedFiles
dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time")), columns, smooshedFiles
);
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
@ -734,7 +766,7 @@ public class IndexIO
return index;
}
private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer, ColumnConfig columnConfig) throws IOException
private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer) throws IOException
{
ColumnDescriptor serde = mapper.readValue(
serializerUtils.readString(byteBuffer), ColumnDescriptor.class

View File

@ -34,15 +34,6 @@ import java.io.File;
public class EmptyIndexTest
{
final ColumnConfig columnConfig = new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
};
@Test
public void testEmptyIndex() throws Exception
{
@ -59,7 +50,7 @@ public class EmptyIndexTest
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(new Interval("2012-08-01/P3D"), emptyIndex);
IndexMerger.merge(Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir);
QueryableIndex emptyQueryableIndex = IndexIO.loadIndex(tmpDir, columnConfig);
QueryableIndex emptyQueryableIndex = IndexIO.loadIndex(tmpDir);
Assert.assertEquals("getAvailableDimensions", 0, Iterables.size(emptyQueryableIndex.getAvailableDimensions()));
Assert.assertEquals("getAvailableMetrics", 0, Iterables.size(emptyQueryableIndex.getColumnNames()));

View File

@ -40,15 +40,6 @@ import java.util.Arrays;
*/
public class IndexMergerTest
{
private final ColumnConfig columnConfig = new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
};
@Test
public void testPersistCaseInsensitive() throws Exception
{
@ -58,7 +49,7 @@ public class IndexMergerTest
final File tempDir = Files.createTempDir();
try {
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir), columnConfig);
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir));
Assert.assertEquals(2, index.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
@ -97,21 +88,20 @@ public class IndexMergerTest
final File tempDir2 = Files.createTempDir();
final File mergedDir = Files.createTempDir();
try {
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1), columnConfig);
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1));
Assert.assertEquals(2, index1.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(2, index1.getColumnNames().size());
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2), columnConfig);
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2));
Assert.assertEquals(2, index2.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(2, index2.getColumnNames().size());
QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, mergedDir),
columnConfig
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, mergedDir)
);
Assert.assertEquals(3, merged.getTimeColumn().getLength());
@ -151,11 +141,10 @@ public class IndexMergerTest
)
);
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1), columnConfig);
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2), columnConfig);
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2));
final QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3),
columnConfig
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
);
Assert.assertEquals(1, index1.getTimeColumn().getLength());

View File

@ -52,15 +52,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class TestIndex
{
private static final ColumnConfig columnConfig = new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
};
private static final Logger log = new Logger(TestIndex.class);
private static IncrementalIndex realtimeIndex = null;
@ -145,11 +136,10 @@ public class TestIndex
mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(topFile, columnConfig), IndexIO.loadIndex(bottomFile, columnConfig)),
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)),
METRIC_AGGS,
mergedFile
),
columnConfig
)
);
return mergedRealtime;
@ -240,7 +230,7 @@ public class TestIndex
someTmpFile.deleteOnExit();
IndexMerger.persist(index, someTmpFile);
return IndexIO.loadIndex(someTmpFile, columnConfig);
return IndexIO.loadIndex(someTmpFile);
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -70,15 +70,6 @@ import java.util.Random;
@RunWith(Parameterized.class)
public class SpatialFilterBonusTest
{
private static final ColumnConfig columnConfig = new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
};
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
@ -230,7 +221,7 @@ public class SpatialFilterBonusTest
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile, columnConfig);
return IndexIO.loadIndex(tmpFile);
}
private static QueryableIndex makeMergedQueryableIndex()
@ -395,11 +386,10 @@ public class SpatialFilterBonusTest
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile, columnConfig), IndexIO.loadIndex(secondFile, columnConfig), IndexIO.loadIndex(thirdFile, columnConfig)),
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
),
columnConfig
)
);
return mergedRealtime;

View File

@ -70,16 +70,6 @@ import java.util.Random;
@RunWith(Parameterized.class)
public class SpatialFilterTest
{
private static ColumnConfig columnConfig = new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
};
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
@ -246,7 +236,7 @@ public class SpatialFilterTest
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile, columnConfig);
return IndexIO.loadIndex(tmpFile);
}
private static QueryableIndex makeMergedQueryableIndex()
@ -426,11 +416,10 @@ public class SpatialFilterTest
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile, columnConfig), IndexIO.loadIndex(secondFile, columnConfig), IndexIO.loadIndex(thirdFile, columnConfig)),
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
),
columnConfig
)
);
return mergedRealtime;

View File

@ -33,10 +33,10 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.StupidPool;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.MetricsEmittingExecutorService;
import io.druid.query.PrioritizedExecutorService;
import io.druid.server.DruidProcessingConfig;
import io.druid.server.VMUtils;
import io.druid.query.VMUtils;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;

View File

@ -33,7 +33,7 @@ import io.druid.segment.loading.MMappedQueryableIndexFactory;
import io.druid.segment.loading.QueryableIndexFactory;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.DruidNode;
import io.druid.server.DruidProcessingConfig;
import io.druid.query.DruidProcessingConfig;
import io.druid.server.coordination.DruidServerMetadata;
import javax.annotation.Nullable;

View File

@ -24,7 +24,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -37,14 +36,13 @@ import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.DbConnectorModule;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.ExtensionsConfig;
import io.druid.guice.FirehoseModule;
import io.druid.guice.HttpClientModule;
import io.druid.guice.IndexingServiceDiscoveryModule;
import io.druid.guice.JacksonConfigManagerModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LifecycleModule;
import io.druid.guice.LocalDataStorageDruidModule;
import io.druid.guice.ParsersModule;
@ -56,12 +54,8 @@ import io.druid.guice.StorageNodeModule;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.jackson.JacksonModule;
import io.druid.server.initialization.ConfigModule;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.server.initialization.JettyServerModule;
import io.druid.server.initialization.PropertiesModule;
import io.druid.server.metrics.MetricsModule;
import io.tesla.aether.Repository;
import io.tesla.aether.TeslaAether;
@ -85,6 +79,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -354,25 +349,6 @@ public class Initialization
return Guice.createInjector(Modules.override(defaultModules.getModules()).with(actualModules.getModules()));
}
public static Injector makeStartupInjector()
{
return Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new ConfigModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(DruidSecondaryModule.class);
JsonConfigProvider.bind(binder, "druid.extensions", ExtensionsConfig.class);
}
}
);
}
private static class ModuleList
{
private final Injector baseInjector;

View File

@ -19,11 +19,9 @@
package io.druid.segment.loading;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnConfig;
import org.apache.commons.io.FileUtils;
import java.io.File;
@ -35,21 +33,11 @@ public class MMappedQueryableIndexFactory implements QueryableIndexFactory
{
private static final Logger log = new Logger(MMappedQueryableIndexFactory.class);
private final ColumnConfig columnConfig;
@Inject
public MMappedQueryableIndexFactory(
ColumnConfig columnConfig
)
{
this.columnConfig = columnConfig;
}
@Override
public QueryableIndex factorize(File parentDir) throws SegmentLoadingException
{
try {
return IndexIO.loadIndex(parentDir, columnConfig);
return IndexIO.loadIndex(parentDir);
}
catch (IOException e) {
log.warn(e, "Got exception!!!! Going to delete parentDir[%s]", parentDir);

View File

@ -63,8 +63,7 @@ public class FlushingPlumber extends RealtimePlumber
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
ColumnConfig columnConfig
ExecutorService queryExecutorService
)
{
super(
@ -77,8 +76,7 @@ public class FlushingPlumber extends RealtimePlumber
queryExecutorService,
null,
null,
null,
columnConfig
null
);
this.flushDuration = flushDuration;

View File

@ -52,7 +52,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final ColumnConfig columnConfig;
@JsonCreator
public FlushingPlumberSchool(
@ -61,7 +60,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject ColumnConfig columnConfig,
// Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@ -79,7 +77,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
null,
null,
queryExecutorService,
columnConfig,
windowPeriod,
basePersistDirectory,
segmentGranularity,
@ -93,7 +90,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.columnConfig = columnConfig;
}
@Override
@ -113,8 +109,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
columnConfig
queryExecutorService
);
}

View File

@ -37,8 +37,6 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -92,7 +90,6 @@ public class RealtimePlumber implements Plumber
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
private final ColumnConfig columnConfig;
private volatile boolean shuttingDown = false;
private volatile boolean stopped = false;
private volatile ExecutorService persistExecutor = null;
@ -109,8 +106,7 @@ public class RealtimePlumber implements Plumber
ExecutorService queryExecutorService,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
FilteredServerView serverView,
ColumnConfig columnConfig
FilteredServerView serverView
)
{
this.schema = schema;
@ -124,7 +120,6 @@ public class RealtimePlumber implements Plumber
this.dataSegmentPusher = dataSegmentPusher;
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.columnConfig = columnConfig;
log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
}
@ -347,7 +342,7 @@ public class RealtimePlumber implements Plumber
mergedTarget
);
QueryableIndex index = IndexIO.loadIndex(mergedFile, columnConfig);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
@ -533,7 +528,7 @@ public class RealtimePlumber implements Plumber
versioningPolicy.getVersion(sinkInterval),
config.getShardSpec()
),
IndexIO.loadIndex(segmentDir, columnConfig)
IndexIO.loadIndex(segmentDir)
),
Integer.parseInt(segmentDir.getName())
)
@ -714,7 +709,7 @@ public class RealtimePlumber implements Plumber
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
IndexIO.loadIndex(persistedFile, columnConfig)
IndexIO.loadIndex(persistedFile)
)
);

View File

@ -52,7 +52,6 @@ public class RealtimePlumberSchool implements PlumberSchool
private final SegmentPublisher segmentPublisher;
private final FilteredServerView serverView;
private final ExecutorService queryExecutorService;
private final ColumnConfig columnConfig;
// Backwards compatible
private final Period windowPeriod;
@ -71,7 +70,6 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject FilteredServerView serverView,
@JacksonInject @Processing ExecutorService executorService,
@JacksonInject ColumnConfig columnConfig,
// Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@ -88,7 +86,6 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.queryExecutorService = executorService;
this.columnConfig = columnConfig;
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
@ -153,8 +150,7 @@ public class RealtimePlumberSchool implements PlumberSchool
queryExecutorService,
dataSegmentPusher,
segmentPublisher,
serverView,
columnConfig
serverView
);
}

View File

@ -27,10 +27,10 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import io.druid.guice.ExtensionsConfig;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ExtensionsConfig;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
@ -58,7 +58,7 @@ public class InitializationTest
@Test
public void test02MakeStartupInjector() throws Exception
{
Injector startupInjector = Initialization.makeStartupInjector();
Injector startupInjector = GuiceInjectors.makeStartupInjector();
Assert.assertNotNull(startupInjector);
Assert.assertNotNull(startupInjector.getInstance(ObjectMapper.class));
}
@ -66,7 +66,7 @@ public class InitializationTest
@Test
public void test03ClassLoaderExtensionsLoading()
{
Injector startupInjector = Initialization.makeStartupInjector();
Injector startupInjector = GuiceInjectors.makeStartupInjector();
Function<DruidModule, String> fnClassName = new Function<DruidModule, String>()
{
@ -99,7 +99,7 @@ public class InitializationTest
@Test
public void test04MakeInjectorWithModules() throws Exception
{
Injector startupInjector = Initialization.makeStartupInjector();
Injector startupInjector = GuiceInjectors.makeStartupInjector();
Injector injector = Initialization.makeInjectorWithModules(
startupInjector, ImmutableList.<Object>of(
new com.google.inject.Module()

View File

@ -73,7 +73,7 @@ public class FireDepartmentTest
new RealtimeIOConfig(
null,
new RealtimePlumberSchool(
null, null, null, null, null, null, null, null, null, null, null, null, null, 0
null, null, null, null, null, null, null, null, null, null, null, null, 0
)
),
new RealtimeTuningConfig(

View File

@ -65,15 +65,6 @@ import java.util.concurrent.TimeUnit;
*/
public class RealtimePlumberSchoolTest
{
private final ColumnConfig columnConfig = new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
};
private Plumber plumber;
private DataSegmentAnnouncer announcer;
@ -153,7 +144,6 @@ public class RealtimePlumberSchoolTest
segmentPublisher,
serverView,
MoreExecutors.sameThreadExecutor(),
columnConfig,
new Period("PT10m"),
tmpDir,
Granularity.HOUR,

View File

@ -35,6 +35,7 @@ import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.Jerseys;
import io.druid.guice.LazySingleton;
import io.druid.guice.annotations.Global;
import io.druid.initialization.GuiceInjectors;
import io.druid.initialization.Initialization;
import org.apache.commons.io.IOUtils;
import org.eclipse.jetty.server.Handler;
@ -90,7 +91,7 @@ public class JettyTest
{
setProperties();
Injector injector = Initialization.makeInjectorWithModules(
Initialization.makeStartupInjector(), Lists.<Object>newArrayList(
GuiceInjectors.makeStartupInjector(), Lists.<Object>newArrayList(
new Module()
{
@Override

View File

@ -27,7 +27,7 @@ import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.guice.ExtensionsConfig;
import io.tesla.aether.internal.DefaultTeslaAether;
import java.io.File;

View File

@ -25,13 +25,11 @@ import io.airlift.command.Help;
import io.airlift.command.ParseException;
import io.druid.cli.convert.ConvertProperties;
import io.druid.cli.validate.DruidJsonValidator;
import io.druid.guice.ExtensionsConfig;
import io.druid.initialization.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.util.Collection;
import java.util.List;
/**
*/
@ -75,7 +73,7 @@ public class Main
.withDefaultCommand(Help.class)
.withCommands(CliPeon.class, CliInternalHadoopIndexer.class);
final Injector injector = Initialization.makeStartupInjector();
final Injector injector = GuiceInjectors.makeStartupInjector();
final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class);
final Collection<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(config, CliCommandCreator.class);

View File

@ -27,7 +27,7 @@ import io.airlift.command.Option;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.HadoopIndexTask;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.guice.ExtensionsConfig;
import io.tesla.aether.internal.DefaultTeslaAether;
import java.util.List;