From 5197ea527ac096a0380ab6f759f05eab7bc677db Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 17 Jul 2014 12:35:45 -0700 Subject: [PATCH 1/5] disable middlemanagers based on worker version --- .../worker/WorkerCuratorCoordinator.java | 23 ++- .../indexing/worker/http/WorkerResource.java | 70 ++++++++- .../worker/http/WorkerResourceTest.java | 135 ++++++++++++++++++ 3 files changed, 226 insertions(+), 2 deletions(-) create mode 100644 indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java index 6669556580b..feda8e81a90 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -129,7 +129,11 @@ public class WorkerCuratorCoordinator try { byte[] rawBytes = jsonMapper.writeValueAsBytes(data); if (rawBytes.length > config.getMaxZnodeBytes()) { - throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()); + throw new ISE( + "Length of raw bytes for task too large[%,d > %,d]", + rawBytes.length, + config.getMaxZnodeBytes() + ); } curatorFramework.create() @@ -173,6 +177,11 @@ public class WorkerCuratorCoordinator return getPath(Arrays.asList(baseStatusPath, statusId)); } + public Worker getWorker() + { + return worker; + } + public void unannounceTask(String taskId) { try { @@ -239,4 +248,16 @@ public class WorkerCuratorCoordinator } } } + + public void updateWorkerAnnouncement(Worker newWorker) throws Exception + { + synchronized (lock) { + if (!started) { + log.error("Cannot update worker! Not Started!"); + return; + } + + announcer.update(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(newWorker)); + } + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java index f38acb23982..b70aa64c111 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java @@ -19,12 +19,18 @@ package io.druid.indexing.worker.http; +import com.google.api.client.util.Lists; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import com.google.common.io.InputSupplier; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.indexing.overlord.ForkingTaskRunner; +import io.druid.indexing.overlord.TaskRunnerWorkItem; +import io.druid.indexing.worker.Worker; +import io.druid.indexing.worker.WorkerCuratorCoordinator; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -43,17 +49,78 @@ public class WorkerResource { private static final Logger log = new Logger(WorkerResource.class); + private final WorkerCuratorCoordinator curatorCoordinator; private final ForkingTaskRunner taskRunner; @Inject public WorkerResource( + WorkerCuratorCoordinator curatorCoordinator, ForkingTaskRunner taskRunner ) throws Exception { + this.curatorCoordinator = curatorCoordinator; this.taskRunner = taskRunner; } + @POST + @Path("/disable") + @Produces("application/json") + public Response doDisable() + { + final Worker worker = curatorCoordinator.getWorker(); + final Worker newWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""); + try { + curatorCoordinator.updateWorkerAnnouncement(newWorker); + return Response.ok(ImmutableMap.of(worker.getHost(), "disabled")).build(); + } + catch (Exception e) { + return Response.serverError().build(); + } + } + + @POST + @Path("/enable") + @Produces("application/json") + public Response doEnable() + { + final Worker worker = curatorCoordinator.getWorker(); + try { + curatorCoordinator.updateWorkerAnnouncement(worker); + return Response.ok(ImmutableMap.of(worker.getHost(), "enabled")).build(); + } + catch (Exception e) { + return Response.serverError().build(); + } + } + + @GET + @Path("/tasks") + @Produces("application/json") + public Response getTasks() + { + try { + return Response.ok( + Lists.newArrayList( + Collections2.transform( + taskRunner.getKnownTasks(), + new Function() + { + @Override + public String apply(TaskRunnerWorkItem input) + { + return input.getTaskId(); + } + } + ) + ) + ).build(); + } + catch (Exception e) { + return Response.serverError().build(); + } + } + @POST @Path("/task/{taskid}/shutdown") @Produces("application/json") @@ -82,7 +149,8 @@ public class WorkerResource if (stream.isPresent()) { try { return Response.ok(stream.get().getInput()).build(); - } catch (Exception e) { + } + catch (Exception e) { log.warn(e, "Failed to read log for task: %s", taskid); return Response.serverError().build(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java new file mode 100644 index 00000000000..5e80a523b56 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java @@ -0,0 +1,135 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.worker.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.curator.PotentiallyGzippedCompressionProvider; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.worker.Worker; +import io.druid.indexing.worker.WorkerCuratorCoordinator; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.initialization.ZkPathsConfig; +import junit.framework.Assert; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.Response; + +/** + */ +public class WorkerResourceTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private static final String basePath = "/test/druid"; + private static final String announcementsPath = String.format("%s/indexer/announcements/host", basePath); + + private TestingCluster testingCluster; + private CuratorFramework cf; + + private Worker worker; + + private WorkerCuratorCoordinator curatorCoordinator; + private WorkerResource workerResource; + + @Before + public void setUp() throws Exception + { + testingCluster = new TestingCluster(1); + testingCluster.start(); + + cf = CuratorFrameworkFactory.builder() + .connectString(testingCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) + .build(); + cf.start(); + cf.create().creatingParentsIfNeeded().forPath(basePath); + + worker = new Worker( + "host", + "ip", + 3, + "v1" + ); + + curatorCoordinator = new WorkerCuratorCoordinator( + jsonMapper, + new ZkPathsConfig() + { + @Override + public String getZkBasePath() + { + return basePath; + } + }, + new RemoteTaskRunnerConfig(), + cf, + worker + ); + curatorCoordinator.start(); + + workerResource = new WorkerResource( + curatorCoordinator, + null + ); + } + + @After + public void tearDown() throws Exception + { + curatorCoordinator.stop(); + cf.close(); + testingCluster.close(); + } + + @Test + public void testDoDisable() throws Exception + { + Worker theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class); + Assert.assertEquals("v1", theWorker.getVersion()); + + Response res = workerResource.doDisable(); + Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus()); + + theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class); + Assert.assertTrue(theWorker.getVersion().isEmpty()); + } + + @Test + public void testDoEnable() throws Exception + { + // Disable the worker + Response res = workerResource.doDisable(); + Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus()); + Worker theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class); + Assert.assertTrue(theWorker.getVersion().isEmpty()); + + // Enable the worker + res = workerResource.doEnable(); + Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus()); + theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class); + Assert.assertEquals("v1", theWorker.getVersion()); + } +} From c6078ca841d3ebec4ccf2f7a09b4d9a00193ee58 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 17 Jul 2014 13:34:05 -0700 Subject: [PATCH 2/5] address code review --- docs/content/Indexing-Service-Config.md | 3 +- .../indexing/overlord/RemoteTaskRunner.java | 3 +- .../SimpleResourceManagementStrategy.java | 4 +- .../overlord/setup/WorkerSetupData.java | 10 --- .../worker/WorkerCuratorCoordinator.java | 6 +- .../indexing/worker/http/WorkerResource.java | 33 +++++-- .../overlord/RemoteTaskRunnerTest.java | 2 +- .../scaling/EC2AutoScalingStrategyTest.java | 1 - .../SimpleResourceManagementStrategyTest.java | 87 +------------------ .../worker/http/WorkerResourceTest.java | 1 + 10 files changed, 38 insertions(+), 112 deletions(-) diff --git a/docs/content/Indexing-Service-Config.md b/docs/content/Indexing-Service-Config.md index 122a6623504..8916c5f5372 100644 --- a/docs/content/Indexing-Service-Config.md +++ b/docs/content/Indexing-Service-Config.md @@ -22,7 +22,7 @@ The following configs only apply if the overlord is running in remote mode: |Property|Description|Default| |--------|-----------|-------| |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a middle manager before throwing an error.|PT5M| -|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |none| +|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"| |`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|false| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| @@ -80,7 +80,6 @@ Issuing a GET request at the same URL will return the current worker setup spec |Property|Description|Default| |--------|-----------|-------| -|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be druid.indexer.runner.minWorkerVersion.|none| |`minNumWorkers`|The minimum number of workers that can be in the cluster at any given time.|0| |`maxNumWorkers`|The maximum number of workers that can be in the cluster at any given time.|0| |`nodeData`|A JSON object that describes how to launch new nodes. Currently, only EC2 is supported.|none; required| diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index c546fbcce06..7b0dab4d512 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -806,8 +806,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } ); sortedWorkers.addAll(zkWorkers.values()); - final String workerSetupDataMinVer = workerSetupData.get() == null ? null : workerSetupData.get().getMinVersion(); - final String minWorkerVer = workerSetupDataMinVer == null ? config.getMinWorkerVersion() : workerSetupDataMinVer; + final String minWorkerVer = config.getMinWorkerVersion(); for (ZkWorker zkWorker : sortedWorkers) { if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java index 6d3dd904c5e..6f645f3aec0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java @@ -274,9 +274,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat @Override public boolean apply(ZkWorker zkWorker) { - final String minVersion = workerSetupData.getMinVersion() != null - ? workerSetupData.getMinVersion() - : config.getWorkerVersion(); + final String minVersion = config.getWorkerVersion(); if (minVersion == null) { throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database."); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java index e792f347aed..ab778622e3e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSetupData.java @@ -28,7 +28,6 @@ public class WorkerSetupData { public static final String CONFIG_KEY = "worker.setup"; - private final String minVersion; private final int minNumWorkers; private final int maxNumWorkers; private final String availabilityZone; @@ -37,7 +36,6 @@ public class WorkerSetupData @JsonCreator public WorkerSetupData( - @JsonProperty("minVersion") String minVersion, @JsonProperty("minNumWorkers") int minNumWorkers, @JsonProperty("maxNumWorkers") int maxNumWorkers, @JsonProperty("availabilityZone") String availabilityZone, @@ -45,7 +43,6 @@ public class WorkerSetupData @JsonProperty("userData") EC2UserData userData ) { - this.minVersion = minVersion; this.minNumWorkers = minNumWorkers; this.maxNumWorkers = maxNumWorkers; this.availabilityZone = availabilityZone; @@ -53,12 +50,6 @@ public class WorkerSetupData this.userData = userData; } - @JsonProperty - public String getMinVersion() - { - return minVersion; - } - @JsonProperty public int getMinNumWorkers() { @@ -93,7 +84,6 @@ public class WorkerSetupData public String toString() { return "WorkerSetupData{" + - "minVersion='" + minVersion + '\'' + ", minNumWorkers=" + minNumWorkers + ", maxNumWorkers=" + maxNumWorkers + ", availabilityZone=" + availabilityZone + diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java index feda8e81a90..32dbf5b85ff 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -51,13 +51,13 @@ public class WorkerCuratorCoordinator private final ObjectMapper jsonMapper; private final RemoteTaskRunnerConfig config; private final CuratorFramework curatorFramework; - private final Worker worker; private final Announcer announcer; private final String baseAnnouncementsPath; private final String baseTaskPath; private final String baseStatusPath; + private volatile Worker worker; private volatile boolean started; @Inject @@ -253,10 +253,10 @@ public class WorkerCuratorCoordinator { synchronized (lock) { if (!started) { - log.error("Cannot update worker! Not Started!"); - return; + throw new ISE("Cannot update worker! Not Started!"); } + this.worker = newWorker; announcer.update(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(newWorker)); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java index b70aa64c111..1084fe28168 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java @@ -48,31 +48,36 @@ import java.io.InputStream; public class WorkerResource { private static final Logger log = new Logger(WorkerResource.class); + private static String DISABLED_VERSION = ""; + private final Worker enabledWorker; + private final Worker disabledWorker; private final WorkerCuratorCoordinator curatorCoordinator; private final ForkingTaskRunner taskRunner; @Inject public WorkerResource( + Worker worker, WorkerCuratorCoordinator curatorCoordinator, ForkingTaskRunner taskRunner ) throws Exception { + this.enabledWorker = worker; + this.disabledWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), DISABLED_VERSION); this.curatorCoordinator = curatorCoordinator; this.taskRunner = taskRunner; } + @POST @Path("/disable") @Produces("application/json") public Response doDisable() { - final Worker worker = curatorCoordinator.getWorker(); - final Worker newWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""); try { - curatorCoordinator.updateWorkerAnnouncement(newWorker); - return Response.ok(ImmutableMap.of(worker.getHost(), "disabled")).build(); + curatorCoordinator.updateWorkerAnnouncement(disabledWorker); + return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build(); } catch (Exception e) { return Response.serverError().build(); @@ -84,10 +89,24 @@ public class WorkerResource @Produces("application/json") public Response doEnable() { - final Worker worker = curatorCoordinator.getWorker(); try { - curatorCoordinator.updateWorkerAnnouncement(worker); - return Response.ok(ImmutableMap.of(worker.getHost(), "enabled")).build(); + curatorCoordinator.updateWorkerAnnouncement(enabledWorker); + return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "enabled")).build(); + } + catch (Exception e) { + return Response.serverError().build(); + } + } + + @GET + @Path("/disabled") + @Produces("application/json") + public Response isEnabled() + { + try { + final Worker theWorker = curatorCoordinator.getWorker(); + final boolean disabled = theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION); + return Response.ok(ImmutableMap.of(theWorker.getHost(), disabled)).build(); } catch (Exception e) { return Response.serverError().build(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index fcf9715fe62..26aa7077c56 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -380,7 +380,7 @@ public class RemoteTaskRunnerTest }, cf, new SimplePathChildrenCacheFactory.Builder().build(), - DSuppliers.of(new AtomicReference(new WorkerSetupData("0", 0, 1, null, null, null))), + DSuppliers.of(new AtomicReference(new WorkerSetupData(0, 1, null, null, null))), null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java index 1ccacc66df4..7be9a6eaad4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/EC2AutoScalingStrategyTest.java @@ -95,7 +95,6 @@ public class EC2AutoScalingStrategyTest { workerSetupData.set( new WorkerSetupData( - "0", 0, 1, "", diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java index 21f3277a653..6c13a0704c0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java @@ -67,7 +67,7 @@ public class SimpleResourceManagementStrategyTest autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); workerSetupData = new AtomicReference<>( new WorkerSetupData( - "0", 0, 2, null, null, null + 0, 2, null, null, null ) ); @@ -237,7 +237,7 @@ public class SimpleResourceManagementStrategyTest @Test public void testDoSuccessfulTerminate() throws Exception { - workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null)); + workerSetupData.set(new WorkerSetupData(0, 1, null, null, null)); EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList()); @@ -267,7 +267,7 @@ public class SimpleResourceManagementStrategyTest @Test public void testSomethingTerminating() throws Exception { - workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null)); + workerSetupData.set(new WorkerSetupData(0, 1, null, null, null)); EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) .andReturn(Lists.newArrayList("ip")).times(2); @@ -381,7 +381,7 @@ public class SimpleResourceManagementStrategyTest EasyMock.verify(autoScalingStrategy); // Increase minNumWorkers - workerSetupData.set(new WorkerSetupData("0", 3, 5, null, null, null)); + workerSetupData.set(new WorkerSetupData(3, 5, null, null, null)); // Should provision two new workers EasyMock.reset(autoScalingStrategy); @@ -404,85 +404,6 @@ public class SimpleResourceManagementStrategyTest EasyMock.verify(autoScalingStrategy); } - @Test - public void testMinVersionIncrease() throws Exception - { - // Don't terminate anything - EasyMock.reset(autoScalingStrategy); - EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); - EasyMock.replay(autoScalingStrategy); - boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(NoopTask.create(), "h1", "i1", "0"), - new TestZkWorker(NoopTask.create(), "h1", "i2", "0") - ) - ); - Assert.assertFalse(terminatedSomething); - EasyMock.verify(autoScalingStrategy); - - // Don't provision anything - EasyMock.reset(autoScalingStrategy); - EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); - EasyMock.replay(autoScalingStrategy); - boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(NoopTask.create()), - new TestZkWorker(NoopTask.create()) - ) - ); - Assert.assertFalse(provisionedSomething); - EasyMock.verify(autoScalingStrategy); - - // Increase minVersion - workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null)); - - // Provision two new workers - EasyMock.reset(autoScalingStrategy); - EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) - .andReturn(Lists.newArrayList("ip")); - EasyMock.expect(autoScalingStrategy.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("h3")) - ); - EasyMock.expect(autoScalingStrategy.provision()).andReturn( - new AutoScalingData(Lists.newArrayList("h4")) - ); - EasyMock.replay(autoScalingStrategy); - provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(NoopTask.create(), "h1", "i1", "0"), - new TestZkWorker(NoopTask.create(), "h2", "i2", "0") - ) - ); - Assert.assertTrue(provisionedSomething); - EasyMock.verify(autoScalingStrategy); - - // Terminate old workers - EasyMock.reset(autoScalingStrategy); - EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn( - ImmutableList.of("h1", "h2", "h3", "h4") - ); - EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn( - new AutoScalingData(ImmutableList.of("h1", "h2")) - ); - EasyMock.replay(autoScalingStrategy); - terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList(), - Arrays.asList( - new TestZkWorker(null, "h1", "i1", "0"), - new TestZkWorker(null, "h2", "i2", "0"), - new TestZkWorker(NoopTask.create(), "h3", "i3", "1"), - new TestZkWorker(NoopTask.create(), "h4", "i4", "1") - ) - ); - Assert.assertTrue(terminatedSomething); - EasyMock.verify(autoScalingStrategy); - } - @Test public void testNullWorkerSetupData() throws Exception { diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java index 5e80a523b56..2a0a1e4a3c3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java @@ -91,6 +91,7 @@ public class WorkerResourceTest curatorCoordinator.start(); workerResource = new WorkerResource( + worker, curatorCoordinator, null ); From d765ff5e75245d4628efc0d85b88fbd50ef6ccfe Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 17 Jul 2014 14:53:26 -0700 Subject: [PATCH 3/5] docs for updating --- docs/content/Rolling-Updates.md | 44 +++++++++++++++++++++++++++++++++ docs/content/toc.textile | 1 + 2 files changed, 45 insertions(+) create mode 100644 docs/content/Rolling-Updates.md diff --git a/docs/content/Rolling-Updates.md b/docs/content/Rolling-Updates.md new file mode 100644 index 00000000000..a6d5cd005bd --- /dev/null +++ b/docs/content/Rolling-Updates.md @@ -0,0 +1,44 @@ +--- +layout: doc_page +--- + + +Rolling Updates +=============== + +For rolling Druid cluster updates with no downtime, we recommend updating Druid nodes in the following order: + +1. Historical Nodes +2. Indexing Service/Real-time Nodes +3. Broker Nodes +4. Coordinator Nodes + +## Historical Nodes + +Historical nodes can be updated one at a time. Each historical node has a startup time to memory map all the segments it was serving before the update. The startup time typically takes a few seconds to a few minutes, depending on the hardware of the node. As long as each historical node is updated with a sufficient delay (greater than the time required to start a single node), you can rolling update the entire historical cluster. + +## Standalone Real-time nodes + +Standalone real-time nodes can be updated one at a time in a rolling fashion. + +## Indexing Service + +### With Autoscaling + +Overlord nodes will try to launch new middle manager nodes and terminate old ones without dropping data. This process is based on the configuration `druid.indexer.runner.minWorkerVersion=#{VERSION}`. Each time you update your overlord node, the `VERSION` value should be increased. + +### Without Autoscaling + +Middle managers can be updated in a rolling fashion based on API. + +To prepare a middle manager for update, send a POST request to `/druid/worker/v1/disable`. The overlord will now no longer send tasks to this middle manager. + +Current tasks will still try to complete. To view all existing tasks, send a GET request to `/druid/worker/v1/tasks`. When this list is empty, the middle manager can be updated. After the middle manager is updated, it is automatically enabled again. You can also manually enable middle managers POSTing to `/druid/worker/v1/enable`. + +## Broker Nodes + +Broker nodes can be updated one at a time in a rolling fashion. There needs to be some delay between updating each node as brokers must load the entire state of the cluster before they return valid results. + +## Coordinator Nodes + +Coordinator nodes can be updated in a rolling fashion. \ No newline at end of file diff --git a/docs/content/toc.textile b/docs/content/toc.textile index a4769a6b2f5..76ba3e31133 100644 --- a/docs/content/toc.textile +++ b/docs/content/toc.textile @@ -17,6 +17,7 @@ h2. Getting Started h2. Booting a Druid Cluster * "Simple Cluster Configuration":Simple-Cluster-Configuration.html * "Production Cluster Configuration":Production-Cluster-Configuration.html +* "Rolling Cluster Updates":Rolling-Updates.html h2. Configuration * "Common Configuration":Configuration.html From 83cb7931ac531debec560d4e6e471a70ace4922e Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 17 Jul 2014 14:59:56 -0700 Subject: [PATCH 4/5] add more stuff to docs --- docs/content/Rolling-Updates.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/content/Rolling-Updates.md b/docs/content/Rolling-Updates.md index a6d5cd005bd..6c8680de0cb 100644 --- a/docs/content/Rolling-Updates.md +++ b/docs/content/Rolling-Updates.md @@ -27,6 +27,8 @@ Standalone real-time nodes can be updated one at a time in a rolling fashion. Overlord nodes will try to launch new middle manager nodes and terminate old ones without dropping data. This process is based on the configuration `druid.indexer.runner.minWorkerVersion=#{VERSION}`. Each time you update your overlord node, the `VERSION` value should be increased. +The config `druid.indexer.autoscale.workerVersion=#{VERSION}` also needs to be set. + ### Without Autoscaling Middle managers can be updated in a rolling fashion based on API. From beac0be45b6c3893ace90020a7916ccc48caba0d Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 17 Jul 2014 18:04:36 -0700 Subject: [PATCH 5/5] fix enabled endpoint --- .../java/io/druid/indexing/worker/http/WorkerResource.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java index 1084fe28168..ddbbd7f0929 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java @@ -99,14 +99,14 @@ public class WorkerResource } @GET - @Path("/disabled") + @Path("/enabled") @Produces("application/json") public Response isEnabled() { try { final Worker theWorker = curatorCoordinator.getWorker(); - final boolean disabled = theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION); - return Response.ok(ImmutableMap.of(theWorker.getHost(), disabled)).build(); + final boolean enabled = !theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION); + return Response.ok(ImmutableMap.of(theWorker.getHost(), enabled)).build(); } catch (Exception e) { return Response.serverError().build();