From c9d3cb1c3b654776cfd86a9b5322b8c86c1acaa3 Mon Sep 17 00:00:00 2001 From: gchanan Date: Thu, 13 Sep 2012 23:00:46 +0000 Subject: [PATCH] HBASE-6260 balancer state should be stored in ZK git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1384593 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/master/HMaster.java | 43 +- .../generated/LoadBalancerProtos.java | 424 ++++++++++++++++++ .../hbase/zookeeper/LoadBalancerTracker.java | 93 ++++ .../hbase/zookeeper/ZooKeeperWatcher.java | 4 + .../src/main/protobuf/LoadBalancer.proto | 28 ++ .../hbase/master/TestMasterFailover.java | 77 ++++ 6 files changed, 653 insertions(+), 16 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/LoadBalancerProtos.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/LoadBalancerTracker.java create mode 100644 hbase-server/src/main/protobuf/LoadBalancer.proto diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index a3956d76f07..9e69eae1b49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -177,6 +177,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker; +import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -232,6 +233,8 @@ Server { private RegionServerTracker regionServerTracker; // Draining region server tracker private DrainingServerTracker drainingServerTracker; + // Tracker for load balancer state + private LoadBalancerTracker loadBalancerTracker; // RPC server for the HMaster private final RpcServer rpcServer; @@ -281,8 +284,6 @@ Server { private LoadBalancer balancer; private Thread balancerChore; - // If 'true', the balancer is 'on'. If 'false', the balancer will not run. - private volatile boolean balanceSwitch = true; private CatalogJanitor catalogJanitorChore; private LogCleaner logCleaner; @@ -516,6 +517,8 @@ Server { this.catalogTracker.start(); this.balancer = LoadBalancerFactory.getLoadBalancer(conf); + this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); + this.loadBalancerTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, this.catalogTracker, this.balancer, this.executorService, this.metrics); zooKeeper.registerListenerFirst(assignmentManager); @@ -1250,7 +1253,7 @@ Server { return false; } // If balance not true, don't run balancer. - if (!this.balanceSwitch) return false; + if (!this.loadBalancerTracker.isBalancerOn()) return false; // Do this call outside of synchronized block. int maximumBalanceTime = getBalancerCutoffTime(); long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; @@ -1339,19 +1342,23 @@ Server { * @param mode BalanceSwitchMode * @return old balancer switch */ - public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) { - boolean oldValue = this.balanceSwitch; + public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException { + boolean oldValue = this.loadBalancerTracker.isBalancerOn(); boolean newValue = b; try { if (this.cpHost != null) { newValue = this.cpHost.preBalanceSwitch(newValue); } - if (mode == BalanceSwitchMode.SYNC) { - synchronized (this.balancer) { - this.balanceSwitch = newValue; + try { + if (mode == BalanceSwitchMode.SYNC) { + synchronized (this.balancer) { + this.loadBalancerTracker.setBalancerOn(newValue); + } + } else { + this.loadBalancerTracker.setBalancerOn(newValue); } - } else { - this.balanceSwitch = newValue; + } catch (KeeperException ke) { + throw new IOException(ke); } LOG.info("BalanceSwitch=" + newValue); if (this.cpHost != null) { @@ -1363,20 +1370,24 @@ Server { return oldValue; } - public boolean synchronousBalanceSwitch(final boolean b) { + public boolean synchronousBalanceSwitch(final boolean b) throws IOException { return switchBalancer(b, BalanceSwitchMode.SYNC); } - public boolean balanceSwitch(final boolean b) { + public boolean balanceSwitch(final boolean b) throws IOException { return switchBalancer(b, BalanceSwitchMode.ASYNC); } @Override public SetBalancerRunningResponse setBalancerRunning( RpcController controller, SetBalancerRunningRequest req) throws ServiceException { - boolean prevValue = (req.getSynchronous())? - synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn()); - return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build(); + try { + boolean prevValue = (req.getSynchronous())? + synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn()); + return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } } /** @@ -1815,7 +1826,7 @@ Server { this.serverName, backupMasters, this.assignmentManager.getRegionStates().getRegionsInTransition(), - this.getCoprocessors(), this.balanceSwitch); + this.getCoprocessors(), this.loadBalancerTracker.isBalancerOn()); } public String getClusterId() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/LoadBalancerProtos.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/LoadBalancerProtos.java new file mode 100644 index 00000000000..48243e64557 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/LoadBalancerProtos.java @@ -0,0 +1,424 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: LoadBalancer.proto + +package org.apache.hadoop.hbase.protobuf.generated; + +public final class LoadBalancerProtos { + private LoadBalancerProtos() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface LoadBalancerStateOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional bool balancerOn = 1; + boolean hasBalancerOn(); + boolean getBalancerOn(); + } + public static final class LoadBalancerState extends + com.google.protobuf.GeneratedMessage + implements LoadBalancerStateOrBuilder { + // Use LoadBalancerState.newBuilder() to construct. + private LoadBalancerState(Builder builder) { + super(builder); + } + private LoadBalancerState(boolean noInit) {} + + private static final LoadBalancerState defaultInstance; + public static LoadBalancerState getDefaultInstance() { + return defaultInstance; + } + + public LoadBalancerState getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.internal_static_LoadBalancerState_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.internal_static_LoadBalancerState_fieldAccessorTable; + } + + private int bitField0_; + // optional bool balancerOn = 1; + public static final int BALANCERON_FIELD_NUMBER = 1; + private boolean balancerOn_; + public boolean hasBalancerOn() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public boolean getBalancerOn() { + return balancerOn_; + } + + private void initFields() { + balancerOn_ = false; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBool(1, balancerOn_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(1, balancerOn_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState other = (org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState) obj; + + boolean result = true; + result = result && (hasBalancerOn() == other.hasBalancerOn()); + if (hasBalancerOn()) { + result = result && (getBalancerOn() + == other.getBalancerOn()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasBalancerOn()) { + hash = (37 * hash) + BALANCERON_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getBalancerOn()); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerStateOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.internal_static_LoadBalancerState_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.internal_static_LoadBalancerState_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + balancerOn_ = false; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState build() { + org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState result = new org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.balancerOn_ = balancerOn_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.getDefaultInstance()) return this; + if (other.hasBalancerOn()) { + setBalancerOn(other.getBalancerOn()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + balancerOn_ = input.readBool(); + break; + } + } + } + } + + private int bitField0_; + + // optional bool balancerOn = 1; + private boolean balancerOn_ ; + public boolean hasBalancerOn() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public boolean getBalancerOn() { + return balancerOn_; + } + public Builder setBalancerOn(boolean value) { + bitField0_ |= 0x00000001; + balancerOn_ = value; + onChanged(); + return this; + } + public Builder clearBalancerOn() { + bitField0_ = (bitField0_ & ~0x00000001); + balancerOn_ = false; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:LoadBalancerState) + } + + static { + defaultInstance = new LoadBalancerState(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:LoadBalancerState) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_LoadBalancerState_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_LoadBalancerState_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\022LoadBalancer.proto\"\'\n\021LoadBalancerStat" + + "e\022\022\n\nbalancerOn\030\001 \001(\010BE\n*org.apache.hado" + + "op.hbase.protobuf.generatedB\022LoadBalance" + + "rProtosH\001\240\001\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_LoadBalancerState_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_LoadBalancerState_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_LoadBalancerState_descriptor, + new java.lang.String[] { "BalancerOn", }, + org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.class, + org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos.LoadBalancerState.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/LoadBalancerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/LoadBalancerTracker.java new file mode 100644 index 00000000000..e76d6903a0e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/LoadBalancerTracker.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.LoadBalancerProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Tracks the load balancer switch up in ZK + */ +@InterfaceAudience.Private +public class LoadBalancerTracker extends ZooKeeperNodeTracker { + private static final Log LOG = LogFactory.getLog(LoadBalancerTracker.class); + + public LoadBalancerTracker(ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, watcher.balancerZNode, abortable); + } + + /** + * Return true if the balance switch is on, false otherwise + */ + public boolean isBalancerOn() { + byte [] upData = super.getData(false); + try { + // is data in ZK is null, use default of on. + return upData == null || parseFrom(upData).getBalancerOn(); + } catch (DeserializationException dex) { + LOG.error("ZK state for LoadBalancer could not be parsed " + Bytes.toStringBinary(upData)); + // return false to be safe. + return false; + } + } + + /** + * Set the balancer on/off + * @param balancerOn + * @throws KeeperException + */ + public void setBalancerOn(boolean balancerOn) throws KeeperException { + byte [] upData = toByteArray(balancerOn); + try { + ZKUtil.createAndWatch(watcher, watcher.balancerZNode, upData); + } catch(KeeperException.NodeExistsException nee) { + ZKUtil.setData(watcher, watcher.balancerZNode, upData); + } + } + + private byte [] toByteArray(boolean isBalancerOn) { + LoadBalancerProtos.LoadBalancerState.Builder builder = + LoadBalancerProtos.LoadBalancerState.newBuilder(); + builder.setBalancerOn(isBalancerOn); + return ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + } + + private LoadBalancerProtos.LoadBalancerState parseFrom(byte [] pbBytes) + throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(pbBytes); + LoadBalancerProtos.LoadBalancerState.Builder builder = + LoadBalancerProtos.LoadBalancerState.newBuilder(); + try { + int magicLen = ProtobufUtil.lengthOfPBMagic(); + builder.mergeFrom(pbBytes, magicLen, pbBytes.length - magicLen); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + return builder.build(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 568b9947337..128a0d9f79c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -101,6 +101,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String clusterIdZNode; // znode used for log splitting work assignment public String splitLogZNode; + // znode containing the state of the load balancer + public String balancerZNode; // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = @@ -211,6 +213,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { conf.get("zookeeper.znode.clusterId", "hbaseid")); splitLogZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); + balancerZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.balancer", "balancer")); } /** diff --git a/hbase-server/src/main/protobuf/LoadBalancer.proto b/hbase-server/src/main/protobuf/LoadBalancer.proto new file mode 100644 index 00000000000..df1c04965b2 --- /dev/null +++ b/hbase-server/src/main/protobuf/LoadBalancer.proto @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file contains protocol buffers to represent the state of the load balancer. + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "LoadBalancerProtos"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message LoadBalancerState { + optional bool balancerOn = 1; +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index d8b893dba3d..63d96a32d8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -1021,6 +1021,83 @@ public class TestMasterFailover { TEST_UTIL.shutdownMiniCluster(); } + /** + * return the index of the active master in the cluster + * @throws MasterNotRunningException if no active master found + */ + private int getActiveMasterIndex(MiniHBaseCluster cluster) throws MasterNotRunningException { + // get all the master threads + List masterThreads = cluster.getMasterThreads(); + + for (int i = 0; i < masterThreads.size(); i++) { + if (masterThreads.get(i).getMaster().isActiveMaster()) { + return i; + } + } + throw new MasterNotRunningException(); + } + + /** + * Kill the master and wait for a new active master to show up + * @param cluster + * @return the new active master + * @throws InterruptedException + * @throws MasterNotRunningException + */ + private HMaster killActiveAndWaitForNewActive(MiniHBaseCluster cluster) + throws InterruptedException, MasterNotRunningException { + int activeIndex = getActiveMasterIndex(cluster); + HMaster active = cluster.getMaster(); + cluster.stopMaster(activeIndex); + cluster.waitOnMaster(activeIndex); + assertTrue(cluster.waitForActiveAndReadyMaster()); + // double check this is actually a new master + HMaster newActive = cluster.getMaster(); + assertFalse(active == newActive); + return newActive; + } + + /** + * Test that if the master fails, the load balancer maintains its + * state (running or not) when the next master takes over + * @throws Exception + */ + @Test (timeout=240000) + public void testMasterFailoverBalancerPersistence() throws Exception { + final int NUM_MASTERS = 3; + final int NUM_RS = 1; + + // Start the cluster + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + + assertTrue(cluster.waitForActiveAndReadyMaster()); + HMaster active = cluster.getMaster(); + // check that the balancer is on by default for the active master + ClusterStatus clusterStatus = active.getClusterStatus(); + assertTrue(clusterStatus.isBalancerOn()); + + active = killActiveAndWaitForNewActive(cluster); + + // ensure the load balancer is still running on new master + clusterStatus = active.getClusterStatus(); + assertTrue(clusterStatus.isBalancerOn()); + + // turn off the load balancer + active.balanceSwitch(false); + + // once more, kill active master and wait for new active master to show up + active = killActiveAndWaitForNewActive(cluster); + + // ensure the load balancer is not running on the new master + clusterStatus = active.getClusterStatus(); + assertFalse(clusterStatus.isBalancerOn()); + + // Stop the cluster + TEST_UTIL.shutdownMiniCluster(); + } @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =