Harmonize znode writing code in RTR and Worker.

- Throw most exceptions rather than suppressing them, which should help
  detect problems. Continue suppressing exceptions that make sense to
  suppress.
- Handle payload length checks consistently, and improve error message.
- Remove unused WorkerCuratorCoordinator.announceTaskAnnouncement method.
- Max znode length should be int, not long.
- Add tests.
This commit is contained in:
Gian Merlino 2016-02-10 11:51:04 -08:00
parent 69a6bdcf03
commit fa92b77f5a
6 changed files with 283 additions and 89 deletions

View File

@ -54,6 +54,7 @@ import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
@ -681,20 +682,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId()); log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());
byte[] rawBytes = jsonMapper.writeValueAsBytes(task); CuratorUtils.createIfNotExists(
if (rawBytes.length > config.getMaxZnodeBytes()) { cf,
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()); JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),
} CreateMode.EPHEMERAL,
jsonMapper.writeValueAsBytes(task),
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()); config.getMaxZnodeBytes()
if (cf.checkExists().forPath(taskPath) == null) {
cf.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(
taskPath, rawBytes
); );
}
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId()); RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
if (workItem == null) { if (workItem == null) {

View File

@ -20,6 +20,7 @@
package io.druid.indexing.overlord.config; package io.druid.indexing.overlord.config;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.curator.CuratorUtils;
import org.joda.time.Period; import org.joda.time.Period;
import javax.validation.constraints.Min; import javax.validation.constraints.Min;
@ -42,7 +43,7 @@ public class RemoteTaskRunnerConfig
@JsonProperty @JsonProperty
@Min(10 * 1024) @Min(10 * 1024)
private long maxZnodeBytes = 512 * 1024; private int maxZnodeBytes = CuratorUtils.DEFAULT_MAX_ZNODE_BYTES;
@JsonProperty @JsonProperty
private Period taskShutdownLinkTimeout = new Period("PT1M"); private Period taskShutdownLinkTimeout = new Period("PT1M");
@ -62,7 +63,7 @@ public class RemoteTaskRunnerConfig
return minWorkerVersion; return minWorkerVersion;
} }
public long getMaxZnodeBytes() public int getMaxZnodeBytes()
{ {
return maxZnodeBytes; return maxZnodeBytes;
} }

View File

@ -31,6 +31,7 @@ import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.curator.CuratorUtils;
import io.druid.curator.announcement.Announcer; import io.druid.curator.announcement.Announcer;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.IndexerZkConfig;
@ -94,16 +95,22 @@ public class WorkerCuratorCoordinator
return; return;
} }
makePathIfNotExisting( CuratorUtils.createIfNotExists(
curatorFramework,
getTaskPathForWorker(), getTaskPathForWorker(),
CreateMode.PERSISTENT, CreateMode.PERSISTENT,
ImmutableMap.of("created", new DateTime().toString()) jsonMapper.writeValueAsBytes(ImmutableMap.of("created", new DateTime().toString())),
config.getMaxZnodeBytes()
); );
makePathIfNotExisting(
CuratorUtils.createIfNotExists(
curatorFramework,
getStatusPathForWorker(), getStatusPathForWorker(),
CreateMode.PERSISTENT, CreateMode.PERSISTENT,
ImmutableMap.of("created", new DateTime().toString()) jsonMapper.writeValueAsBytes(ImmutableMap.of("created", new DateTime().toString())),
config.getMaxZnodeBytes()
); );
announcer.start(); announcer.start();
announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker), false); announcer.announce(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(worker), false);
@ -125,30 +132,6 @@ public class WorkerCuratorCoordinator
} }
} }
public void makePathIfNotExisting(String path, CreateMode mode, Object data) throws Exception
{
if (curatorFramework.checkExists().forPath(path) == null) {
try {
byte[] rawBytes = jsonMapper.writeValueAsBytes(data);
if (rawBytes.length > config.getMaxZnodeBytes()) {
throw new ISE(
"Length of raw bytes for task too large[%,d > %,d]",
rawBytes.length,
config.getMaxZnodeBytes()
);
}
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(path, rawBytes);
}
catch (Exception e) {
log.warn(e, "Could not create path[%s], perhaps it already exists?", path);
}
}
}
public String getPath(Iterable<String> parts) public String getPath(Iterable<String> parts)
{ {
return JOINER.join(parts); return JOINER.join(parts);
@ -194,33 +177,6 @@ public class WorkerCuratorCoordinator
} }
} }
public void announceTaskAnnouncement(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
return;
}
try {
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
if (rawBytes.length > config.getMaxZnodeBytes()) {
throw new ISE(
"Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()
);
}
curatorFramework.create()
.withMode(CreateMode.PERSISTENT)
.forPath(
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
public void updateAnnouncement(TaskAnnouncement announcement) public void updateAnnouncement(TaskAnnouncement announcement)
{ {
synchronized (lock) { synchronized (lock) {
@ -229,20 +185,12 @@ public class WorkerCuratorCoordinator
} }
try { try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) { CuratorUtils.createOrSet(
announceTaskAnnouncement(announcement); curatorFramework,
return; getStatusPathForId(announcement.getTaskStatus().getId()),
} CreateMode.PERSISTENT,
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement); jsonMapper.writeValueAsBytes(announcement),
if (rawBytes.length > config.getMaxZnodeBytes()) { config.getMaxZnodeBytes()
throw new ISE(
"Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()
);
}
curatorFramework.setData()
.forPath(
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
); );
} }
catch (Exception e) { catch (Exception e) {
@ -251,7 +199,8 @@ public class WorkerCuratorCoordinator
} }
} }
public List<TaskAnnouncement> getAnnouncements(){ public List<TaskAnnouncement> getAnnouncements()
{
try { try {
return Lists.transform( return Lists.transform(
curatorFramework.getChildren().forPath(getStatusPathForWorker()), new Function<String, TaskAnnouncement>() curatorFramework.getChildren().forPath(getStatusPathForWorker()), new Function<String, TaskAnnouncement>()
@ -261,7 +210,10 @@ public class WorkerCuratorCoordinator
public TaskAnnouncement apply(String input) public TaskAnnouncement apply(String input)
{ {
try { try {
return jsonMapper.readValue(curatorFramework.getData().forPath(getStatusPathForId(input)),TaskAnnouncement.class); return jsonMapper.readValue(
curatorFramework.getData().forPath(getStatusPathForId(input)),
TaskAnnouncement.class
);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -46,7 +46,7 @@ public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
} }
@Override @Override
public long getMaxZnodeBytes() public int getMaxZnodeBytes()
{ {
// make sure this is large enough, otherwise RemoteTaskRunnerTest might fail unexpectedly // make sure this is large enough, otherwise RemoteTaskRunnerTest might fail unexpectedly
return 10 * 1024; return 10 * 1024;

View File

@ -0,0 +1,130 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator;
import com.metamx.common.IAE;
import com.metamx.common.logger.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
public class CuratorUtils
{
public static final int DEFAULT_MAX_ZNODE_BYTES = 512 * 1024;
private static final Logger log = new Logger(CuratorUtils.class);
/**
* Create znode if it does not already exist. If it does already exist, this does nothing. In particular, the
* existing znode may have a different payload or create mode.
*
* @param curatorFramework curator
* @param path path
* @param mode create mode
* @param rawBytes payload
* @param maxZnodeBytes maximum payload size
*
* @throws IllegalArgumentException if rawBytes.length > maxZnodeBytes
* @throws Exception if Curator throws an Exception
*/
public static void createIfNotExists(
CuratorFramework curatorFramework,
String path,
CreateMode mode,
byte[] rawBytes,
int maxZnodeBytes
) throws Exception
{
verifySize(path, rawBytes, maxZnodeBytes);
if (curatorFramework.checkExists().forPath(path) == null) {
try {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(path, rawBytes);
}
catch (KeeperException.NodeExistsException e) {
log.info("Skipping create path[%s], since it already exists.", path);
}
}
}
/**
* Create znode if it does not already exist. If it does already exist, update the payload (but not the create mode).
* If someone deletes the znode while we're trying to set it, just let it stay deleted.
*
* @param curatorFramework curator
* @param path path
* @param mode create mode
* @param rawBytes payload
* @param maxZnodeBytes maximum payload size
*
* @throws IllegalArgumentException if rawBytes.length > maxZnodeBytes
* @throws Exception if Curator throws an Exception
*/
public static void createOrSet(
CuratorFramework curatorFramework,
String path,
CreateMode mode,
byte[] rawBytes,
int maxZnodeBytes
) throws Exception
{
verifySize(path, rawBytes, maxZnodeBytes);
boolean created = false;
if (curatorFramework.checkExists().forPath(path) == null) {
try {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(path, rawBytes);
created = true;
}
catch (KeeperException.NodeExistsException e) {
log.debug("Path [%s] created while we were running, will setData instead.", path);
}
}
if (!created) {
try {
curatorFramework.setData()
.forPath(path, rawBytes);
}
catch (KeeperException.NoNodeException e) {
log.warn("Someone deleted path[%s] while we were trying to set it. Leaving it deleted.", path);
}
}
}
private static void verifySize(String path, byte[] rawBytes, int maxZnodeBytes)
{
if (rawBytes.length > maxZnodeBytes) {
throw new IAE(
"Length of raw bytes for znode[%s] too large[%,d > %,d]",
path,
rawBytes.length,
maxZnodeBytes
);
}
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class CuratorUtilsTest extends CuratorTestBase
{
@Before
public void setUp() throws Exception
{
setupServerAndCurator();
}
@After
public void tearDown()
{
tearDownServerAndCurator();
}
@Test(timeout = 10_000L)
public void testCreateIfNotExists() throws Exception
{
curator.start();
curator.blockUntilConnected();
CuratorUtils.createIfNotExists(
curator,
"/foo/bar",
CreateMode.PERSISTENT,
"baz".getBytes(),
CuratorUtils.DEFAULT_MAX_ZNODE_BYTES
);
Assert.assertEquals("baz", new String(curator.getData().forPath("/foo/bar")));
CuratorUtils.createIfNotExists(
curator,
"/foo/bar",
CreateMode.PERSISTENT,
"qux".getBytes(),
CuratorUtils.DEFAULT_MAX_ZNODE_BYTES
);
Assert.assertEquals("baz", new String(curator.getData().forPath("/foo/bar")));
}
@Test(timeout = 10_000L)
public void testCreateIfNotExistsPayloadTooLarge() throws Exception
{
curator.start();
curator.blockUntilConnected();
Exception thrown = null;
try {
CuratorUtils.createIfNotExists(curator, "/foo/bar", CreateMode.PERSISTENT, "baz".getBytes(), 2);
}
catch (Exception e) {
thrown = e;
}
Assert.assertTrue(thrown instanceof IllegalArgumentException);
Assert.assertNull(curator.checkExists().forPath("/foo/bar"));
}
@Test(timeout = 10_000L)
public void testCreateOrSet() throws Exception
{
curator.start();
curator.blockUntilConnected();
Assert.assertNull(curator.checkExists().forPath("/foo/bar"));
CuratorUtils.createOrSet(curator, "/foo/bar", CreateMode.PERSISTENT, "baz".getBytes(), 3);
Assert.assertEquals("baz", new String(curator.getData().forPath("/foo/bar")));
CuratorUtils.createOrSet(curator, "/foo/bar", CreateMode.PERSISTENT, "qux".getBytes(), 3);
Assert.assertEquals("qux", new String(curator.getData().forPath("/foo/bar")));
}
@Test(timeout = 10_000L)
public void testCreateOrSetPayloadTooLarge() throws Exception
{
curator.start();
curator.blockUntilConnected();
Exception thrown = null;
try {
CuratorUtils.createOrSet(curator, "/foo/bar", CreateMode.PERSISTENT, "baz".getBytes(), 2);
}
catch (Exception e) {
thrown = e;
}
Assert.assertTrue(thrown instanceof IllegalArgumentException);
Assert.assertNull(curator.checkExists().forPath("/foo/bar"));
}
}