diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 3bd3d53911f..a2b780026a7 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -575,6 +575,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11604. Prevent ConcurrentModificationException while closing domain
sockets during shutdown of DomainSocketWatcher thread. (cnauroth)
+ HADOOP-11612. Workaround for Curator's ChildReaper requiring Guava 15+.
+ (rkanter)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java
new file mode 100644
index 00000000000..3bff187a287
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.recipes.locks.Reaper;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.curator.utils.PathUtils;
+
+/**
+ * This is a copy of Curator 2.7.1's ChildReaper class, modified to work with
+ * Guava 11.0.2. The problem is the 'paths' Collection, which calls Guava's
+ * Sets.newConcurrentHashSet(), which was added in Guava 15.0.
+ *
+ * Utility to reap empty child nodes of a parent node. Periodically calls getChildren on
+ * the node and adds empty nodes to an internally managed {@link Reaper}
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ChildReaper implements Closeable
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final Reaper reaper;
+ private final AtomicReference state = new AtomicReference(State.LATENT);
+ private final CuratorFramework client;
+ private final Collection paths = newConcurrentHashSet();
+ private final Reaper.Mode mode;
+ private final CloseableScheduledExecutorService executor;
+ private final int reapingThresholdMs;
+
+ private volatile Future> task;
+
+ // This is copied from Curator's Reaper class
+ static final int DEFAULT_REAPING_THRESHOLD_MS = (int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
+
+ // This is copied from Guava
+ /**
+ * Creates a thread-safe set backed by a hash map. The set is backed by a
+ * {@link ConcurrentHashMap} instance, and thus carries the same concurrency
+ * guarantees.
+ *
+ * Unlike {@code HashSet}, this class does NOT allow {@code null} to be
+ * used as an element. The set is serializable.
+ *
+ * @return a new, empty thread-safe {@code Set}
+ * @since 15.0
+ */
+ public static Set newConcurrentHashSet() {
+ return Sets.newSetFromMap(new ConcurrentHashMap());
+ }
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * @param client the client
+ * @param path path to reap children from
+ * @param mode reaping mode
+ */
+ public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode)
+ {
+ this(client, path, mode, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, null);
+ }
+
+ /**
+ * @param client the client
+ * @param path path to reap children from
+ * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
+ * @param mode reaping mode
+ */
+ public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, int reapingThresholdMs)
+ {
+ this(client, path, mode, newExecutorService(), reapingThresholdMs, null);
+ }
+
+ /**
+ * @param client the client
+ * @param path path to reap children from
+ * @param executor executor to use for background tasks
+ * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
+ * @param mode reaping mode
+ */
+ public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs)
+ {
+ this(client, path, mode, executor, reapingThresholdMs, null);
+ }
+
+ /**
+ * @param client the client
+ * @param path path to reap children from
+ * @param executor executor to use for background tasks
+ * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
+ * @param mode reaping mode
+ * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
+ */
+ public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
+ {
+ this.client = client;
+ this.mode = mode;
+ this.executor = new CloseableScheduledExecutorService(executor);
+ this.reapingThresholdMs = reapingThresholdMs;
+ this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath);
+ addPath(path);
+ }
+
+ /**
+ * The reaper must be started
+ *
+ * @throws Exception errors
+ */
+ public void start() throws Exception
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+
+ task = executor.scheduleWithFixedDelay
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doWork();
+ }
+ },
+ reapingThresholdMs,
+ reapingThresholdMs,
+ TimeUnit.MILLISECONDS
+ );
+
+ reaper.start();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ CloseableUtils.closeQuietly(reaper);
+ task.cancel(true);
+ }
+ }
+
+ /**
+ * Add a path to reap children from
+ *
+ * @param path the path
+ * @return this for chaining
+ */
+ public ChildReaper addPath(String path)
+ {
+ paths.add(PathUtils.validatePath(path));
+ return this;
+ }
+
+ /**
+ * Remove a path from reaping
+ *
+ * @param path the path
+ * @return true if the path existed and was removed
+ */
+ public boolean removePath(String path)
+ {
+ return paths.remove(PathUtils.validatePath(path));
+ }
+
+ private static ScheduledExecutorService newExecutorService()
+ {
+ return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
+ }
+
+ private void doWork()
+ {
+ for ( String path : paths )
+ {
+ try
+ {
+ List children = client.getChildren().forPath(path);
+ for ( String name : children )
+ {
+ String thisPath = ZKPaths.makePath(path, name);
+ Stat stat = client.checkExists().forPath(thisPath);
+ if ( (stat != null) && (stat.getNumChildren() == 0) )
+ {
+ reaper.addPath(thisPath, mode);
+ }
+ }
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not get children for path: " + path, e);
+ }
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java
new file mode 100644
index 00000000000..11b254fc697
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestChildReaper.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.framework.recipes.locks.Reaper;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Timing;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.BindException;
+import java.util.Random;
+
+/**
+ * This is a copy of Curator 2.7.1's TestChildReaper class, with minor
+ * modifications to make it work with JUnit (some setup code taken from
+ * Curator's BaseClassForTests). This is to ensure that the ChildReaper
+ * class we modified is still correct.
+ */
+public class TestChildReaper
+{
+ protected TestingServer server;
+
+ @Before
+ public void setup() throws Exception {
+ while(this.server == null) {
+ try {
+ this.server = new TestingServer();
+ } catch (BindException var2) {
+ System.err.println("Getting bind exception - retrying to allocate server");
+ this.server = null;
+ }
+ }
+ }
+
+ @After
+ public void teardown() throws Exception {
+ this.server.close();
+ this.server = null;
+ }
+
+ @Test
+ public void testSomeNodes() throws Exception
+ {
+
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ Random r = new Random();
+ int nonEmptyNodes = 0;
+ for ( int i = 0; i < 10; ++i )
+ {
+ client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
+ if ( r.nextBoolean() )
+ {
+ client.create().forPath("/test/" + Integer.toString(i) + "/foo");
+ ++nonEmptyNodes;
+ }
+ }
+
+ reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
+ reaper.start();
+
+ timing.forWaiting().sleepABit();
+
+ Stat stat = client.checkExists().forPath("/test");
+ Assert.assertEquals(stat.getNumChildren(), nonEmptyNodes);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(reaper);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testSimple() throws Exception
+ {
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ for ( int i = 0; i < 10; ++i )
+ {
+ client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
+ }
+
+ reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
+ reaper.start();
+
+ timing.forWaiting().sleepABit();
+
+ Stat stat = client.checkExists().forPath("/test");
+ Assert.assertEquals(stat.getNumChildren(), 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(reaper);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testMultiPath() throws Exception
+ {
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ for ( int i = 0; i < 10; ++i )
+ {
+ client.create().creatingParentsIfNeeded().forPath("/test1/" + Integer.toString(i));
+ client.create().creatingParentsIfNeeded().forPath("/test2/" + Integer.toString(i));
+ client.create().creatingParentsIfNeeded().forPath("/test3/" + Integer.toString(i));
+ }
+
+ reaper = new ChildReaper(client, "/test2", Reaper.Mode.REAP_UNTIL_DELETE, 1);
+ reaper.start();
+ reaper.addPath("/test1");
+
+ timing.forWaiting().sleepABit();
+
+ Stat stat = client.checkExists().forPath("/test1");
+ Assert.assertEquals(stat.getNumChildren(), 0);
+ stat = client.checkExists().forPath("/test2");
+ Assert.assertEquals(stat.getNumChildren(), 0);
+ stat = client.checkExists().forPath("/test3");
+ Assert.assertEquals(stat.getNumChildren(), 10);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(reaper);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testNamespace() throws Exception
+ {
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .sessionTimeoutMs(timing.session())
+ .connectionTimeoutMs(timing.connection())
+ .retryPolicy(new RetryOneTime(1))
+ .namespace("foo")
+ .build();
+ try
+ {
+ client.start();
+
+ for ( int i = 0; i < 10; ++i )
+ {
+ client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
+ }
+
+ reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, 1);
+ reaper.start();
+
+ timing.forWaiting().sleepABit();
+
+ Stat stat = client.checkExists().forPath("/test");
+ Assert.assertEquals(stat.getNumChildren(), 0);
+
+ stat = client.usingNamespace(null).checkExists().forPath("/foo/test");
+ Assert.assertNotNull(stat);
+ Assert.assertEquals(stat.getNumChildren(), 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(reaper);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+}