diff --git a/docs/content/Thanks.md b/docs/content/Thanks.md
deleted file mode 100644
index 97ec7e0904a..00000000000
--- a/docs/content/Thanks.md
+++ /dev/null
@@ -1,12 +0,0 @@
----
-layout: doc_page
----
-
-YourKit supports the Druid open source projects with its
-full-featured Java Profiler.
-YourKit, LLC is the creator of innovative and intelligent tools for profiling
-Java and .NET applications. Take a look at YourKit's software products:
-YourKit Java
-Profiler and
-YourKit .NET
-Profiler.
diff --git a/docs/content/toc.textile b/docs/content/toc.textile
index cc3c86eac99..62a9ec0776a 100644
--- a/docs/content/toc.textile
+++ b/docs/content/toc.textile
@@ -93,4 +93,4 @@ h2. Development
* "Libraries":./Libraries.html
h2. Misc
-* "Thanks":./Thanks.html
+* "Thanks":/thanks.html
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java
index f657b6a2435..f78f576de67 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ImmutableZkWorker.java
@@ -32,7 +32,7 @@ public class ImmutableZkWorker
{
private final Worker worker;
private final int currCapacityUsed;
- private final Set availabilityGroups;
+ private final ImmutableSet availabilityGroups;
public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set availabilityGroups)
{
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java
index e9bba479c64..e382c6dee3c 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java
@@ -164,7 +164,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
- Worker worker;
+ final Worker worker;
switch (event.getType()) {
case CHILD_ADDED:
worker = jsonMapper.readValue(
@@ -198,6 +198,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
);
break;
+ case CHILD_UPDATED:
+ worker = jsonMapper.readValue(
+ event.getData().getData(),
+ Worker.class
+ );
+ updateWorker(worker);
+ break;
+
case CHILD_REMOVED:
worker = jsonMapper.readValue(
event.getData().getData(),
@@ -745,6 +753,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
}
+ /**
+ * We allow workers to change their own capacities and versions. They cannot change their own hosts or ips without
+ * dropping themselves and re-announcing.
+ */
+ private void updateWorker(final Worker worker)
+ {
+ final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
+ if (zkWorker != null) {
+ log.info("Worker[%s] updated its announcement from[%s] to[%s].", worker.getHost(), zkWorker.getWorker(), worker);
+ zkWorker.setWorker(worker);
+ } else {
+ log.warn(
+ "WTF, worker[%s] updated its announcement but we didn't have a ZkWorker for it. Ignoring.",
+ worker.getHost()
+ );
+ }
+ }
+
/**
* When a ephemeral worker node disappears from ZK, incomplete running tasks will be retried by
* the logic in the status listener. We still have to make sure there are no tasks assigned
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java
index abc4da0ad57..54b09da2b29 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ZkWorker.java
@@ -22,11 +22,11 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData;
@@ -46,15 +46,15 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ZkWorker implements Closeable
{
- private final Worker worker;
private final PathChildrenCache statusCache;
private final Function cacheConverter;
+ private AtomicReference worker;
private AtomicReference lastCompletedTaskTime = new AtomicReference(new DateTime());
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
- this.worker = worker;
+ this.worker = new AtomicReference<>(worker);
this.statusCache = statusCache;
this.cacheConverter = new Function()
{
@@ -84,7 +84,7 @@ public class ZkWorker implements Closeable
@JsonProperty("worker")
public Worker getWorker()
{
- return worker;
+ return worker.get();
}
@JsonProperty("runningTasks")
@@ -137,30 +137,28 @@ public class ZkWorker implements Closeable
return getRunningTasks().containsKey(taskId);
}
- public boolean isAtCapacity()
- {
- return getCurrCapacityUsed() >= worker.getCapacity();
- }
-
public boolean isValidVersion(String minVersion)
{
- return worker.getVersion().compareTo(minVersion) >= 0;
+ return worker.get().getVersion().compareTo(minVersion) >= 0;
}
- public boolean canRunTask(Task task)
+ public void setWorker(Worker newWorker)
{
- return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
- && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
+ final Worker oldWorker = worker.get();
+ Preconditions.checkArgument(newWorker.getHost().equals(oldWorker.getHost()), "Cannot change Worker host");
+ Preconditions.checkArgument(newWorker.getIp().equals(oldWorker.getIp()), "Cannot change Worker ip");
+
+ worker.set(newWorker);
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
- lastCompletedTaskTime.getAndSet(completedTaskTime);
+ lastCompletedTaskTime.set(completedTaskTime);
}
public ImmutableZkWorker toImmutable()
{
- return new ImmutableZkWorker(worker, getCurrCapacityUsed(), getAvailabilityGroups());
+ return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups());
}
@Override
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 11c7c85c639..b5f86ea7494 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -361,6 +361,29 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
}
+ @Test
+ public void testWorkerDisabled() throws Exception
+ {
+ doSetup();
+ final ListenableFuture result = remoteTaskRunner.run(task);
+
+ Assert.assertTrue(taskAnnounced(task.getId()));
+ mockWorkerRunningTask(task);
+ Assert.assertTrue(workerRunningTask(task.getId()));
+
+ // Disable while task running
+ disableWorker();
+
+ // Continue test
+ mockWorkerCompleteSuccessfulTask(task);
+ Assert.assertTrue(workerCompletedTask(result));
+ Assert.assertEquals(task.getId(), result.get().getId());
+ Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
+
+ // Confirm RTR thinks the worker is disabled.
+ Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
+ }
+
private void doSetup() throws Exception
{
makeWorker();
@@ -405,6 +428,14 @@ public class RemoteTaskRunnerTest
);
}
+ private void disableWorker() throws Exception
+ {
+ cf.setData().forPath(
+ announcementsPath,
+ jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
+ );
+ }
+
private boolean taskAnnounced(final String taskId)
{
return pathExists(joiner.join(tasksPath, taskId));
diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
index b1db05a2284..071a91b3c3e 100644
--- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java
@@ -63,12 +63,12 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
public CardinalityAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldNames") final List fieldNames,
- @JsonProperty("byRow") final Boolean byRow
+ @JsonProperty("byRow") final boolean byRow
)
{
this.name = name;
this.fieldNames = fieldNames;
- this.byRow = byRow == null ? false : byRow;
+ this.byRow = byRow;
}
@Override
@@ -203,6 +203,12 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
return fieldNames;
}
+ @JsonProperty
+ public boolean isByRow()
+ {
+ return byRow;
+ }
+
@Override
public byte[] getCacheKey()
{
diff --git a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java
index 4d27df07303..9d8ac7c721f 100644
--- a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java
+++ b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java
@@ -19,16 +19,20 @@
package io.druid.query.aggregation.cardinality;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
@@ -378,4 +382,15 @@ public class CardinalityAggregatorTest
0.05
);
}
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ CardinalityAggregatorFactory factory = new CardinalityAggregatorFactory("billy", ImmutableList.of("b", "a", "c"), true);
+ ObjectMapper objectMapper = new DefaultObjectMapper();
+ Assert.assertEquals(
+ factory,
+ objectMapper.readValue(objectMapper.writeValueAsString(factory), AggregatorFactory.class)
+ );
+ }
}