From fa92b77f5a54fbed5df8ef8cdea31415ef93208d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 10 Feb 2016 11:51:04 -0800 Subject: [PATCH] Harmonize znode writing code in RTR and Worker. - Throw most exceptions rather than suppressing them, which should help detect problems. Continue suppressing exceptions that make sense to suppress. - Handle payload length checks consistently, and improve error message. - Remove unused WorkerCuratorCoordinator.announceTaskAnnouncement method. - Max znode length should be int, not long. - Add tests. --- .../indexing/overlord/RemoteTaskRunner.java | 22 ++- .../config/RemoteTaskRunnerConfig.java | 5 +- .../worker/WorkerCuratorCoordinator.java | 96 ++++--------- .../overlord/TestRemoteTaskRunnerConfig.java | 2 +- .../java/io/druid/curator/CuratorUtils.java | 130 ++++++++++++++++++ .../io/druid/curator/CuratorUtilsTest.java | 117 ++++++++++++++++ 6 files changed, 283 insertions(+), 89 deletions(-) create mode 100644 server/src/main/java/io/druid/curator/CuratorUtils.java create mode 100644 server/src/test/java/io/druid/curator/CuratorUtilsTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index d022a4302cc..a4efbad057f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -54,6 +54,7 @@ import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.concurrent.Execs; +import io.druid.curator.CuratorUtils; import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; @@ -681,20 +682,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId()); - byte[] rawBytes = jsonMapper.writeValueAsBytes(task); - if (rawBytes.length > config.getMaxZnodeBytes()) { - throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()); - } - - String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()); - - if (cf.checkExists().forPath(taskPath) == null) { - cf.create() - .withMode(CreateMode.EPHEMERAL) - .forPath( - taskPath, rawBytes - ); - } + CuratorUtils.createIfNotExists( + cf, + JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()), + CreateMode.EPHEMERAL, + jsonMapper.writeValueAsBytes(task), + config.getMaxZnodeBytes() + ); RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId()); if (workItem == null) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java index 3375aa31049..33182024d33 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java @@ -20,6 +20,7 @@ package io.druid.indexing.overlord.config; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.curator.CuratorUtils; import org.joda.time.Period; import javax.validation.constraints.Min; @@ -42,7 +43,7 @@ public class RemoteTaskRunnerConfig @JsonProperty @Min(10 * 1024) - private long maxZnodeBytes = 512 * 1024; + private int maxZnodeBytes = CuratorUtils.DEFAULT_MAX_ZNODE_BYTES; @JsonProperty private Period taskShutdownLinkTimeout = new Period("PT1M"); @@ -62,7 +63,7 @@ public class RemoteTaskRunnerConfig return minWorkerVersion; } - public long getMaxZnodeBytes() + public int getMaxZnodeBytes() { return maxZnodeBytes; } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java index b97fa5808b0..59d68220bfd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -31,6 +31,7 @@ import com.metamx.common.ISE; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import io.druid.curator.CuratorUtils; import io.druid.curator.announcement.Announcer; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.server.initialization.IndexerZkConfig; @@ -94,16 +95,22 @@ public class WorkerCuratorCoordinator return; } - makePathIfNotExisting( + CuratorUtils.createIfNotExists( + curatorFramework, getTaskPathForWorker(), CreateMode.PERSISTENT, - ImmutableMap.of("created", new DateTime().toString()) + jsonMapper.writeValueAsBytes(ImmutableMap.of("created", new DateTime().toString())), + config.getMaxZnodeBytes() ); - makePathIfNotExisting( + + CuratorUtils.createIfNotExists( + curatorFramework, getStatusPathForWorker(), CreateMode.PERSISTENT, - ImmutableMap.of("created", new DateTime().toString()) + jsonMapper.writeValueAsBytes(ImmutableMap.of("created", new DateTime().toString())), + config.getMaxZnodeBytes() ); + announcer.start(); announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker), false); @@ -125,30 +132,6 @@ public class WorkerCuratorCoordinator } } - public void makePathIfNotExisting(String path, CreateMode mode, Object data) throws Exception - { - if (curatorFramework.checkExists().forPath(path) == null) { - try { - byte[] rawBytes = jsonMapper.writeValueAsBytes(data); - if (rawBytes.length > config.getMaxZnodeBytes()) { - throw new ISE( - "Length of raw bytes for task too large[%,d > %,d]", - rawBytes.length, - config.getMaxZnodeBytes() - ); - } - - curatorFramework.create() - .creatingParentsIfNeeded() - .withMode(mode) - .forPath(path, rawBytes); - } - catch (Exception e) { - log.warn(e, "Could not create path[%s], perhaps it already exists?", path); - } - } - } - public String getPath(Iterable parts) { return JOINER.join(parts); @@ -194,33 +177,6 @@ public class WorkerCuratorCoordinator } } - public void announceTaskAnnouncement(TaskAnnouncement announcement) - { - synchronized (lock) { - if (!started) { - return; - } - - try { - byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement); - if (rawBytes.length > config.getMaxZnodeBytes()) { - throw new ISE( - "Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes() - ); - } - - curatorFramework.create() - .withMode(CreateMode.PERSISTENT) - .forPath( - getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - public void updateAnnouncement(TaskAnnouncement announcement) { synchronized (lock) { @@ -229,21 +185,13 @@ public class WorkerCuratorCoordinator } try { - if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) { - announceTaskAnnouncement(announcement); - return; - } - byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement); - if (rawBytes.length > config.getMaxZnodeBytes()) { - throw new ISE( - "Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes() - ); - } - - curatorFramework.setData() - .forPath( - getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes - ); + CuratorUtils.createOrSet( + curatorFramework, + getStatusPathForId(announcement.getTaskStatus().getId()), + CreateMode.PERSISTENT, + jsonMapper.writeValueAsBytes(announcement), + config.getMaxZnodeBytes() + ); } catch (Exception e) { throw Throwables.propagate(e); @@ -251,7 +199,8 @@ public class WorkerCuratorCoordinator } } - public List getAnnouncements(){ + public List getAnnouncements() + { try { return Lists.transform( curatorFramework.getChildren().forPath(getStatusPathForWorker()), new Function() @@ -261,7 +210,10 @@ public class WorkerCuratorCoordinator public TaskAnnouncement apply(String input) { try { - return jsonMapper.readValue(curatorFramework.getData().forPath(getStatusPathForId(input)),TaskAnnouncement.class); + return jsonMapper.readValue( + curatorFramework.getData().forPath(getStatusPathForId(input)), + TaskAnnouncement.class + ); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TestRemoteTaskRunnerConfig.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TestRemoteTaskRunnerConfig.java index fa879333146..9ce94e2ab81 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TestRemoteTaskRunnerConfig.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TestRemoteTaskRunnerConfig.java @@ -46,7 +46,7 @@ public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig } @Override - public long getMaxZnodeBytes() + public int getMaxZnodeBytes() { // make sure this is large enough, otherwise RemoteTaskRunnerTest might fail unexpectedly return 10 * 1024; diff --git a/server/src/main/java/io/druid/curator/CuratorUtils.java b/server/src/main/java/io/druid/curator/CuratorUtils.java new file mode 100644 index 00000000000..d8e5c24016e --- /dev/null +++ b/server/src/main/java/io/druid/curator/CuratorUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator; + +import com.metamx.common.IAE; +import com.metamx.common.logger.Logger; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; + +public class CuratorUtils +{ + public static final int DEFAULT_MAX_ZNODE_BYTES = 512 * 1024; + + private static final Logger log = new Logger(CuratorUtils.class); + + /** + * Create znode if it does not already exist. If it does already exist, this does nothing. In particular, the + * existing znode may have a different payload or create mode. + * + * @param curatorFramework curator + * @param path path + * @param mode create mode + * @param rawBytes payload + * @param maxZnodeBytes maximum payload size + * + * @throws IllegalArgumentException if rawBytes.length > maxZnodeBytes + * @throws Exception if Curator throws an Exception + */ + public static void createIfNotExists( + CuratorFramework curatorFramework, + String path, + CreateMode mode, + byte[] rawBytes, + int maxZnodeBytes + ) throws Exception + { + verifySize(path, rawBytes, maxZnodeBytes); + + if (curatorFramework.checkExists().forPath(path) == null) { + try { + curatorFramework.create() + .creatingParentsIfNeeded() + .withMode(mode) + .forPath(path, rawBytes); + } + catch (KeeperException.NodeExistsException e) { + log.info("Skipping create path[%s], since it already exists.", path); + } + } + } + + /** + * Create znode if it does not already exist. If it does already exist, update the payload (but not the create mode). + * If someone deletes the znode while we're trying to set it, just let it stay deleted. + * + * @param curatorFramework curator + * @param path path + * @param mode create mode + * @param rawBytes payload + * @param maxZnodeBytes maximum payload size + * + * @throws IllegalArgumentException if rawBytes.length > maxZnodeBytes + * @throws Exception if Curator throws an Exception + */ + public static void createOrSet( + CuratorFramework curatorFramework, + String path, + CreateMode mode, + byte[] rawBytes, + int maxZnodeBytes + ) throws Exception + { + verifySize(path, rawBytes, maxZnodeBytes); + + boolean created = false; + if (curatorFramework.checkExists().forPath(path) == null) { + try { + curatorFramework.create() + .creatingParentsIfNeeded() + .withMode(mode) + .forPath(path, rawBytes); + + created = true; + } + catch (KeeperException.NodeExistsException e) { + log.debug("Path [%s] created while we were running, will setData instead.", path); + } + } + + if (!created) { + try { + curatorFramework.setData() + .forPath(path, rawBytes); + } + catch (KeeperException.NoNodeException e) { + log.warn("Someone deleted path[%s] while we were trying to set it. Leaving it deleted.", path); + } + } + } + + private static void verifySize(String path, byte[] rawBytes, int maxZnodeBytes) + { + if (rawBytes.length > maxZnodeBytes) { + throw new IAE( + "Length of raw bytes for znode[%s] too large[%,d > %,d]", + path, + rawBytes.length, + maxZnodeBytes + ); + } + } +} diff --git a/server/src/test/java/io/druid/curator/CuratorUtilsTest.java b/server/src/test/java/io/druid/curator/CuratorUtilsTest.java new file mode 100644 index 00000000000..c5b4f795cd9 --- /dev/null +++ b/server/src/test/java/io/druid/curator/CuratorUtilsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator; + +import org.apache.zookeeper.CreateMode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CuratorUtilsTest extends CuratorTestBase +{ + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + } + + @After + public void tearDown() + { + tearDownServerAndCurator(); + } + + @Test(timeout = 10_000L) + public void testCreateIfNotExists() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + + CuratorUtils.createIfNotExists( + curator, + "/foo/bar", + CreateMode.PERSISTENT, + "baz".getBytes(), + CuratorUtils.DEFAULT_MAX_ZNODE_BYTES + ); + Assert.assertEquals("baz", new String(curator.getData().forPath("/foo/bar"))); + + CuratorUtils.createIfNotExists( + curator, + "/foo/bar", + CreateMode.PERSISTENT, + "qux".getBytes(), + CuratorUtils.DEFAULT_MAX_ZNODE_BYTES + ); + Assert.assertEquals("baz", new String(curator.getData().forPath("/foo/bar"))); + } + + @Test(timeout = 10_000L) + public void testCreateIfNotExistsPayloadTooLarge() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + + Exception thrown = null; + try { + CuratorUtils.createIfNotExists(curator, "/foo/bar", CreateMode.PERSISTENT, "baz".getBytes(), 2); + } + catch (Exception e) { + thrown = e; + } + + Assert.assertTrue(thrown instanceof IllegalArgumentException); + Assert.assertNull(curator.checkExists().forPath("/foo/bar")); + } + + @Test(timeout = 10_000L) + public void testCreateOrSet() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + + Assert.assertNull(curator.checkExists().forPath("/foo/bar")); + + CuratorUtils.createOrSet(curator, "/foo/bar", CreateMode.PERSISTENT, "baz".getBytes(), 3); + Assert.assertEquals("baz", new String(curator.getData().forPath("/foo/bar"))); + + CuratorUtils.createOrSet(curator, "/foo/bar", CreateMode.PERSISTENT, "qux".getBytes(), 3); + Assert.assertEquals("qux", new String(curator.getData().forPath("/foo/bar"))); + } + + @Test(timeout = 10_000L) + public void testCreateOrSetPayloadTooLarge() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + + Exception thrown = null; + try { + CuratorUtils.createOrSet(curator, "/foo/bar", CreateMode.PERSISTENT, "baz".getBytes(), 2); + } + catch (Exception e) { + thrown = e; + } + + Assert.assertTrue(thrown instanceof IllegalArgumentException); + Assert.assertNull(curator.checkExists().forPath("/foo/bar")); + } +}