HBASE-15292 Refined ZooKeeperWatcher to prevent ZooKeeper's callback while construction (Hiroshi Ikeda)
This commit is contained in:
parent
efcf94def7
commit
8b0ce77942
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* Placeholder of an instance which will be accessed by other threads
|
||||
* but is not yet created. Thread safe.
|
||||
*/
|
||||
class InstancePending<T> {
|
||||
// Based on a subtle part of the Java Language Specification,
|
||||
// in order to avoid a slight overhead of synchronization for each access.
|
||||
|
||||
private final CountDownLatch pendingLatch = new CountDownLatch(1);
|
||||
|
||||
/** Piggybacking on {@code pendingLatch}. */
|
||||
private InstanceHolder<T> instanceHolder;
|
||||
|
||||
private static class InstanceHolder<T> {
|
||||
// The JLS ensures the visibility of a final field and its contents
|
||||
// unless they are exposed to another thread while the construction.
|
||||
final T instance;
|
||||
|
||||
InstanceHolder(T instance) {
|
||||
this.instance = instance;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the instance given by the method {@link #prepare}.
|
||||
* This is an interruptible blocking method
|
||||
* and the interruption flag will be set just before returning if any.
|
||||
*/
|
||||
T get() {
|
||||
InstanceHolder<T> instanceHolder;
|
||||
boolean interrupted = false;
|
||||
|
||||
while ((instanceHolder = this.instanceHolder) == null) {
|
||||
try {
|
||||
pendingLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return instanceHolder.instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Associates the given instance for the method {@link #get}.
|
||||
* This method should be called once, and {@code instance} should be non-null.
|
||||
* This method is expected to call as soon as possible
|
||||
* because the method {@code get} is uninterruptibly blocked until this method is called.
|
||||
*/
|
||||
void prepare(T instance) {
|
||||
assert instance != null;
|
||||
instanceHolder = new InstanceHolder<T>(instance);
|
||||
pendingLatch.countDown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
|
||||
/**
|
||||
* Placeholder of a watcher which might be triggered before the instance is not yet created.
|
||||
* <p>
|
||||
* {@code ZooKeeper} starts its event thread within its constructor (and that is an anti-pattern),
|
||||
* and the watcher passed to the constructor might be called back by the event thread
|
||||
* before you get the instance of {@code ZooKeeper} from the constructor.
|
||||
* If your watcher calls methods of {@code ZooKeeper},
|
||||
* pass this placeholder to the constructor of the {@code ZooKeeper},
|
||||
* create your watcher using the instance of {@code ZooKeeper},
|
||||
* and then call the method {@code PendingWatcher.prepare}.
|
||||
*/
|
||||
class PendingWatcher implements Watcher {
|
||||
private final InstancePending<Watcher> pending = new InstancePending<Watcher>();
|
||||
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
pending.get().process(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Associates the substantial watcher of processing events.
|
||||
* This method should be called once, and {@code watcher} should be non-null.
|
||||
* This method is expected to call as soon as possible
|
||||
* because the event processing, being invoked by the ZooKeeper event thread,
|
||||
* is uninterruptibly blocked until this method is called.
|
||||
*/
|
||||
void prepare(Watcher watcher) {
|
||||
pending.prepare(watcher);
|
||||
}
|
||||
}
|
|
@ -73,7 +73,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
|||
private String quorum;
|
||||
|
||||
// zookeeper connection
|
||||
private RecoverableZooKeeper recoverableZooKeeper;
|
||||
private final RecoverableZooKeeper recoverableZooKeeper;
|
||||
|
||||
// abortable in case of zk failure
|
||||
protected Abortable abortable;
|
||||
|
@ -130,8 +130,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
|||
|
||||
private final Configuration conf;
|
||||
|
||||
private final Exception constructorCaller;
|
||||
|
||||
/* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
|
||||
private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
|
||||
|
||||
|
@ -162,13 +160,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
|||
Abortable abortable, boolean canCreateBaseZNode)
|
||||
throws IOException, ZooKeeperConnectionException {
|
||||
this.conf = conf;
|
||||
// Capture a stack trace now. Will print it out later if problem so we can
|
||||
// distingush amongst the myriad ZKWs.
|
||||
try {
|
||||
throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
|
||||
} catch (Exception e) {
|
||||
this.constructorCaller = e;
|
||||
}
|
||||
this.quorum = ZKConfig.getZKQuorumServersString(conf);
|
||||
this.prefix = identifier;
|
||||
// Identifier will get the sessionid appended later below down when we
|
||||
|
@ -176,7 +167,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
|||
this.identifier = identifier + "0x0";
|
||||
this.abortable = abortable;
|
||||
setNodeNames(conf);
|
||||
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
|
||||
PendingWatcher pendingWatcher = new PendingWatcher();
|
||||
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
|
||||
pendingWatcher.prepare(this);
|
||||
if (canCreateBaseZNode) {
|
||||
createBaseZNodes();
|
||||
}
|
||||
|
@ -650,27 +643,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
|||
private void connectionEvent(WatchedEvent event) {
|
||||
switch(event.getState()) {
|
||||
case SyncConnected:
|
||||
// Now, this callback can be invoked before the this.zookeeper is set.
|
||||
// Wait a little while.
|
||||
long finished = System.currentTimeMillis() +
|
||||
this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
|
||||
while (System.currentTimeMillis() < finished) {
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted while sleeping");
|
||||
throw new RuntimeException("Interrupted while waiting for" +
|
||||
" recoverableZooKeeper is set");
|
||||
}
|
||||
if (this.recoverableZooKeeper != null) break;
|
||||
}
|
||||
|
||||
if (this.recoverableZooKeeper == null) {
|
||||
LOG.error("ZK is null on connection event -- see stack trace " +
|
||||
"for the stack trace when constructor was called on this zkw",
|
||||
this.constructorCaller);
|
||||
throw new NullPointerException("ZK is null");
|
||||
}
|
||||
this.identifier = this.prefix + "-0x" +
|
||||
Long.toHexString(this.recoverableZooKeeper.getSessionId());
|
||||
// Update our identifier. Otherwise ignore.
|
||||
|
@ -759,9 +731,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
|||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
if (recoverableZooKeeper != null) {
|
||||
recoverableZooKeeper.close();
|
||||
}
|
||||
recoverableZooKeeper.close();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestInstancePending {
|
||||
@Test(timeout = 1000)
|
||||
public void test() throws Exception {
|
||||
final InstancePending<String> pending = new InstancePending<String>();
|
||||
final AtomicReference<String> getResultRef = new AtomicReference<String>();
|
||||
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
getResultRef.set(pending.get());
|
||||
}
|
||||
}.start();
|
||||
|
||||
Thread.sleep(100);
|
||||
Assert.assertNull(getResultRef.get());
|
||||
|
||||
pending.prepare("abc");
|
||||
Thread.sleep(100);
|
||||
Assert.assertEquals("abc", getResultRef.get());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue