diff --git a/docs/content/Indexing-Service-Config.md b/docs/content/Indexing-Service-Config.md index 122a6623504..8916c5f5372 100644 --- a/docs/content/Indexing-Service-Config.md +++ b/docs/content/Indexing-Service-Config.md @@ -22,7 +22,7 @@ The following configs only apply if the overlord is running in remote mode: |Property|Description|Default| |--------|-----------|-------| |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a middle manager before throwing an error.|PT5M| -|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |none| +|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"| |`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|false| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| @@ -80,7 +80,6 @@ Issuing a GET request at the same URL will return the current worker setup spec |Property|Description|Default| |--------|-----------|-------| -|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be druid.indexer.runner.minWorkerVersion.|none| |`minNumWorkers`|The minimum number of workers that can be in the cluster at any given time.|0| |`maxNumWorkers`|The maximum number of workers that can be in the cluster at any given time.|0| |`nodeData`|A JSON object that describes how to launch new nodes. Currently, only EC2 is supported.|none; required| diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index c546fbcce06..7b0dab4d512 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -806,8 +806,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } ); sortedWorkers.addAll(zkWorkers.values()); - final String workerSetupDataMinVer = workerSetupData.get() == null ? null : workerSetupData.get().getMinVersion(); - final String minWorkerVer = workerSetupDataMinVer == null ? config.getMinWorkerVersion() : workerSetupDataMinVer; + final String minWorkerVer = config.getMinWorkerVersion(); for (ZkWorker zkWorker : sortedWorkers) { if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java index 6d3dd904c5e..6f645f3aec0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java @@ -274,9 +274,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat @Override public boolean apply(ZkWorker zkWorker) { - final String minVersion = workerSetupData.getMinVersion() != null - ? workerSetupData.getMinVersion() - : config.getWorkerVersion(); + final String minVersion = config.getWorkerVersion(); if (minVersion == null) { throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database."); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java index e792f347aed..ab778622e3e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java @@ -28,7 +28,6 @@ public class WorkerSetupData { public static final String CONFIG_KEY = "worker.setup"; - private final String minVersion; private final int minNumWorkers; private final int maxNumWorkers; private final String availabilityZone; @@ -37,7 +36,6 @@ public class WorkerSetupData @JsonCreator public WorkerSetupData( - @JsonProperty("minVersion") String minVersion, @JsonProperty("minNumWorkers") int minNumWorkers, @JsonProperty("maxNumWorkers") int maxNumWorkers, @JsonProperty("availabilityZone") String availabilityZone, @@ -45,7 +43,6 @@ public class WorkerSetupData @JsonProperty("userData") EC2UserData userData ) { - this.minVersion = minVersion; this.minNumWorkers = minNumWorkers; this.maxNumWorkers = maxNumWorkers; this.availabilityZone = availabilityZone; @@ -53,12 +50,6 @@ public class WorkerSetupData this.userData = userData; } - @JsonProperty - public String getMinVersion() - { - return minVersion; - } - @JsonProperty public int getMinNumWorkers() { @@ -93,7 +84,6 @@ public class WorkerSetupData public String toString() { return "WorkerSetupData{" + - "minVersion='" + minVersion + '\'' + ", minNumWorkers=" + minNumWorkers + ", maxNumWorkers=" + maxNumWorkers + ", availabilityZone=" + availabilityZone + diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java index feda8e81a90..32dbf5b85ff 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -51,13 +51,13 @@ public class WorkerCuratorCoordinator private final ObjectMapper jsonMapper; private final RemoteTaskRunnerConfig config; private final CuratorFramework curatorFramework; - private final Worker worker; private final Announcer announcer; private final String baseAnnouncementsPath; private final String baseTaskPath; private final String baseStatusPath; + private volatile Worker worker; private volatile boolean started; @Inject @@ -253,10 +253,10 @@ public class WorkerCuratorCoordinator { synchronized (lock) { if (!started) { - log.error("Cannot update worker! Not Started!"); - return; + throw new ISE("Cannot update worker! Not Started!"); } + this.worker = newWorker; announcer.update(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(newWorker)); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java index b70aa64c111..1084fe28168 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java @@ -48,31 +48,36 @@ import java.io.InputStream; public class WorkerResource { private static final Logger log = new Logger(WorkerResource.class); + private static String DISABLED_VERSION = ""; + private final Worker enabledWorker; + private final Worker disabledWorker; private final WorkerCuratorCoordinator curatorCoordinator; private final ForkingTaskRunner taskRunner; @Inject public WorkerResource( + Worker worker, WorkerCuratorCoordinator curatorCoordinator, ForkingTaskRunner taskRunner ) throws Exception { + this.enabledWorker = worker; + this.disabledWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), DISABLED_VERSION); this.curatorCoordinator = curatorCoordinator; this.taskRunner = taskRunner; } + @POST @Path("/disable") @Produces("application/json") public Response doDisable() { - final Worker worker = curatorCoordinator.getWorker(); - final Worker newWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""); try { - curatorCoordinator.updateWorkerAnnouncement(newWorker); - return Response.ok(ImmutableMap.of(worker.getHost(), "disabled")).build(); + curatorCoordinator.updateWorkerAnnouncement(disabledWorker); + return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build(); } catch (Exception e) { return Response.serverError().build(); @@ -84,10 +89,24 @@ public class WorkerResource @Produces("application/json") public Response doEnable() { - final Worker worker = curatorCoordinator.getWorker(); try { - curatorCoordinator.updateWorkerAnnouncement(worker); - return Response.ok(ImmutableMap.of(worker.getHost(), "enabled")).build(); + curatorCoordinator.updateWorkerAnnouncement(enabledWorker); + return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "enabled")).build(); + } + catch (Exception e) { + return Response.serverError().build(); + } + } + + @GET + @Path("/disabled") + @Produces("application/json") + public Response isEnabled() + { + try { + final Worker theWorker = curatorCoordinator.getWorker(); + final boolean disabled = theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION); + return Response.ok(ImmutableMap.of(theWorker.getHost(), disabled)).build(); } catch (Exception e) { return Response.serverError().build(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index fcf9715fe62..26aa7077c56 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -380,7 +380,7 @@ public class RemoteTaskRunnerTest }, cf, new SimplePathChildrenCacheFactory.Builder().build(), - DSuppliers.of(new AtomicReference(new WorkerSetupData("0", 0, 1, null, null, null))), + DSuppliers.of(new AtomicReference(new WorkerSetupData(0, 1, null, null, null))), null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java index 1ccacc66df4..7be9a6eaad4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java @@ -95,7 +95,6 @@ public class EC2AutoScalingStrategyTest { workerSetupData.set( new WorkerSetupData( - "0", 0, 1, "", diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java index 21f3277a653..6c13a0704c0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java @@ -67,7 +67,7 @@ public class SimpleResourceManagementStrategyTest autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); workerSetupData = new AtomicReference<>( new WorkerSetupData( - "0", 0, 2, null, null, null + 0, 2, null, null, null ) ); @@ -237,7 +237,7 @@ public class SimpleResourceManagementStrategyTest @Test public void testDoSuccessfulTerminate() throws Exception { - workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null)); + workerSetupData.set(new WorkerSetupData(0, 1, null, null, null)); EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList()); @@ -267,7 +267,7 @@ public class SimpleResourceManagementStrategyTest @Test public void testSomethingTerminating() throws Exception { - workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null)); + workerSetupData.set(new WorkerSetupData(0, 1, null, null, null)); EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList("ip")).times(2); @@ -381,7 +381,7 @@ public class SimpleResourceManagementStrategyTest EasyMock.verify(autoScalingStrategy); // Increase minNumWorkers - workerSetupData.set(new WorkerSetupData("0", 3, 5, null, null, null)); + workerSetupData.set(new WorkerSetupData(3, 5, null, null, null)); // Should provision two new workers EasyMock.reset(autoScalingStrategy); @@ -404,85 +404,6 @@ public class SimpleResourceManagementStrategyTest EasyMock.verify(autoScalingStrategy); } - @Test - public void testMinVersionIncrease() throws Exception - { - // Don't terminate anything - EasyMock.reset(autoScalingStrategy); - EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); - EasyMock.replay(autoScalingStrategy); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(NoopTask.create(), "h1", "i1", "0"), - new TestZkWorker(NoopTask.create(), "h1", "i2", "0") - ) - ); - Assert.assertFalse(terminatedSomething); - EasyMock.verify(autoScalingStrategy); - - // Don't provision anything - EasyMock.reset(autoScalingStrategy); - EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); - EasyMock.replay(autoScalingStrategy); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(NoopTask.create()), - new TestZkWorker(NoopTask.create()) - ) - ); - Assert.assertFalse(provisionedSomething); - EasyMock.verify(autoScalingStrategy); - - // Increase minVersion - workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null)); - - // Provision two new workers - EasyMock.reset(autoScalingStrategy); - EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); - EasyMock.expect(autoScalingStrategy.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("h3")) - ); - EasyMock.expect(autoScalingStrategy.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("h4")) - ); - EasyMock.replay(autoScalingStrategy); - provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(NoopTask.create(), "h1", "i1", "0"), - new TestZkWorker(NoopTask.create(), "h2", "i2", "0") - ) - ); - Assert.assertTrue(provisionedSomething); - EasyMock.verify(autoScalingStrategy); - - // Terminate old workers - EasyMock.reset(autoScalingStrategy); - EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn( - ImmutableList.of("h1", "h2", "h3", "h4") - ); - EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn( - new AutoScalingData(ImmutableList.of("h1", "h2")) - ); - EasyMock.replay(autoScalingStrategy); - terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(null, "h1", "i1", "0"), - new TestZkWorker(null, "h2", "i2", "0"), - new TestZkWorker(NoopTask.create(), "h3", "i3", "1"), - new TestZkWorker(NoopTask.create(), "h4", "i4", "1") - ) - ); - Assert.assertTrue(terminatedSomething); - EasyMock.verify(autoScalingStrategy); - } - @Test public void testNullWorkerSetupData() throws Exception { diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java index 5e80a523b56..2a0a1e4a3c3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java @@ -91,6 +91,7 @@ public class WorkerResourceTest curatorCoordinator.start(); workerResource = new WorkerResource( + worker, curatorCoordinator, null );