From f6995bc9084d3a56121fe6e83b9d62ad4d204529 Mon Sep 17 00:00:00 2001 From: Slim Date: Mon, 31 Oct 2016 10:05:58 -0700 Subject: [PATCH] offline appenderator factory. (#3483) * adding default offline appenderator * adding test * fix comments * fix comments --- .../appenderator/AppenderatorFactory.java | 3 +- .../appenderator/AppenderatorImpl.java | 8 +- .../realtime/appenderator/Appenderators.java | 1 - .../DefaultOfflineAppenderatorFactory.java | 60 ++++++ ...> DefaultRealtimeAppenderatorFactory.java} | 5 +- ...DefaultOfflineAppenderatorFactoryTest.java | 172 ++++++++++++++++++ 6 files changed, 242 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java rename server/src/main/java/io/druid/segment/realtime/appenderator/{DefaultAppenderatorFactory.java => DefaultRealtimeAppenderatorFactory.java} (97%) create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java index 039fb8995bc..5e21a64f10a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorFactory.java @@ -28,7 +28,8 @@ import io.druid.segment.realtime.FireDepartmentMetrics; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "default", value = DefaultAppenderatorFactory.class) + @JsonSubTypes.Type(name = "default", value = DefaultRealtimeAppenderatorFactory.class), + @JsonSubTypes.Type(name = "offline", value = DefaultOfflineAppenderatorFactory.class) }) public interface AppenderatorFactory { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 1d39538db0a..bbe2b2bb68b 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -143,7 +143,7 @@ public class AppenderatorImpl implements Appenderator this.segmentAnnouncer = Preconditions.checkNotNull(segmentAnnouncer, "segmentAnnouncer"); this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger"); - this.cache = Preconditions.checkNotNull(cache, "cache"); + this.cache = cache; this.texasRanger = conglomerate == null ? null : new SinkQuerySegmentWalker( schema.getDataSource(), sinkTimeline, @@ -151,7 +151,7 @@ public class AppenderatorImpl implements Appenderator emitter, conglomerate, queryExecutorService, - cache, + Preconditions.checkNotNull(cache, "cache"), cacheConfig ); @@ -912,7 +912,9 @@ public class AppenderatorImpl implements Appenderator identifier.getShardSpec().createChunk(sink) ); for (FireHydrant hydrant : sink) { - cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + if (cache != null) { + cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + } } if (removeOnDiskData) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java index 41614315448..f978221e955 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java @@ -128,5 +128,4 @@ public class Appenderators ); } - } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java new file mode 100644 index 00000000000..2b81e4f4d85 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.FireDepartmentMetrics; + + +public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory +{ + private final DataSegmentPusher dataSegmentPusher; + private final ObjectMapper objectMapper; + private final IndexIO indexIO; + private final IndexMerger indexMerger; + + @JsonCreator + public DefaultOfflineAppenderatorFactory( + @JacksonInject DataSegmentPusher dataSegmentPusher, + @JacksonInject ObjectMapper objectMapper, + @JacksonInject IndexIO indexIO, + @JacksonInject IndexMerger indexMerger + ) { + this.dataSegmentPusher = dataSegmentPusher; + this.objectMapper = objectMapper; + this.indexIO = indexIO; + this.indexMerger = indexMerger; + } + + @Override + public Appenderator build( + DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + ) + { + return Appenderators.createOffline(schema, config, metrics, dataSegmentPusher, objectMapper, indexIO, indexMerger); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java similarity index 97% rename from server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java rename to server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index a7b8d06bd26..9ff9c8edf00 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultAppenderatorFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -39,7 +39,8 @@ import io.druid.timeline.partition.ShardSpec; import java.io.File; import java.util.concurrent.ExecutorService; -public class DefaultAppenderatorFactory implements AppenderatorFactory +public class +DefaultRealtimeAppenderatorFactory implements AppenderatorFactory { private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; @@ -52,7 +53,7 @@ public class DefaultAppenderatorFactory implements AppenderatorFactory private final Cache cache; private final CacheConfig cacheConfig; - public DefaultAppenderatorFactory( + public DefaultRealtimeAppenderatorFactory( @JacksonInject ServiceEmitter emitter, @JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java new file mode 100644 index 00000000000..53c8c177a37 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.guice.GuiceInjectors; +import io.druid.initialization.Initialization; +import io.druid.query.DruidProcessingConfig; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.plumber.Committers; +import io.druid.timeline.partition.LinearShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Map; + +public class DefaultOfflineAppenderatorFactoryTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testBuild() throws IOException, SegmentNotWritableException + { + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of(new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); + binder.bind(DruidProcessingConfig.class).toInstance( + new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return "processing-%s"; + } + + @Override + public int intermediateComputeSizeBytes() + { + return 100 * 1024 * 1024; + } + + @Override + public int getNumThreads() + { + return 1; + } + + @Override + public int columnCacheSizeBytes() + { + return 25 * 1024 * 1024; + } + } + ); + binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); + } + } + ) + ); + ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class); + AppenderatorFactory defaultOfflineAppenderatorFactory = objectMapper.reader(AppenderatorFactory.class) + .readValue("{\"type\":\"offline\"}"); + + final Map parserMap = objectMapper.convertValue( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(null, null, null), + null, + null + ) + ), + Map.class + ); + DataSchema schema = new DataSchema( + "dataSourceName", + parserMap, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("met", "met") + }, + new UniformGranularitySpec(Granularity.MINUTE, QueryGranularities.NONE, null), + objectMapper + ); + + RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + 75000, + null, + null, + temporaryFolder.newFolder(), + null, + null, + null, + null, + null, + null, + 0, + 0, + null, + null + ); + + try (Appenderator appenderator = defaultOfflineAppenderatorFactory.build( + schema, + tuningConfig, + new FireDepartmentMetrics() + )) { + Assert.assertEquals("dataSourceName", appenderator.getDataSource()); + Assert.assertEquals(null, appenderator.startJob()); + SegmentIdentifier identifier = new SegmentIdentifier( + "dataSourceName", + new Interval("2000/2001"), + "A", + new LinearShardSpec(0) + ); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(identifier, AppenderatorTest.IR("2000", "bar", 1), Suppliers.ofInstance(Committers.nil())); + Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(identifier, AppenderatorTest.IR("2000", "baz", 1), Suppliers.ofInstance(Committers.nil())); + Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } + } +}