HBASE-6260 balancer state should be stored in ZK

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1384593 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
gchanan 2012-09-13 23:00:46 +00:00
parent 3ddce6a96d
commit c9d3cb1c3b
6 changed files with 653 additions and 16 deletions

View File

@ -177,6 +177,7 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker; import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -232,6 +233,8 @@ Server {
private RegionServerTracker regionServerTracker; private RegionServerTracker regionServerTracker;
// Draining region server tracker // Draining region server tracker
private DrainingServerTracker drainingServerTracker; private DrainingServerTracker drainingServerTracker;
// Tracker for load balancer state
private LoadBalancerTracker loadBalancerTracker;
// RPC server for the HMaster // RPC server for the HMaster
private final RpcServer rpcServer; private final RpcServer rpcServer;
@ -281,8 +284,6 @@ Server {
private LoadBalancer balancer; private LoadBalancer balancer;
private Thread balancerChore; private Thread balancerChore;
// If 'true', the balancer is 'on'. If 'false', the balancer will not run.
private volatile boolean balanceSwitch = true;
private CatalogJanitor catalogJanitorChore; private CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner; private LogCleaner logCleaner;
@ -516,6 +517,8 @@ Server {
this.catalogTracker.start(); this.catalogTracker.start();
this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
this.assignmentManager = new AssignmentManager(this, serverManager, this.assignmentManager = new AssignmentManager(this, serverManager,
this.catalogTracker, this.balancer, this.executorService, this.metrics); this.catalogTracker, this.balancer, this.executorService, this.metrics);
zooKeeper.registerListenerFirst(assignmentManager); zooKeeper.registerListenerFirst(assignmentManager);
@ -1250,7 +1253,7 @@ Server {
return false; return false;
} }
// If balance not true, don't run balancer. // If balance not true, don't run balancer.
if (!this.balanceSwitch) return false; if (!this.loadBalancerTracker.isBalancerOn()) return false;
// Do this call outside of synchronized block. // Do this call outside of synchronized block.
int maximumBalanceTime = getBalancerCutoffTime(); int maximumBalanceTime = getBalancerCutoffTime();
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
@ -1339,19 +1342,23 @@ Server {
* @param mode BalanceSwitchMode * @param mode BalanceSwitchMode
* @return old balancer switch * @return old balancer switch
*/ */
public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) { public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException {
boolean oldValue = this.balanceSwitch; boolean oldValue = this.loadBalancerTracker.isBalancerOn();
boolean newValue = b; boolean newValue = b;
try { try {
if (this.cpHost != null) { if (this.cpHost != null) {
newValue = this.cpHost.preBalanceSwitch(newValue); newValue = this.cpHost.preBalanceSwitch(newValue);
} }
if (mode == BalanceSwitchMode.SYNC) { try {
synchronized (this.balancer) { if (mode == BalanceSwitchMode.SYNC) {
this.balanceSwitch = newValue; synchronized (this.balancer) {
this.loadBalancerTracker.setBalancerOn(newValue);
}
} else {
this.loadBalancerTracker.setBalancerOn(newValue);
} }
} else { } catch (KeeperException ke) {
this.balanceSwitch = newValue; throw new IOException(ke);
} }
LOG.info("BalanceSwitch=" + newValue); LOG.info("BalanceSwitch=" + newValue);
if (this.cpHost != null) { if (this.cpHost != null) {
@ -1363,20 +1370,24 @@ Server {
return oldValue; return oldValue;
} }
public boolean synchronousBalanceSwitch(final boolean b) { public boolean synchronousBalanceSwitch(final boolean b) throws IOException {
return switchBalancer(b, BalanceSwitchMode.SYNC); return switchBalancer(b, BalanceSwitchMode.SYNC);
} }
public boolean balanceSwitch(final boolean b) { public boolean balanceSwitch(final boolean b) throws IOException {
return switchBalancer(b, BalanceSwitchMode.ASYNC); return switchBalancer(b, BalanceSwitchMode.ASYNC);
} }
@Override @Override
public SetBalancerRunningResponse setBalancerRunning( public SetBalancerRunningResponse setBalancerRunning(
RpcController controller, SetBalancerRunningRequest req) throws ServiceException { RpcController controller, SetBalancerRunningRequest req) throws ServiceException {
boolean prevValue = (req.getSynchronous())? try {
synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn()); boolean prevValue = (req.getSynchronous())?
return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build(); synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn());
return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
} }
/** /**
@ -1815,7 +1826,7 @@ Server {
this.serverName, this.serverName,
backupMasters, backupMasters,
this.assignmentManager.getRegionStates().getRegionsInTransition(), this.assignmentManager.getRegionStates().getRegionsInTransition(),
this.getCoprocessors(), this.balanceSwitch); this.getCoprocessors(), this.loadBalancerTracker.isBalancerOn());
} }
public String getClusterId() { public String getClusterId() {

View File

@ -0,0 +1,424 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: LoadBalancer.proto
package org.apache.hadoop.hbase.protobuf.generated;
public final class LoadBalancerProtos {
private LoadBalancerProtos() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public interface LoadBalancerStateOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// optional bool balancerOn = 1;
boolean hasBalancerOn();
boolean getBalancerOn();
}
public static final class LoadBalancerState extends
com.google.protobuf.GeneratedMessage
implements LoadBalancerStateOrBuilder {
// Use LoadBalancerState.newBuilder() to construct.
private LoadBalancerState(Builder builder) {
super(builder);
}
private LoadBalancerState(boolean noInit) {}
private static final LoadBalancerState defaultInstance;
public static LoadBalancerState getDefaultInstance() {
return defaultInstance;
}
public LoadBalancerState getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.internal_static_LoadBalancerState_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.internal_static_LoadBalancerState_fieldAccessorTable;
}
private int bitField0_;
// optional bool balancerOn = 1;
public static final int BALANCERON_FIELD_NUMBER = 1;
private boolean balancerOn_;
public boolean hasBalancerOn() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public boolean getBalancerOn() {
return balancerOn_;
}
private void initFields() {
balancerOn_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBool(1, balancerOn_);
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(1, balancerOn_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState)) {
return super.equals(obj);
}
org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState other = (org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState) obj;
boolean result = true;
result = result && (hasBalancerOn() == other.hasBalancerOn());
if (hasBalancerOn()) {
result = result && (getBalancerOn()
== other.getBalancerOn());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
}
@java.lang.Override
public int hashCode() {
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
if (hasBalancerOn()) {
hash = (37 * hash) + BALANCERON_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getBalancerOn());
}
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerStateOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.internal_static_LoadBalancerState_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.internal_static_LoadBalancerState_fieldAccessorTable;
}
// Construct using org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
balancerOn_ = false;
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.getDescriptor();
}
public org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState getDefaultInstanceForType() {
return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.getDefaultInstance();
}
public org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState build() {
org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState result = new org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.balancerOn_ = balancerOn_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState) {
return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.getDefaultInstance()) return this;
if (other.hasBalancerOn()) {
setBalancerOn(other.getBalancerOn());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
case 8: {
bitField0_ |= 0x00000001;
balancerOn_ = input.readBool();
break;
}
}
}
}
private int bitField0_;
// optional bool balancerOn = 1;
private boolean balancerOn_ ;
public boolean hasBalancerOn() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public boolean getBalancerOn() {
return balancerOn_;
}
public Builder setBalancerOn(boolean value) {
bitField0_ |= 0x00000001;
balancerOn_ = value;
onChanged();
return this;
}
public Builder clearBalancerOn() {
bitField0_ = (bitField0_ & ~0x00000001);
balancerOn_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:LoadBalancerState)
}
static {
defaultInstance = new LoadBalancerState(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:LoadBalancerState)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_LoadBalancerState_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_LoadBalancerState_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\022LoadBalancer.proto\"\'\n\021LoadBalancerStat" +
"e\022\022\n\nbalancerOn\030\001 \001(\010BE\n*org.apache.hado" +
"op.hbase.protobuf.generatedB\022LoadBalance" +
"rProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_LoadBalancerState_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_LoadBalancerState_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_LoadBalancerState_descriptor,
new java.lang.String[] { "BalancerOn", },
org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.class,
org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.Builder.class);
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Tracks the load balancer switch up in ZK
*/
@InterfaceAudience.Private
public class LoadBalancerTracker extends ZooKeeperNodeTracker {
private static final Log LOG = LogFactory.getLog(LoadBalancerTracker.class);
public LoadBalancerTracker(ZooKeeperWatcher watcher,
Abortable abortable) {
super(watcher, watcher.balancerZNode, abortable);
}
/**
* Return true if the balance switch is on, false otherwise
*/
public boolean isBalancerOn() {
byte [] upData = super.getData(false);
try {
// is data in ZK is null, use default of on.
return upData == null || parseFrom(upData).getBalancerOn();
} catch (DeserializationException dex) {
LOG.error("ZK state for LoadBalancer could not be parsed " + Bytes.toStringBinary(upData));
// return false to be safe.
return false;
}
}
/**
* Set the balancer on/off
* @param balancerOn
* @throws KeeperException
*/
public void setBalancerOn(boolean balancerOn) throws KeeperException {
byte [] upData = toByteArray(balancerOn);
try {
ZKUtil.createAndWatch(watcher, watcher.balancerZNode, upData);
} catch(KeeperException.NodeExistsException nee) {
ZKUtil.setData(watcher, watcher.balancerZNode, upData);
}
}
private byte [] toByteArray(boolean isBalancerOn) {
LoadBalancerProtos.LoadBalancerState.Builder builder =
LoadBalancerProtos.LoadBalancerState.newBuilder();
builder.setBalancerOn(isBalancerOn);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
private LoadBalancerProtos.LoadBalancerState parseFrom(byte [] pbBytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(pbBytes);
LoadBalancerProtos.LoadBalancerState.Builder builder =
LoadBalancerProtos.LoadBalancerState.newBuilder();
try {
int magicLen = ProtobufUtil.lengthOfPBMagic();
builder.mergeFrom(pbBytes, magicLen, pbBytes.length - magicLen);
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return builder.build();
}
}

View File

@ -101,6 +101,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public String clusterIdZNode; public String clusterIdZNode;
// znode used for log splitting work assignment // znode used for log splitting work assignment
public String splitLogZNode; public String splitLogZNode;
// znode containing the state of the load balancer
public String balancerZNode;
// Certain ZooKeeper nodes need to be world-readable // Certain ZooKeeper nodes need to be world-readable
public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE = public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
@ -211,6 +213,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
conf.get("zookeeper.znode.clusterId", "hbaseid")); conf.get("zookeeper.znode.clusterId", "hbaseid"));
splitLogZNode = ZKUtil.joinZNode(baseZNode, splitLogZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
balancerZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.balancer", "balancer"));
} }
/** /**

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers to represent the state of the load balancer.
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "LoadBalancerProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message LoadBalancerState {
optional bool balancerOn = 1;
}

View File

@ -1021,6 +1021,83 @@ public class TestMasterFailover {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
/**
* return the index of the active master in the cluster
* @throws MasterNotRunningException if no active master found
*/
private int getActiveMasterIndex(MiniHBaseCluster cluster) throws MasterNotRunningException {
// get all the master threads
List<MasterThread> masterThreads = cluster.getMasterThreads();
for (int i = 0; i < masterThreads.size(); i++) {
if (masterThreads.get(i).getMaster().isActiveMaster()) {
return i;
}
}
throw new MasterNotRunningException();
}
/**
* Kill the master and wait for a new active master to show up
* @param cluster
* @return the new active master
* @throws InterruptedException
* @throws MasterNotRunningException
*/
private HMaster killActiveAndWaitForNewActive(MiniHBaseCluster cluster)
throws InterruptedException, MasterNotRunningException {
int activeIndex = getActiveMasterIndex(cluster);
HMaster active = cluster.getMaster();
cluster.stopMaster(activeIndex);
cluster.waitOnMaster(activeIndex);
assertTrue(cluster.waitForActiveAndReadyMaster());
// double check this is actually a new master
HMaster newActive = cluster.getMaster();
assertFalse(active == newActive);
return newActive;
}
/**
* Test that if the master fails, the load balancer maintains its
* state (running or not) when the next master takes over
* @throws Exception
*/
@Test (timeout=240000)
public void testMasterFailoverBalancerPersistence() throws Exception {
final int NUM_MASTERS = 3;
final int NUM_RS = 1;
// Start the cluster
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
assertTrue(cluster.waitForActiveAndReadyMaster());
HMaster active = cluster.getMaster();
// check that the balancer is on by default for the active master
ClusterStatus clusterStatus = active.getClusterStatus();
assertTrue(clusterStatus.isBalancerOn());
active = killActiveAndWaitForNewActive(cluster);
// ensure the load balancer is still running on new master
clusterStatus = active.getClusterStatus();
assertTrue(clusterStatus.isBalancerOn());
// turn off the load balancer
active.balanceSwitch(false);
// once more, kill active master and wait for new active master to show up
active = killActiveAndWaitForNewActive(cluster);
// ensure the load balancer is not running on the new master
clusterStatus = active.getClusterStatus();
assertFalse(clusterStatus.isBalancerOn());
// Stop the cluster
TEST_UTIL.shutdownMiniCluster();
}
@org.junit.Rule @org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =