address code review

This commit is contained in:
fjy 2014-07-17 13:34:05 -07:00
parent 5197ea527a
commit c6078ca841
10 changed files with 38 additions and 112 deletions

View File

@ -22,7 +22,7 @@ The following configs only apply if the overlord is running in remote mode:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a middle manager before throwing an error.|PT5M|
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |none|
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|false|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
@ -80,7 +80,6 @@ Issuing a GET request at the same URL will return the current worker setup spec
|Property|Description|Default|
|--------|-----------|-------|
|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be druid.indexer.runner.minWorkerVersion.|none|
|`minNumWorkers`|The minimum number of workers that can be in the cluster at any given time.|0|
|`maxNumWorkers`|The maximum number of workers that can be in the cluster at any given time.|0|
|`nodeData`|A JSON object that describes how to launch new nodes. Currently, only EC2 is supported.|none; required|

View File

@ -806,8 +806,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
);
sortedWorkers.addAll(zkWorkers.values());
final String workerSetupDataMinVer = workerSetupData.get() == null ? null : workerSetupData.get().getMinVersion();
final String minWorkerVer = workerSetupDataMinVer == null ? config.getMinWorkerVersion() : workerSetupDataMinVer;
final String minWorkerVer = config.getMinWorkerVersion();
for (ZkWorker zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {

View File

@ -274,9 +274,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Override
public boolean apply(ZkWorker zkWorker)
{
final String minVersion = workerSetupData.getMinVersion() != null
? workerSetupData.getMinVersion()
: config.getWorkerVersion();
final String minVersion = config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}

View File

@ -28,7 +28,6 @@ public class WorkerSetupData
{
public static final String CONFIG_KEY = "worker.setup";
private final String minVersion;
private final int minNumWorkers;
private final int maxNumWorkers;
private final String availabilityZone;
@ -37,7 +36,6 @@ public class WorkerSetupData
@JsonCreator
public WorkerSetupData(
@JsonProperty("minVersion") String minVersion,
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("availabilityZone") String availabilityZone,
@ -45,7 +43,6 @@ public class WorkerSetupData
@JsonProperty("userData") EC2UserData userData
)
{
this.minVersion = minVersion;
this.minNumWorkers = minNumWorkers;
this.maxNumWorkers = maxNumWorkers;
this.availabilityZone = availabilityZone;
@ -53,12 +50,6 @@ public class WorkerSetupData
this.userData = userData;
}
@JsonProperty
public String getMinVersion()
{
return minVersion;
}
@JsonProperty
public int getMinNumWorkers()
{
@ -93,7 +84,6 @@ public class WorkerSetupData
public String toString()
{
return "WorkerSetupData{" +
"minVersion='" + minVersion + '\'' +
", minNumWorkers=" + minNumWorkers +
", maxNumWorkers=" + maxNumWorkers +
", availabilityZone=" + availabilityZone +

View File

@ -51,13 +51,13 @@ public class WorkerCuratorCoordinator
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework curatorFramework;
private final Worker worker;
private final Announcer announcer;
private final String baseAnnouncementsPath;
private final String baseTaskPath;
private final String baseStatusPath;
private volatile Worker worker;
private volatile boolean started;
@Inject
@ -253,10 +253,10 @@ public class WorkerCuratorCoordinator
{
synchronized (lock) {
if (!started) {
log.error("Cannot update worker! Not Started!");
return;
throw new ISE("Cannot update worker! Not Started!");
}
this.worker = newWorker;
announcer.update(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(newWorker));
}
}

View File

@ -48,31 +48,36 @@ import java.io.InputStream;
public class WorkerResource
{
private static final Logger log = new Logger(WorkerResource.class);
private static String DISABLED_VERSION = "";
private final Worker enabledWorker;
private final Worker disabledWorker;
private final WorkerCuratorCoordinator curatorCoordinator;
private final ForkingTaskRunner taskRunner;
@Inject
public WorkerResource(
Worker worker,
WorkerCuratorCoordinator curatorCoordinator,
ForkingTaskRunner taskRunner
) throws Exception
{
this.enabledWorker = worker;
this.disabledWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), DISABLED_VERSION);
this.curatorCoordinator = curatorCoordinator;
this.taskRunner = taskRunner;
}
@POST
@Path("/disable")
@Produces("application/json")
public Response doDisable()
{
final Worker worker = curatorCoordinator.getWorker();
final Worker newWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), "");
try {
curatorCoordinator.updateWorkerAnnouncement(newWorker);
return Response.ok(ImmutableMap.of(worker.getHost(), "disabled")).build();
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();
}
catch (Exception e) {
return Response.serverError().build();
@ -84,10 +89,24 @@ public class WorkerResource
@Produces("application/json")
public Response doEnable()
{
final Worker worker = curatorCoordinator.getWorker();
try {
curatorCoordinator.updateWorkerAnnouncement(worker);
return Response.ok(ImmutableMap.of(worker.getHost(), "enabled")).build();
curatorCoordinator.updateWorkerAnnouncement(enabledWorker);
return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "enabled")).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@GET
@Path("/disabled")
@Produces("application/json")
public Response isEnabled()
{
try {
final Worker theWorker = curatorCoordinator.getWorker();
final boolean disabled = theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION);
return Response.ok(ImmutableMap.of(theWorker.getHost(), disabled)).build();
}
catch (Exception e) {
return Response.serverError().build();

View File

@ -380,7 +380,7 @@ public class RemoteTaskRunnerTest
},
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
DSuppliers.of(new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null, null))),
DSuppliers.of(new AtomicReference<WorkerSetupData>(new WorkerSetupData(0, 1, null, null, null))),
null
);

View File

@ -95,7 +95,6 @@ public class EC2AutoScalingStrategyTest
{
workerSetupData.set(
new WorkerSetupData(
"0",
0,
1,
"",

View File

@ -67,7 +67,7 @@ public class SimpleResourceManagementStrategyTest
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<>(
new WorkerSetupData(
"0", 0, 2, null, null, null
0, 2, null, null, null
)
);
@ -237,7 +237,7 @@ public class SimpleResourceManagementStrategyTest
@Test
public void testDoSuccessfulTerminate() throws Exception
{
workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null));
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
@ -267,7 +267,7 @@ public class SimpleResourceManagementStrategyTest
@Test
public void testSomethingTerminating() throws Exception
{
workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null));
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2);
@ -381,7 +381,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy);
// Increase minNumWorkers
workerSetupData.set(new WorkerSetupData("0", 3, 5, null, null, null));
workerSetupData.set(new WorkerSetupData(3, 5, null, null, null));
// Should provision two new workers
EasyMock.reset(autoScalingStrategy);
@ -404,85 +404,6 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testMinVersionIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h1", "i2", "0")
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
// Don't provision anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Increase minVersion
workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null));
// Provision two new workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScalingStrategy);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h2", "i2", "0")
)
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Terminate old workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn(
ImmutableList.of("h1", "h2", "h3", "h4")
);
EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn(
new AutoScalingData(ImmutableList.of("h1", "h2"))
);
EasyMock.replay(autoScalingStrategy);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(null, "h1", "i1", "0"),
new TestZkWorker(null, "h2", "i2", "0"),
new TestZkWorker(NoopTask.create(), "h3", "i3", "1"),
new TestZkWorker(NoopTask.create(), "h4", "i4", "1")
)
);
Assert.assertTrue(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testNullWorkerSetupData() throws Exception
{

View File

@ -91,6 +91,7 @@ public class WorkerResourceTest
curatorCoordinator.start();
workerResource = new WorkerResource(
worker,
curatorCoordinator,
null
);