offline appenderator factory. (#3483)

* adding default offline appenderator

* adding test

* fix comments

* fix comments
This commit is contained in:
Slim 2016-10-31 10:05:58 -07:00 committed by GitHub
parent 317d62e18c
commit f6995bc908
6 changed files with 242 additions and 7 deletions

View File

@ -28,7 +28,8 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "default", value = DefaultAppenderatorFactory.class)
@JsonSubTypes.Type(name = "default", value = DefaultRealtimeAppenderatorFactory.class),
@JsonSubTypes.Type(name = "offline", value = DefaultOfflineAppenderatorFactory.class)
})
public interface AppenderatorFactory
{

View File

@ -143,7 +143,7 @@ public class AppenderatorImpl implements Appenderator
this.segmentAnnouncer = Preconditions.checkNotNull(segmentAnnouncer, "segmentAnnouncer");
this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cache = cache;
this.texasRanger = conglomerate == null ? null : new SinkQuerySegmentWalker(
schema.getDataSource(),
sinkTimeline,
@ -151,7 +151,7 @@ public class AppenderatorImpl implements Appenderator
emitter,
conglomerate,
queryExecutorService,
cache,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig
);
@ -912,7 +912,9 @@ public class AppenderatorImpl implements Appenderator
identifier.getShardSpec().createChunk(sink)
);
for (FireHydrant hydrant : sink) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
if (cache != null) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
}
if (removeOnDiskData) {

View File

@ -128,5 +128,4 @@ public class Appenderators
);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory
{
private final DataSegmentPusher dataSegmentPusher;
private final ObjectMapper objectMapper;
private final IndexIO indexIO;
private final IndexMerger indexMerger;
@JsonCreator
public DefaultOfflineAppenderatorFactory(
@JacksonInject DataSegmentPusher dataSegmentPusher,
@JacksonInject ObjectMapper objectMapper,
@JacksonInject IndexIO indexIO,
@JacksonInject IndexMerger indexMerger
) {
this.dataSegmentPusher = dataSegmentPusher;
this.objectMapper = objectMapper;
this.indexIO = indexIO;
this.indexMerger = indexMerger;
}
@Override
public Appenderator build(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return Appenderators.createOffline(schema, config, metrics, dataSegmentPusher, objectMapper, indexIO, indexMerger);
}
}

View File

@ -39,7 +39,8 @@ import io.druid.timeline.partition.ShardSpec;
import java.io.File;
import java.util.concurrent.ExecutorService;
public class DefaultAppenderatorFactory implements AppenderatorFactory
public class
DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
{
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
@ -52,7 +53,7 @@ public class DefaultAppenderatorFactory implements AppenderatorFactory
private final Cache cache;
private final CacheConfig cacheConfig;
public DefaultAppenderatorFactory(
public DefaultRealtimeAppenderatorFactory(
@JacksonInject ServiceEmitter emitter,
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,

View File

@ -0,0 +1,172 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.guice.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.timeline.partition.LinearShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Map;
public class DefaultOfflineAppenderatorFactoryTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testBuild() throws IOException, SegmentNotWritableException
{
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
binder.bind(DruidProcessingConfig.class).toInstance(
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return "processing-%s";
}
@Override
public int intermediateComputeSizeBytes()
{
return 100 * 1024 * 1024;
}
@Override
public int getNumThreads()
{
return 1;
}
@Override
public int columnCacheSizeBytes()
{
return 25 * 1024 * 1024;
}
}
);
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
}
}
)
);
ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
AppenderatorFactory defaultOfflineAppenderatorFactory = objectMapper.reader(AppenderatorFactory.class)
.readValue("{\"type\":\"offline\"}");
final Map<String, Object> parserMap = objectMapper.convertValue(
new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(null, null, null),
null,
null
)
),
Map.class
);
DataSchema schema = new DataSchema(
"dataSourceName",
parserMap,
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("met", "met")
},
new UniformGranularitySpec(Granularity.MINUTE, QueryGranularities.NONE, null),
objectMapper
);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
75000,
null,
null,
temporaryFolder.newFolder(),
null,
null,
null,
null,
null,
null,
0,
0,
null,
null
);
try (Appenderator appenderator = defaultOfflineAppenderatorFactory.build(
schema,
tuningConfig,
new FireDepartmentMetrics()
)) {
Assert.assertEquals("dataSourceName", appenderator.getDataSource());
Assert.assertEquals(null, appenderator.startJob());
SegmentIdentifier identifier = new SegmentIdentifier(
"dataSourceName",
new Interval("2000/2001"),
"A",
new LinearShardSpec(0)
);
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(identifier, AppenderatorTest.IR("2000", "bar", 1), Suppliers.ofInstance(Committers.nil()));
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(identifier, AppenderatorTest.IR("2000", "baz", 1), Suppliers.ofInstance(Committers.nil()));
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
}
}