HBASE-3065 Retry all 'retryable' zk operations; e.g. connection loss
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1151751 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
22bb923beb
commit
b11659d679
|
@ -342,6 +342,7 @@ Release 0.91.0 - Unreleased
|
||||||
HBASE-1938 Make in-memory table scanning faster (nkeywal)
|
HBASE-1938 Make in-memory table scanning faster (nkeywal)
|
||||||
HBASE-4143 HTable.doPut(List) should check the writebuffer length every so often
|
HBASE-4143 HTable.doPut(List) should check the writebuffer length every so often
|
||||||
(Doug Meil via Ted Yu)
|
(Doug Meil via Ted Yu)
|
||||||
|
HBASE-3065 Retry all 'retryable' zk operations; e.g. connection loss (Liyin Tang)
|
||||||
|
|
||||||
TASKS
|
TASKS
|
||||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||||
|
|
|
@ -1706,8 +1706,8 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
this.servers.clear();
|
this.servers.clear();
|
||||||
if (this.zooKeeper != null) {
|
if (this.zooKeeper != null) {
|
||||||
LOG.info("Closed zookeeper sessionid=0x"
|
LOG.info("Closed zookeeper sessionid=0x" +
|
||||||
+ Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
|
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
|
||||||
this.zooKeeper.close();
|
this.zooKeeper.close();
|
||||||
this.zooKeeper = null;
|
this.zooKeeper = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1145,7 +1145,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString());
|
LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString());
|
||||||
// Async exists to set a watcher so we'll get triggered when
|
// Async exists to set a watcher so we'll get triggered when
|
||||||
// unassigned node changes.
|
// unassigned node changes.
|
||||||
this.zkw.getZooKeeper().exists(path, this.zkw,
|
this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
|
||||||
new ExistsUnassignedAsyncCallback(this.counter, destination), ctx);
|
new ExistsUnassignedAsyncCallback(this.counter, destination), ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -370,7 +370,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
|
|
||||||
LOG.info("Server active/primary master; " + this.serverName +
|
LOG.info("Server active/primary master; " + this.serverName +
|
||||||
", sessionid=0x" +
|
", sessionid=0x" +
|
||||||
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) +
|
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
|
||||||
", cluster-up flag was=" + wasUp);
|
", cluster-up flag was=" + wasUp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -197,6 +197,7 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* if there was an error while splitting any log file
|
* if there was an error while splitting any log file
|
||||||
* @return cumulative size of the logfiles split
|
* @return cumulative size of the logfiles split
|
||||||
|
* @throws KeeperException
|
||||||
*/
|
*/
|
||||||
public long splitLogDistributed(final Path logDir) throws IOException {
|
public long splitLogDistributed(final Path logDir) throws IOException {
|
||||||
List<Path> logDirs = new ArrayList<Path>();
|
List<Path> logDirs = new ArrayList<Path>();
|
||||||
|
@ -370,7 +371,8 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
|
|
||||||
|
|
||||||
private void getDataSetWatch(String path, Long retry_count) {
|
private void getDataSetWatch(String path, Long retry_count) {
|
||||||
this.watcher.getZooKeeper().getData(path, this.watcher,
|
this.watcher.getRecoverableZooKeeper().getZooKeeper().
|
||||||
|
getData(path, this.watcher,
|
||||||
new GetDataAsyncCallback(), retry_count);
|
new GetDataAsyncCallback(), retry_count);
|
||||||
tot_mgr_get_data_queued.incrementAndGet();
|
tot_mgr_get_data_queued.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
@ -524,7 +526,8 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
|
|
||||||
private void deleteNode(String path, Long retries) {
|
private void deleteNode(String path, Long retries) {
|
||||||
tot_mgr_node_delete_queued.incrementAndGet();
|
tot_mgr_node_delete_queued.incrementAndGet();
|
||||||
this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(),
|
this.watcher.getRecoverableZooKeeper().getZooKeeper().
|
||||||
|
delete(path, -1, new DeleteAsyncCallback(),
|
||||||
retries);
|
retries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,9 +554,11 @@ public class SplitLogManager extends ZooKeeperListener {
|
||||||
/**
|
/**
|
||||||
* signal the workers that a task was resubmitted by creating the
|
* signal the workers that a task was resubmitted by creating the
|
||||||
* RESCAN node.
|
* RESCAN node.
|
||||||
|
* @throws KeeperException
|
||||||
*/
|
*/
|
||||||
private void createRescanNode(long retries) {
|
private void createRescanNode(long retries) {
|
||||||
watcher.getZooKeeper().create(ZKSplitLog.getRescanNode(watcher),
|
this.watcher.getRecoverableZooKeeper().getZooKeeper().
|
||||||
|
create(ZKSplitLog.getRescanNode(watcher),
|
||||||
TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT_SEQUENTIAL,
|
CreateMode.PERSISTENT_SEQUENTIAL,
|
||||||
new CreateRescanAsyncCallback(), new Long(retries));
|
new CreateRescanAsyncCallback(), new Long(retries));
|
||||||
|
|
|
@ -879,7 +879,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
LOG.info("Serving as " + this.serverNameFromMasterPOV +
|
LOG.info("Serving as " + this.serverNameFromMasterPOV +
|
||||||
", RPC listening on " + this.isa +
|
", RPC listening on " + this.isa +
|
||||||
", sessionid=0x" +
|
", sessionid=0x" +
|
||||||
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
|
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
|
||||||
isOnline = true;
|
isOnline = true;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
this.isOnline = false;
|
this.isOnline = false;
|
||||||
|
|
|
@ -329,7 +329,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
*/
|
*/
|
||||||
private boolean ownTask(boolean isFirstTime) {
|
private boolean ownTask(boolean isFirstTime) {
|
||||||
try {
|
try {
|
||||||
Stat stat = this.watcher.getZooKeeper().setData(currentTask,
|
Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask,
|
||||||
TaskState.TASK_OWNED.get(serverName), currentVersion);
|
TaskState.TASK_OWNED.get(serverName), currentVersion);
|
||||||
if (stat == null) {
|
if (stat == null) {
|
||||||
LOG.warn("zk.setData() returned null for path " + currentTask);
|
LOG.warn("zk.setData() returned null for path " + currentTask);
|
||||||
|
@ -392,7 +392,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
void getDataSetWatchAsync() {
|
void getDataSetWatchAsync() {
|
||||||
this.watcher.getZooKeeper().getData(currentTask, this.watcher,
|
this.watcher.getRecoverableZooKeeper().getZooKeeper().
|
||||||
|
getData(currentTask, this.watcher,
|
||||||
new GetDataAsyncCallback(), null);
|
new GetDataAsyncCallback(), null);
|
||||||
tot_wkr_get_data_queued.incrementAndGet();
|
tot_wkr_get_data_queued.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
|
@ -311,6 +311,15 @@ public class Bytes {
|
||||||
return toStringBinary(b, 0, b.length);
|
return toStringBinary(b, 0, b.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The same as {@link #toStringBinary(byte[])}, but returns a string "null"
|
||||||
|
* if given a null argument.
|
||||||
|
*/
|
||||||
|
public static String toStringBinarySafe(final byte [] b) {
|
||||||
|
if (b == null)
|
||||||
|
return "null";
|
||||||
|
return toStringBinary(b, 0, b.length);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Write a printable representation of a byte array. Non-printable
|
* Write a printable representation of a byte array. Non-printable
|
||||||
* characters are hex escaped in the format \\x%02X, eg:
|
* characters are hex escaped in the format \\x%02X, eg:
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2011 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class RetryCounter {
|
||||||
|
private final int maxRetries;
|
||||||
|
private int retriesRemaining;
|
||||||
|
private final int retryIntervalMillis;
|
||||||
|
private final TimeUnit timeUnit;
|
||||||
|
|
||||||
|
public RetryCounter(int maxRetries,
|
||||||
|
int retryIntervalMillis, TimeUnit timeUnit) {
|
||||||
|
this.maxRetries = maxRetries;
|
||||||
|
this.retriesRemaining = maxRetries;
|
||||||
|
this.retryIntervalMillis = retryIntervalMillis;
|
||||||
|
this.timeUnit = timeUnit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxRetries() {
|
||||||
|
return maxRetries;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sleepUntilNextRetry() throws InterruptedException {
|
||||||
|
timeUnit.sleep(retryIntervalMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean shouldRetry() {
|
||||||
|
return retriesRemaining > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void useRetry() {
|
||||||
|
retriesRemaining--;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAttemptTimes() {
|
||||||
|
return maxRetries-retriesRemaining+1;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2011 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class RetryCounterFactory {
|
||||||
|
private final int maxRetries;
|
||||||
|
private final int retryIntervalMillis;
|
||||||
|
|
||||||
|
public RetryCounterFactory(int maxRetries, int retryIntervalMillis) {
|
||||||
|
this.maxRetries = maxRetries;
|
||||||
|
this.retryIntervalMillis = retryIntervalMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RetryCounter create() {
|
||||||
|
return new RetryCounter(
|
||||||
|
maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,661 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2011 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.zookeeper;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
|
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||||
|
import org.apache.zookeeper.AsyncCallback;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.Watcher;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.ZooKeeper.States;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A zookeeper that can handle 'recoverable' errors.
|
||||||
|
* To handle recoverable errors, developers need to realize that there are two
|
||||||
|
* classes of requests: idempotent and non-idempotent requests. Read requests
|
||||||
|
* and unconditional sets and deletes are examples of idempotent requests, they
|
||||||
|
* can be reissued with the same results.
|
||||||
|
* (Although, the delete may throw a NoNodeException on reissue its effect on
|
||||||
|
* the ZooKeeper state is the same.) Non-idempotent requests need special
|
||||||
|
* handling, application and library writers need to keep in mind that they may
|
||||||
|
* need to encode information in the data or name of znodes to detect
|
||||||
|
* retries. A simple example is a create that uses a sequence flag.
|
||||||
|
* If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
|
||||||
|
* loss exception, that process will reissue another
|
||||||
|
* create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
|
||||||
|
* getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
|
||||||
|
* that x-109 was the result of the previous create, so the process actually
|
||||||
|
* owns both x-109 and x-111. An easy way around this is to use "x-process id-"
|
||||||
|
* when doing the create. If the process is using an id of 352, before reissuing
|
||||||
|
* the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
|
||||||
|
* "x-352-109", x-333-110". The process will know that the original create
|
||||||
|
* succeeded an the znode it created is "x-352-109".
|
||||||
|
* @see http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
|
||||||
|
*/
|
||||||
|
public class RecoverableZooKeeper {
|
||||||
|
private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
|
||||||
|
// the actual ZooKeeper client instance
|
||||||
|
private ZooKeeper zk;
|
||||||
|
private final RetryCounterFactory retryCounterFactory;
|
||||||
|
// An identifier of this process in the cluster
|
||||||
|
private final String identifier;
|
||||||
|
private final byte[] id;
|
||||||
|
private int retryIntervalMillis;
|
||||||
|
|
||||||
|
private static final int ID_OFFSET = Bytes.SIZEOF_INT;
|
||||||
|
// the magic number is to be backward compatible
|
||||||
|
private static final byte MAGIC =(byte) 0XFF;
|
||||||
|
private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE;
|
||||||
|
|
||||||
|
public RecoverableZooKeeper(String quorumServers, int seesionTimeout,
|
||||||
|
Watcher watcher, int maxRetries, int retryIntervalMillis)
|
||||||
|
throws IOException {
|
||||||
|
this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher);
|
||||||
|
this.retryCounterFactory =
|
||||||
|
new RetryCounterFactory(maxRetries, retryIntervalMillis);
|
||||||
|
this.retryIntervalMillis = retryIntervalMillis;
|
||||||
|
|
||||||
|
// the identifier = processID@hostName
|
||||||
|
this.identifier = ManagementFactory.getRuntimeMXBean().getName();
|
||||||
|
LOG.info("The identifier of this process is " + identifier);
|
||||||
|
this.id = Bytes.toBytes(identifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* delete is an idempotent operation. Retry before throw out exception.
|
||||||
|
* This function will not throw out NoNodeException if the path is not existed
|
||||||
|
* @param path
|
||||||
|
* @param version
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
public void delete(String path, int version)
|
||||||
|
throws InterruptedException, KeeperException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
boolean isRetry = false; // False for first attempt, true for all retries.
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
zk.delete(path, version);
|
||||||
|
return;
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case NONODE:
|
||||||
|
if (isRetry) {
|
||||||
|
LOG.info("Node " + path + " already deleted. Assuming that a " +
|
||||||
|
"previous attempt succeeded.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.warn("Node " + path + " already deleted, and this is not a " +
|
||||||
|
"retry");
|
||||||
|
throw e;
|
||||||
|
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper delete failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
isRetry = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* exists is an idempotent operation. Retry before throw out exception
|
||||||
|
* @param path
|
||||||
|
* @param watcher
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public Stat exists(String path, Watcher watcher)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return zk.exists(path, watcher);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper exists failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* exists is an idempotent operation. Retry before throw out exception
|
||||||
|
* @param path
|
||||||
|
* @param watch
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public Stat exists(String path, boolean watch)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return zk.exists(path, watch);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper exists failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getChildren is an idempotent operation. Retry before throw out exception
|
||||||
|
* @param path
|
||||||
|
* @param watcher
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public List<String> getChildren(String path, Watcher watcher)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return zk.getChildren(path, watcher);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper getChildren failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getChildren is an idempotent operation. Retry before throw out exception
|
||||||
|
* @param path
|
||||||
|
* @param watch
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public List<String> getChildren(String path, boolean watch)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return zk.getChildren(path, watch);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper getChildren failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getData is an idempotent operation. Retry before throw out exception
|
||||||
|
* @param path
|
||||||
|
* @param watcher
|
||||||
|
* @param stat
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public byte[] getData(String path, Watcher watcher, Stat stat)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
byte[] revData = zk.getData(path, watcher, stat);
|
||||||
|
return this.removeMetaData(revData);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper getData failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getData is an idemnpotent operation. Retry before throw out exception
|
||||||
|
* @param path
|
||||||
|
* @param watch
|
||||||
|
* @param stat
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public byte[] getData(String path, boolean watch, Stat stat)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
byte[] revData = zk.getData(path, watch, stat);
|
||||||
|
return this.removeMetaData(revData);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper getData failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* setData is NOT an idempotent operation. Retry may cause BadVersion Exception
|
||||||
|
* Adding an identifier field into the data to check whether
|
||||||
|
* badversion is caused by the result of previous correctly setData
|
||||||
|
* @param path
|
||||||
|
* @param data
|
||||||
|
* @param version
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public Stat setData(String path, byte[] data, int version)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
byte[] newData = appendMetaData(data);
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return zk.setData(path, newData, version);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper setData failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case BADVERSION:
|
||||||
|
// try to verify whether the previous setData success or not
|
||||||
|
try{
|
||||||
|
Stat stat = new Stat();
|
||||||
|
byte[] revData = zk.getData(path, false, stat);
|
||||||
|
int idLength = Bytes.toInt(revData, ID_OFFSET);
|
||||||
|
int dataLength = revData.length-ID_OFFSET-idLength;
|
||||||
|
int dataOffset = ID_OFFSET+idLength;
|
||||||
|
|
||||||
|
if(Bytes.compareTo(revData, ID_OFFSET, id.length,
|
||||||
|
revData, dataOffset, dataLength) == 0) {
|
||||||
|
// the bad version is caused by previous successful setData
|
||||||
|
return stat;
|
||||||
|
}
|
||||||
|
} catch(KeeperException keeperException){
|
||||||
|
// the ZK is not reliable at this moment. just throw out exception
|
||||||
|
throw keeperException;
|
||||||
|
}
|
||||||
|
|
||||||
|
// throw out other exceptions and verified bad version exceptions
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* NONSEQUENTIAL create is idempotent operation.
|
||||||
|
* Retry before throw out exceptions.
|
||||||
|
* But this function will not throw out the NodeExist exception back to the
|
||||||
|
* application.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* But SEQUENTIAL is NOT idempotent operation. It is necessary to add
|
||||||
|
* identifier to the path to verify, whether the previous one is successful
|
||||||
|
* or not.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param path
|
||||||
|
* @param data
|
||||||
|
* @param acl
|
||||||
|
* @param createMode
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public String create(String path, byte[] data, List<ACL> acl,
|
||||||
|
CreateMode createMode)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
byte[] newData = appendMetaData(data);
|
||||||
|
switch (createMode) {
|
||||||
|
case EPHEMERAL:
|
||||||
|
case PERSISTENT:
|
||||||
|
return createNonSequential(path, newData, acl, createMode);
|
||||||
|
|
||||||
|
case EPHEMERAL_SEQUENTIAL:
|
||||||
|
case PERSISTENT_SEQUENTIAL:
|
||||||
|
return createSequential(path, newData, acl, createMode);
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Unrecognized CreateMode: " +
|
||||||
|
createMode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createNonSequential(String path, byte[] data, List<ACL> acl,
|
||||||
|
CreateMode createMode) throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
boolean isRetry = false; // False for first attempt, true for all retries.
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return zk.create(path, data, acl, createMode);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case NODEEXISTS:
|
||||||
|
if (isRetry) {
|
||||||
|
// If the connection was lost, there is still a possibility that
|
||||||
|
// we have successfully created the node at our previous attempt,
|
||||||
|
// so we read the node and compare.
|
||||||
|
byte[] currentData = zk.getData(path, false, null);
|
||||||
|
if (currentData != null &&
|
||||||
|
Bytes.compareTo(currentData, data) == 0) {
|
||||||
|
// We successfully created a non-sequential node
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
LOG.error("Node " + path + " already exists with " +
|
||||||
|
Bytes.toStringBinarySafe(currentData) + ", could not write " +
|
||||||
|
Bytes.toStringBinarySafe(data));
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
LOG.error("Node " + path + " already exists and this is not a " +
|
||||||
|
"retry");
|
||||||
|
throw e;
|
||||||
|
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper create failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
isRetry = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createSequential(String path, byte[] data,
|
||||||
|
List<ACL> acl, CreateMode createMode)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
boolean first = true;
|
||||||
|
String newPath = path+this.identifier;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
if (!first) {
|
||||||
|
// Check if we succeeded on a previous attempt
|
||||||
|
String previousResult = findPreviousSequentialNode(newPath);
|
||||||
|
if (previousResult != null) {
|
||||||
|
return previousResult;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
first = false;
|
||||||
|
return zk.create(newPath, data, acl, createMode);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
LOG.warn("Possibly transient ZooKeeper exception: " + e);
|
||||||
|
if (!retryCounter.shouldRetry()) {
|
||||||
|
LOG.error("ZooKeeper create failed after "
|
||||||
|
+ retryCounter.getMaxRetries() + " retries");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
|
||||||
|
"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String findPreviousSequentialNode(String path)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
int lastSlashIdx = path.lastIndexOf('/');
|
||||||
|
assert(lastSlashIdx != -1);
|
||||||
|
String parent = path.substring(0, lastSlashIdx);
|
||||||
|
String nodePrefix = path.substring(lastSlashIdx+1);
|
||||||
|
|
||||||
|
List<String> nodes = zk.getChildren(parent, false);
|
||||||
|
List<String> matching = filterByPrefix(nodes, nodePrefix);
|
||||||
|
for (String node : matching) {
|
||||||
|
String nodePath = parent + "/" + node;
|
||||||
|
Stat stat = zk.exists(nodePath, false);
|
||||||
|
if (stat != null) {
|
||||||
|
return nodePath;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] removeMetaData(byte[] data) {
|
||||||
|
if(data == null || data.length == 0) {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
// check the magic data; to be backward compatible
|
||||||
|
byte magic = data[0];
|
||||||
|
if(magic != MAGIC) {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
int idLength = Bytes.toInt(data, MAGIC_OFFSET);
|
||||||
|
int dataLength = data.length-MAGIC_OFFSET-ID_OFFSET-idLength;
|
||||||
|
int dataOffset = MAGIC_OFFSET+ID_OFFSET+idLength;
|
||||||
|
|
||||||
|
byte[] newData = new byte[dataLength];
|
||||||
|
System.arraycopy(data, dataOffset, newData, 0, dataLength);
|
||||||
|
|
||||||
|
return newData;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] appendMetaData(byte[] data) {
|
||||||
|
if(data == null){
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] newData = new byte[MAGIC_OFFSET+ID_OFFSET+id.length+data.length];
|
||||||
|
int pos = 0;
|
||||||
|
pos = Bytes.putByte(newData, pos, MAGIC);
|
||||||
|
pos = Bytes.putInt(newData, pos, id.length);
|
||||||
|
pos = Bytes.putBytes(newData, pos, id, 0, id.length);
|
||||||
|
pos = Bytes.putBytes(newData, pos, data, 0, data.length);
|
||||||
|
|
||||||
|
return newData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSessionId() {
|
||||||
|
return zk.getSessionId();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws InterruptedException {
|
||||||
|
zk.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public States getState() {
|
||||||
|
return zk.getState();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ZooKeeper getZooKeeper() {
|
||||||
|
return zk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getSessionPasswd() {
|
||||||
|
return zk.getSessionPasswd();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
|
||||||
|
this.zk.sync(path, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filters the given node list by the given prefixes.
|
||||||
|
* This method is all-inclusive--if any element in the node list starts
|
||||||
|
* with any of the given prefixes, then it is included in the result.
|
||||||
|
*
|
||||||
|
* @param nodes the nodes to filter
|
||||||
|
* @param prefixes the prefixes to include in the result
|
||||||
|
* @return list of every element that starts with one of the prefixes
|
||||||
|
*/
|
||||||
|
private static List<String> filterByPrefix(List<String> nodes,
|
||||||
|
String... prefixes) {
|
||||||
|
List<String> lockChildren = new ArrayList<String>();
|
||||||
|
for (String child : nodes){
|
||||||
|
for (String prefix : prefixes){
|
||||||
|
if (child.startsWith(prefix)){
|
||||||
|
lockChildren.add(child);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lockChildren;
|
||||||
|
}
|
||||||
|
}
|
|
@ -70,20 +70,20 @@ public class ZKUtil {
|
||||||
* @return connection to zookeeper
|
* @return connection to zookeeper
|
||||||
* @throws IOException if unable to connect to zk or config problem
|
* @throws IOException if unable to connect to zk or config problem
|
||||||
*/
|
*/
|
||||||
public static ZooKeeper connect(Configuration conf, Watcher watcher)
|
public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Properties properties = ZKConfig.makeZKProps(conf);
|
Properties properties = ZKConfig.makeZKProps(conf);
|
||||||
String ensemble = ZKConfig.getZKQuorumServersString(properties);
|
String ensemble = ZKConfig.getZKQuorumServersString(properties);
|
||||||
return connect(conf, ensemble, watcher);
|
return connect(conf, ensemble, watcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ZooKeeper connect(Configuration conf, String ensemble,
|
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
|
||||||
Watcher watcher)
|
Watcher watcher)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return connect(conf, ensemble, watcher, "");
|
return connect(conf, ensemble, watcher, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ZooKeeper connect(Configuration conf, String ensemble,
|
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
|
||||||
Watcher watcher, final String descriptor)
|
Watcher watcher, final String descriptor)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if(ensemble == null) {
|
if(ensemble == null) {
|
||||||
|
@ -92,7 +92,11 @@ public class ZKUtil {
|
||||||
int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000);
|
int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000);
|
||||||
LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" +
|
LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" +
|
||||||
ensemble + ")");
|
ensemble + ")");
|
||||||
return new ZooKeeper(ensemble, timeout, watcher);
|
int retry = conf.getInt("zookeeper.recovery.retry", 3);
|
||||||
|
int retryIntervalMillis =
|
||||||
|
conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
|
||||||
|
return new RecoverableZooKeeper(ensemble, timeout, watcher,
|
||||||
|
retry, retryIntervalMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
@ -214,7 +218,7 @@ public class ZKUtil {
|
||||||
public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
|
public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
Stat s = zkw.getZooKeeper().exists(znode, zkw);
|
Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
|
||||||
LOG.debug(zkw.prefix("Set watcher on existing znode " + znode));
|
LOG.debug(zkw.prefix("Set watcher on existing znode " + znode));
|
||||||
return s != null ? true : false;
|
return s != null ? true : false;
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
|
@ -242,7 +246,7 @@ public class ZKUtil {
|
||||||
public static int checkExists(ZooKeeperWatcher zkw, String znode)
|
public static int checkExists(ZooKeeperWatcher zkw, String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
Stat s = zkw.getZooKeeper().exists(znode, null);
|
Stat s = zkw.getRecoverableZooKeeper().exists(znode, null);
|
||||||
return s != null ? s.getVersion() : -1;
|
return s != null ? s.getVersion() : -1;
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
|
LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
|
||||||
|
@ -279,7 +283,7 @@ public class ZKUtil {
|
||||||
ZooKeeperWatcher zkw, String znode)
|
ZooKeeperWatcher zkw, String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
List<String> children = zkw.getZooKeeper().getChildren(znode, zkw);
|
List<String> children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
|
||||||
return children;
|
return children;
|
||||||
} catch(KeeperException.NoNodeException ke) {
|
} catch(KeeperException.NoNodeException ke) {
|
||||||
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
|
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
|
||||||
|
@ -339,7 +343,7 @@ public class ZKUtil {
|
||||||
List<String> children = null;
|
List<String> children = null;
|
||||||
try {
|
try {
|
||||||
// List the children without watching
|
// List the children without watching
|
||||||
children = zkw.getZooKeeper().getChildren(znode, null);
|
children = zkw.getRecoverableZooKeeper().getChildren(znode, null);
|
||||||
} catch(KeeperException.NoNodeException nne) {
|
} catch(KeeperException.NoNodeException nne) {
|
||||||
return null;
|
return null;
|
||||||
} catch(InterruptedException ie) {
|
} catch(InterruptedException ie) {
|
||||||
|
@ -389,7 +393,7 @@ public class ZKUtil {
|
||||||
public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode)
|
public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
return !zkw.getZooKeeper().getChildren(znode, null).isEmpty();
|
return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty();
|
||||||
} catch(KeeperException.NoNodeException ke) {
|
} catch(KeeperException.NoNodeException ke) {
|
||||||
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
|
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
|
||||||
"because node does not exist (not an error)"));
|
"because node does not exist (not an error)"));
|
||||||
|
@ -421,7 +425,7 @@ public class ZKUtil {
|
||||||
public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode)
|
public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
Stat stat = zkw.getZooKeeper().exists(znode, null);
|
Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null);
|
||||||
return stat == null ? 0 : stat.getNumChildren();
|
return stat == null ? 0 : stat.getNumChildren();
|
||||||
} catch(KeeperException e) {
|
} catch(KeeperException e) {
|
||||||
LOG.warn(zkw.prefix("Unable to get children of node " + znode));
|
LOG.warn(zkw.prefix("Unable to get children of node " + znode));
|
||||||
|
@ -443,7 +447,7 @@ public class ZKUtil {
|
||||||
public static byte [] getData(ZooKeeperWatcher zkw, String znode)
|
public static byte [] getData(ZooKeeperWatcher zkw, String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
byte [] data = zkw.getZooKeeper().getData(znode, null, null);
|
byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null);
|
||||||
logRetrievedMsg(zkw, znode, data, false);
|
logRetrievedMsg(zkw, znode, data, false);
|
||||||
return data;
|
return data;
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
|
@ -475,7 +479,7 @@ public class ZKUtil {
|
||||||
public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode)
|
public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
byte [] data = zkw.getZooKeeper().getData(znode, zkw, null);
|
byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, null);
|
||||||
logRetrievedMsg(zkw, znode, data, true);
|
logRetrievedMsg(zkw, znode, data, true);
|
||||||
return data;
|
return data;
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
|
@ -512,7 +516,7 @@ public class ZKUtil {
|
||||||
Stat stat)
|
Stat stat)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
byte [] data = zkw.getZooKeeper().getData(znode, null, stat);
|
byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat);
|
||||||
logRetrievedMsg(zkw, znode, data, false);
|
logRetrievedMsg(zkw, znode, data, false);
|
||||||
return data;
|
return data;
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
|
@ -549,7 +553,7 @@ public class ZKUtil {
|
||||||
byte [] data, int expectedVersion)
|
byte [] data, int expectedVersion)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
zkw.getZooKeeper().setData(znode, data, expectedVersion);
|
zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion);
|
||||||
} catch(InterruptedException ie) {
|
} catch(InterruptedException ie) {
|
||||||
zkw.interruptedException(ie);
|
zkw.interruptedException(ie);
|
||||||
}
|
}
|
||||||
|
@ -583,7 +587,7 @@ public class ZKUtil {
|
||||||
byte [] data, int expectedVersion)
|
byte [] data, int expectedVersion)
|
||||||
throws KeeperException, KeeperException.NoNodeException {
|
throws KeeperException, KeeperException.NoNodeException {
|
||||||
try {
|
try {
|
||||||
return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null;
|
return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null;
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
zkw.interruptedException(e);
|
zkw.interruptedException(e);
|
||||||
return false;
|
return false;
|
||||||
|
@ -654,7 +658,7 @@ public class ZKUtil {
|
||||||
String znode, byte [] data)
|
String znode, byte [] data)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
|
zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.EPHEMERAL);
|
CreateMode.EPHEMERAL);
|
||||||
} catch (KeeperException.NodeExistsException nee) {
|
} catch (KeeperException.NodeExistsException nee) {
|
||||||
if(!watchAndCheckExists(zkw, znode)) {
|
if(!watchAndCheckExists(zkw, znode)) {
|
||||||
|
@ -693,11 +697,11 @@ public class ZKUtil {
|
||||||
ZooKeeperWatcher zkw, String znode, byte [] data)
|
ZooKeeperWatcher zkw, String znode, byte [] data)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
|
zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
} catch (KeeperException.NodeExistsException nee) {
|
} catch (KeeperException.NodeExistsException nee) {
|
||||||
try {
|
try {
|
||||||
zkw.getZooKeeper().exists(znode, zkw);
|
zkw.getRecoverableZooKeeper().exists(znode, zkw);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
zkw.interruptedException(e);
|
zkw.interruptedException(e);
|
||||||
return false;
|
return false;
|
||||||
|
@ -730,9 +734,9 @@ public class ZKUtil {
|
||||||
String znode, byte [] data)
|
String znode, byte [] data)
|
||||||
throws KeeperException, KeeperException.NodeExistsException {
|
throws KeeperException, KeeperException.NodeExistsException {
|
||||||
try {
|
try {
|
||||||
zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
|
zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
return zkw.getZooKeeper().exists(znode, zkw).getVersion();
|
return zkw.getRecoverableZooKeeper().exists(znode, zkw).getVersion();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
zkw.interruptedException(e);
|
zkw.interruptedException(e);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -757,7 +761,7 @@ public class ZKUtil {
|
||||||
public static void asyncCreate(ZooKeeperWatcher zkw,
|
public static void asyncCreate(ZooKeeperWatcher zkw,
|
||||||
String znode, byte [] data, final AsyncCallback.StringCallback cb,
|
String znode, byte [] data, final AsyncCallback.StringCallback cb,
|
||||||
final Object ctx) {
|
final Object ctx) {
|
||||||
zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
|
zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT, cb, ctx);
|
CreateMode.PERSISTENT, cb, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -775,7 +779,7 @@ public class ZKUtil {
|
||||||
String znode)
|
String znode)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
ZooKeeper zk = zkw.getZooKeeper();
|
RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
|
||||||
if (zk.exists(znode, false) == null) {
|
if (zk.exists(znode, false) == null) {
|
||||||
zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
@ -783,7 +787,7 @@ public class ZKUtil {
|
||||||
} catch(KeeperException.NodeExistsException nee) {
|
} catch(KeeperException.NodeExistsException nee) {
|
||||||
} catch(KeeperException.NoAuthException nee){
|
} catch(KeeperException.NoAuthException nee){
|
||||||
try {
|
try {
|
||||||
if (null == zkw.getZooKeeper().exists(znode, false)) {
|
if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) {
|
||||||
// If we failed to create the file and it does not already exist.
|
// If we failed to create the file and it does not already exist.
|
||||||
throw(nee);
|
throw(nee);
|
||||||
}
|
}
|
||||||
|
@ -813,7 +817,7 @@ public class ZKUtil {
|
||||||
if(znode == null) {
|
if(znode == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
zkw.getRecoverableZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
} catch(KeeperException.NodeExistsException nee) {
|
} catch(KeeperException.NodeExistsException nee) {
|
||||||
return;
|
return;
|
||||||
|
@ -845,7 +849,7 @@ public class ZKUtil {
|
||||||
int version)
|
int version)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
zkw.getZooKeeper().delete(node, version);
|
zkw.getRecoverableZooKeeper().delete(node, version);
|
||||||
return true;
|
return true;
|
||||||
} catch(KeeperException.BadVersionException bve) {
|
} catch(KeeperException.BadVersionException bve) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -864,7 +868,7 @@ public class ZKUtil {
|
||||||
public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
|
public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
zkw.getZooKeeper().delete(node, -1);
|
zkw.getRecoverableZooKeeper().delete(node, -1);
|
||||||
} catch(KeeperException.NoNodeException nne) {
|
} catch(KeeperException.NoNodeException nne) {
|
||||||
} catch(InterruptedException ie) {
|
} catch(InterruptedException ie) {
|
||||||
zkw.interruptedException(ie);
|
zkw.interruptedException(ie);
|
||||||
|
@ -886,7 +890,7 @@ public class ZKUtil {
|
||||||
deleteNodeRecursively(zkw, joinZNode(node, child));
|
deleteNodeRecursively(zkw, joinZNode(node, child));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
zkw.getZooKeeper().delete(node, -1);
|
zkw.getRecoverableZooKeeper().delete(node, -1);
|
||||||
} catch(InterruptedException ie) {
|
} catch(InterruptedException ie) {
|
||||||
zkw.interruptedException(ie);
|
zkw.interruptedException(ie);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.zookeeper;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
import org.apache.zookeeper.Watcher;
|
import org.apache.zookeeper.Watcher;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acts as the single ZooKeeper Watcher. One instance of this is instantiated
|
* Acts as the single ZooKeeper Watcher. One instance of this is instantiated
|
||||||
|
@ -58,7 +56,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
private String quorum;
|
private String quorum;
|
||||||
|
|
||||||
// zookeeper connection
|
// zookeeper connection
|
||||||
private ZooKeeper zooKeeper;
|
private RecoverableZooKeeper recoverableZooKeeper;
|
||||||
|
|
||||||
// abortable in case of zk failure
|
// abortable in case of zk failure
|
||||||
private Abortable abortable;
|
private Abortable abortable;
|
||||||
|
@ -116,51 +114,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
this.identifier = descriptor;
|
this.identifier = descriptor;
|
||||||
this.abortable = abortable;
|
this.abortable = abortable;
|
||||||
setNodeNames(conf);
|
setNodeNames(conf);
|
||||||
this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
|
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
|
||||||
try {
|
try {
|
||||||
// Create all the necessary "directories" of znodes
|
// Create all the necessary "directories" of znodes
|
||||||
// TODO: Move this to an init method somewhere so not everyone calls it?
|
// TODO: Move this to an init method somewhere so not everyone calls it?
|
||||||
|
|
||||||
// The first call against zk can fail with connection loss. Seems common.
|
|
||||||
// Apparently this is recoverable. Retry a while.
|
|
||||||
// See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
|
|
||||||
// TODO: Generalize out in ZKUtil.
|
|
||||||
long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
|
|
||||||
HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME);
|
|
||||||
long finished = System.currentTimeMillis() + wait;
|
|
||||||
KeeperException ke = null;
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
ZKUtil.createAndFailSilent(this, baseZNode);
|
ZKUtil.createAndFailSilent(this, baseZNode);
|
||||||
ke = null;
|
|
||||||
break;
|
|
||||||
} catch (KeeperException.ConnectionLossException e) {
|
|
||||||
if (LOG.isDebugEnabled() && (isFinishedRetryingRecoverable(finished))) {
|
|
||||||
LOG.debug("Retrying zk create for another " +
|
|
||||||
(finished - System.currentTimeMillis()) +
|
|
||||||
"ms; set 'hbase.zookeeper.recoverable.waittime' to change " +
|
|
||||||
"wait time); " + e.getMessage());
|
|
||||||
}
|
|
||||||
ke = e;
|
|
||||||
}
|
|
||||||
} while (isFinishedRetryingRecoverable(finished));
|
|
||||||
// Convert connectionloss exception to ZKCE.
|
|
||||||
if (ke != null) {
|
|
||||||
try {
|
|
||||||
// If we don't close it, the zk connection managers won't be killed
|
|
||||||
this.zooKeeper.close();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
LOG.warn("Interrupted while closing", e);
|
|
||||||
}
|
|
||||||
throw new ZooKeeperConnectionException("HBase is able to connect to" +
|
|
||||||
" ZooKeeper but the connection closes immediately. This could be" +
|
|
||||||
" a sign that the server has too many connections (30 is the" +
|
|
||||||
" default). Consider inspecting your ZK server logs for that" +
|
|
||||||
" error and then make sure you are reusing HBaseConfiguration" +
|
|
||||||
" as often as you can. See HTable's javadoc for more information.",
|
|
||||||
ke);
|
|
||||||
}
|
|
||||||
ZKUtil.createAndFailSilent(this, assignmentZNode);
|
ZKUtil.createAndFailSilent(this, assignmentZNode);
|
||||||
ZKUtil.createAndFailSilent(this, rsZNode);
|
ZKUtil.createAndFailSilent(this, rsZNode);
|
||||||
ZKUtil.createAndFailSilent(this, tableZNode);
|
ZKUtil.createAndFailSilent(this, tableZNode);
|
||||||
|
@ -235,8 +193,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
* Get the connection to ZooKeeper.
|
* Get the connection to ZooKeeper.
|
||||||
* @return connection reference to zookeeper
|
* @return connection reference to zookeeper
|
||||||
*/
|
*/
|
||||||
public ZooKeeper getZooKeeper() {
|
public RecoverableZooKeeper getRecoverableZooKeeper() {
|
||||||
return zooKeeper;
|
return recoverableZooKeeper;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -321,16 +279,16 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
|
this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
|
||||||
while (System.currentTimeMillis() < finished) {
|
while (System.currentTimeMillis() < finished) {
|
||||||
Threads.sleep(1);
|
Threads.sleep(1);
|
||||||
if (this.zooKeeper != null) break;
|
if (this.recoverableZooKeeper != null) break;
|
||||||
}
|
}
|
||||||
if (this.zooKeeper == null) {
|
if (this.recoverableZooKeeper == null) {
|
||||||
LOG.error("ZK is null on connection event -- see stack trace " +
|
LOG.error("ZK is null on connection event -- see stack trace " +
|
||||||
"for the stack trace when constructor was called on this zkw",
|
"for the stack trace when constructor was called on this zkw",
|
||||||
this.constructorCaller);
|
this.constructorCaller);
|
||||||
throw new NullPointerException("ZK is null");
|
throw new NullPointerException("ZK is null");
|
||||||
}
|
}
|
||||||
this.identifier = this.identifier + "-0x" +
|
this.identifier = this.identifier + "-0x" +
|
||||||
Long.toHexString(this.zooKeeper.getSessionId());
|
Long.toHexString(this.recoverableZooKeeper.getSessionId());
|
||||||
// Update our identifier. Otherwise ignore.
|
// Update our identifier. Otherwise ignore.
|
||||||
LOG.debug(this.identifier + " connected");
|
LOG.debug(this.identifier + " connected");
|
||||||
break;
|
break;
|
||||||
|
@ -365,7 +323,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
* is up-to-date from when we begin the operation.
|
* is up-to-date from when we begin the operation.
|
||||||
*/
|
*/
|
||||||
public void sync(String path) {
|
public void sync(String path) {
|
||||||
this.zooKeeper.sync(path, null, null);
|
this.recoverableZooKeeper.sync(path, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -408,8 +366,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
*/
|
*/
|
||||||
public void close() {
|
public void close() {
|
||||||
try {
|
try {
|
||||||
if (zooKeeper != null) {
|
if (recoverableZooKeeper != null) {
|
||||||
zooKeeper.close();
|
recoverableZooKeeper.close();
|
||||||
// super.close();
|
// super.close();
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
|
@ -1126,7 +1126,7 @@ public class HBaseTestingUtility {
|
||||||
Configuration c = new Configuration(this.conf);
|
Configuration c = new Configuration(this.conf);
|
||||||
String quorumServers = ZKConfig.getZKQuorumServersString(c);
|
String quorumServers = ZKConfig.getZKQuorumServersString(c);
|
||||||
int sessionTimeout = 5 * 1000; // 5 seconds
|
int sessionTimeout = 5 * 1000; // 5 seconds
|
||||||
ZooKeeper zk = nodeZK.getZooKeeper();
|
ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
|
||||||
byte[] password = zk.getSessionPasswd();
|
byte[] password = zk.getSessionPasswd();
|
||||||
long sessionID = zk.getSessionId();
|
long sessionID = zk.getSessionId();
|
||||||
|
|
||||||
|
|
|
@ -99,9 +99,8 @@ public class TestZooKeeper {
|
||||||
int sessionTimeout = 5 * 1000; // 5 seconds
|
int sessionTimeout = 5 * 1000; // 5 seconds
|
||||||
HConnection connection = HConnectionManager.getConnection(c);
|
HConnection connection = HConnectionManager.getConnection(c);
|
||||||
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
|
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
|
||||||
long sessionID = connectionZK.getZooKeeper().getSessionId();
|
long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId();
|
||||||
byte [] password = connectionZK.getZooKeeper().getSessionPasswd();
|
byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd();
|
||||||
|
|
||||||
ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout,
|
ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout,
|
||||||
EmptyWatcher.instance, sessionID, password);
|
EmptyWatcher.instance, sessionID, password);
|
||||||
LOG.info("Session timeout=" + zk.getSessionTimeout() +
|
LOG.info("Session timeout=" + zk.getSessionTimeout() +
|
||||||
|
@ -116,14 +115,15 @@ public class TestZooKeeper {
|
||||||
|
|
||||||
// Check that the old ZK connection is closed, means we did expire
|
// Check that the old ZK connection is closed, means we did expire
|
||||||
System.err.println("ZooKeeper should have timed out");
|
System.err.println("ZooKeeper should have timed out");
|
||||||
String state = connectionZK.getZooKeeper().getState().toString();
|
String state = connectionZK.getRecoverableZooKeeper().getState().toString();
|
||||||
Assert.assertTrue("State=" + state,
|
LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState());
|
||||||
connectionZK.getZooKeeper().getState().equals(States.CLOSED));
|
Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().
|
||||||
|
equals(States.CLOSED));
|
||||||
|
|
||||||
// Check that the client recovered
|
// Check that the client recovered
|
||||||
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
|
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
|
||||||
LOG.info("state=" + newConnectionZK.getZooKeeper().getState());
|
LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState());
|
||||||
Assert.assertTrue(newConnectionZK.getZooKeeper().getState().equals(
|
Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals(
|
||||||
States.CONNECTED));
|
States.CONNECTED));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -198,7 +198,7 @@ public class TestSplitLogManager {
|
||||||
LOG.info("TestOrphanTaskAcquisition");
|
LOG.info("TestOrphanTaskAcquisition");
|
||||||
|
|
||||||
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
|
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
|
||||||
zkw.getZooKeeper().create(tasknode,
|
zkw.getRecoverableZooKeeper().create(tasknode,
|
||||||
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -231,7 +231,7 @@ public class TestSplitLogManager {
|
||||||
" startup");
|
" startup");
|
||||||
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
|
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
|
||||||
//create an unassigned orphan task
|
//create an unassigned orphan task
|
||||||
zkw.getZooKeeper().create(tasknode,
|
zkw.getRecoverableZooKeeper().create(tasknode,
|
||||||
TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
int version = ZKUtil.checkExists(zkw, tasknode);
|
int version = ZKUtil.checkExists(zkw, tasknode);
|
||||||
|
@ -391,7 +391,7 @@ public class TestSplitLogManager {
|
||||||
|
|
||||||
// create an orphan task in OWNED state
|
// create an orphan task in OWNED state
|
||||||
String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
|
String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
|
||||||
zkw.getZooKeeper().create(tasknode1,
|
zkw.getRecoverableZooKeeper().create(tasknode1,
|
||||||
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
|
|
@ -129,7 +129,7 @@ public class TestSplitLogWorker {
|
||||||
LOG.info("testAcquireTaskAtStartup");
|
LOG.info("testAcquireTaskAtStartup");
|
||||||
ZKSplitLog.Counters.resetCounters();
|
ZKSplitLog.Counters.resetCounters();
|
||||||
|
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
|
||||||
TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -161,7 +161,7 @@ public class TestSplitLogWorker {
|
||||||
LOG.info("testRaceForTask");
|
LOG.info("testRaceForTask");
|
||||||
ZKSplitLog.Counters.resetCounters();
|
ZKSplitLog.Counters.resetCounters();
|
||||||
|
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -200,7 +200,7 @@ public class TestSplitLogWorker {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
// this time create a task node after starting the splitLogWorker
|
// this time create a task node after starting the splitLogWorker
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -228,7 +228,7 @@ public class TestSplitLogWorker {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -236,7 +236,7 @@ public class TestSplitLogWorker {
|
||||||
// now the worker is busy doing the above task
|
// now the worker is busy doing the above task
|
||||||
|
|
||||||
// create another task
|
// create another task
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -264,7 +264,7 @@ public class TestSplitLogWorker {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
@ -277,7 +277,7 @@ public class TestSplitLogWorker {
|
||||||
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
|
||||||
|
|
||||||
// create a RESCAN node
|
// create a RESCAN node
|
||||||
zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
|
||||||
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT_SEQUENTIAL);
|
CreateMode.PERSISTENT_SEQUENTIAL);
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
|
String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
|
||||||
hri.getEncodedName());
|
hri.getEncodedName());
|
||||||
Stat stats =
|
Stat stats =
|
||||||
t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
|
t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
|
||||||
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
|
LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
|
||||||
RegionTransitionData rtd =
|
RegionTransitionData rtd =
|
||||||
ZKAssign.getData(t.getConnection().getZooKeeperWatcher(),
|
ZKAssign.getData(t.getConnection().getZooKeeperWatcher(),
|
||||||
|
@ -162,7 +162,7 @@ public class TestSplitTransactionOnCluster {
|
||||||
assertTrue(daughters.contains(r));
|
assertTrue(daughters.contains(r));
|
||||||
}
|
}
|
||||||
// Finally assert that the ephemeral SPLIT znode was cleaned up.
|
// Finally assert that the ephemeral SPLIT znode was cleaned up.
|
||||||
stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
|
stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
|
||||||
LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
|
LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
|
||||||
assertTrue(stats == null);
|
assertTrue(stats == null);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue