From 8b0ce77942e99751394cc23a40436d9f20d4254c Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 5 May 2016 11:14:32 -0700 Subject: [PATCH] HBASE-15292 Refined ZooKeeperWatcher to prevent ZooKeeper's callback while construction (Hiroshi Ikeda) --- .../hbase/zookeeper/InstancePending.java | 80 +++++++++++++++++++ .../hbase/zookeeper/PendingWatcher.java | 53 ++++++++++++ .../hbase/zookeeper/ZooKeeperWatcher.java | 40 ++-------- .../hbase/zookeeper/TestInstancePending.java | 49 ++++++++++++ 4 files changed, 187 insertions(+), 35 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java new file mode 100644 index 00000000000..7464dbbcf17 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.util.concurrent.CountDownLatch; + +/** + * Placeholder of an instance which will be accessed by other threads + * but is not yet created. Thread safe. + */ +class InstancePending { + // Based on a subtle part of the Java Language Specification, + // in order to avoid a slight overhead of synchronization for each access. + + private final CountDownLatch pendingLatch = new CountDownLatch(1); + + /** Piggybacking on {@code pendingLatch}. */ + private InstanceHolder instanceHolder; + + private static class InstanceHolder { + // The JLS ensures the visibility of a final field and its contents + // unless they are exposed to another thread while the construction. + final T instance; + + InstanceHolder(T instance) { + this.instance = instance; + } + } + + /** + * Returns the instance given by the method {@link #prepare}. + * This is an interruptible blocking method + * and the interruption flag will be set just before returning if any. + */ + T get() { + InstanceHolder instanceHolder; + boolean interrupted = false; + + while ((instanceHolder = this.instanceHolder) == null) { + try { + pendingLatch.await(); + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + return instanceHolder.instance; + } + + /** + * Associates the given instance for the method {@link #get}. + * This method should be called once, and {@code instance} should be non-null. + * This method is expected to call as soon as possible + * because the method {@code get} is uninterruptibly blocked until this method is called. + */ + void prepare(T instance) { + assert instance != null; + instanceHolder = new InstanceHolder(instance); + pendingLatch.countDown(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java new file mode 100644 index 00000000000..11d0e5dfc44 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +/** + * Placeholder of a watcher which might be triggered before the instance is not yet created. + *

+ * {@code ZooKeeper} starts its event thread within its constructor (and that is an anti-pattern), + * and the watcher passed to the constructor might be called back by the event thread + * before you get the instance of {@code ZooKeeper} from the constructor. + * If your watcher calls methods of {@code ZooKeeper}, + * pass this placeholder to the constructor of the {@code ZooKeeper}, + * create your watcher using the instance of {@code ZooKeeper}, + * and then call the method {@code PendingWatcher.prepare}. + */ +class PendingWatcher implements Watcher { + private final InstancePending pending = new InstancePending(); + + @Override + public void process(WatchedEvent event) { + pending.get().process(event); + } + + /** + * Associates the substantial watcher of processing events. + * This method should be called once, and {@code watcher} should be non-null. + * This method is expected to call as soon as possible + * because the event processing, being invoked by the ZooKeeper event thread, + * is uninterruptibly blocked until this method is called. + */ + void prepare(Watcher watcher) { + pending.prepare(watcher); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 205d3970fb8..dcbcf30c67b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -73,7 +73,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private String quorum; // zookeeper connection - private RecoverableZooKeeper recoverableZooKeeper; + private final RecoverableZooKeeper recoverableZooKeeper; // abortable in case of zk failure protected Abortable abortable; @@ -130,8 +130,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private final Configuration conf; - private final Exception constructorCaller; - /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); @@ -162,13 +160,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { Abortable abortable, boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { this.conf = conf; - // Capture a stack trace now. Will print it out later if problem so we can - // distingush amongst the myriad ZKWs. - try { - throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING"); - } catch (Exception e) { - this.constructorCaller = e; - } this.quorum = ZKConfig.getZKQuorumServersString(conf); this.prefix = identifier; // Identifier will get the sessionid appended later below down when we @@ -176,7 +167,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { this.identifier = identifier + "0x0"; this.abortable = abortable; setNodeNames(conf); - this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier); + PendingWatcher pendingWatcher = new PendingWatcher(); + this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); + pendingWatcher.prepare(this); if (canCreateBaseZNode) { createBaseZNodes(); } @@ -650,27 +643,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private void connectionEvent(WatchedEvent event) { switch(event.getState()) { case SyncConnected: - // Now, this callback can be invoked before the this.zookeeper is set. - // Wait a little while. - long finished = System.currentTimeMillis() + - this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000); - while (System.currentTimeMillis() < finished) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - LOG.warn("Interrupted while sleeping"); - throw new RuntimeException("Interrupted while waiting for" + - " recoverableZooKeeper is set"); - } - if (this.recoverableZooKeeper != null) break; - } - - if (this.recoverableZooKeeper == null) { - LOG.error("ZK is null on connection event -- see stack trace " + - "for the stack trace when constructor was called on this zkw", - this.constructorCaller); - throw new NullPointerException("ZK is null"); - } this.identifier = this.prefix + "-0x" + Long.toHexString(this.recoverableZooKeeper.getSessionId()); // Update our identifier. Otherwise ignore. @@ -759,9 +731,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { @Override public void close() { try { - if (recoverableZooKeeper != null) { - recoverableZooKeeper.close(); - } + recoverableZooKeeper.close(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java new file mode 100644 index 00000000000..667fed80a33 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestInstancePending { + @Test(timeout = 1000) + public void test() throws Exception { + final InstancePending pending = new InstancePending(); + final AtomicReference getResultRef = new AtomicReference(); + + new Thread() { + @Override + public void run() { + getResultRef.set(pending.get()); + } + }.start(); + + Thread.sleep(100); + Assert.assertNull(getResultRef.get()); + + pending.prepare("abc"); + Thread.sleep(100); + Assert.assertEquals("abc", getResultRef.get()); + } +}