Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
cheddar 2013-08-14 10:04:04 -07:00
commit c0624d9591
44 changed files with 2562 additions and 470 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>
@ -79,6 +79,11 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -0,0 +1,14 @@
package com.metamx.druid.indexer.data;
import java.nio.ByteBuffer;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class),
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class)
})
public interface ByteBufferInputRowParser extends InputRowParser<ByteBuffer> {
}

View File

@ -1,8 +1,15 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes({
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class),
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class)
})
public interface InputRowParser<T>
{
public InputRow parse(T input) throws FormattedException;

View File

@ -2,6 +2,7 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
@ -16,18 +17,20 @@ import java.util.Set;
public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{
private final TimestampSpec timestampSpec;
private final DataSpec dataSpec;
private List<String> dimensions;
private final Set<String> dimensionExclusions;
@JsonCreator
public MapInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.timestampSpec = timestampSpec;
this.dataSpec = dataSpec;
if (dimensions != null) {
this.dimensions = ImmutableList.copyOf(dimensions);
}
this.dimensionExclusions = Sets.newHashSet();
if (dimensionExclusions != null) {
for (String dimensionExclusion : dimensionExclusions) {
@ -40,8 +43,8 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
@Override
public InputRow parse(Map<String, Object> theMap) throws FormattedException
{
final List<String> dimensions = dataSpec.hasCustomDimensions()
? dataSpec.getDimensions()
final List<String> dimensions = hasCustomDimensions()
? this.dimensions
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp;
@ -67,6 +70,10 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
}
private boolean hasCustomDimensions() {
return dimensions != null;
}
@Override
public void addDimensionExclusion(String dimension)
{
@ -79,10 +86,10 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
return timestampSpec;
}
@JsonProperty("data")
public DataSpec getDataSpec()
@JsonProperty
public List<String> getDimensions()
{
return dataSpec;
return dimensions;
}
@JsonProperty

View File

@ -0,0 +1,108 @@
package com.metamx.druid.indexer.data;
import static com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import static com.google.protobuf.Descriptors.Descriptor;
import static com.google.protobuf.Descriptors.FileDescriptor;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
public class ProtoBufInputRowParser implements ByteBufferInputRowParser
{
private final MapInputRowParser inputRowCreator;
private final Descriptor descriptor;
@JsonCreator
public ProtoBufInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@JsonProperty("descriptor") String descriptorFileInClasspath)
{
descriptor = getDescriptor(descriptorFileInClasspath);
inputRowCreator = new MapInputRowParser(timestampSpec, dimensions, dimensionExclusions);
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
// TODO there should be a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly
Map<String, Object> theMap = buildStringKeyMap(input);
return inputRowCreator.parse(theMap);
}
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
Map<String, Object> theMap = Maps.newHashMap();
try
{
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
Map<Descriptors.FieldDescriptor, Object> allFields = message.getAllFields();
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet())
{
String name = entry.getKey().getName();
if (theMap.containsKey(name))
{
continue;
// TODO
// throw new RuntimeException("dupicate key " + name + " in " +
// message);
}
Object value = entry.getValue();
if(value instanceof Descriptors.EnumValueDescriptor) {
Descriptors.EnumValueDescriptor desc = (Descriptors.EnumValueDescriptor) value;
value = desc.getName();
}
theMap.put(name, value);
}
} catch (InvalidProtocolBufferException e)
{
// TODO
e.printStackTrace();
}
return theMap;
}
private Descriptor getDescriptor(String descriptorFileInClassPath)
{
try
{
InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);
FileDescriptor file = FileDescriptor.buildFrom(set.getFile(0), new FileDescriptor[]
{});
return file.getMessageTypes().get(0);
} catch (Exception e)
{
throw Throwables.propagate(e);
}
}
@Override
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
}

View File

@ -19,49 +19,106 @@
package com.metamx.druid.indexer.data;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Charsets;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.input.InputRow;
import java.util.List;
import java.util.Map;
/**
*/
public class StringInputRowParser implements InputRowParser<String>
public class StringInputRowParser implements ByteBufferInputRowParser
{
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final Parser<String, Object> parser;
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final Parser<String, Object> parser;
@JsonCreator
public StringInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
private CharBuffer chars = null;
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
@JsonCreator
public StringInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions)
{
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec.getDimensions(), dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
@Override
public InputRow parse(String input) throws FormattedException
{
return inputRowCreator.parse(parser.parse(input));
}
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{
return inputRowCreator;
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
return parseMap(buildStringKeyMap(input));
}
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
int payloadSize = input.remaining();
if (chars == null || chars.remaining() < payloadSize)
{
chars = CharBuffer.allocate(payloadSize);
}
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(input, chars, true);
Map<String, Object> theMap;
if (coderResult.isUnderflow())
{
chars.flip();
try
{
theMap = parseString(chars.toString());
} finally
{
chars.clear();
}
}
else
{
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
return theMap;
}
private Map<String, Object> parseString(String inputString)
{
return parser.parse(inputString);
}
public InputRow parse(String input) throws FormattedException
{
return parseMap(parseString(input));
}
private InputRow parseMap(Map<String, Object> theMap)
{
return inputRowCreator.parse(theMap);
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{
return inputRowCreator;
}
}

View File

@ -0,0 +1,83 @@
package com.metamx.druid.indexer.data;
import static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import com.google.protobuf.ByteString;
import org.joda.time.DateTime;
import org.junit.Test;
import com.metamx.druid.input.InputRow;
public class ProtoBufInputRowParserTest {
public static final String[] DIMENSIONS = new String[]{"eventType", "id", "someOtherId", "isValid"};
/*
eventType = 1;
required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;
optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
*/
@Test
public void testParse() throws Exception {
//configure parser with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(new TimestampSpec("timestamp", "iso"),
Arrays.asList(DIMENSIONS), Arrays.<String>asList(), "prototest.desc");
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 07, 12, 9, 30);
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parse(ByteBuffer.wrap(out.toByteArray()));
System.out.println(row);
assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions());
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
assertDimensionEquals(row, "eventType", CATEGORY_ONE.name());
assertEquals(47.11F, row.getFloatMetric("someFloatColumn"), 0.0);
assertEquals(815.0F, row.getFloatMetric("someIntColumn"), 0.0);
assertEquals(816.0F, row.getFloatMetric("someLongColumn"), 0.0);
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected) {
List<String> values = row.getDimension(dimension);
assertEquals(1, values.size());
assertEquals(expected, values.get(0));
}
}

View File

@ -0,0 +1,31 @@
package prototest;
option java_package = "com.metamx.druid.indexer.data";
option java_outer_classname = "ProtoTestEventWrapper";
message ProtoTestEvent {
enum EventCategory {
CATEGORY_ZERO = 0;
CATEGORY_ONE = 1;
CATEGORY_TWO = 2;
}
required EventCategory eventType = 1;
required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;
optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
}

Binary file not shown.

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>

10
pom.xml
View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
@ -38,7 +38,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.24.0-SNAPSHOT</metamx.java-util.version>
<metamx.java-util.version>0.24.0</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version>
</properties>
@ -325,6 +325,12 @@
<artifactId>lz4</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.4.0a</version>
</dependency>
<!-- Test Scope -->

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -19,14 +19,14 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.metamx.druid.indexer.data.ByteBufferInputRowParser;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
@ -34,131 +34,123 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.InputRowParser;
import com.metamx.druid.input.InputRow;
/**
*/
public class KafkaFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(KafkaFirehoseFactory.class);
private static final Logger log = new Logger(KafkaFirehoseFactory.class);
@JsonProperty
private final Properties consumerProps;
@JsonProperty
private final Properties consumerProps;
@JsonProperty
private final String feed;
@JsonProperty
private final String feed;
@JsonProperty
private final StringInputRowParser parser;
@JsonProperty
private final ByteBufferInputRowParser parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") StringInputRowParser parser
)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") ByteBufferInputRowParser parser)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = parser;
parser.addDimensionExclusion("feed");
}
parser.addDimensionExclusion("feed");
}
@Override
public Firehose connect() throws IOException
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
@Override
public Firehose connect() throws IOException
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) {
return null;
}
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1)
{
return null;
}
final KafkaStream<Message> stream = streamList.get(0);
final KafkaStream<Message> stream = streamList.get(0);
return new Firehose()
{
Iterator<MessageAndMetadata<Message>> iter = stream.iterator();
private CharBuffer chars = null;
return new DefaultFirehose(connector, stream, parser);
}
@Override
public boolean hasMore()
{
return iter.hasNext();
}
private static class DefaultFirehose implements Firehose
{
private final ConsumerConnector connector;
private final Iterator<MessageAndMetadata<Message>> iter;
private final InputRowParser<ByteBuffer> parser;
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next().message();
public DefaultFirehose(ConsumerConnector connector, KafkaStream<Message> stream, InputRowParser<ByteBuffer> parser)
{
iter = stream.iterator();
this.connector = connector;
this.parser = parser;
}
if (message == null) {
return null;
}
@Override
public boolean hasMore()
{
return iter.hasNext();
}
int payloadSize = message.payloadSize();
if (chars == null || chars.remaining() < payloadSize) {
chars = CharBuffer.allocate(payloadSize);
}
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next().message();
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(message.payload(), chars, true);
if (message == null)
{
return null;
}
if (coderResult.isUnderflow()) {
chars.flip();
try {
return parser.parse(chars.toString());
}
finally {
chars.clear();
}
}
else {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
}
return parseMessage(message);
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
This is actually not going to do exactly what we want, cause it will be called asynchronously
after the persist is complete. So, it's going to commit that it's processed more than was actually
persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade
of our Kafka version.
*/
public InputRow parseMessage(Message message) throws FormattedException
{
return parser.parse(message.payload());
}
log.info("committing offsets");
connector.commitOffsets();
}
};
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
* This is actually not going to do exactly what we want, cause it
* will be called asynchronously after the persist is complete. So,
* it's going to commit that it's processed more than was actually
* persisted. This is unfortunate, but good enough for now. Should
* revisit along with an upgrade of our Kafka version.
*/
@Override
public void close() throws IOException
{
connector.shutdown();
}
};
}
log.info("committing offsets");
connector.commitOffsets();
}
};
}
@Override
public void close() throws IOException
{
connector.shutdown();
}
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -46,11 +46,11 @@ public class ReferenceCountingSegment implements Segment
public Segment getBaseSegment()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment;
if (isClosed) {
return null;
}
return null;
return baseSegment;
}
}
@ -68,11 +68,11 @@ public class ReferenceCountingSegment implements Segment
public String getIdentifier()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.getIdentifier();
if (isClosed) {
return null;
}
return null;
return baseSegment.getIdentifier();
}
}
@ -80,11 +80,11 @@ public class ReferenceCountingSegment implements Segment
public Interval getDataInterval()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.getDataInterval();
if (isClosed) {
return null;
}
return null;
return baseSegment.getDataInterval();
}
}
@ -92,11 +92,11 @@ public class ReferenceCountingSegment implements Segment
public QueryableIndex asQueryableIndex()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.asQueryableIndex();
if (isClosed) {
return null;
}
return null;
return baseSegment.asQueryableIndex();
}
}
@ -104,11 +104,11 @@ public class ReferenceCountingSegment implements Segment
public StorageAdapter asStorageAdapter()
{
synchronized (lock) {
if (!isClosed) {
return baseSegment.asStorageAdapter();
if (isClosed) {
return null;
}
return null;
return baseSegment.asStorageAdapter();
}
}
@ -116,24 +116,18 @@ public class ReferenceCountingSegment implements Segment
public void close() throws IOException
{
synchronized (lock) {
log.info("Trying to close %s", baseSegment.getIdentifier());
if (!isClosed) {
if (numReferences > 0) {
log.info(
"%d references to %s still exist. Decrementing instead.",
numReferences,
baseSegment.getIdentifier()
);
decrement();
} else {
log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences);
baseSegment.close();
isClosed = true;
}
} else {
if (isClosed) {
log.info("Failed to close, %s is closed already", baseSegment.getIdentifier());
return;
}
if (numReferences > 0) {
log.info("%d references to %s still exist. Decrementing.", numReferences, baseSegment.getIdentifier());
decrement();
} else {
log.info("Closing %s", baseSegment.getIdentifier());
innerClose();
}
}
}
@ -141,38 +135,50 @@ public class ReferenceCountingSegment implements Segment
public Closeable increment()
{
synchronized (lock) {
if (!isClosed) {
numReferences++;
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
return new Closeable()
{
@Override
public void close() throws IOException
{
if (decrementOnce.compareAndSet(false, true)) {
decrement();
}
}
};
if (isClosed) {
return null;
}
return null;
numReferences++;
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
return new Closeable()
{
@Override
public void close() throws IOException
{
if (decrementOnce.compareAndSet(false, true)) {
decrement();
}
}
};
}
}
private void decrement()
{
synchronized (lock) {
if (!isClosed) {
if (--numReferences < 0) {
try {
close();
}
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
}
if (isClosed) {
return;
}
if (--numReferences < 0) {
try {
innerClose();
}
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
}
}
}
}
private void innerClose() throws IOException
{
synchronized (lock) {
log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences);
isClosed = true;
baseSegment.close();
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.metamx.druid.client.DataSegment;
import java.util.List;
public interface BalancerStrategy
{
public ServerHolder findNewSegmentHomeBalancer(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
public ServerHolder findNewSegmentHomeReplicator(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders);
public void emitStats(String tier, MasterStats stats, List<ServerHolder> serverHolderList);
}

View File

@ -0,0 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import org.joda.time.DateTime;
public interface BalancerStrategyFactory
{
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp);
}

View File

@ -1,95 +1,132 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
/**
* The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in
* computeJointSegmentCosts. It will then propose to move (pseudo-)randomly chosen segments from their
* respective initial servers to other servers, chosen greedily to minimize the cost of the cluster.
*/
public class BalancerCostAnalyzer
public class CostBalancerStrategy implements BalancerStrategy
{
private static final Logger log = new Logger(BalancerCostAnalyzer.class);
private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class);
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
private final Random rand;
private final DateTime referenceTimestamp;
public BalancerCostAnalyzer(DateTime referenceTimestamp)
public CostBalancerStrategy(DateTime referenceTimestamp)
{
this.referenceTimestamp = referenceTimestamp;
rand = new Random(0);
}
/**
* Calculates the cost normalization. This is such that the normalized cost is lower bounded
* by 1 (e.g. when each segment gets its own compute node).
*
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return The normalization value (the sum of the diagonal entries in the
* pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own compute node.
*/
public double calculateNormalization(final List<ServerHolder> serverHolders)
@Override
public ServerHolder findNewSegmentHomeReplicator(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
double cost = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServer().getSegments().values()) {
cost += computeJointSegmentCosts(segment, segment);
}
ServerHolder holder= chooseBestServer(proposalSegment, serverHolders, false).rhs;
if (holder!=null && !holder.isServingSegment(proposalSegment))
{
return holder;
}
return cost;
return null;
}
/**
* Calculates the initial cost of the Druid segment configuration.
*
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return The initial cost of the Druid tier.
*/
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
@Override
public ServerHolder findNewSegmentHomeBalancer(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
double cost = 0;
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
}
/**
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
*
* @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders An iterable of ServerHolders for a particular tier.
*
* @return A ServerHolder with the new home for a segment.
*/
private Pair<Double, ServerHolder> chooseBestServer(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders,
boolean includeCurrentServer
)
{
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
final long proposalSegmentSize = proposalSegment.getSize();
for (ServerHolder server : serverHolders) {
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
for (int i = 0; i < segments.length; ++i) {
for (int j = i; j < segments.length; ++j) {
cost += computeJointSegmentCosts(segments[i], segments[j]);
if (includeCurrentServer || !server.isServingSegment(proposalSegment))
{
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
continue;
}
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
double cost = 0f;
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
for (DataSegment segment : server.getServer().getSegments().values()) {
if (!proposalSegment.equals(segment)) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
if (cost < bestServer.lhs) {
bestServer = Pair.of(cost, server);
}
}
}
return cost;
return bestServer;
}
/**
@ -122,8 +159,10 @@ public class BalancerCostAnalyzer
referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
);
if (maxDiff < SEVEN_DAYS_IN_MILLIS) {
recencyPenalty = 2 - maxDiff / SEVEN_DAYS_IN_MILLIS;
double segment1diff=referenceTimestamp.getMillis()-segment1.getInterval().getEndMillis();
double segment2diff=referenceTimestamp.getMillis()-segment2.getInterval().getEndMillis();
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff <SEVEN_DAYS_IN_MILLIS) {
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS)*(2-segment2diff /SEVEN_DAYS_IN_MILLIS);
}
/** gap is null if the two segment intervals overlap or if they're adjacent */
@ -141,129 +180,77 @@ public class BalancerCostAnalyzer
return cost;
}
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
{
ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}
/**
* The balancing application requires us to pick a proposal segment uniformly at random from the set of
* all servers. We use reservoir sampling to do this.
* Calculates the initial cost of the Druid segment configuration.
*
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return A BalancerSegmentHolder sampled uniformly at random.
* @return The initial cost of the Druid tier.
*/
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
{
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int numSoFar = 0;
double cost = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServer().getSegments().values()) {
int randNum = rand.nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar + 1), swap out the server and segment
if (randNum == numSoFar) {
fromServerHolder = server;
proposalSegment = segment;
numSoFar++;
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
for (int i = 0; i < segments.length; ++i) {
for (int j = i; j < segments.length; ++j) {
cost += computeJointSegmentCosts(segments[i], segments[j]);
}
}
}
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
return cost;
}
/**
* For balancing, we want to only make a move if the minimum cost server is not already serving the segment.
* Calculates the cost normalization. This is such that the normalized cost is lower bounded
* by 1 (e.g. when each segment gets its own compute node).
*
* @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders An iterable of ServerHolders for a particular tier.
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return A ServerHolder with the new home for a segment.
* @return The normalization value (the sum of the diagonal entries in the
* pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own compute node.
*/
public ServerHolder findNewSegmentHomeBalance(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders
)
public double calculateNormalization(final List<ServerHolder> serverHolders)
{
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
if (costsAndServers.isEmpty()) {
return null;
}
ServerHolder toServer = costsAndServers.pollFirst().rhs;
if (!toServer.isServingSegment(proposalSegment)) {
return toServer;
}
return null;
}
/**
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
*
* @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders An iterable of ServerHolders for a particular tier.
*
* @return A ServerHolder with the new home for a segment.
*/
public ServerHolder findNewSegmentHomeAssign(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders
)
{
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
while (!costsAndServers.isEmpty()) {
ServerHolder toServer = costsAndServers.pollFirst().rhs;
if (!toServer.isServingSegment(proposalSegment)) {
return toServer;
}
}
return null;
}
private MinMaxPriorityQueue<Pair<Double, ServerHolder>> computeCosts(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders
)
{
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
final long proposalSegmentSize = proposalSegment.getSize();
double cost = 0;
for (ServerHolder server : serverHolders) {
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
continue;
}
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
double cost = 0f;
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
for (DataSegment segment : server.getServer().getSegments().values()) {
if (!proposalSegment.equals(segment)) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
cost += computeJointSegmentCosts(segment, segment);
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
costsAndServers.add(Pair.of(cost, server));
}
return costsAndServers;
return cost;
}
}
@Override
public void emitStats(
String tier,
MasterStats stats, List<ServerHolder> serverHolderList
)
{
final double initialTotalCost = calculateInitialTotalCost(serverHolderList);
final double normalization = calculateNormalization(serverHolderList);
final double normalizedInitialCost = initialTotalCost / normalization;
stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
stats.addToTieredStat("normalization", tier, (long) normalization);
stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
log.info(
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]",
tier,
initialTotalCost,
normalization,
normalizedInitialCost
);
}
}

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import org.joda.time.DateTime;
public class CostBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
{
return new CostBalancerStrategy(referenceTimestamp);
}
}

View File

@ -92,7 +92,6 @@ public class DruidMaster
private final IndexingServiceClient indexingServiceClient;
private final ScheduledExecutorService exec;
private final LoadQueueTaskMaster taskMaster;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final AtomicReference<LeaderLatch> leaderLatch;
private volatile AtomicReference<MasterSegmentSettings> segmentSettingsAtomicReference;

View File

@ -78,7 +78,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
{
final MasterStats stats = new MasterStats();
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
final int maxSegmentsToMove = params.getMasterSegmentSettings().getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
@ -113,34 +113,25 @@ public class DruidMasterBalancer implements DruidMasterHelper
}
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList);
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
}
}
}
final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);
final double normalization = analyzer.calculateNormalization(serverHolderList);
final double normalizedInitialCost = initialTotalCost / normalization;
stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
stats.addToTieredStat("normalization", tier, (long) normalization);
stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
if (params.getMasterSegmentSettings().isEmitBalancingStats()) {
strategy.emitStats(tier, stats, serverHolderList);
}
log.info(
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]",
tier,
initialTotalCost,
normalization,
normalizedInitialCost,
currentlyMovingSegments.get(tier).size()
"[%s]: Segments Moved: [%d]", tier, currentlyMovingSegments.get(tier).size()
);
}
return params.buildFromExisting()

View File

@ -42,12 +42,6 @@ public abstract class DruidMasterConfig
@Default("PT1800s")
public abstract Duration getMasterSegmentMergerPeriod();
@Config("druid.master.millisToWaitBeforeDeleting")
public long getMillisToWaitBeforeDeleting()
{
return 15 * 60 * 1000L;
}
@Config("druid.master.merger.on")
public boolean isMergeSegments()
{
@ -66,22 +60,6 @@ public abstract class DruidMasterConfig
return null;
}
@Config("druid.master.merge.threshold")
public long getMergeBytesLimit()
{
return 100000000L;
}
@Config("druid.master.merge.maxSegments")
public int getMergeSegmentsLimit()
{
return Integer.MAX_VALUE;
}
@Config("druid.master.balancer.maxSegmentsToMove")
@Default("5")
public abstract int getMaxSegmentsToMove();
@Config("druid.master.replicant.lifetime")
@Default("15")
public abstract int getReplicantLifetime();

View File

@ -68,7 +68,6 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
DatabaseRuleManager databaseRuleManager = paramsWithReplicationManager.getDatabaseRuleManager();
for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
if (rule.appliesTo(segment, now)) {

View File

@ -49,6 +49,7 @@ public class DruidMasterRuntimeParams
private final MasterSegmentSettings masterSegmentSettings;
private final MasterStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategyFactory strategyFactory;
public DruidMasterRuntimeParams(
long startTime,
@ -62,7 +63,8 @@ public class DruidMasterRuntimeParams
ServiceEmitter emitter,
MasterSegmentSettings masterSegmentSettings,
MasterStats stats,
DateTime balancerReferenceTimestamp
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
)
{
this.startTime = startTime;
@ -77,6 +79,7 @@ public class DruidMasterRuntimeParams
this.masterSegmentSettings = masterSegmentSettings;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory = strategyFactory;
}
public long getStartTime()
@ -139,9 +142,9 @@ public class DruidMasterRuntimeParams
return balancerReferenceTimestamp;
}
public BalancerCostAnalyzer getBalancerCostAnalyzer(DateTime referenceTimestamp)
public BalancerStrategyFactory getBalancerStrategyFactory()
{
return new BalancerCostAnalyzer(referenceTimestamp);
return strategyFactory;
}
public boolean hasDeletionWaitTimeElapsed()
@ -168,7 +171,8 @@ public class DruidMasterRuntimeParams
emitter,
masterSegmentSettings,
stats,
balancerReferenceTimestamp
balancerReferenceTimestamp,
strategyFactory
);
}
@ -186,6 +190,7 @@ public class DruidMasterRuntimeParams
private MasterSegmentSettings masterSegmentSettings;
private MasterStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategyFactory strategyFactory;
Builder()
{
@ -201,6 +206,7 @@ public class DruidMasterRuntimeParams
this.stats = new MasterStats();
this.masterSegmentSettings = new MasterSegmentSettings.Builder().build();
this.balancerReferenceTimestamp = null;
this.strategyFactory = new CostBalancerStrategyFactory();
}
Builder(
@ -215,7 +221,8 @@ public class DruidMasterRuntimeParams
ServiceEmitter emitter,
MasterSegmentSettings masterSegmentSettings,
MasterStats stats,
DateTime balancerReferenceTimestamp
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
)
{
this.startTime = startTime;
@ -230,6 +237,7 @@ public class DruidMasterRuntimeParams
this.masterSegmentSettings = masterSegmentSettings;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory=strategyFactory;
}
public DruidMasterRuntimeParams build()
@ -246,7 +254,8 @@ public class DruidMasterRuntimeParams
emitter,
masterSegmentSettings,
stats,
balancerReferenceTimestamp
balancerReferenceTimestamp,
strategyFactory
);
}
@ -321,5 +330,11 @@ public class DruidMasterRuntimeParams
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
return this;
}
public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory)
{
this.strategyFactory=strategyFactory;
return this;
}
}
}

View File

@ -28,19 +28,22 @@ public class MasterSegmentSettings
private long mergeBytesLimit= 100000000L;
private int mergeSegmentsLimit = Integer.MAX_VALUE;
private int maxSegmentsToMove = 5;
private boolean emitBalancingStats = false;
@JsonCreator
public MasterSegmentSettings(
@JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting,
@JsonProperty("mergeBytesLimit") Long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove
@JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove,
@JsonProperty("emitBalancingStats") Boolean emitBalancingStats
)
{
this.maxSegmentsToMove=maxSegmentsToMove;
this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting;
this.mergeSegmentsLimit=mergeSegmentsLimit;
this.mergeBytesLimit=mergeBytesLimit;
this.emitBalancingStats = emitBalancingStats;
}
public static String getConfigKey()
@ -60,6 +63,12 @@ public class MasterSegmentSettings
return mergeBytesLimit;
}
@JsonProperty
public boolean isEmitBalancingStats()
{
return emitBalancingStats;
}
@JsonProperty
public int getMergeSegmentsLimit()
{
@ -80,6 +89,7 @@ public class MasterSegmentSettings
private long mergeBytesLimit;
private int mergeSegmentsLimit;
private int maxSegmentsToMove;
private boolean emitBalancingStats;
public Builder()
{
@ -87,14 +97,16 @@ public class MasterSegmentSettings
this.mergeBytesLimit= 100000000L;
this.mergeSegmentsLimit= Integer.MAX_VALUE;
this.maxSegmentsToMove = 5;
this.emitBalancingStats = false;
}
public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove)
public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
this.emitBalancingStats = emitBalancingStats;
}
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
@ -123,7 +135,7 @@ public class MasterSegmentSettings
public MasterSegmentSettings build()
{
return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove);
return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats);
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.metamx.druid.client.DataSegment;
import java.util.List;
import java.util.Random;
public class RandomBalancerStrategy implements BalancerStrategy
{
private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
@Override
public ServerHolder findNewSegmentHomeReplicator(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
if (serverHolders.size()==1)
{
return null;
}
else
{
ServerHolder holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
while (holder.isServingSegment(proposalSegment))
{
holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
}
return holder;
}
}
@Override
public ServerHolder findNewSegmentHomeBalancer(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
{
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}
@Override
public void emitStats(
String tier, MasterStats stats, List<ServerHolder> serverHolderList
)
{
}
}

View File

@ -0,0 +1,30 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import org.joda.time.DateTime;
public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
{
return new RandomBalancerStrategy();
}
}

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.metamx.druid.client.DataSegment;
import java.util.List;
import java.util.Random;
public class ReservoirSegmentSampler
{
public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
{
final Random rand = new Random();
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int numSoFar = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServer().getSegments().values()) {
int randNum = rand.nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
if (randNum == numSoFar) {
fromServerHolder = server;
proposalSegment = segment;
}
numSoFar++;
}
}
if (fromServerHolder != null) {
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
} else {
return null;
}
}
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.master.rules;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.BalancerCostAnalyzer;
import com.metamx.druid.master.BalancerStrategy;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
@ -60,15 +60,14 @@ public abstract class LoadRule implements Rule
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
expectedReplicants,
totalReplicants,
analyzer,
strategy,
serverHolderList,
segment
)
@ -84,7 +83,7 @@ public abstract class LoadRule implements Rule
final ReplicationThrottler replicationManager,
final int expectedReplicants,
int totalReplicants,
final BalancerCostAnalyzer analyzer,
final BalancerStrategy strategy,
final List<ServerHolder> serverHolderList,
final DataSegment segment
)
@ -98,7 +97,7 @@ public abstract class LoadRule implements Rule
break;
}
final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
if (holder == null) {
log.warn(

View File

@ -12,7 +12,7 @@ import java.io.Closeable;
*/
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> runner;
private final QueryRunnerFactory<T, Query<T>> factory;
private final ReferenceCountingSegment adapter;
public ReferenceCountingSegmentQueryRunner(
@ -20,9 +20,8 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
ReferenceCountingSegment adapter
)
{
this.factory = factory;
this.adapter = adapter;
this.runner = factory.createRunner(adapter);
}
@Override
@ -30,7 +29,7 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{
final Closeable closeable = adapter.increment();
try {
final Sequence<T> baseSequence = runner.run(query);
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query);
return new ResourceClosingSequence<T>(baseSequence, closeable);
}

View File

@ -4,6 +4,7 @@ $(function () {
document.getElementById("mergeBytes").value = data["mergeBytesLimit"];
document.getElementById("mergeSegments").value = data["mergeSegmentsLimit"];
document.getElementById("maxSegments").value = data["maxSegmentsToMove"];
document.getElementById("emitBalancingStats").value = data["emitBalancingStats"];
});
$("#submit").click( function ()

View File

@ -13,6 +13,7 @@
<br>
maxSegmentsToMove: <input type= "text" name ="maxSegmentsToMove" id ="maxSegments">
<br>
emitBalancingStats: <input type= "text" name ="emitBalancingStats" id="emitBalancingStats">
<button type="button" id="submit"> Submit </button>
</form>
</body>

View File

@ -73,6 +73,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
*/
@ -262,7 +263,7 @@ public class ServerManagerTest
)
);
queryNotifyLatch.await();
queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, factory.getSegmentReferences().size());
@ -301,7 +302,7 @@ public class ServerManagerTest
)
);
queryNotifyLatch.await();
queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, factory.getSegmentReferences().size());
@ -344,7 +345,7 @@ public class ServerManagerTest
)
);
queryNotifyLatch.await();
queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, factory.getSegmentReferences().size());
@ -378,7 +379,7 @@ public class ServerManagerTest
private void waitForTestVerificationAndCleanup(Future future)
{
try {
queryNotifyLatch.await();
queryNotifyLatch.await(25, TimeUnit.MILLISECONDS);
queryWaitYieldLatch.countDown();
queryWaitLatch.countDown();
future.get();
@ -505,13 +506,13 @@ public class ServerManagerTest
if (!(adapter instanceof ReferenceCountingSegment)) {
throw new IAE("Expected instance of ReferenceCountingSegment, got %s", adapter.getClass());
}
segmentReferences.add((ReferenceCountingSegment) adapter);
adapters.add((SegmentForTesting) ((ReferenceCountingSegment) adapter).getBaseSegment());
final ReferenceCountingSegment segment = (ReferenceCountingSegment) adapter;
Assert.assertTrue(segment.getNumReferences() > 0);
segmentReferences.add(segment);
adapters.add((SegmentForTesting) segment.getBaseSegment());
return new BlockingQueryRunner<Result<SearchResultValue>>(
new NoopQueryRunner<Result<SearchResultValue>>(),
waitLatch,
waitYieldLatch,
notifyLatch
new NoopQueryRunner<Result<SearchResultValue>>(), waitLatch, waitYieldLatch, notifyLatch
);
}
@ -702,7 +703,7 @@ public class ServerManagerTest
notifyLatch.countDown();
try {
waitYieldLatch.await();
waitYieldLatch.await(25, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -715,7 +716,7 @@ public class ServerManagerTest
public OutType get()
{
try {
waitLatch.await();
waitLatch.await(25, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -10,9 +10,6 @@ import org.junit.Test;
import java.util.Arrays;
/**
* @author jan.rudert
*/
public class DataSegmentPusherUtilTest {
@Test
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception {

View File

@ -132,6 +132,78 @@ public class DruidMasterBalancerTest
EasyMock.verify(druidServer4);
}
@Test
public void testMoveToEmptyServerBalancer()
{
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the master needs
master.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master);
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServer1, fromPeon),
new ServerHolder(druidServer2, toPeon)
)
)
)
)
)
.withLoadManagementPeons(
ImmutableMap.<String, LoadQueuePeon>of(
"from",
fromPeon,
"to",
toPeon
)
)
.withAvailableSegments(segments.values())
.withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
params = new DruidMasterBalancerTester(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size());
}
@Test
public void testRun1()
{
@ -203,7 +275,8 @@ public class DruidMasterBalancerTest
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
}
@Test
@Test
public void testRun2()
{
// Mock some servers of different usages
@ -295,4 +368,5 @@ public class DruidMasterBalancerTest
params = new DruidMasterBalancerTester(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
}
}

View File

@ -36,7 +36,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
@ -95,24 +94,12 @@ public class DruidMasterTest
return null;
}
@Override
public long getMillisToWaitBeforeDeleting()
{
return super.getMillisToWaitBeforeDeleting();
}
@Override
public String getMergerServiceName()
{
return "";
}
@Override
public int getMaxSegmentsToMove()
{
return 0;
}
@Override
public int getReplicantLifetime()
{

View File

@ -0,0 +1,208 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class ReservoirSegmentSamplerTest
{
private DruidServer druidServer1;
private DruidServer druidServer2;
private DruidServer druidServer3;
private DruidServer druidServer4;
private ServerHolder holder1;
private ServerHolder holder2;
private ServerHolder holder3;
private ServerHolder holder4;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment segment3;
private DataSegment segment4;
Map<String, DataSegment> segmentsMap1;
Map<String, DataSegment> segmentsMap2;
Map<String, DataSegment> segmentsMap3;
Map<String, DataSegment> segmentsMap4;
List<DataSegment> segments;
@Before
public void setUp() throws Exception
{
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer3 = EasyMock.createMock(DruidServer.class);
druidServer4 = EasyMock.createMock(DruidServer.class);
holder1 = EasyMock.createMock(ServerHolder.class);
holder2 = EasyMock.createMock(ServerHolder.class);
holder3 = EasyMock.createMock(ServerHolder.class);
holder4 = EasyMock.createMock(ServerHolder.class);
segment1 = EasyMock.createMock(DataSegment.class);
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);
segment4 = EasyMock.createMock(DataSegment.class);
DateTime start1 = new DateTime("2012-01-01");
DateTime start2 = new DateTime("2012-02-01");
DateTime version = new DateTime("2012-03-01");
segment1 = new DataSegment(
"datasource1",
new Interval(start1, start1.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
11L
);
segment2 = new DataSegment(
"datasource1",
new Interval(start2, start2.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
7L
);
segment3 = new DataSegment(
"datasource2",
new Interval(start1, start1.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
4L
);
segment4 = new DataSegment(
"datasource2",
new Interval(start2, start2.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
8L
);
segments = Lists.newArrayList(segment1, segment2, segment3, segment4);
segmentsMap1 = ImmutableMap.<String, DataSegment>of(
"datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
segment1
);
segmentsMap2 = ImmutableMap.<String, DataSegment>of(
"datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
segment2
);
segmentsMap3 = ImmutableMap.<String, DataSegment>of(
"datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
segment3
);
segmentsMap4 = ImmutableMap.<String, DataSegment>of(
"datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
segment4
);
}
//checks if every segment is selected at least once out of 5000 trials
@Test
public void getRandomBalancerSegmentHolderTest()
{
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segmentsMap1).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(segmentsMap2).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer3.getSegments()).andReturn(segmentsMap3).anyTimes();
EasyMock.expect(druidServer3.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer4.getSegments()).andReturn(segmentsMap4).anyTimes();
EasyMock.expect(druidServer4.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer4);
EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
EasyMock.replay(holder1);
EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
EasyMock.replay(holder2);
EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
EasyMock.replay(holder3);
EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes();
EasyMock.replay(holder4);
List<ServerHolder> holderList = Lists.newArrayList();
holderList.add(holder1);
holderList.add(holder2);
holderList.add(holder3);
holderList.add(holder4);
Map<DataSegment, Integer> segmentCountMap = Maps.newHashMap();
ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
for (int i = 0; i < 5000; i++) {
segmentCountMap.put(sampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1);
}
for (DataSegment segment : segments) {
Assert.assertEquals(segmentCountMap.get(segment), new Integer(1));
}
}
}

View File

@ -0,0 +1,249 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.utils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.DruidCluster;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterBalancerTester;
import com.metamx.druid.master.DruidMasterRuleRunner;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.LoadQueuePeon;
import com.metamx.druid.master.LoadQueuePeonTester;
import com.metamx.druid.master.MasterSegmentSettings;
import com.metamx.druid.master.ReplicationThrottler;
import com.metamx.druid.master.SegmentReplicantLookup;
import com.metamx.druid.master.ServerHolder;
import com.metamx.druid.master.rules.PeriodLoadRule;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DruidMasterBalancerProfiler
{
private static final int MAX_SEGMENTS_TO_MOVE = 5;
private DruidMaster master;
private DruidServer druidServer1;
private DruidServer druidServer2;
Map<String, DataSegment> segments = Maps.newHashMap();
ServiceEmitter emitter;
DatabaseRuleManager manager;
PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"),3,"normal");
List<Rule> rules = ImmutableList.<Rule>of(loadRule);
@Before
public void setUp() throws Exception
{
master = EasyMock.createMock(DruidMaster.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
manager = EasyMock.createMock(DatabaseRuleManager.class);
}
public void bigProfiler()
{
Stopwatch watch = new Stopwatch();
int numSegments = 55000;
int numServers=50;
EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.<String, List<Rule>>of("test", rules)).anyTimes();
EasyMock.expect(manager.getRules(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
EasyMock.expect(manager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
EasyMock.replay(manager);
master.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master);
List<DruidServer> serverList = Lists.newArrayList();
Map<String, LoadQueuePeon> peonMap = Maps.newHashMap();
List<ServerHolder> serverHolderList = Lists.newArrayList();
Map<String,DataSegment> segmentMap = Maps.newHashMap();
for (int i=0;i<numSegments;i++)
{
segmentMap.put(
"segment" + i,
new DataSegment(
"datasource" + i,
new Interval(new DateTime("2012-01-01"), (new DateTime("2012-01-01")).plusHours(1)),
(new DateTime("2012-03-01")).toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
4L
)
);
}
for (int i=0;i<numServers;i++)
{
DruidServer server =EasyMock.createMock(DruidServer.class);
EasyMock.expect(server.getMetadata()).andReturn(null).anyTimes();
EasyMock.expect(server.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(server.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(server.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce();
EasyMock.expect(server.getHost()).andReturn(Integer.toString(i)).anyTimes();
if (i==0)
{
EasyMock.expect(server.getSegments()).andReturn(segmentMap).anyTimes();
}
else
{
EasyMock.expect(server.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
}
EasyMock.expect(server.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(server);
LoadQueuePeon peon = new LoadQueuePeonTester();
peonMap.put(Integer.toString(i),peon);
serverHolderList.add(new ServerHolder(server, peon));
}
DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
serverHolderList
)
)
)
)
.withLoadManagementPeons(
peonMap
)
.withAvailableSegments(segmentMap.values())
.withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withEmitter(emitter)
.withDatabaseRuleManager(manager)
.withReplicationManager(new ReplicationThrottler(2, 500))
.withSegmentReplicantLookup(
SegmentReplicantLookup.make(new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
serverHolderList
)
)
)
)
)
.build();
DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
DruidMasterRuleRunner runner = new DruidMasterRuleRunner(master,500,5);
watch.start();
DruidMasterRuntimeParams balanceParams = tester.run(params);
DruidMasterRuntimeParams assignParams = runner.run(params);
System.out.println(watch.stop());
}
public void profileRun(){
Stopwatch watch = new Stopwatch();
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
master.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master);
DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServer1, fromPeon),
new ServerHolder(druidServer2, toPeon)
)
)
)
)
)
.withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("from", fromPeon, "to", toPeon))
.withAvailableSegments(segments.values())
.withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
watch.start();
DruidMasterRuntimeParams balanceParams = tester.run(params);
System.out.println(watch.stop());
}
}

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.37-SNAPSHOT</version>
<version>0.5.38-SNAPSHOT</version>
</parent>
<dependencies>