Remove SingleDataSegmentAnnouncer in favor of BatchDataSegmentAnnouncer

This commit is contained in:
Bingkun Guo 2015-12-18 15:19:01 -06:00
parent 3d135e35b4
commit 951a4e9b35
4 changed files with 37 additions and 137 deletions

View File

@ -269,19 +269,7 @@ This config is used to find the [Coordinator](../design/coordinator.html) using
### Announcing Segments
You can optionally configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs.
#### Data Segment Announcer
Data segment announcers are used to announce segments.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.announcer.type`|Choices: legacy or batch. The type of data segment announcer to use.|batch|
##### Single Data Segment Announcer
In legacy Druid, each segment served by a node would be announced as an individual Znode.
You can configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs.
##### Batch Data Segment Announcer

View File

@ -27,7 +27,6 @@ import io.druid.curator.announcement.Announcer;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentAnnouncerProvider;
import io.druid.server.coordination.SingleDataSegmentAnnouncer;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.curator.framework.CuratorFramework;
@ -42,7 +41,6 @@ public class AnnouncerModule implements Module
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
binder.bind(SingleDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
}
@Provides

View File

@ -1,89 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.curator.announcement.Announcer;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{
private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class);
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String servedSegmentsLocation;
@Inject
public SingleDataSegmentAnnouncer(
DruidServerMetadata server,
ZkPathsConfig config,
Announcer announcer,
ObjectMapper jsonMapper
)
{
super(server, config, announcer, jsonMapper);
this.announcer = announcer;
this.jsonMapper = jsonMapper;
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName());
}
public void announceSegment(DataSegment segment) throws IOException
{
final String path = makeServedSegmentPath(segment);
log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path);
announcer.announce(path, jsonMapper.writeValueAsBytes(segment));
}
public void unannounceSegment(DataSegment segment) throws IOException
{
final String path = makeServedSegmentPath(segment);
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), path);
announcer.unannounce(path);
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
announceSegment(segment);
}
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
unannounceSegment(segment);
}
}
private String makeServedSegmentPath(DataSegment segment)
{
return ZKPaths.makePath(servedSegmentsLocation, segment.getIdentifier());
}
}

View File

@ -19,35 +19,6 @@
package io.druid.server.coordination;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.LocalCacheProvider;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.announcement.Announcer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.NoopQueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.loading.CacheTestSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -61,6 +32,34 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.LocalCacheProvider;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.announcement.Announcer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.NoopQueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.loading.CacheTestSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.curator.framework.CuratorFramework;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@ -117,8 +116,12 @@ public class ZkCoordinatorTest extends CuratorTestBase
announceCount = new AtomicInteger(0);
announcer = new DataSegmentAnnouncer()
{
private final DataSegmentAnnouncer delegate = new SingleDataSegmentAnnouncer(
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
private final DataSegmentAnnouncer delegate = new BatchDataSegmentAnnouncer(
me,
new BatchDataSegmentAnnouncerConfig(),
zkPaths,
new Announcer(curator, Execs.singleThreaded("blah")),
jsonMapper
);
@Override
@ -191,7 +194,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
public void testLoadCache() throws Exception
{
List<DataSegment> segments = Lists.newLinkedList();
for(int i = 0; i < COUNT; ++i) {
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02")));
@ -216,7 +219,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
zkCoordinator.start();
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
for(int i = 0; i < COUNT; ++i) {
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
}