HBASE-3677 Generate a globally unique cluster ID
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1089297 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
82ffa80bd9
commit
ef3073c22d
|
@ -4,6 +4,8 @@ Release 0.91.0 - Unreleased
|
|||
INCOMPATIBLE CHANGES
|
||||
HBASE-2002 Coprocessors: Client side support; Support RPC interface
|
||||
changes at runtime (Gary Helmling via Andrew Purtell)
|
||||
HBASE-3677 Generate a globally unique cluster ID (changed
|
||||
ClusterStatus serialization)
|
||||
|
||||
BUG FIXES
|
||||
HBASE-3280 YouAreDeadException being swallowed in HRS getMaster
|
||||
|
|
|
@ -45,15 +45,25 @@ import org.apache.hadoop.io.VersionedWritable;
|
|||
* <li>Detailed region server loading and resource usage information,
|
||||
* per server and per region.</li>
|
||||
* <li>Regions in transition at master</li>
|
||||
* <li>The unique cluster ID</li>
|
||||
* </ul>
|
||||
*/
|
||||
public class ClusterStatus extends VersionedWritable {
|
||||
private static final byte VERSION = 0;
|
||||
/**
|
||||
* Version for object serialization. Incremented for changes in serialized
|
||||
* representation.
|
||||
* <dl>
|
||||
* <dt>0</dt> <dd>initial version</dd>
|
||||
* <dt>1</dt> <dd>added cluster ID</dd>
|
||||
* </dl>
|
||||
*/
|
||||
private static final byte VERSION = 1;
|
||||
|
||||
private String hbaseVersion;
|
||||
private Collection<HServerInfo> liveServerInfo;
|
||||
private Collection<String> deadServers;
|
||||
private Map<String, RegionState> intransition;
|
||||
private String clusterId;
|
||||
|
||||
/**
|
||||
* Constructor, for Writable
|
||||
|
@ -194,6 +204,14 @@ public class ClusterStatus extends VersionedWritable {
|
|||
this.intransition = m;
|
||||
}
|
||||
|
||||
public String getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public void setClusterId(String id) {
|
||||
this.clusterId = id;
|
||||
}
|
||||
|
||||
//
|
||||
// Writable
|
||||
//
|
||||
|
@ -214,6 +232,7 @@ public class ClusterStatus extends VersionedWritable {
|
|||
out.writeUTF(e.getKey());
|
||||
e.getValue().write(out);
|
||||
}
|
||||
out.writeUTF(clusterId);
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
|
@ -239,5 +258,6 @@ public class ClusterStatus extends VersionedWritable {
|
|||
regionState.readFields(in);
|
||||
this.intransition.put(key, regionState);
|
||||
}
|
||||
this.clusterId = in.readUTF();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,6 +153,12 @@ public final class HConstants {
|
|||
/** Maximum value length, enforced on KeyValue construction */
|
||||
public static final int MAXIMUM_VALUE_LENGTH = Integer.MAX_VALUE;
|
||||
|
||||
/** name of the file for unique cluster ID */
|
||||
public static final String CLUSTER_ID_FILE_NAME = "hbase.id";
|
||||
|
||||
/** Configuration key storing the cluster ID */
|
||||
public static final String CLUSTER_ID = "hbase.cluster.id";
|
||||
|
||||
// Always store the location of the root table's HRegion.
|
||||
// This HRegion is never split.
|
||||
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKTable;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -230,6 +231,7 @@ public class HConnectionManager {
|
|||
// ZooKeeper-based master address tracker
|
||||
private MasterAddressTracker masterAddressTracker;
|
||||
private RootRegionTracker rootRegionTracker;
|
||||
private ClusterId clusterId;
|
||||
|
||||
private final Object metaRegionLock = new Object();
|
||||
|
||||
|
@ -299,6 +301,8 @@ public class HConnectionManager {
|
|||
|
||||
this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this);
|
||||
this.rootRegionTracker.start();
|
||||
|
||||
this.clusterId = new ClusterId(this.zooKeeper, this);
|
||||
}
|
||||
|
||||
private synchronized void resetZooKeeperTrackers()
|
||||
|
@ -308,6 +312,7 @@ public class HConnectionManager {
|
|||
masterAddressTracker = null;
|
||||
rootRegionTracker.stop();
|
||||
rootRegionTracker = null;
|
||||
clusterId = null;
|
||||
this.zooKeeper = null;
|
||||
setupZookeeperTrackers();
|
||||
}
|
||||
|
@ -349,6 +354,9 @@ public class HConnectionManager {
|
|||
throw new MasterNotRunningException();
|
||||
}
|
||||
|
||||
if (clusterId.hasId()) {
|
||||
conf.set(HConstants.CLUSTER_ID, clusterId.getId());
|
||||
}
|
||||
HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
|
||||
HMasterInterface.class, HMasterInterface.VERSION,
|
||||
masterLocation.getInetSocketAddress(), this.conf, this.rpcTimeout);
|
||||
|
@ -954,6 +962,9 @@ public class HConnectionManager {
|
|||
server = this.servers.get(rsName);
|
||||
if (server == null) {
|
||||
try {
|
||||
if (clusterId.hasId()) {
|
||||
conf.set(HConstants.CLUSTER_ID, clusterId.getId());
|
||||
}
|
||||
// definitely a cache miss. establish an RPC for this RS
|
||||
server = (HRegionInterface) HBaseRPC.waitForProxy(
|
||||
serverInterfaceClass, HRegionInterface.VERSION,
|
||||
|
|
|
@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.ipc;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -84,6 +85,7 @@ public class HBaseClient {
|
|||
|
||||
protected final SocketFactory socketFactory; // how to create sockets
|
||||
private int refCount = 1;
|
||||
protected String clusterId;
|
||||
|
||||
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
|
||||
final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
|
||||
|
@ -686,6 +688,7 @@ public class HBaseClient {
|
|||
}
|
||||
this.conf = conf;
|
||||
this.socketFactory = factory;
|
||||
this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -44,7 +44,8 @@ public interface HMasterInterface extends VersionedProtocol {
|
|||
// maintained a single global version number on all HBase Interfaces. This
|
||||
// meant all HBase RPC was broke though only one of the three RPC Interfaces
|
||||
// had changed. This has since been undone.
|
||||
public static final long VERSION = 28L;
|
||||
// 29: 4/3/2010 - changed ClusterStatus serialization
|
||||
public static final long VERSION = 29L;
|
||||
|
||||
/** @return true if master is available */
|
||||
public boolean isMasterRunning();
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.lang.reflect.InvocationTargetException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -82,6 +81,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.Sleeper;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
import org.apache.hadoop.hbase.zookeeper.ClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -347,6 +347,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
|
||||
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
|
||||
this.fileSystemManager = new MasterFileSystem(this, metrics);
|
||||
// publish cluster ID
|
||||
ClusterId.setClusterId(this.zooKeeper,
|
||||
fileSystemManager.getClusterId());
|
||||
|
||||
this.connection = HConnectionManager.getConnection(conf);
|
||||
this.executorService = new ExecutorService(getServerName());
|
||||
|
||||
|
@ -1084,9 +1088,14 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
status.setServerInfo(serverManager.getOnlineServers().values());
|
||||
status.setDeadServers(serverManager.getDeadServers());
|
||||
status.setRegionsInTransition(assignmentManager.getRegionsInTransition());
|
||||
status.setClusterId(fileSystemManager.getClusterId());
|
||||
return status;
|
||||
}
|
||||
|
||||
public String getClusterId() {
|
||||
return fileSystemManager.getClusterId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(final String msg, final Throwable t) {
|
||||
if (t != null) LOG.fatal(msg, t);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -58,6 +59,8 @@ public class MasterFileSystem {
|
|||
Server master;
|
||||
// metrics for master
|
||||
MasterMetrics metrics;
|
||||
// Persisted unique cluster ID
|
||||
private String clusterId;
|
||||
// Keep around for convenience.
|
||||
private final FileSystem fs;
|
||||
// Is the fileystem ok?
|
||||
|
@ -148,6 +151,14 @@ public class MasterFileSystem {
|
|||
return this.rootdir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the unique identifier generated for this cluster
|
||||
* @return
|
||||
*/
|
||||
public String getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inspect the log directory to recover any log file without
|
||||
* an active region server.
|
||||
|
@ -223,7 +234,7 @@ public class MasterFileSystem {
|
|||
* needed populating the directory with necessary bootup files).
|
||||
* @throws IOException
|
||||
*/
|
||||
private static Path checkRootDir(final Path rd, final Configuration c,
|
||||
private Path checkRootDir(final Path rd, final Configuration c,
|
||||
final FileSystem fs)
|
||||
throws IOException {
|
||||
// If FS is in safe mode wait till out of it.
|
||||
|
@ -246,6 +257,14 @@ public class MasterFileSystem {
|
|||
FSUtils.checkVersion(fs, rd, true, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
|
||||
10 * 1000));
|
||||
}
|
||||
// Make sure cluster ID exists
|
||||
if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
|
||||
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
|
||||
FSUtils.setClusterId(fs, rd, UUID.randomUUID().toString(), c.getInt(
|
||||
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
|
||||
}
|
||||
clusterId = FSUtils.getClusterId(fs, rd);
|
||||
|
||||
// Make sure the root region directory exists!
|
||||
if (!FSUtils.rootRegionExists(fs, rd)) {
|
||||
bootstrap(rd, c);
|
||||
|
|
|
@ -274,6 +274,103 @@ public class FSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that a cluster ID file exists in the HBase root directory
|
||||
* @param fs the root directory FileSystem
|
||||
* @param rootdir the HBase root directory in HDFS
|
||||
* @param wait how long to wait between retries
|
||||
* @return <code>true</code> if the file exists, otherwise <code>false</code>
|
||||
* @throws IOException if checking the FileSystem fails
|
||||
*/
|
||||
public static boolean checkClusterIdExists(FileSystem fs, Path rootdir,
|
||||
int wait) throws IOException {
|
||||
while (true) {
|
||||
try {
|
||||
Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
|
||||
return fs.exists(filePath);
|
||||
} catch (IOException ioe) {
|
||||
if (wait > 0) {
|
||||
LOG.warn("Unable to check cluster ID file in " + rootdir.toString() +
|
||||
", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
|
||||
try {
|
||||
Thread.sleep(wait);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.interrupted();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value of the unique cluster ID stored for this HBase instance.
|
||||
* @param fs the root directory FileSystem
|
||||
* @param rootdir the path to the HBase root directory
|
||||
* @return the unique cluster identifier
|
||||
* @throws IOException if reading the cluster ID file fails
|
||||
*/
|
||||
public static String getClusterId(FileSystem fs, Path rootdir)
|
||||
throws IOException {
|
||||
Path idPath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
|
||||
String clusterId = null;
|
||||
if (fs.exists(idPath)) {
|
||||
FSDataInputStream in = fs.open(idPath);
|
||||
try {
|
||||
clusterId = in.readUTF();
|
||||
} catch (EOFException eof) {
|
||||
LOG.warn("Cluster ID file "+idPath.toString()+" was empty");
|
||||
} finally{
|
||||
in.close();
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Cluster ID file does not exist at " + idPath.toString());
|
||||
}
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a new unique identifier for this cluster to the "hbase.id" file
|
||||
* in the HBase root directory
|
||||
* @param fs the root directory FileSystem
|
||||
* @param rootdir the path to the HBase root directory
|
||||
* @param clusterId the unique identifier to store
|
||||
* @param wait how long (in milliseconds) to wait between retries
|
||||
* @throws IOException if writing to the FileSystem fails and no wait value
|
||||
*/
|
||||
public static void setClusterId(FileSystem fs, Path rootdir, String clusterId,
|
||||
int wait) throws IOException {
|
||||
while (true) {
|
||||
try {
|
||||
Path filePath = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
|
||||
FSDataOutputStream s = fs.create(filePath);
|
||||
s.writeUTF(clusterId);
|
||||
s.close();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Created cluster ID file at " + filePath.toString() +
|
||||
" with ID: " + clusterId);
|
||||
}
|
||||
return;
|
||||
} catch (IOException ioe) {
|
||||
if (wait > 0) {
|
||||
LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
|
||||
", retrying in "+wait+"msec: "+StringUtils.stringifyException(ioe));
|
||||
try {
|
||||
Thread.sleep(wait);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.interrupted();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies root directory path is a valid URI with a scheme
|
||||
*
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Publishes and synchronizes a unique identifier specific to a given HBase
|
||||
* cluster. The stored identifier is read from the file system by the active
|
||||
* master on startup, and is subsequently available to all watchers (including
|
||||
* clients).
|
||||
*/
|
||||
public class ClusterId {
|
||||
private ZooKeeperWatcher watcher;
|
||||
private Abortable abortable;
|
||||
private String id;
|
||||
|
||||
public ClusterId(ZooKeeperWatcher watcher, Abortable abortable) {
|
||||
this.watcher = watcher;
|
||||
this.abortable = abortable;
|
||||
}
|
||||
|
||||
public boolean hasId() {
|
||||
return getId() != null;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
try {
|
||||
if (id == null) {
|
||||
id = readClusterIdZNode(watcher);
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
abortable.abort("Unexpected exception from ZooKeeper reading cluster ID",
|
||||
ke);
|
||||
}
|
||||
return id;
|
||||
}
|
||||
|
||||
public static String readClusterIdZNode(ZooKeeperWatcher watcher)
|
||||
throws KeeperException {
|
||||
if (ZKUtil.checkExists(watcher, watcher.clusterIdZNode) != -1) {
|
||||
byte[] data = ZKUtil.getData(watcher, watcher.clusterIdZNode);
|
||||
if (data != null) {
|
||||
return Bytes.toString(data);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void setClusterId(ZooKeeperWatcher watcher, String id)
|
||||
throws KeeperException {
|
||||
ZKUtil.createSetData(watcher, watcher.clusterIdZNode, Bytes.toBytes(id));
|
||||
}
|
||||
}
|
|
@ -87,6 +87,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
|||
public String assignmentZNode;
|
||||
// znode used for table disabling/enabling
|
||||
public String tableZNode;
|
||||
// znode containing the unique cluster ID
|
||||
public String clusterIdZNode;
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
|
@ -191,6 +193,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
|||
conf.get("zookeeper.znode.unassigned", "unassigned"));
|
||||
tableZNode = ZKUtil.joinZNode(baseZNode,
|
||||
conf.get("zookeeper.znode.tableEnableDisable", "table"));
|
||||
clusterIdZNode = ZKUtil.joinZNode(baseZNode,
|
||||
conf.get("zookeeper.znode.clusterId", "hbaseid"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -67,6 +67,7 @@
|
|||
<tr><td>Hadoop Version</td><td><%= org.apache.hadoop.util.VersionInfo.getVersion() %>, r<%= org.apache.hadoop.util.VersionInfo.getRevision() %></td><td>Hadoop version and svn revision</td></tr>
|
||||
<tr><td>Hadoop Compiled</td><td><%= org.apache.hadoop.util.VersionInfo.getDate() %>, <%= org.apache.hadoop.util.VersionInfo.getUser() %></td><td>When Hadoop version was compiled and by whom</td></tr>
|
||||
<tr><td>HBase Root Directory</td><td><%= FSUtils.getRootDir(master.getConfiguration()).toString() %></td><td>Location of HBase home directory</td></tr>
|
||||
<tr><td>HBase Cluster ID</td><td><%= master.getClusterId() != null ? master.getClusterId() : "Not set" %><td>Unique identifier generated for each HBase cluster</td></tr>
|
||||
<tr><td>Load average</td><td><%= StringUtils.limitDecimalTo2(master.getServerManager().getAverageLoad()) %></td><td>Average number of regions per regionserver. Naive computation.</td></tr>
|
||||
<% if (showFragmentation) { %>
|
||||
<tr><td>Fragmentation</td><td><%= frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %></td><td>Overall fragmentation of all tables, including .META. and -ROOT-.</td></tr>
|
||||
|
|
Loading…
Reference in New Issue