Merge pull request #2784 from metamx/lite-segment-announcements

Add ability to skip loadSpec,dimensions, metrics from DataSegment Announcements
This commit is contained in:
Slim 2016-04-07 09:51:22 -05:00
commit d5787dd3cd
6 changed files with 172 additions and 38 deletions

View File

@ -221,6 +221,11 @@ public class DataSegment implements Comparable<DataSegment>
return builder(this).dimensions(dimensions).build();
}
public DataSegment withMetrics(List<String> metrics)
{
return builder(this).metrics(metrics).build();
}
public DataSegment withSize(long size)
{
return builder(this).size(size).build();

View File

@ -296,3 +296,7 @@ In current Druid, multiple data segments may be announced under the same Znode.
|--------|-----------|-------|
|`druid.announcer.segmentsPerNode`|Each Znode contains info for up to this many segments.|50|
|`druid.announcer.maxBytesPerNode`|Max byte size for Znode.|524288|
|`druid.announcer.skipDimensions`|Skip Dimension list from segment announcements. NOTE: Enabling this will also remove the dimensions list from coordinator and broker endpoints.|false|
|`druid.announcer.skipMetrics`|Skip Metrics list from segment announcements. NOTE: Enabling this will also remove the metrics list from coordinator and broker endpoints.|false|
|`druid.announcer.skipLoadSpec`|Skip segment LoadSpec from segment announcements. NOTE: Enabling this will also remove the loadspec from coordinator and broker endpoints.|false|

View File

@ -1,26 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime;
/**
*/
public abstract class RealtimeCuratorDataSegmentAnnouncerConfig
{
}

View File

@ -21,7 +21,9 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
@ -59,11 +61,12 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
private final Function<DataSegment, DataSegment> segmentTransformer;
@Inject
public BatchDataSegmentAnnouncer(
DruidServerMetadata server,
BatchDataSegmentAnnouncerConfig config,
final BatchDataSegmentAnnouncerConfig config,
ZkPathsConfig zkPaths,
Announcer announcer,
ObjectMapper jsonMapper
@ -76,12 +79,31 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
this.server = server;
this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName());
segmentTransformer = new Function<DataSegment, DataSegment>()
{
@Override
public DataSegment apply(DataSegment input)
{
DataSegment rv = input;
if (config.isSkipDimensions()) {
rv = rv.withDimensions(null);
}
if (config.isSkipMetrics()) {
rv = rv.withMetrics(null);
}
if (config.isSkipLoadSpec()) {
rv = rv.withLoadSpec(null);
}
return rv;
}
};
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
DataSegment toAnnounce = segmentTransformer.apply(segment);
int newBytesLen = jsonMapper.writeValueAsBytes(toAnnounce).length;
if (newBytesLen > config.getMaxBytesPerNode()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode());
}
@ -94,11 +116,15 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) {
availableZNode.addSegment(segment);
availableZNode.addSegment(toAnnounce);
log.info("Announcing segment[%s] at existing path[%s]", segment.getIdentifier(), availableZNode.getPath());
log.info(
"Announcing segment[%s] at existing path[%s]",
toAnnounce.getIdentifier(),
availableZNode.getPath()
);
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
segmentLookup.put(toAnnounce, availableZNode);
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
@ -118,11 +144,11 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
// create new batch
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment);
availableZNode.addSegment(toAnnounce);
log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), availableZNode.getPath());
log.info("Announcing segment[%s] at new path[%s]", toAnnounce.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
segmentLookup.put(toAnnounce, availableZNode);
availableZNodes.add(availableZNode);
}
}
@ -154,12 +180,13 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
Iterable<DataSegment> toAnnounce = Iterables.transform(segments, segmentTransformer);
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath());
Set<DataSegment> batch = Sets.newHashSet();
int byteSize = 0;
int count = 0;
for (DataSegment segment : segments) {
for (DataSegment segment : toAnnounce) {
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxBytesPerNode()) {

View File

@ -37,6 +37,18 @@ public class BatchDataSegmentAnnouncerConfig
@Min(1024)
private long maxBytesPerNode = 512 * 1024;
// Skip LoadSpec from segment announcements
@JsonProperty
private boolean skipLoadSpec = false;
// Skip dimension list from segment announcements
@JsonProperty
private boolean skipDimensions = false;
// Skip metrics list from segment announcements
@JsonProperty
private boolean skipMetrics = false;
public int getSegmentsPerNode()
{
return segmentsPerNode;
@ -46,4 +58,19 @@ public class BatchDataSegmentAnnouncerConfig
{
return maxBytesPerNode;
}
public boolean isSkipLoadSpec()
{
return skipLoadSpec;
}
public boolean isSkipDimensions()
{
return skipDimensions;
}
public boolean isSkipMetrics()
{
return skipMetrics;
}
}

View File

@ -23,6 +23,9 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
@ -66,6 +69,10 @@ public class BatchDataSegmentAnnouncerTest
private Set<DataSegment> testSegments;
private final AtomicInteger maxBytesPerNode = new AtomicInteger(512 * 1024);
private Boolean skipDimensions;
private Boolean skipMetrics;
private Boolean skipLoadSpec;
@Before
public void setUp() throws Exception
@ -91,6 +98,9 @@ public class BatchDataSegmentAnnouncerTest
announcer.start();
segmentReader = new SegmentReader(cf, jsonMapper);
skipDimensions = false;
skipMetrics = false;
skipLoadSpec = false;
segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata(
"id",
@ -113,6 +123,24 @@ public class BatchDataSegmentAnnouncerTest
{
return maxBytesPerNode.get();
}
@Override
public boolean isSkipDimensions()
{
return skipDimensions;
}
@Override
public boolean isSkipMetrics()
{
return skipMetrics;
}
@Override
public boolean isSkipLoadSpec()
{
return skipLoadSpec;
}
},
new ZkPathsConfig()
{
@ -177,13 +205,79 @@ public class BatchDataSegmentAnnouncerTest
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testSkipDimensions() throws Exception
{
skipDimensions = true;
Iterator<DataSegment> segIter = testSegments.iterator();
DataSegment firstSegment = segIter.next();
segmentAnnouncer.announceSegment(firstSegment);
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
for (String zNode : zNodes) {
DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
Assert.assertEquals(announcedSegment, firstSegment);
Assert.assertTrue(announcedSegment.getDimensions().isEmpty());
}
segmentAnnouncer.unannounceSegment(firstSegment);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testSkipMetrics() throws Exception
{
skipMetrics = true;
Iterator<DataSegment> segIter = testSegments.iterator();
DataSegment firstSegment = segIter.next();
segmentAnnouncer.announceSegment(firstSegment);
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
for (String zNode : zNodes) {
DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
Assert.assertEquals(announcedSegment, firstSegment);
Assert.assertTrue(announcedSegment.getMetrics().isEmpty());
}
segmentAnnouncer.unannounceSegment(firstSegment);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testSkipLoadSpec() throws Exception
{
skipLoadSpec = true;
Iterator<DataSegment> segIter = testSegments.iterator();
DataSegment firstSegment = segIter.next();
segmentAnnouncer.announceSegment(firstSegment);
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
for (String zNode : zNodes) {
DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
Assert.assertEquals(announcedSegment, firstSegment);
Assert.assertNull(announcedSegment.getLoadSpec());
}
segmentAnnouncer.unannounceSegment(firstSegment);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testSingleAnnounceManyTimes() throws Exception
{
int prevMax = maxBytesPerNode.get();
maxBytesPerNode.set(2048);
// each segment is about 317 bytes long and that makes 2048 / 317 = 6 segments included per node
// so 100 segments makes (100 / 6) + 1 = 17 nodes
// each segment is about 348 bytes long and that makes 2048 / 348 = 5 segments included per node
// so 100 segments makes 100 / 5 = 20 nodes
try {
for (DataSegment segment : testSegments) {
segmentAnnouncer.announceSegment(segment);
@ -194,7 +288,7 @@ public class BatchDataSegmentAnnouncerTest
}
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
Assert.assertEquals(17, zNodes.size());
Assert.assertEquals(20, zNodes.size());
Set<DataSegment> segments = Sets.newHashSet(testSegments);
for (String zNode : zNodes) {
@ -244,6 +338,9 @@ public class BatchDataSegmentAnnouncerTest
)
)
.version(new DateTime().toString())
.dimensions(ImmutableList.<String>of("dim1", "dim2"))
.metrics(ImmutableList.<String>of("met1", "met2"))
.loadSpec(ImmutableMap.<String, Object>of("type", "local"))
.build();
}