mirror of https://github.com/apache/druid.git
Merge pull request #2126 from guobingkun/remove_single_announcer
Remove SingleDataSegmentAnnouncer in favor of BatchDataSegmentAnnouncer
This commit is contained in:
commit
c2a2d19d62
|
@ -269,19 +269,7 @@ This config is used to find the [Coordinator](../design/coordinator.html) using
|
||||||
|
|
||||||
### Announcing Segments
|
### Announcing Segments
|
||||||
|
|
||||||
You can optionally configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs.
|
You can configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs.
|
||||||
|
|
||||||
#### Data Segment Announcer
|
|
||||||
|
|
||||||
Data segment announcers are used to announce segments.
|
|
||||||
|
|
||||||
|Property|Description|Default|
|
|
||||||
|--------|-----------|-------|
|
|
||||||
|`druid.announcer.type`|Choices: legacy or batch. The type of data segment announcer to use.|batch|
|
|
||||||
|
|
||||||
##### Single Data Segment Announcer
|
|
||||||
|
|
||||||
In legacy Druid, each segment served by a node would be announced as an individual Znode.
|
|
||||||
|
|
||||||
##### Batch Data Segment Announcer
|
##### Batch Data Segment Announcer
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,6 @@ import io.druid.curator.announcement.Announcer;
|
||||||
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
|
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
|
||||||
import io.druid.server.coordination.DataSegmentAnnouncer;
|
import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||||
import io.druid.server.coordination.DataSegmentAnnouncerProvider;
|
import io.druid.server.coordination.DataSegmentAnnouncerProvider;
|
||||||
import io.druid.server.coordination.SingleDataSegmentAnnouncer;
|
|
||||||
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
|
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
|
||||||
|
@ -42,7 +41,6 @@ public class AnnouncerModule implements Module
|
||||||
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
|
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
|
||||||
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
|
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
|
||||||
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
|
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
|
||||||
binder.bind(SingleDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
|
|
|
@ -1,89 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. Metamarkets licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.druid.server.coordination;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.google.inject.Inject;
|
|
||||||
import com.metamx.common.logger.Logger;
|
|
||||||
import io.druid.curator.announcement.Announcer;
|
|
||||||
import io.druid.server.initialization.ZkPathsConfig;
|
|
||||||
import io.druid.timeline.DataSegment;
|
|
||||||
import org.apache.curator.utils.ZKPaths;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|
||||||
{
|
|
||||||
private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class);
|
|
||||||
|
|
||||||
private final Announcer announcer;
|
|
||||||
private final ObjectMapper jsonMapper;
|
|
||||||
private final String servedSegmentsLocation;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public SingleDataSegmentAnnouncer(
|
|
||||||
DruidServerMetadata server,
|
|
||||||
ZkPathsConfig config,
|
|
||||||
Announcer announcer,
|
|
||||||
ObjectMapper jsonMapper
|
|
||||||
)
|
|
||||||
{
|
|
||||||
super(server, config, announcer, jsonMapper);
|
|
||||||
|
|
||||||
this.announcer = announcer;
|
|
||||||
this.jsonMapper = jsonMapper;
|
|
||||||
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void announceSegment(DataSegment segment) throws IOException
|
|
||||||
{
|
|
||||||
final String path = makeServedSegmentPath(segment);
|
|
||||||
log.info("Announcing segment[%s] to path[%s]", segment.getIdentifier(), path);
|
|
||||||
announcer.announce(path, jsonMapper.writeValueAsBytes(segment));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void unannounceSegment(DataSegment segment) throws IOException
|
|
||||||
{
|
|
||||||
final String path = makeServedSegmentPath(segment);
|
|
||||||
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), path);
|
|
||||||
announcer.unannounce(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
|
||||||
{
|
|
||||||
for (DataSegment segment : segments) {
|
|
||||||
announceSegment(segment);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
|
||||||
{
|
|
||||||
for (DataSegment segment : segments) {
|
|
||||||
unannounceSegment(segment);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String makeServedSegmentPath(DataSegment segment)
|
|
||||||
{
|
|
||||||
return ZKPaths.makePath(servedSegmentsLocation, segment.getIdentifier());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -19,35 +19,6 @@
|
||||||
|
|
||||||
package io.druid.server.coordination;
|
package io.druid.server.coordination;
|
||||||
|
|
||||||
import io.druid.client.cache.CacheConfig;
|
|
||||||
import io.druid.client.cache.LocalCacheProvider;
|
|
||||||
import io.druid.concurrent.Execs;
|
|
||||||
import io.druid.curator.CuratorTestBase;
|
|
||||||
import io.druid.curator.announcement.Announcer;
|
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
|
||||||
import io.druid.query.NoopQueryRunnerFactoryConglomerate;
|
|
||||||
import io.druid.segment.IndexIO;
|
|
||||||
import io.druid.segment.loading.CacheTestSegmentLoader;
|
|
||||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
|
||||||
import io.druid.server.initialization.ZkPathsConfig;
|
|
||||||
import io.druid.server.metrics.NoopServiceEmitter;
|
|
||||||
import io.druid.timeline.DataSegment;
|
|
||||||
import io.druid.timeline.partition.NoneShardSpec;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
|
||||||
import org.joda.time.Interval;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
|
@ -61,6 +32,34 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
|
import io.druid.client.cache.CacheConfig;
|
||||||
|
import io.druid.client.cache.LocalCacheProvider;
|
||||||
|
import io.druid.concurrent.Execs;
|
||||||
|
import io.druid.curator.CuratorTestBase;
|
||||||
|
import io.druid.curator.announcement.Announcer;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import io.druid.query.NoopQueryRunnerFactoryConglomerate;
|
||||||
|
import io.druid.segment.IndexIO;
|
||||||
|
import io.druid.segment.loading.CacheTestSegmentLoader;
|
||||||
|
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||||
|
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
|
||||||
|
import io.druid.server.initialization.ZkPathsConfig;
|
||||||
|
import io.druid.server.metrics.NoopServiceEmitter;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import io.druid.timeline.partition.NoneShardSpec;
|
||||||
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -117,8 +116,12 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
announceCount = new AtomicInteger(0);
|
announceCount = new AtomicInteger(0);
|
||||||
announcer = new DataSegmentAnnouncer()
|
announcer = new DataSegmentAnnouncer()
|
||||||
{
|
{
|
||||||
private final DataSegmentAnnouncer delegate = new SingleDataSegmentAnnouncer(
|
private final DataSegmentAnnouncer delegate = new BatchDataSegmentAnnouncer(
|
||||||
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
|
me,
|
||||||
|
new BatchDataSegmentAnnouncerConfig(),
|
||||||
|
zkPaths,
|
||||||
|
new Announcer(curator, Execs.singleThreaded("blah")),
|
||||||
|
jsonMapper
|
||||||
);
|
);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -191,7 +194,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
public void testLoadCache() throws Exception
|
public void testLoadCache() throws Exception
|
||||||
{
|
{
|
||||||
List<DataSegment> segments = Lists.newLinkedList();
|
List<DataSegment> segments = Lists.newLinkedList();
|
||||||
for(int i = 0; i < COUNT; ++i) {
|
for (int i = 0; i < COUNT; ++i) {
|
||||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01")));
|
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01")));
|
||||||
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02")));
|
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02")));
|
||||||
segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02")));
|
segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02")));
|
||||||
|
@ -216,7 +219,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
|
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
|
||||||
zkCoordinator.start();
|
zkCoordinator.start();
|
||||||
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
|
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
|
||||||
for(int i = 0; i < COUNT; ++i) {
|
for (int i = 0; i < COUNT; ++i) {
|
||||||
Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue());
|
Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue());
|
||||||
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
|
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue