replication working too

This commit is contained in:
Dhruv Parthasarathy 2013-08-05 14:51:58 -07:00
parent 1045aac4b8
commit 575dfe3679
2 changed files with 39 additions and 4 deletions

View File

@ -57,7 +57,7 @@ public class DruidMasterBalancerTester extends DruidMasterBalancer
{ {
serverMap.put(toServerName,0); serverMap.put(toServerName,0);
} }
serverMap.put(fromServerName,serverMap.get(fromServerName)-1); // serverMap.put(fromServerName,serverMap.get(fromServerName)-1);
serverMap.put(toServerName, serverMap.get(toServerName)+1); serverMap.put(toServerName, serverMap.get(toServerName)+1);
} }
catch (Exception e) { catch (Exception e) {

View File

@ -1,26 +1,34 @@
package com.metamx.druid.utils; package com.metamx.druid.utils;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.DruidCluster; import com.metamx.druid.master.DruidCluster;
import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterBalancerTester; import com.metamx.druid.master.DruidMasterBalancerTester;
import com.metamx.druid.master.DruidMasterRuleRunner;
import com.metamx.druid.master.DruidMasterRuntimeParams; import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback; import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.LoadQueuePeon; import com.metamx.druid.master.LoadQueuePeon;
import com.metamx.druid.master.LoadQueuePeonTester; import com.metamx.druid.master.LoadQueuePeonTester;
import com.metamx.druid.master.ReplicationThrottler;
import com.metamx.druid.master.SegmentReplicantLookup;
import com.metamx.druid.master.ServerHolder; import com.metamx.druid.master.ServerHolder;
import com.metamx.druid.master.rules.PeriodLoadRule;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -39,6 +47,9 @@ public class DruidMasterBalancerProfiler
DateTime start1 = new DateTime("2012-01-01"); DateTime start1 = new DateTime("2012-01-01");
DateTime version = new DateTime("2012-03-01"); DateTime version = new DateTime("2012-03-01");
ServiceEmitter emitter; ServiceEmitter emitter;
DatabaseRuleManager manager;
PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"),3,"normal");
List<Rule> rules = ImmutableList.<Rule>of(loadRule);
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
@ -47,6 +58,7 @@ public class DruidMasterBalancerProfiler
druidServer2 = EasyMock.createMock(DruidServer.class); druidServer2 = EasyMock.createMock(DruidServer.class);
emitter = EasyMock.createMock(ServiceEmitter.class); emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter); EmittingLogger.registerEmitter(emitter);
manager = EasyMock.createMock(DatabaseRuleManager.class);
} }
@Test @Test
@ -54,13 +66,18 @@ public class DruidMasterBalancerProfiler
{ {
Stopwatch watch = new Stopwatch(); Stopwatch watch = new Stopwatch();
int numServers = 1000; int numServers = 1000;
EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.<String, List<Rule>>of("test", rules)).anyTimes();
EasyMock.expect(manager.getRules(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
EasyMock.expect(manager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
EasyMock.replay(manager);
master.moveSegment( master.moveSegment(
EasyMock.<String>anyObject(), EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(), EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(), EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject() EasyMock.<LoadPeonCallback>anyObject()
); );
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master); EasyMock.replay(master);
@ -92,7 +109,9 @@ public class DruidMasterBalancerProfiler
EasyMock.expect(server.getMetadata()).andReturn(null).anyTimes(); EasyMock.expect(server.getMetadata()).andReturn(null).anyTimes();
EasyMock.expect(server.getCurrSize()).andReturn(30L).atLeastOnce(); EasyMock.expect(server.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(server.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(server.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(server.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce(); EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce();
EasyMock.expect(server.getHost()).andReturn(Integer.toString(i)).anyTimes();
if (i==0) if (i==0)
{ {
EasyMock.expect(server.getSegments()).andReturn(segmentMap).anyTimes(); EasyMock.expect(server.getSegments()).andReturn(segmentMap).anyTimes();
@ -107,7 +126,7 @@ public class DruidMasterBalancerProfiler
LoadQueuePeon peon = new LoadQueuePeonTester(); LoadQueuePeon peon = new LoadQueuePeonTester();
peonMap.put(Integer.toString(i),peon); peonMap.put(Integer.toString(i),peon);
serverHolderList.add(new ServerHolder(server, peon)); serverHolderList.add(new ServerHolder(server, peon));
} }
DruidMasterRuntimeParams params = DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder() DruidMasterRuntimeParams.newBuilder()
@ -129,13 +148,29 @@ public class DruidMasterBalancerProfiler
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withEmitter(emitter) .withEmitter(emitter)
.withDatabaseRuleManager(manager)
.withReplicationManager(new ReplicationThrottler(2, 500))
.withSegmentReplicantLookup(
SegmentReplicantLookup.make(new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
serverHolderList
)
)
)
)
)
.build(); .build();
DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master); DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
DruidMasterRuleRunner runner = new DruidMasterRuleRunner(master,500,5);
watch.start(); watch.start();
while (!tester.isBalanced(20,50)) while (!tester.isBalanced(40,50))
{ {
params = tester.run(params); params = tester.run(params);
params = runner.run(params);
} }
System.out.println(watch.stop()); System.out.println(watch.stop());
} }