HBASE-5444 Add PB-based calls to HMasterRegionInterface

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1333319 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-05-03 06:20:26 +00:00
parent 7692e2efb7
commit f7c1418d4b
28 changed files with 7387 additions and 254 deletions

View File

@ -37,7 +37,7 @@ org.apache.hadoop.hbase.util.JvmVersion;
org.apache.hadoop.hbase.util.FSUtils;
org.apache.hadoop.hbase.master.HMaster;
org.apache.hadoop.hbase.HConstants;
org.apache.hadoop.hbase.HServerLoad;
org.apache.hadoop.hbase.ServerLoad;
org.apache.hadoop.hbase.ServerName;
org.apache.hadoop.hbase.client.HBaseAdmin;
org.apache.hadoop.hbase.client.HConnectionManager;
@ -213,11 +213,12 @@ org.apache.hadoop.hbase.HBaseConfiguration;
// or be set to 0 to get ephemeral ports
int infoPort = master.getConfiguration().getInt("hbase.regionserver.info.port", 60030);
String url = "http://" + serverName.getHostname() + ":" + infoPort + "/";
HServerLoad hsl = master.getServerManager().getLoad(serverName);
String loadStr = hsl == null? "-": hsl.toString();
if (hsl != null) {
totalRegions += hsl.getNumberOfRegions();
totalRequests += hsl.getNumberOfRequests();
ServerLoad sl = master.getServerManager().getLoad(serverName);
String loadStr = sl == null? "-": sl.toString();
if (sl != null) {
totalRegions += sl.getRegionLoadsCount();
// Is this correct? Adding a rate to a measure.
totalRequests += sl.getRequestsPerSecond();
}
long startcode = serverName.getStartcode();
</%java>

View File

@ -32,7 +32,8 @@ org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
org.apache.hadoop.hbase.util.Bytes;
org.apache.hadoop.hbase.HConstants;
org.apache.hadoop.hbase.HServerInfo;
org.apache.hadoop.hbase.HServerLoad;
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerLoad;
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
org.apache.hadoop.hbase.HRegionInfo;
org.apache.hadoop.hbase.ServerName;
org.apache.hadoop.hbase.HBaseConfiguration;
@ -118,7 +119,7 @@ String url = "http://" + host + "/";
</%java>
<%for HRegionInfo r: onlineRegions %>
<%java>
HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getEncodedName());
RegionLoad load = regionServer.createRegionLoad(r.getEncodedName());
</%java>
<tr><td><% r.getRegionNameAsString() %></td>
<td><% Bytes.toStringBinary(r.getStartKey()) %></td><td><% Bytes.toStringBinary(r.getEndKey()) %></td>

View File

@ -29,10 +29,15 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.HashSet;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HServerLoad.RegionLoad;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.VersionMismatchException;
@ -89,14 +94,48 @@ public class ClusterStatus extends VersionedWritable {
}
public ClusterStatus(final String hbaseVersion, final String clusterid,
final Map<ServerName, HServerLoad> servers,
final Map<ServerName, ServerLoad> servers,
final Collection<ServerName> deadServers,
final ServerName master,
final Collection<ServerName> backupMasters,
final Map<String, RegionState> rit,
final String[] masterCoprocessors) {
this.hbaseVersion = hbaseVersion;
this.liveServers = servers;
// TODO: This conversion of ServerLoad to HServerLoad is temporary,
// will be cleaned up in HBASE-5445. Using the ClusterStatus proto brings
// in a lot of other changes, so it makes sense to break this up.
Map<ServerName, HServerLoad> convertedLoad =
new HashMap<ServerName,HServerLoad>();
for (Map.Entry<ServerName,ServerLoad> entry : servers.entrySet()) {
ServerLoad sl = entry.getValue();
Map<byte[],RegionLoad> regionLoad = new HashMap<byte[],RegionLoad>();
for (HBaseProtos.RegionLoad rl : sl.getRegionLoadsList()) {
Set<String> regionCoprocessors = new HashSet<String>();
for (HBaseProtos.Coprocessor coprocessor
: rl.getCoprocessorsList()) {
regionCoprocessors.add(coprocessor.getName());
}
byte [] regionName = rl.getRegionSpecifier().getValue().toByteArray();
RegionLoad converted = new RegionLoad(regionName,
rl.getStores(),rl.getStorefiles(),rl.getStoreUncompressedSizeMB(),
rl.getStorefileSizeMB(),rl.getMemstoreSizeMB(),
rl.getStorefileIndexSizeMB(),rl.getRootIndexSizeKB(),
rl.getTotalStaticIndexSizeKB(),rl.getTotalStaticBloomSizeKB(),
rl.getReadRequestsCount(),rl.getWriteRequestsCount(),
rl.getTotalCompactingKVs(),rl.getCurrentCompactedKVs(),
regionCoprocessors);
regionLoad.put(regionName, converted);
}
HServerLoad hsl = new HServerLoad(sl.getTotalNumberOfRequests(),
sl.getRequestsPerSecond(),sl.getUsedHeapMB(),sl.getMaxHeapMB(),
regionLoad,new HashSet<String>(Arrays.asList(masterCoprocessors)));
convertedLoad.put(entry.getKey(), hsl);
}
this.liveServers = convertedLoad;
this.deadServers = deadServers;
this.master = master;
this.backupMasters = backupMasters;

View File

@ -0,0 +1,155 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.List;
import java.util.TreeSet;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This class is used for exporting current state of load on a RegionServer.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ServerLoad {
public ServerLoad(HBaseProtos.ServerLoad serverLoad) {
this.serverLoad = serverLoad;
}
/* @return the underlying ServerLoad protobuf object */
public HBaseProtos.ServerLoad getServerLoadPB() {
return serverLoad;
}
protected HBaseProtos.ServerLoad serverLoad;
/* @return number of requests per second since last report. */
public int getRequestsPerSecond() {
return serverLoad.getRequestsPerSecond();
}
public boolean hasRequestsPerSecond() {
return serverLoad.hasRequestsPerSecond();
}
/* @return total Number of requests from the start of the region server. */
public int getTotalNumberOfRequests() {
return serverLoad.getTotalNumberOfRequests();
}
public boolean hasTotalNumberOfRequests() {
return serverLoad.hasTotalNumberOfRequests();
}
/* @return the amount of used heap, in MB. */
public int getUsedHeapMB() {
return serverLoad.getUsedHeapMB();
}
public boolean hasUsedHeapMB() {
return serverLoad.hasUsedHeapMB();
}
/* @return the maximum allowable size of the heap, in MB. */
public int getMaxHeapMB() {
return serverLoad.getMaxHeapMB();
}
public boolean hasMaxHeapMB() {
return serverLoad.hasMaxHeapMB();
}
/* Returns list of RegionLoads, which contain information on the load of individual regions. */
public List<RegionLoad> getRegionLoadsList() {
return serverLoad.getRegionLoadsList();
}
public RegionLoad getRegionLoads(int index) {
return serverLoad.getRegionLoads(index);
}
public int getRegionLoadsCount() {
return serverLoad.getRegionLoadsCount();
}
/**
* @return the list Regionserver-level coprocessors, e.g., WALObserver implementations.
* Region-level coprocessors, on the other hand, are stored inside the RegionLoad objects.
*/
public List<Coprocessor> getCoprocessorsList() {
return serverLoad.getCoprocessorsList();
}
public Coprocessor getCoprocessors(int index) {
return serverLoad.getCoprocessors(index);
}
public int getCoprocessorsCount() {
return serverLoad.getCoprocessorsCount();
}
/**
* Return the RegionServer-level coprocessors from a ServerLoad pb.
* @param sl - ServerLoad
* @return string array of loaded RegionServer-level coprocessors
*/
public static String[] getRegionServerCoprocessors(ServerLoad sl) {
if (sl == null) {
return null;
}
List<Coprocessor> list = sl.getCoprocessorsList();
String [] ret = new String[list.size()];
int i = 0;
for (Coprocessor elem : list) {
ret[i++] = elem.getName();
}
return ret;
}
/**
* Return the RegionServer-level and Region-level coprocessors
* from a ServerLoad pb.
* @param sl - ServerLoad
* @return string array of loaded RegionServer-level and
* Region-level coprocessors
*/
public static String[] getAllCoprocessors(ServerLoad sl) {
if (sl == null) {
return null;
}
// Need a set to remove duplicates, but since generated Coprocessor class
// is not Comparable, make it a Set<String> instead of Set<Coprocessor>
TreeSet<String> coprocessSet = new TreeSet<String>();
for (Coprocessor coprocessor : sl.getCoprocessorsList()) {
coprocessSet.add(coprocessor.getName());
}
for (RegionLoad rl : sl.getRegionLoadsList()) {
for (Coprocessor coprocessor : rl.getCoprocessorsList()) {
coprocessSet.add(coprocessor.getName());
}
}
return coprocessSet.toArray(new String[0]);
}
public static final ServerLoad EMPTY_SERVERLOAD =
new ServerLoad(HBaseProtos.ServerLoad.newBuilder().build());
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.util.*;
import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
import java.lang.reflect.Method;
@ -64,7 +65,7 @@ public class HBaseRpcMetrics implements Updater {
context.registerUpdater(this);
initMethods(HMasterInterface.class);
initMethods(HMasterRegionInterface.class);
initMethods(RegionServerStatusProtocol.class);
initMethods(HRegionInterface.class);
rpcStatistics = new HBaseRPCStatistics(this.registry, hostName, port);
}

View File

@ -43,6 +43,7 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -133,6 +134,10 @@ public abstract class HBaseServer implements RpcServer {
new ThreadLocal<RpcServer>();
private volatile boolean started = false;
// For generated protocol classes which doesn't have VERSION field
private static final Map<Class<?>, Long>
PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
private static final Map<String, Class<? extends VersionedProtocol>>
PROTOCOL_CACHE =
new ConcurrentHashMap<String, Class<? extends VersionedProtocol>>();

View File

@ -1,78 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.KerberosInfo;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
/**
* The Master publishes this Interface for RegionServers to register themselves
* on.
*/
@KerberosInfo(
serverPrincipal = "hbase.master.kerberos.principal",
clientPrincipal = "hbase.regionserver.kerberos.principal")
@InterfaceAudience.Private
public interface HMasterRegionInterface extends VersionedProtocol {
/**
* This Interfaces' version. Version changes when the Interface changes.
*/
// All HBase Interfaces used derive from HBaseRPCProtocolVersion. It
// maintained a single global version number on all HBase Interfaces. This
// meant all HBase RPC was broke though only one of the three RPC Interfaces
// had changed. This has since been undone.
public static final long VERSION = 29L;
/**
* Called when a region server first starts.
* @param port Port number this regionserver is up on.
* @param serverStartcode This servers' startcode.
* @param serverCurrentTime The current time of the region server in ms
* @throws IOException e
* @return Configuration for the regionserver to use: e.g. filesystem,
* hbase rootdir, the hostname to use creating the RegionServer ServerName,
* etc.
*/
public MapWritable regionServerStartup(final int port,
final long serverStartcode, final long serverCurrentTime)
throws IOException;
/**
* @param sn {@link ServerName#getVersionedBytes()}
* @param hsl Server load.
* @throws IOException
*/
public void regionServerReport(byte [] sn, HServerLoad hsl)
throws IOException;
/**
* Called by a region server to report a fatal error that is causing
* it to abort.
* @param sn {@link ServerName#getVersionedBytes()}
* @param errorMessage informative text to expose in the master logs and UI
*/
public void reportRSFatalError(byte [] sn, String errorMessage);
}

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.VersionedWritable;
@ -61,6 +63,8 @@ public class Invocation extends VersionedWritable implements Configurable {
Long.valueOf(ClientProtocol.VERSION));
PROTOCOL_VERSION.put(AdminService.BlockingInterface.class,
Long.valueOf(AdminProtocol.VERSION));
PROTOCOL_VERSION.put(RegionServerStatusService.BlockingInterface.class,
Long.valueOf(RegionServerStatusProtocol.VERSION));
}
// For protobuf protocols, which use ServiceException, instead of IOException
@ -70,6 +74,7 @@ public class Invocation extends VersionedWritable implements Configurable {
static {
PROTOBUF_PROTOCOLS.add(ClientProtocol.class);
PROTOBUF_PROTOCOLS.add(AdminProtocol.class);
PROTOBUF_PROTOCOLS.add(RegionServerStatusProtocol.class);
}
private static byte RPC_VERSION = 1;

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol that a RegionServer uses to communicate its status to the Master.
*/
@KerberosInfo(
serverPrincipal = "hbase.master.kerberos.principal")
@TokenInfo("HBASE_AUTH_TOKEN")
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface RegionServerStatusProtocol extends
RegionServerStatusService.BlockingInterface, VersionedProtocol {
public static final long VERSION = 1L;
}

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@ -3171,10 +3172,10 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
}
Map<ServerName, HServerLoad> onlineSvrs = this.serverManager.getOnlineServers();
Map<ServerName, ServerLoad> onlineSvrs = this.serverManager.getOnlineServers();
// Take care of servers w/o assignments.
for (Map<ServerName,List<HRegionInfo>> map : result.values()) {
for (Map.Entry<ServerName, HServerLoad> svrEntry: onlineSvrs.entrySet()) {
for (Map.Entry<ServerName, ServerLoad> svrEntry: onlineSvrs.entrySet()) {
if (!map.containsKey(svrEntry.getKey())) {
map.put(svrEntry.getKey(), new ArrayList<HRegionInfo>());
}

View File

@ -77,7 +77,8 @@ import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
@ -112,12 +113,21 @@ import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import com.google.protobuf.ServiceException;
/**
* HMaster is the "master server" for HBase. An HBase cluster has one active
@ -133,12 +143,12 @@ import org.apache.zookeeper.Watcher;
* <p>You can also shutdown just this master. Call {@link #stopMaster()}.
*
* @see HMasterInterface
* @see HMasterRegionInterface
* @see MasterRegionInterface
* @see Watcher
*/
@InterfaceAudience.Private
public class HMaster extends HasThread
implements HMasterInterface, HMasterRegionInterface, MasterServices,
implements HMasterInterface, RegionServerStatusProtocol, MasterServices,
Server {
private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
@ -262,7 +272,7 @@ Server {
int numHandlers = conf.getInt("hbase.master.handler.count",
conf.getInt("hbase.regionserver.handler.count", 25));
this.rpcServer = HBaseRPC.getServer(this,
new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
new Class<?>[]{HMasterInterface.class, RegionServerStatusProtocol.class},
initialIsa.getHostName(), // BindAddress is IP we got for this server.
initialIsa.getPort(),
numHandlers,
@ -564,7 +574,7 @@ Server {
// Not registered; add it.
LOG.info("Registering server found up in zk but who has not yet " +
"reported in: " + sn);
this.serverManager.recordNewServer(sn, HServerLoad.EMPTY_HSERVERLOAD);
this.serverManager.recordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD);
}
}
@ -795,8 +805,8 @@ Server {
throws IOException {
if (HMasterInterface.class.getName().equals(protocol)) {
return new ProtocolSignature(HMasterInterface.VERSION, null);
} else if (HMasterRegionInterface.class.getName().equals(protocol)) {
return new ProtocolSignature(HMasterRegionInterface.VERSION, null);
} else if (RegionServerStatusProtocol.class.getName().equals(protocol)) {
return new ProtocolSignature(RegionServerStatusProtocol.VERSION, null);
}
throw new IOException("Unknown protocol: " + protocol);
}
@ -804,8 +814,8 @@ Server {
public long getProtocolVersion(String protocol, long clientVersion) {
if (HMasterInterface.class.getName().equals(protocol)) {
return HMasterInterface.VERSION;
} else if (HMasterRegionInterface.class.getName().equals(protocol)) {
return HMasterRegionInterface.VERSION;
} else if (RegionServerStatusProtocol.class.getName().equals(protocol)) {
return RegionServerStatusProtocol.VERSION;
}
// unknown protocol
LOG.warn("Version requested for unimplemented protocol: "+protocol);
@ -952,18 +962,25 @@ Server {
}
@Override
public MapWritable regionServerStartup(final int port,
final long serverStartCode, final long serverCurrentTime)
throws IOException {
public RegionServerStartupResponse regionServerStartup(
RpcController controller, RegionServerStartupRequest request) throws ServiceException {
// Register with server manager
InetAddress ia = getRemoteInetAddress(port, serverStartCode);
ServerName rs = this.serverManager.regionServerStartup(ia, port,
serverStartCode, serverCurrentTime);
try {
InetAddress ia = getRemoteInetAddress(request.getPort(), request.getServerStartCode());
ServerName rs = this.serverManager.regionServerStartup(ia, request.getPort(),
request.getServerStartCode(), request.getServerCurrentTime());
// Send back some config info
MapWritable mw = createConfigurationSubset();
mw.put(new Text(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER),
new Text(rs.getHostname()));
return mw;
RegionServerStartupResponse.Builder resp = createConfigurationSubset();
NameStringPair.Builder entry = NameStringPair.newBuilder()
.setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
.setValue(rs.getHostname());
resp.addMapEntries(entry.build());
return resp.build();
} catch(IOException ioe) {
throw new ServiceException(ioe);
}
}
/**
@ -981,32 +998,49 @@ Server {
* @return Subset of configuration to pass initializing regionservers: e.g.
* the filesystem to use and root directory to use.
*/
protected MapWritable createConfigurationSubset() {
MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
return addConfig(mw, "fs.default.name");
protected RegionServerStartupResponse.Builder createConfigurationSubset() {
RegionServerStartupResponse.Builder resp = addConfig(
RegionServerStartupResponse.newBuilder(), HConstants.HBASE_DIR);
return addConfig(resp, "fs.default.name");
}
private MapWritable addConfig(final MapWritable mw, final String key) {
mw.put(new Text(key), new Text(this.conf.get(key)));
return mw;
private RegionServerStartupResponse.Builder addConfig(
final RegionServerStartupResponse.Builder resp, final String key) {
NameStringPair.Builder entry = NameStringPair.newBuilder()
.setName(key)
.setValue(this.conf.get(key));
resp.addMapEntries(entry.build());
return resp;
}
@Override
public void regionServerReport(final byte [] sn, final HServerLoad hsl)
throws IOException {
this.serverManager.regionServerReport(ServerName.parseVersionedServerName(sn), hsl);
if (hsl != null && this.metrics != null) {
public RegionServerReportResponse regionServerReport(
RpcController controller,RegionServerReportRequest request) throws ServiceException {
try {
HBaseProtos.ServerLoad sl = request.getLoad();
this.serverManager.regionServerReport(ProtobufUtil.toServerName(request.getServer()), new ServerLoad(sl));
if (sl != null && this.metrics != null) {
// Up our metrics.
this.metrics.incrementRequests(hsl.getTotalNumberOfRequests());
this.metrics.incrementRequests(sl.getTotalNumberOfRequests());
}
} catch(IOException ioe) {
throw new ServiceException(ioe);
}
return RegionServerReportResponse.newBuilder().build();
}
@Override
public void reportRSFatalError(byte [] sn, String errorText) {
String msg = "Region server " + Bytes.toString(sn) +
public ReportRSFatalErrorResponse reportRSFatalError(
RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException {
String errorText = request.getErrorMessage();
ServerName sn = ProtobufUtil.toServerName(request.getServer());
String msg = "Region server " + Bytes.toString(sn.getVersionedBytes()) +
" reported a fatal error:\n" + errorText;
LOG.error(msg);
rsFatals.add(msg);
return ReportRSFatalErrorResponse.newBuilder().build();
}
public boolean isMasterRunning() {

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.master;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.ServerLoad;
/**
* This is the JMX management interface for Hbase master information
@ -101,7 +101,7 @@ public interface MXBean {
* Get the live region servers
* @return Live region servers
*/
public Map<String, HServerLoad> getRegionServers();
public Map<String, ServerLoad> getRegionServers();
/**
* Get the dead region servers

View File

@ -23,7 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
@ -77,9 +77,9 @@ public class MXBeanImpl implements MXBean {
}
@Override
public Map<String, HServerLoad> getRegionServers() {
Map<String, HServerLoad> data = new HashMap<String, HServerLoad>();
for (final Entry<ServerName, HServerLoad> entry :
public Map<String, ServerLoad> getRegionServers() {
Map<String, ServerLoad> data = new HashMap<String, ServerLoad>();
for (final Entry<ServerName, ServerLoad> entry :
master.getServerManager().getOnlineServers().entrySet()) {
data.put(entry.getKey().getServerName(),
entry.getValue());

View File

@ -32,8 +32,8 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.monitoring.LogMonitoring;
import org.apache.hadoop.hbase.monitoring.StateDumpServlet;
@ -114,9 +114,9 @@ public class MasterDumpServlet extends StateDumpServlet {
}
private void dumpServers(HMaster master, PrintWriter out) {
Map<ServerName, HServerLoad> servers =
Map<ServerName, ServerLoad> servers =
master.getServerManager().getOnlineServers();
for (Map.Entry<ServerName, HServerLoad> e : servers.entrySet()) {
for (Map.Entry<ServerName, ServerLoad> e : servers.entrySet()) {
out.println(e.getKey() + ": " + e.getValue());
}
}

View File

@ -38,7 +38,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@ -51,7 +53,6 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
/**
@ -75,8 +76,8 @@ public class ServerManager {
private volatile boolean clusterShutdown = false;
/** Map of registered servers to their current load */
private final Map<ServerName, HServerLoad> onlineServers =
new ConcurrentHashMap<ServerName, HServerLoad>();
private final Map<ServerName, ServerLoad> onlineServers =
new ConcurrentHashMap<ServerName, ServerLoad>();
// TODO: This is strange to have two maps but HSI above is used on both sides
/**
@ -154,11 +155,11 @@ public class ServerManager {
checkClockSkew(sn, serverCurrentTime);
checkIsDead(sn, "STARTUP");
checkAlreadySameHostPort(sn);
recordNewServer(sn, HServerLoad.EMPTY_HSERVERLOAD);
recordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD);
return sn;
}
void regionServerReport(ServerName sn, HServerLoad hsl)
void regionServerReport(ServerName sn, ServerLoad sl)
throws YouAreDeadException, PleaseHoldException {
checkIsDead(sn, "REPORT");
if (!this.onlineServers.containsKey(sn)) {
@ -169,9 +170,9 @@ public class ServerManager {
// The only thing we are skipping is passing back to the regionserver
// the ServerName to use. Here we presume a master has already done
// that so we'll press on with whatever it gave us for ServerName.
recordNewServer(sn, hsl);
recordNewServer(sn, sl);
} else {
this.onlineServers.put(sn, hsl);
this.onlineServers.put(sn, sl);
}
}
@ -255,9 +256,9 @@ public class ServerManager {
* @param hsl
* @param serverName The remote servers name.
*/
void recordNewServer(final ServerName serverName, final HServerLoad hsl) {
void recordNewServer(final ServerName serverName, final ServerLoad sl) {
LOG.info("Registering server=" + serverName);
this.onlineServers.put(serverName, hsl);
this.onlineServers.put(serverName, sl);
this.serverConnections.remove(serverName);
}
@ -265,7 +266,7 @@ public class ServerManager {
* @param serverName
* @return HServerLoad if serverName is known else null
*/
public HServerLoad getLoad(final ServerName serverName) {
public ServerLoad getLoad(final ServerName serverName) {
return this.onlineServers.get(serverName);
}
@ -274,7 +275,7 @@ public class ServerManager {
* @return HServerLoad if serverName is known else null
* @deprecated Use {@link #getLoad(HServerAddress)}
*/
public HServerLoad getLoad(final HServerAddress address) {
public ServerLoad getLoad(final HServerAddress address) {
ServerName sn = new ServerName(address.toString(), ServerName.NON_STARTCODE);
ServerName actual =
ServerName.findServerWithSameHostnamePort(this.getOnlineServersList(), sn);
@ -291,9 +292,9 @@ public class ServerManager {
int totalLoad = 0;
int numServers = 0;
double averageLoad = 0.0;
for (HServerLoad hsl: this.onlineServers.values()) {
for (ServerLoad sl: this.onlineServers.values()) {
numServers++;
totalLoad += hsl.getNumberOfRegions();
totalLoad += sl.getRegionLoadsCount();
}
averageLoad = (double)totalLoad / (double)numServers;
return averageLoad;
@ -308,7 +309,7 @@ public class ServerManager {
/**
* @return Read-only map of servers to serverinfo
*/
public Map<ServerName, HServerLoad> getOnlineServers() {
public Map<ServerName, ServerLoad> getOnlineServers() {
// Presumption is that iterating the returned Map is OK.
synchronized (this.onlineServers) {
return Collections.unmodifiableMap(this.onlineServers);

View File

@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -1403,5 +1404,18 @@ public final class ProtobufUtil {
}
}
/*
* Get the total (read + write) requests from a RegionLoad pb
* @param rl - RegionLoad pb
* @return total (read + write) requests
*/
public static long getTotalRequestsCount(RegionLoad rl) {
if (rl == null) {
return 0;
}
return rl.getReadRequestsCount() + rl.getWriteRequestsCount();
}
// End helpers for Admin
}

View File

@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
@ -112,7 +111,6 @@ import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.Invocation;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
@ -147,7 +145,6 @@ import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics.util.MBeanUtil;
@ -156,9 +153,24 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@ -191,7 +203,7 @@ public class HRegionServer extends RegionServer
protected final int numRegionsToReport;
// Remote HMaster
private HMasterRegionInterface hbaseMaster;
private RegionServerStatusProtocol hbaseMaster;
// Server to handle client requests. Default access so can be accessed by
// unit tests.
@ -589,7 +601,7 @@ public class HRegionServer extends RegionServer
// Try and register with the Master; tell it we are here. Break if
// server is stopped or the clusterup flag is down or hdfs went wacky.
while (keepLooping()) {
MapWritable w = reportForDuty();
RegionServerStartupResponse w = reportForDuty();
if (w == null) {
LOG.warn("reportForDuty failed; sleeping and then retrying.");
this.sleeper.sleep();
@ -737,15 +749,18 @@ public class HRegionServer extends RegionServer
void tryRegionServerReport()
throws IOException {
HServerLoad hsl = buildServerLoad();
HBaseProtos.ServerLoad sl = buildServerLoad();
// Why we do this?
this.requestCount.set(0);
try {
this.hbaseMaster.regionServerReport(this.serverNameFromMasterPOV.getVersionedBytes(), hsl);
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
ioe = ((RemoteException)ioe).unwrapRemoteException();
}
RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
ServerName sn = ServerName.parseVersionedServerName(
this.serverNameFromMasterPOV.getVersionedBytes());
request.setServer(ProtobufUtil.toServerName(sn));
request.setLoad(sl);
this.hbaseMaster.regionServerReport(null, request.build());
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
if (ioe instanceof YouAreDeadException) {
// This will be caught and handled as a fatal error in run()
throw ioe;
@ -756,19 +771,26 @@ public class HRegionServer extends RegionServer
}
}
HServerLoad buildServerLoad() {
HBaseProtos.ServerLoad buildServerLoad() {
Collection<HRegion> regions = getOnlineRegionsLocalContext();
TreeMap<byte [], HServerLoad.RegionLoad> regionLoads =
new TreeMap<byte [], HServerLoad.RegionLoad>(Bytes.BYTES_COMPARATOR);
for (HRegion region: regions) {
regionLoads.put(region.getRegionName(), createRegionLoad(region));
}
MemoryUsage memory =
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
return new HServerLoad(requestCount.get(),(int)metrics.getRequests(),
(int)(memory.getUsed() / 1024 / 1024),
(int) (memory.getMax() / 1024 / 1024), regionLoads,
this.hlog.getCoprocessorHost().getCoprocessors());
HBaseProtos.ServerLoad.Builder serverLoad = HBaseProtos.ServerLoad.newBuilder();
serverLoad.setRequestsPerSecond((int)metrics.getRequests());
serverLoad.setTotalNumberOfRequests(requestCount.get());
serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
for (String coprocessor : coprocessors) {
serverLoad.addCoprocessors(
Coprocessor.newBuilder().setName(coprocessor).build());
}
for (HRegion region : regions) {
serverLoad.addRegionLoads(createRegionLoad(region));
}
return serverLoad.build();
}
String getOnlineRegionsAsPrintableString() {
@ -858,14 +880,14 @@ public class HRegionServer extends RegionServer
*
* @param c Extra configuration.
*/
protected void handleReportForDutyResponse(final MapWritable c)
protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
throws IOException {
try {
for (Map.Entry<Writable, Writable> e :c.entrySet()) {
String key = e.getKey().toString();
for (NameStringPair e : c.getMapEntriesList()) {
String key = e.getName();
// The hostname the master sees us as.
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
String hostnameFromMasterPOV = e.getValue().toString();
String hostnameFromMasterPOV = e.getValue();
this.serverNameFromMasterPOV = new ServerName(hostnameFromMasterPOV,
this.isa.getPort(), this.startcode);
LOG.info("Master passed us hostname to use. Was=" +
@ -943,7 +965,7 @@ public class HRegionServer extends RegionServer
*
* @throws IOException
*/
private HServerLoad.RegionLoad createRegionLoad(final HRegion r) {
private RegionLoad createRegionLoad(final HRegion r) {
byte[] name = r.getRegionName();
int stores = 0;
int storefiles = 0;
@ -980,20 +1002,38 @@ public class HRegionServer extends RegionServer
(int) (store.getTotalStaticBloomSize() / 1024);
}
}
return new HServerLoad.RegionLoad(name, stores, storefiles,
storeUncompressedSizeMB,
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
totalStaticIndexSizeKB, totalStaticBloomSizeKB,
(int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
totalCompactingKVs, currentCompactedKVs,
r.getCoprocessorHost().getCoprocessors());
RegionLoad.Builder regionLoad = RegionLoad.newBuilder();
RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
regionSpecifier.setValue(ByteString.copyFrom(name));
regionLoad.setRegionSpecifier(regionSpecifier.build())
.setStores(stores)
.setStorefiles(storefiles)
.setStoreUncompressedSizeMB(storeUncompressedSizeMB)
.setStorefileSizeMB(storefileSizeMB)
.setMemstoreSizeMB(memstoreSizeMB)
.setStorefileIndexSizeMB(storefileIndexSizeMB)
.setRootIndexSizeKB(rootIndexSizeKB)
.setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
.setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
.setReadRequestsCount((int) r.readRequestsCount.get())
.setWriteRequestsCount((int) r.writeRequestsCount.get())
.setTotalCompactingKVs(totalCompactingKVs)
.setCurrentCompactedKVs(currentCompactedKVs);
Set<String> coprocessors = r.getCoprocessorHost().getCoprocessors();
for (String coprocessor : coprocessors) {
regionLoad.addCoprocessors(
Coprocessor.newBuilder().setName(coprocessor).build());
}
return regionLoad.build();
}
/**
* @param encodedRegionName
* @return An instance of RegionLoad.
*/
public HServerLoad.RegionLoad createRegionLoad(final String encodedRegionName) {
public RegionLoad createRegionLoad(final String encodedRegionName) {
HRegion r = null;
r = this.onlineRegions.get(encodedRegionName);
return r != null ? createRegionLoad(r) : null;
@ -1507,8 +1547,14 @@ public class HRegionServer extends RegionServer
msg += "\nCause:\n" + StringUtils.stringifyException(cause);
}
if (hbaseMaster != null) {
ReportRSFatalErrorRequest.Builder builder =
ReportRSFatalErrorRequest.newBuilder();
ServerName sn =
ServerName.parseVersionedServerName(this.serverNameFromMasterPOV.getVersionedBytes());
builder.setServer(ProtobufUtil.toServerName(sn));
builder.setErrorMessage(msg);
hbaseMaster.reportRSFatalError(
this.serverNameFromMasterPOV.getVersionedBytes(), msg);
null,builder.build());
}
} catch (Throwable t) {
LOG.warn("Unable to report fatal error to master", t);
@ -1588,7 +1634,7 @@ public class HRegionServer extends RegionServer
private ServerName getMaster() {
ServerName masterServerName = null;
long previousLogTime = 0;
HMasterRegionInterface master = null;
RegionServerStatusProtocol master = null;
boolean refresh = false; // for the first time, use cached data
while (keepLooping() && master == null) {
masterServerName = this.masterAddressManager.getMasterAddress(refresh);
@ -1614,8 +1660,8 @@ public class HRegionServer extends RegionServer
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
HMasterRegionInterface.class, HMasterRegionInterface.VERSION,
master = (RegionServerStatusProtocol) HBaseRPC.waitForProxy(
RegionServerStatusProtocol.class, RegionServerStatusProtocol.VERSION,
isa, this.conf, -1,
this.rpcTimeout, this.rpcTimeout);
} catch (IOException e) {
@ -1658,8 +1704,8 @@ public class HRegionServer extends RegionServer
* null if we failed to register.
* @throws IOException
*/
private MapWritable reportForDuty() throws IOException {
MapWritable result = null;
private RegionServerStartupResponse reportForDuty() throws IOException {
RegionServerStartupResponse result = null;
ServerName masterServerName = getMaster();
if (masterServerName == null) return result;
try {
@ -1668,18 +1714,20 @@ public class HRegionServer extends RegionServer
"with port=" + this.isa.getPort() + ", startcode=" + this.startcode);
long now = EnvironmentEdgeManager.currentTimeMillis();
int port = this.isa.getPort();
result = this.hbaseMaster.regionServerStartup(port, this.startcode, now);
} catch (RemoteException e) {
IOException ioe = e.unwrapRemoteException();
RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
request.setPort(port);
request.setServerStartCode(this.startcode);
request.setServerCurrentTime(now);
result = this.hbaseMaster.regionServerStartup(null, request.build());
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
if (ioe instanceof ClockOutOfSyncException) {
LOG.fatal("Master rejected startup because clock is out of sync", ioe);
// Re-throw IOE will cause RS to abort
throw ioe;
} else {
LOG.warn("remote error telling master we are up", e);
LOG.warn("error telling master we are up", se);
}
} catch (IOException e) {
LOG.warn("error telling master we are up", e);
}
return result;
}
@ -3295,8 +3343,9 @@ public class HRegionServer extends RegionServer
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getCoprocessors() {
HServerLoad hsl = buildServerLoad();
return hsl == null? null: hsl.getCoprocessors();
HBaseProtos.ServerLoad sl = buildServerLoad();
return sl == null? null:
ServerLoad.getRegionServerCoprocessors(new ServerLoad(sl));
}
/**

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers that are used for MasterRegionProtocol.
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "RegionServerStatusProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "hbase.proto";
message RegionServerStartupRequest {
/** Port number this regionserver is up on */
required uint32 port = 1;
/** This servers' startcode */
required uint64 serverStartCode = 2;
/** Current time of the region server in ms */
required uint64 serverCurrentTime = 3;
}
message RegionServerStartupResponse {
/**
* Configuration for the regionserver to use: e.g. filesystem,
* hbase rootdir, the hostname to use creating the RegionServer ServerName,
* etc
*/
repeated NameStringPair mapEntries = 1;
}
message RegionServerReportRequest {
required ServerName server = 1;
/** load the server is under */
optional ServerLoad load = 2;
}
message RegionServerReportResponse {
}
message ReportRSFatalErrorRequest {
/** name of the server experiencing the error */
required ServerName server = 1;
/** informative text to expose in the master logs and UI */
required string errorMessage = 2;
}
message ReportRSFatalErrorResponse {
}
service RegionServerStatusService {
/** Called when a region server first starts. */
rpc regionServerStartup(RegionServerStartupRequest)
returns(RegionServerStartupResponse);
/** Called to report the load the RegionServer is under. */
rpc regionServerReport(RegionServerReportRequest)
returns(RegionServerReportResponse);
/**
* Called by a region server to report a fatal error that is causing it to
* abort.
*/
rpc reportRSFatalError(ReportRSFatalErrorRequest)
returns(ReportRSFatalErrorResponse);
}

View File

@ -54,6 +54,85 @@ message RegionSpecifier {
}
}
message RegionLoad {
/** the region specifier */
required RegionSpecifier regionSpecifier = 1;
/** the number of stores for the region */
optional uint32 stores = 2;
/** the number of storefiles for the region */
optional uint32 storefiles = 3;
/** the total size of the store files for the region, uncompressed, in MB */
optional uint32 storeUncompressedSizeMB = 4;
/** the current total size of the store files for the region, in MB */
optional uint32 storefileSizeMB = 5;
/** the current size of the memstore for the region, in MB */
optional uint32 memstoreSizeMB = 6;
/**
* The current total size of root-level store file indexes for the region,
* in MB. The same as {@link #rootIndexSizeKB} but in MB.
*/
optional uint32 storefileIndexSizeMB = 7;
/** the current total read requests made to region */
optional uint64 readRequestsCount = 8;
/** the current total write requests made to region */
optional uint64 writeRequestsCount = 9;
/** the total compacting key values in currently running compaction */
optional uint64 totalCompactingKVs = 10;
/** the completed count of key values in currently running compaction */
optional uint64 currentCompactedKVs = 11;
/** The current total size of root-level indexes for the region, in KB. */
optional uint32 rootIndexSizeKB = 12;
/** The total size of all index blocks, not just the root level, in KB. */
optional uint32 totalStaticIndexSizeKB = 13;
/**
* The total size of all Bloom filter blocks, not just loaded into the
* block cache, in KB.
*/
optional uint32 totalStaticBloomSizeKB = 14;
/** Region-level coprocessors. */
repeated Coprocessor coprocessors = 15;
}
/* Server-level protobufs */
message ServerLoad {
/** Number of requests per second since last report. */
optional uint32 requestsPerSecond = 1;
/** Total Number of requests from the start of the region server. */
optional uint32 totalNumberOfRequests = 2;
/** the amount of used heap, in MB. */
optional uint32 usedHeapMB = 3;
/** the maximum allowable size of the heap, in MB. */
optional uint32 maxHeapMB = 4;
/** Information on the load of individual regions. */
repeated RegionLoad regionLoads = 5;
/**
* Regionserver-level coprocessors, e.g., WALObserver implementations.
* Region-level coprocessors, on the other hand, are stored inside RegionLoad
* objects.
*/
repeated Coprocessor coprocessors = 6;
}
/**
* A range of time. Both from and to are Java time
* stamp in milliseconds. If you don't specify a time
@ -104,6 +183,10 @@ message ServerName {
// Comment data structures
message Coprocessor {
required string name = 1;
}
message NameStringPair {
required string name = 1;
required string value = 2;

View File

@ -29,12 +29,14 @@
import="org.apache.hadoop.hbase.HServerAddress"
import="org.apache.hadoop.hbase.ServerName"
import="org.apache.hadoop.hbase.HServerInfo"
import="org.apache.hadoop.hbase.HServerLoad"
import="org.apache.hadoop.hbase.HServerLoad.RegionLoad"
import="org.apache.hadoop.hbase.ServerLoad;"
import="org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad"
import="org.apache.hadoop.hbase.io.ImmutableBytesWritable"
import="org.apache.hadoop.hbase.master.HMaster"
import="org.apache.hadoop.hbase.util.Bytes"
import="org.apache.hadoop.hbase.util.FSUtils"
import="org.apache.hadoop.hbase.protobuf.ProtobufUtil"
import="java.util.List"
import="java.util.Map"
import="org.apache.hadoop.hbase.HConstants"%><%
HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER);
@ -176,11 +178,15 @@
String urlRegionServer = null;
if (addr != null) {
HServerLoad sl = master.getServerManager().getLoad(addr);
ServerLoad sl = master.getServerManager().getLoad(addr);
if (sl != null) {
Map<byte[], RegionLoad> map = sl.getRegionsLoad();
if (map.containsKey(regionInfo.getRegionName())) {
req = map.get(regionInfo.getRegionName()).getRequestsCount();
List<RegionLoad> list = sl.getRegionLoadsList();
byte [] regionName = regionInfo.getRegionName();
for (RegionLoad rgLoad : list) {
if (rgLoad.getRegionSpecifier().getValue().toByteArray().equals(regionName)) {
req = ProtobufUtil.getTotalRequestsCount(rgLoad);
break;
}
}
// This port might be wrong if RS actually ended up using something else.
urlRegionServer =

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
@ -112,7 +113,8 @@ public class MiniHBaseCluster {
*/
@Override
protected void handleReportForDutyResponse(MapWritable c) throws IOException {
protected void handleReportForDutyResponse(
final RegionServerStartupResponse c) throws IOException {
super.handleReportForDutyResponse(c);
// Run this thread to shutdown our filesystem on way out.
this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());

View File

@ -26,14 +26,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import javax.tools.*;
import java.io.*;
import java.util.*;
import java.util.Arrays;
import java.util.jar.*;
import org.junit.*;
@ -550,22 +552,21 @@ public class TestClassLoading {
/**
* return the subset of all regionservers
* (actually returns set of HServerLoads)
* (actually returns set of ServerLoads)
* which host some region in a given table.
* used by assertAllRegionServers() below to
* test reporting of loaded coprocessors.
* @param tableName : given table.
* @return subset of all servers.
*/
Map<ServerName, HServerLoad> serversForTable(String tableName) {
Map<ServerName, HServerLoad> serverLoadHashMap =
new HashMap<ServerName, HServerLoad>();
for(Map.Entry<ServerName,HServerLoad> server:
Map<ServerName, ServerLoad> serversForTable(String tableName) {
Map<ServerName, ServerLoad> serverLoadHashMap =
new HashMap<ServerName, ServerLoad>();
for(Map.Entry<ServerName,ServerLoad> server:
TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().
getOnlineServers().entrySet()) {
for(Map.Entry<byte[], HServerLoad.RegionLoad> region:
server.getValue().getRegionsLoad().entrySet()) {
if (region.getValue().getNameAsString().equals(tableName)) {
for (RegionLoad region : server.getValue().getRegionLoadsList()) {
if (Bytes.toString(region.getRegionSpecifier().getValue().toByteArray()).equals(tableName)) {
// this server server hosts a region of tableName: add this server..
serverLoadHashMap.put(server.getKey(),server.getValue());
// .. and skip the rest of the regions that it hosts.
@ -578,7 +579,7 @@ public class TestClassLoading {
void assertAllRegionServers(String[] expectedCoprocessors, String tableName)
throws InterruptedException {
Map<ServerName, HServerLoad> servers;
Map<ServerName, ServerLoad> servers;
String[] actualCoprocessors = null;
boolean success = false;
for(int i = 0; i < 5; i++) {
@ -591,8 +592,9 @@ public class TestClassLoading {
servers = serversForTable(tableName);
}
boolean any_failed = false;
for(Map.Entry<ServerName,HServerLoad> server: servers.entrySet()) {
actualCoprocessors = server.getValue().getCoprocessors();
for(Map.Entry<ServerName,ServerLoad> server: servers.entrySet()) {
actualCoprocessors =
ServerLoad.getAllCoprocessors(server.getValue());
if (!Arrays.equals(actualCoprocessors, expectedCoprocessors)) {
LOG.debug("failed comparison: actual: " +
Arrays.toString(actualCoprocessors) +

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
@ -128,9 +128,9 @@ public class TestAssignmentManager {
this.serverManager = Mockito.mock(ServerManager.class);
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)).thenReturn(true);
final Map<ServerName, HServerLoad> onlineServers = new HashMap<ServerName, HServerLoad>();
onlineServers.put(SERVERNAME_B, new HServerLoad());
onlineServers.put(SERVERNAME_A, new HServerLoad());
final Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(this.serverManager.getOnlineServers()).thenReturn(onlineServers);

View File

@ -24,7 +24,7 @@ import java.util.Set;
import junit.framework.Assert;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.MediumTests;
import org.junit.AfterClass;
@ -48,7 +48,7 @@ public class TestMXBean {
TEST_UTIL.shutdownMiniCluster();
}
private void verifyRegionServers(Map<String, HServerLoad> regions) {
private void verifyRegionServers(Map<String, ServerLoad> regions) {
Set<String> expected = new HashSet<String>();
for (int i = 0; i < 4; ++i) {
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i);
@ -56,7 +56,7 @@ public class TestMXBean {
}
int found = 0;
for (java.util.Map.Entry<String, HServerLoad> entry : regions.entrySet()) {
for (java.util.Map.Entry<String, ServerLoad> entry : regions.entrySet()) {
if (expected.contains(entry.getKey())) {
++found;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@ -46,9 +47,13 @@ import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import com.google.protobuf.ServiceException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -127,7 +132,7 @@ public class TestMasterNoCluster {
*/
@Test
public void testFailover()
throws IOException, KeeperException, InterruptedException {
throws IOException, KeeperException, InterruptedException, ServiceException {
final long now = System.currentTimeMillis();
// Names for our three servers. Make the port numbers match hostname.
// Will come in use down in the server when we need to figure how to respond.
@ -209,7 +214,11 @@ public class TestMasterNoCluster {
while (!master.isRpcServerOpen()) Threads.sleep(10);
// Fake master that there are regionservers out there. Report in.
for (int i = 0; i < sns.length; i++) {
master.regionServerReport(sns[i].getVersionedBytes(), new HServerLoad());
RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();;
ServerName sn = ServerName.parseVersionedServerName(sns[i].getVersionedBytes());
request.setServer(ProtobufUtil.toServerName(sn));
request.setLoad(ServerLoad.EMPTY_SERVERLOAD.getServerLoadPB());
master.regionServerReport(null, request.build());
}
// Master should now come up.
while (!master.isInitialized()) {Threads.sleep(10);}
@ -229,10 +238,11 @@ public class TestMasterNoCluster {
* @throws KeeperException
* @throws InterruptedException
* @throws DeserializationException
* @throws ServiceException
*/
@Test
public void testCatalogDeploys()
throws IOException, KeeperException, InterruptedException, DeserializationException {
throws IOException, KeeperException, InterruptedException, DeserializationException, ServiceException {
final Configuration conf = TESTUTIL.getConfiguration();
final long now = System.currentTimeMillis();
// Name for our single mocked up regionserver.
@ -286,11 +296,19 @@ public class TestMasterNoCluster {
// Wait till master is up ready for RPCs.
while (!master.isRpcServerOpen()) Threads.sleep(10);
// Fake master that there is a regionserver out there. Report in.
MapWritable mw = master.regionServerStartup(rs0.getServerName().getPort(),
rs0.getServerName().getStartcode(), now);
RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
request.setPort(rs0.getServerName().getPort());
request.setServerStartCode(rs0.getServerName().getStartcode());
request.setServerCurrentTime(now);
RegionServerStartupResponse result =
master.regionServerStartup(null, request.build());
String rshostname = new String();
for (NameStringPair e : result.getMapEntriesList()) {
if (e.getName().toString().equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
rshostname = e.getValue();
}
}
// Assert hostname is as expected.
String rshostname =
mw.get(new Text(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)).toString();
assertEquals(rs0.getServerName().getHostname(), rshostname);
// Now master knows there is at least one regionserver checked in and so
// it'll wait a while to see if more and when none, will assign root and

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;