HBASE-27514 Move some persistent states from zookeeper to master region (#4925)

Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Signed-off-by: Peter Somogyi <psomogyi@apache.org>
This commit is contained in:
Duo Zhang 2022-12-22 12:04:24 +08:00 committed by GitHub
parent 3b714a3d8f
commit dcfde79f7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 991 additions and 509 deletions

View File

@ -69,10 +69,25 @@ public class ZNodePaths {
// znode used for log splitting work assignment
public final String splitLogZNode;
// znode containing the state of the load balancer
/**
* @deprecated Since 2.6.0, will be removed in 4.0.0. We use master local region to store this
* state.
*/
@Deprecated
public final String balancerZNode;
// znode containing the state of region normalizer
/**
* @deprecated Since 2.6.0, will be removed in 4.0.0. We use master local region to store this
* state.
*/
@Deprecated
public final String regionNormalizerZNode;
// znode containing the state of all switches, currently there are split and merge child node.
/**
* @deprecated Since 2.6.0, will be removed in 4.0.0. We use master local region to store this
* state.
*/
@Deprecated
public final String switchZNode;
// znode of indicating master maintenance mode
public final String masterMaintZNode;
@ -86,7 +101,12 @@ public class ZNodePaths {
// znode containing queues of hfile references to be replicated
public final String hfileRefsZNode;
// znode containing the state of the snapshot auto-cleanup
final String snapshotCleanupZNode;
/**
* @deprecated Since 2.6.0, will be removed in 4.0.0. We use master local region to store this
* state.
*/
@Deprecated
public final String snapshotCleanupZNode;
public ZNodePaths(Configuration conf) {
baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* Store a boolean state.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UG_SYNC_SET_UNSYNC_GET",
justification = "the flag is volatile")
@InterfaceAudience.Private
public abstract class BooleanStateStore extends MasterStateStore {
private volatile boolean on;
protected BooleanStateStore(MasterRegion masterRegion, String stateName, ZKWatcher watcher,
String zkPath) throws IOException, KeeperException, DeserializationException {
super(masterRegion, stateName, watcher, zkPath);
byte[] state = getState();
this.on = state == null || parseFrom(state);
}
/**
* Returns true if the flag is on, otherwise false
*/
public boolean get() {
return on;
}
/**
* Set the flag on/off.
* @param on true if the flag should be on, false otherwise
* @throws IOException if the operation fails
*/
public synchronized void set(boolean on) throws IOException {
byte[] state = toByteArray(on);
setState(state);
this.on = on;
}
protected abstract byte[] toByteArray(boolean on);
protected abstract boolean parseFrom(byte[] bytes) throws DeserializationException;
}

View File

@ -109,6 +109,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
@ -129,6 +130,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerStateStore;
import org.apache.hadoop.hbase.master.balancer.MaintenanceLoadBalancer;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
@ -145,6 +147,7 @@ import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerStateStore;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
@ -174,6 +177,7 @@ import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManage
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotCleanupStateStore;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
@ -246,11 +250,8 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.util.TableDescriptorChecker;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -306,17 +307,17 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
// Draining region server tracker
private DrainingServerTracker drainingServerTracker;
// Tracker for load balancer state
LoadBalancerTracker loadBalancerTracker;
LoadBalancerStateStore loadBalancerStateStore;
// Tracker for meta location, if any client ZK quorum specified
private MetaLocationSyncer metaLocationSyncer;
// Tracker for active master location, if any client ZK quorum specified
@InterfaceAudience.Private
MasterAddressSyncer masterAddressSyncer;
// Tracker for auto snapshot cleanup state
SnapshotCleanupTracker snapshotCleanupTracker;
SnapshotCleanupStateStore snapshotCleanupStateStore;
// Tracker for split and merge state
private SplitOrMergeTracker splitOrMergeTracker;
private SplitOrMergeStateStore splitOrMergeStateStore;
private ClusterSchemaService clusterSchemaService;
@ -750,7 +751,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
* should have already been initialized along with {@link ServerManager}.
*/
private void initializeZKBasedSystemTrackers()
throws IOException, KeeperException, ReplicationException {
throws IOException, KeeperException, ReplicationException, DeserializationException {
if (maintenanceMode) {
// in maintenance mode, always use MaintenanceLoadBalancer.
conf.unset(LoadBalancer.HBASE_RSGROUP_LOADBALANCER_CLASS);
@ -758,16 +759,14 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
LoadBalancer.class);
}
this.balancer = new RSGroupBasedLoadBalancer();
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
this.loadBalancerStateStore = new LoadBalancerStateStore(masterRegion, zooKeeper);
this.regionNormalizerManager =
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
RegionNormalizerFactory.createNormalizerManager(conf, masterRegion, zooKeeper, this);
this.configurationManager.registerObserver(regionNormalizerManager);
this.regionNormalizerManager.start();
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();
this.splitOrMergeStateStore = new SplitOrMergeStateStore(masterRegion, zooKeeper, conf);
// This is for backwards compatible. We do not need the CP for rs group now but if user want to
// load it, we need to enable rs group.
@ -787,8 +786,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();
this.snapshotCleanupTracker = new SnapshotCleanupTracker(zooKeeper, this);
this.snapshotCleanupTracker.start();
this.snapshotCleanupStateStore = new SnapshotCleanupStateStore(masterRegion, zooKeeper);
String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
@ -910,8 +908,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
* Notice that now we will not schedule a special procedure to make meta online(unless the first
* time where meta has not been created yet), we will rely on SCP to bring meta online.
*/
private void finishActiveMasterInitialization(MonitoredTask status)
throws IOException, InterruptedException, KeeperException, ReplicationException {
private void finishActiveMasterInitialization(MonitoredTask status) throws IOException,
InterruptedException, KeeperException, ReplicationException, DeserializationException {
/*
* We are active master now... go initialize components we need to run.
*/
@ -1640,7 +1638,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager);
getChoreService().scheduleChore(replicationBarrierCleaner);
final boolean isSnapshotChoreEnabled = this.snapshotCleanupTracker.isSnapshotCleanupEnabled();
final boolean isSnapshotChoreEnabled = this.snapshotCleanupStateStore.get();
this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
if (isSnapshotChoreEnabled) {
getChoreService().scheduleChore(this.snapshotCleanerChore);
@ -1762,7 +1760,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
* Turn on/off Snapshot Cleanup Chore
* @param on indicates whether Snapshot Cleanup Chore is to be run
*/
void switchSnapshotCleanup(final boolean on, final boolean synchronous) {
void switchSnapshotCleanup(final boolean on, final boolean synchronous) throws IOException {
if (synchronous) {
synchronized (this.snapshotCleanerChore) {
switchSnapshotCleanup(on);
@ -1772,16 +1770,12 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
}
}
private void switchSnapshotCleanup(final boolean on) {
try {
snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
if (on) {
getChoreService().scheduleChore(this.snapshotCleanerChore);
} else {
this.snapshotCleanerChore.cancel();
}
} catch (KeeperException e) {
LOG.error("Error updating snapshot cleanup mode to {}", on, e);
private void switchSnapshotCleanup(final boolean on) throws IOException {
snapshotCleanupStateStore.set(on);
if (on) {
getChoreService().scheduleChore(this.snapshotCleanerChore);
} else {
this.snapshotCleanerChore.cancel();
}
}
@ -1955,9 +1949,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
BalanceResponse.Builder responseBuilder = BalanceResponse.newBuilder();
if (
loadBalancerTracker == null || !(loadBalancerTracker.isBalancerOn() || request.isDryRun())
) {
if (loadBalancerStateStore == null || !(loadBalancerStateStore.get() || request.isDryRun())) {
return responseBuilder.build();
}
@ -2889,8 +2881,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
break;
}
case BALANCER_ON: {
if (loadBalancerTracker != null) {
builder.setBalancerOn(loadBalancerTracker.isBalancerOn());
if (loadBalancerStateStore != null) {
builder.setBalancerOn(loadBalancerStateStore.get());
}
break;
}
@ -3727,17 +3719,16 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
}
/**
* Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized, false
* is returned.
* Queries the state of the {@link LoadBalancerStateStore}. If the balancer is not initialized,
* false is returned.
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
public boolean isBalancerOn() {
return !isInMaintenanceMode() && loadBalancerTracker != null
&& loadBalancerTracker.isBalancerOn();
return !isInMaintenanceMode() && loadBalancerStateStore != null && loadBalancerStateStore.get();
}
/**
* Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized, false is
* Queries the state of the {@link RegionNormalizerStateStore}. If it's not initialized, false is
* returned.
*/
public boolean isNormalizerOn() {
@ -3745,15 +3736,15 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
}
/**
* Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized, false is
* Queries the state of the {@link SplitOrMergeStateStore}. If it is not initialized, false is
* returned. If switchType is illegal, false will return.
* @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
* @return The state of the switch
*/
@Override
public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
return !isInMaintenanceMode() && splitOrMergeTracker != null
&& splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
return !isInMaintenanceMode() && splitOrMergeStateStore != null
&& splitOrMergeStateStore.isSplitOrMergeEnabled(switchType);
}
/**
@ -3768,8 +3759,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
LoadBalancerFactory.getDefaultLoadBalancerClass().getName());
}
public SplitOrMergeTracker getSplitOrMergeTracker() {
return splitOrMergeTracker;
public SplitOrMergeStateStore getSplitOrMergeStateStore() {
return splitOrMergeStateStore;
}
@Override

View File

@ -121,7 +121,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -539,22 +538,18 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
* @return old balancer switch
*/
boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException {
boolean oldValue = server.loadBalancerTracker.isBalancerOn();
boolean oldValue = server.loadBalancerStateStore.get();
boolean newValue = b;
try {
if (server.cpHost != null) {
server.cpHost.preBalanceSwitch(newValue);
}
try {
if (mode == BalanceSwitchMode.SYNC) {
synchronized (server.getLoadBalancer()) {
server.loadBalancerTracker.setBalancerOn(newValue);
}
} else {
server.loadBalancerTracker.setBalancerOn(newValue);
if (mode == BalanceSwitchMode.SYNC) {
synchronized (server.getLoadBalancer()) {
server.loadBalancerStateStore.set(newValue);
}
} catch (KeeperException ke) {
throw new IOException(ke);
} else {
server.loadBalancerStateStore.set(newValue);
}
LOG.info(server.getClientIdAuditPrefix() + " set balanceSwitch=" + newValue);
if (server.cpHost != null) {
@ -1648,8 +1643,7 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
IsSnapshotCleanupEnabledRequest request) throws ServiceException {
try {
server.checkInitialized();
final boolean isSnapshotCleanupEnabled =
server.snapshotCleanupTracker.isSnapshotCleanupEnabled();
final boolean isSnapshotCleanupEnabled = server.snapshotCleanupStateStore.get();
return IsSnapshotCleanupEnabledResponse.newBuilder().setEnabled(isSnapshotCleanupEnabled)
.build();
} catch (IOException e) {
@ -1665,8 +1659,8 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
* @return previous snapshot auto-cleanup mode
*/
private synchronized boolean switchSnapshotCleanup(final boolean enabledNewVal,
final boolean synchronous) {
final boolean oldValue = server.snapshotCleanupTracker.isSnapshotCleanupEnabled();
final boolean synchronous) throws IOException {
final boolean oldValue = server.snapshotCleanupStateStore.get();
server.switchSnapshotCleanup(enabledNewVal, synchronous);
LOG.info("{} Successfully set snapshot cleanup to {}", server.getClientIdAuditPrefix(),
enabledNewVal);
@ -1901,12 +1895,12 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
if (server.cpHost != null) {
server.cpHost.preSetSplitOrMergeEnabled(newValue, switchType);
}
server.getSplitOrMergeTracker().setSplitOrMergeEnabled(newValue, switchType);
server.getSplitOrMergeStateStore().setSplitOrMergeEnabled(newValue, switchType);
if (server.cpHost != null) {
server.cpHost.postSetSplitOrMergeEnabled(newValue, switchType);
}
}
} catch (IOException | KeeperException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
return response.build();
@ -1956,7 +1950,11 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
// only one process with the authority to modify the value.
final boolean prevValue = server.getRegionNormalizerManager().isNormalizerOn();
final boolean newValue = request.getOn();
server.getRegionNormalizerManager().setNormalizerOn(newValue);
try {
server.getRegionNormalizerManager().setNormalizerOn(newValue);
} catch (IOException e) {
throw new ServiceException(e);
}
LOG.info("{} set normalizerSwitch={}", server.getClientIdAuditPrefix(), newValue);
return SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build();
}

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
@ -434,8 +433,8 @@ public interface MasterServices extends Server {
RSGroupInfoManager getRSGroupInfoManager();
/**
* Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized, false
* is returned.
* Queries the state of the {@code LoadBalancerStateStore}. If the balancer is not initialized,
* false is returned.
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
boolean isBalancerOn();

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.STATE_FAMILY;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A state storage which stores the state in master local region.
* <p/>
* We used to store some persistent state on zookeeper, so here we provide the ability to migrate
* the state from zookeeper.
* <p/>
* Since parsing the state may introduce some extra overhead, we make this class abstract and the
* get and set state methods protected. Sub classes should store their state in decoded format to
* save the extra parsing overhead.
*/
@InterfaceAudience.Private
public abstract class MasterStateStore {
private static final Logger LOG = LoggerFactory.getLogger(MasterStateStore.class);
private static final byte[] QUALIFIER = Bytes.toBytes("d");
private final MasterRegion masterRegion;
private final byte[] stateName;
protected MasterStateStore(MasterRegion masterRegion, String stateName, ZKWatcher watcher,
String zkPath) throws IOException, KeeperException {
this.masterRegion = masterRegion;
this.stateName = Bytes.toBytes(stateName);
tryMigrate(watcher, zkPath);
}
protected final byte[] getState() throws IOException {
return get().getValue(STATE_FAMILY, QUALIFIER);
}
protected final void setState(byte[] state) throws IOException {
update(state);
}
private Result get() throws IOException {
return masterRegion.get(new Get(stateName).addColumn(STATE_FAMILY, QUALIFIER));
}
private void update(byte[] s) throws IOException {
masterRegion.update(r -> r.put(new Put(stateName).addColumn(STATE_FAMILY, QUALIFIER, s)));
}
private byte[] migrate(ZKWatcher watcher, String zkPath) throws KeeperException, IOException {
byte[] zkData = ZKUtil.getDataNoWatch(watcher, zkPath, null);
if (zkData == null || zkData.length == 0) {
return null;
}
update(zkData);
return zkData;
}
private void tryMigrate(ZKWatcher watcher, String zkPath) throws IOException, KeeperException {
Result result = get();
if (result.isEmpty()) {
// migrate
migrate(watcher, zkPath);
}
// we may fail in the middle so even if the value is available in master local region, we may
// still leave a znode on zookeeper, so always try to delete the znode here since it is not very
// expensive
try {
ZKUtil.deleteNodeFailSilent(watcher, zkPath);
} catch (Exception e) {
LOG.warn("failed to delete migrated zk state node {}, ignore and continue", zkPath);
}
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
/**
* Tracks the switch of split and merge states.
*/
@InterfaceAudience.Private
public class SplitOrMergeStateStore {
private static final String SPLIT_STATE_NAME = "split_enabled";
private static final String MERGE_STATE_NAME = "merge_enabled";
private SwitchStateStore splitStateStore;
private SwitchStateStore mergeStateStore;
public SplitOrMergeStateStore(MasterRegion masterRegion, ZKWatcher watcher, Configuration conf)
throws IOException, KeeperException, DeserializationException {
@SuppressWarnings("deprecation")
String splitZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
conf.get("zookeeper.znode.switch.split", "split"));
@SuppressWarnings("deprecation")
String mergeZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
conf.get("zookeeper.znode.switch.merge", "merge"));
splitStateStore = new SwitchStateStore(masterRegion, SPLIT_STATE_NAME, watcher, splitZnode);
mergeStateStore = new SwitchStateStore(masterRegion, MERGE_STATE_NAME, watcher, mergeZnode);
}
public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
switch (switchType) {
case SPLIT:
return splitStateStore.get();
case MERGE:
return mergeStateStore.get();
default:
break;
}
return false;
}
public void setSplitOrMergeEnabled(boolean enabled, MasterSwitchType switchType)
throws IOException {
switch (switchType) {
case SPLIT:
splitStateStore.set(enabled);
break;
case MERGE:
mergeStateStore.set(enabled);
break;
default:
break;
}
}
private static final class SwitchStateStore extends BooleanStateStore {
public SwitchStateStore(MasterRegion masterRegion, String stateName, ZKWatcher watcher,
String zkPath) throws IOException, KeeperException, DeserializationException {
super(masterRegion, stateName, watcher, zkPath);
}
@Override
protected byte[] toByteArray(boolean enabled) {
ZooKeeperProtos.SwitchState.Builder builder = ZooKeeperProtos.SwitchState.newBuilder();
builder.setEnabled(enabled);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
@Override
protected boolean parseFrom(byte[] bytes) throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
ZooKeeperProtos.SwitchState.Builder builder = ZooKeeperProtos.SwitchState.newBuilder();
try {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, bytes, magicLen, bytes.length - magicLen);
} catch (IOException e) {
throw new DeserializationException(e);
}
return builder.build().getEnabled();
}
}
}

View File

@ -1,149 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SwitchState;
/**
* Tracks the switch of split and merge states in ZK
*/
@InterfaceAudience.Private
public class SplitOrMergeTracker {
private String splitZnode;
private String mergeZnode;
private SwitchStateTracker splitStateTracker;
private SwitchStateTracker mergeStateTracker;
public SplitOrMergeTracker(ZKWatcher watcher, Configuration conf, Abortable abortable) {
try {
if (ZKUtil.checkExists(watcher, watcher.getZNodePaths().switchZNode) < 0) {
ZKUtil.createAndFailSilent(watcher, watcher.getZNodePaths().switchZNode);
}
} catch (KeeperException e) {
throw new RuntimeException(e);
}
splitZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
conf.get("zookeeper.znode.switch.split", "split"));
mergeZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
conf.get("zookeeper.znode.switch.merge", "merge"));
splitStateTracker = new SwitchStateTracker(watcher, splitZnode, abortable);
mergeStateTracker = new SwitchStateTracker(watcher, mergeZnode, abortable);
}
public void start() {
splitStateTracker.start();
mergeStateTracker.start();
}
public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
switch (switchType) {
case SPLIT:
return splitStateTracker.isSwitchEnabled();
case MERGE:
return mergeStateTracker.isSwitchEnabled();
default:
break;
}
return false;
}
public void setSplitOrMergeEnabled(boolean enabled, MasterSwitchType switchType)
throws KeeperException {
switch (switchType) {
case SPLIT:
splitStateTracker.setSwitchEnabled(enabled);
break;
case MERGE:
mergeStateTracker.setSwitchEnabled(enabled);
break;
default:
break;
}
}
private static class SwitchStateTracker extends ZKNodeTracker {
public SwitchStateTracker(ZKWatcher watcher, String node, Abortable abortable) {
super(watcher, node, abortable);
}
/**
* Return true if the switch is on, false otherwise
*/
public boolean isSwitchEnabled() {
byte[] upData = super.getData(false);
try {
// if data in ZK is null, use default of on.
return upData == null || parseFrom(upData).getEnabled();
} catch (DeserializationException dex) {
LOG.error("ZK state for LoadBalancer could not be parsed " + Bytes.toStringBinary(upData));
// return false to be safe.
return false;
}
}
/**
* Set the switch on/off
* @param enabled switch enabled or not?
* @throws KeeperException keepException will be thrown out
*/
public void setSwitchEnabled(boolean enabled) throws KeeperException {
byte[] upData = toByteArray(enabled);
try {
ZKUtil.setData(watcher, node, upData);
} catch (KeeperException.NoNodeException nne) {
ZKUtil.createAndWatch(watcher, node, upData);
}
super.nodeDataChanged(node);
}
private byte[] toByteArray(boolean enabled) {
SwitchState.Builder builder = SwitchState.newBuilder();
builder.setEnabled(enabled);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
private SwitchState parseFrom(byte[] bytes) throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
SwitchState.Builder builder = SwitchState.newBuilder();
try {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, bytes, magicLen, bytes.length - magicLen);
} catch (IOException e) {
throw new DeserializationException(e);
}
return builder.build();
}
}
}

View File

@ -15,71 +15,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
package org.apache.hadoop.hbase.master.balancer;
import java.io.IOException;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.master.BooleanStateStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LoadBalancerProtos;
/**
* Tracks the load balancer state up in ZK
* Store the balancer state.
*/
@InterfaceAudience.Private
public class LoadBalancerTracker extends ZKNodeTracker {
private static final Logger LOG = LoggerFactory.getLogger(LoadBalancerTracker.class);
public class LoadBalancerStateStore extends BooleanStateStore {
public LoadBalancerTracker(ZKWatcher watcher, Abortable abortable) {
super(watcher, watcher.getZNodePaths().balancerZNode, abortable);
public static final String STATE_NAME = "load_balancer_on";
@SuppressWarnings("deprecation")
public LoadBalancerStateStore(MasterRegion masterRegion, ZKWatcher watcher)
throws IOException, KeeperException, DeserializationException {
super(masterRegion, STATE_NAME, watcher, watcher.getZNodePaths().balancerZNode);
}
/**
* Return true if the balance switch is on, false otherwise
*/
public boolean isBalancerOn() {
byte[] upData = super.getData(false);
try {
// if data in ZK is null, use default of on.
return upData == null || parseFrom(upData).getBalancerOn();
} catch (DeserializationException dex) {
LOG.error("ZK state for LoadBalancer could not be parsed {}", Bytes.toStringBinary(upData));
// return false to be safe.
return false;
}
}
/**
* Set the balancer on/off.
* @param balancerOn true if the balancher should be on, false otherwise
* @throws KeeperException if a ZooKeeper operation fails
*/
public void setBalancerOn(boolean balancerOn) throws KeeperException {
byte[] upData = toByteArray(balancerOn);
try {
ZKUtil.setData(watcher, watcher.getZNodePaths().balancerZNode, upData);
} catch (KeeperException.NoNodeException nne) {
ZKUtil.createAndWatch(watcher, watcher.getZNodePaths().balancerZNode, upData);
}
super.nodeDataChanged(watcher.getZNodePaths().balancerZNode);
}
private byte[] toByteArray(boolean isBalancerOn) {
@Override
protected byte[] toByteArray(boolean isBalancerOn) {
LoadBalancerProtos.LoadBalancerState.Builder builder =
LoadBalancerProtos.LoadBalancerState.newBuilder();
builder.setBalancerOn(isBalancerOn);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
private LoadBalancerProtos.LoadBalancerState parseFrom(byte[] pbBytes)
throws DeserializationException {
@Override
protected boolean parseFrom(byte[] pbBytes) throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(pbBytes);
LoadBalancerProtos.LoadBalancerState.Builder builder =
LoadBalancerProtos.LoadBalancerState.newBuilder();
@ -89,6 +61,6 @@ public class LoadBalancerTracker extends ZKNodeTracker {
} catch (IOException e) {
throw new DeserializationException(e);
}
return builder.build();
return builder.build().getBalancerOn();
}
}

View File

@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* Factory to create instance of {@link RegionNormalizer} as configured.
@ -35,12 +38,14 @@ public final class RegionNormalizerFactory {
private RegionNormalizerFactory() {
}
// TODO: consolidate this down to MasterServices
public static RegionNormalizerManager createNormalizerManager(final Configuration conf,
final ZKWatcher zkWatcher, final HMaster master // TODO: consolidate this down to MasterServices
) {
final MasterRegion masterRegion, final ZKWatcher zkWatcher, final HMaster master)
throws DeserializationException, IOException, KeeperException {
final RegionNormalizer regionNormalizer = getRegionNormalizer(conf);
regionNormalizer.setMasterServices(master);
final RegionNormalizerTracker tracker = new RegionNormalizerTracker(zkWatcher, master);
final RegionNormalizerStateStore stateStore =
new RegionNormalizerStateStore(masterRegion, zkWatcher);
final RegionNormalizerChore chore =
master.isInMaintenanceMode() ? null : new RegionNormalizerChore(master);
final RegionNormalizerWorkQueue<TableName> workQueue =
@ -48,7 +53,7 @@ public final class RegionNormalizerFactory {
final RegionNormalizerWorker worker = master.isInMaintenanceMode()
? null
: new RegionNormalizerWorker(conf, master, regionNormalizer, workQueue);
return new RegionNormalizerManager(tracker, chore, workQueue, worker);
return new RegionNormalizerManager(stateStore, chore, workQueue, worker);
}
/**

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.normalizer;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -27,9 +28,7 @@ import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,7 +41,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
public class RegionNormalizerManager implements PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
private final RegionNormalizerTracker regionNormalizerTracker;
private final RegionNormalizerStateStore regionNormalizerStateStore;
private final RegionNormalizerChore regionNormalizerChore;
private final RegionNormalizerWorkQueue<TableName> workQueue;
private final RegionNormalizerWorker worker;
@ -52,11 +51,11 @@ public class RegionNormalizerManager implements PropagatingConfigurationObserver
private boolean started = false;
private boolean stopped = false;
RegionNormalizerManager(@NonNull final RegionNormalizerTracker regionNormalizerTracker,
RegionNormalizerManager(@NonNull final RegionNormalizerStateStore regionNormalizerStateStore,
@Nullable final RegionNormalizerChore regionNormalizerChore,
@Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
@Nullable final RegionNormalizerWorker worker) {
this.regionNormalizerTracker = regionNormalizerTracker;
this.regionNormalizerStateStore = regionNormalizerStateStore;
this.regionNormalizerChore = regionNormalizerChore;
this.workQueue = workQueue;
this.worker = worker;
@ -90,7 +89,6 @@ public class RegionNormalizerManager implements PropagatingConfigurationObserver
if (started) {
return;
}
regionNormalizerTracker.start();
if (worker != null) {
// worker will be null when master is in maintenance mode.
pool.submit(worker);
@ -108,7 +106,6 @@ public class RegionNormalizerManager implements PropagatingConfigurationObserver
return;
}
pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
regionNormalizerTracker.stop();
stopped = true;
}
}
@ -121,19 +118,15 @@ public class RegionNormalizerManager implements PropagatingConfigurationObserver
* Return {@code true} if region normalizer is on, {@code false} otherwise
*/
public boolean isNormalizerOn() {
return regionNormalizerTracker.isNormalizerOn();
return regionNormalizerStateStore.get();
}
/**
* Set region normalizer on/off
* @param normalizerOn whether normalizer should be on or off
*/
public void setNormalizerOn(boolean normalizerOn) {
try {
regionNormalizerTracker.setNormalizerOn(normalizerOn);
} catch (KeeperException e) {
LOG.warn("Error flipping normalizer switch", e);
}
public void setNormalizerOn(boolean normalizerOn) throws IOException {
regionNormalizerStateStore.set(normalizerOn);
}
/**

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import java.io.IOException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.BooleanStateStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionNormalizerProtos;
/**
* Store region normalizer state.
*/
@InterfaceAudience.Private
public class RegionNormalizerStateStore extends BooleanStateStore {
public static final String STATE_NAME = "region_normalizer_on";
@SuppressWarnings("deprecation")
public RegionNormalizerStateStore(MasterRegion masterRegion, ZKWatcher watcher)
throws IOException, KeeperException, DeserializationException {
super(masterRegion, STATE_NAME, watcher, watcher.getZNodePaths().regionNormalizerZNode);
}
@Override
protected byte[] toByteArray(boolean isNormalizerOn) {
RegionNormalizerProtos.RegionNormalizerState.Builder builder =
RegionNormalizerProtos.RegionNormalizerState.newBuilder();
builder.setNormalizerOn(isNormalizerOn);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
@Override
protected boolean parseFrom(byte[] pbBytes) throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(pbBytes);
RegionNormalizerProtos.RegionNormalizerState.Builder builder =
RegionNormalizerProtos.RegionNormalizerState.newBuilder();
try {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen);
} catch (IOException e) {
throw new DeserializationException(e);
}
return builder.build().getNormalizerOn();
}
}

View File

@ -72,9 +72,9 @@
* The Region Normalizer subsystem is composed of a handful of related classes:
* <ul>
* <li>
* The {@link org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker} provides a system by
* which the Normalizer can be disabled at runtime. It currently does this by managing a znode,
* but this is an implementation detail.
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerStateStore} provides a
* system by which the Normalizer can be disabled at runtime. It currently does this by
* storing the state in master local region, but this is an implementation detail.
* </li>
* <li>
* The {@link org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorkQueue} is a

View File

@ -143,12 +143,6 @@ public final class MasterRegion {
flusherAndCompactor.onUpdate();
}
/**
* The design for master region is to only load all the data to memory at once when starting, so
* typically you should not use the get method to get a single row of data at runtime.
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public Result get(Get get) throws IOException {
return region.get(get);
}

View File

@ -89,13 +89,16 @@ public final class MasterRegionFactory {
public static final byte[] REGION_SERVER_FAMILY = Bytes.toBytes("rs");
public static final byte[] STATE_FAMILY = Bytes.toBytes("state");
private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
.setMaxVersions(HConstants.DEFAULT_HBASE_META_VERSIONS).setInMemory(true)
.setBlocksize(HConstants.DEFAULT_HBASE_META_BLOCK_SIZE).setBloomFilterType(BloomType.ROWCOL)
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(REGION_SERVER_FAMILY)).build();
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(REGION_SERVER_FAMILY))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(STATE_FAMILY)).build();
private static TableDescriptor withTrackerConfigs(Configuration conf) {
String trackerImpl = conf.get(TRACKER_IMPL, conf.get(StoreFileTrackerFactory.TRACKER_IMPL,

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.snapshot;
import java.io.IOException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.BooleanStateStore;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotCleanupProtos;
/**
* Store the snapshot cleanup enabled state.
*/
@InterfaceAudience.Private
public class SnapshotCleanupStateStore extends BooleanStateStore {
public static final String STATE_NAME = "snapshot_cleanup_enabled";
@SuppressWarnings("deprecation")
public SnapshotCleanupStateStore(MasterRegion masterRegion, ZKWatcher watcher)
throws IOException, KeeperException, DeserializationException {
super(masterRegion, STATE_NAME, watcher, watcher.getZNodePaths().snapshotCleanupZNode);
}
@Override
protected byte[] toByteArray(boolean isSnapshotCleanupEnabled) {
SnapshotCleanupProtos.SnapshotCleanupState.Builder builder =
SnapshotCleanupProtos.SnapshotCleanupState.newBuilder();
builder.setSnapshotCleanupEnabled(isSnapshotCleanupEnabled);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
@Override
protected boolean parseFrom(byte[] pbBytes) throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(pbBytes);
SnapshotCleanupProtos.SnapshotCleanupState.Builder builder =
SnapshotCleanupProtos.SnapshotCleanupState.newBuilder();
try {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen);
} catch (IOException e) {
throw new DeserializationException(e);
}
return builder.build().getSnapshotCleanupEnabled();
}
}

View File

@ -23,8 +23,6 @@ import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartTestingClusterOption;
@ -33,7 +31,6 @@ import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.junit.AfterClass;
import org.junit.Rule;
import org.slf4j.Logger;
@ -98,21 +95,7 @@ public class MetaWithReplicasTestBase {
destinationServerName);
}
// Disable the balancer
LoadBalancerTracker l =
new LoadBalancerTracker(TEST_UTIL.getZooKeeperWatcher(), new Abortable() {
AtomicBoolean aborted = new AtomicBoolean(false);
@Override
public boolean isAborted() {
return aborted.get();
}
@Override
public void abort(String why, Throwable e) {
aborted.set(true);
}
});
l.setBalancerOn(false);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
LOG.debug("All meta replicas assigned");
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.master.region.MasterRegionParams;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public abstract class MasterStateStoreTestBase {
protected static HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
protected static MasterRegion REGION;
protected static ChoreService CHORE_SERVICE;
protected static DirScanPool HFILE_CLEANER_POOL;
protected static DirScanPool LOG_CLEANER_POOL;
protected static TableDescriptor TD =
TableDescriptorBuilder.newBuilder(TableName.valueOf("test:local"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(MasterRegionFactory.STATE_FAMILY)).build();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
// Runs on local filesystem. Test does not need sync. Turn off checks.
conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
CHORE_SERVICE = new ChoreService("TestMasterStateStore");
HFILE_CLEANER_POOL = DirScanPool.getHFileCleanerScanPool(conf);
LOG_CLEANER_POOL = DirScanPool.getLogCleanerScanPool(conf);
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
when(server.getChoreService()).thenReturn(CHORE_SERVICE);
Path testDir = UTIL.getDataTestDir();
CommonFSUtils.setRootDir(conf, testDir);
MasterRegionParams params = new MasterRegionParams();
TableDescriptor td = TableDescriptorBuilder
.newBuilder(TD).setValue(StoreFileTrackerFactory.TRACKER_IMPL, conf
.get(StoreFileTrackerFactory.TRACKER_IMPL, StoreFileTrackerFactory.Trackers.DEFAULT.name()))
.build();
params.server(server).regionDirName("local").tableDescriptor(td)
.flushSize(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE).flushPerChanges(1_000_000)
.flushIntervalMs(TimeUnit.MINUTES.toMillis(15)).compactMin(4).maxWals(32).useHsync(false)
.ringBufferSlotCount(16).rollPeriodMs(TimeUnit.MINUTES.toMillis(15))
.archivedWalSuffix(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX);
REGION = MasterRegion.create(params);
UTIL.startMiniZKCluster();
}
@AfterClass
public static void tearDownAfterClass() throws IOException {
REGION.close(true);
HFILE_CLEANER_POOL.shutdownNow();
LOG_CLEANER_POOL.shutdownNow();
CHORE_SERVICE.shutdown();
UTIL.shutdownMiniZKCluster();
UTIL.cleanupTestDir();
}
protected static void cleanup() throws IOException {
try (ResultScanner scanner = REGION.getScanner(new Scan())) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
REGION.update(r -> r.delete(new Delete(result.getRow())));
}
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@Category({ MasterTests.class, MediumTests.class })
public class TestSplitOrMergeStateStore extends MasterStateStoreTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSplitOrMergeStateStore.class);
@After
public void tearDown() throws Exception {
cleanup();
ZKUtil.deleteNodeRecursively(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().switchZNode);
}
@Test
public void testSplit() throws Exception {
testReadWrite(MasterSwitchType.SPLIT);
}
@Test
public void testMerge() throws Exception {
testReadWrite(MasterSwitchType.MERGE);
}
@Test
public void testSplitMigrate() throws Exception {
testMigrate(MasterSwitchType.SPLIT,
ZNodePaths.joinZNode(UTIL.getZooKeeperWatcher().getZNodePaths().switchZNode,
UTIL.getConfiguration().get("zookeeper.znode.switch.split", "split")));
}
@Test
public void testMergeMigrate() throws Exception {
testMigrate(MasterSwitchType.MERGE,
ZNodePaths.joinZNode(UTIL.getZooKeeperWatcher().getZNodePaths().switchZNode,
UTIL.getConfiguration().get("zookeeper.znode.switch.merge", "merge")));
}
private void testReadWrite(MasterSwitchType type) throws Exception {
SplitOrMergeStateStore store =
new SplitOrMergeStateStore(REGION, UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
assertTrue(store.isSplitOrMergeEnabled(type));
store.setSplitOrMergeEnabled(false, type);
assertFalse(store.isSplitOrMergeEnabled(type));
// restart
store = new SplitOrMergeStateStore(REGION, UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
assertFalse(store.isSplitOrMergeEnabled(type));
store.setSplitOrMergeEnabled(true, type);
assertTrue(store.isSplitOrMergeEnabled(type));
}
private void testMigrate(MasterSwitchType type, String zkPath) throws Exception {
// prepare data on zk which set snapshot cleanup enabled to false, since the default value is
// true
byte[] zkData = ProtobufUtil.prependPBMagic(
ZooKeeperProtos.SwitchState.newBuilder().setEnabled(false).build().toByteArray());
ZKUtil.createSetData(UTIL.getZooKeeperWatcher(), zkPath, zkData);
SplitOrMergeStateStore store =
new SplitOrMergeStateStore(REGION, UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
assertFalse(store.isSplitOrMergeEnabled(type));
// should have deleted the node on zk
assertEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), zkPath));
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.master.MasterStateStoreTestBase;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LoadBalancerProtos;
@Category({ MasterTests.class, MediumTests.class })
public class TestLoadBalancerStateStore extends MasterStateStoreTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLoadBalancerStateStore.class);
@After
public void tearDown() throws Exception {
cleanup();
ZKUtil.deleteNodeFailSilent(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().balancerZNode);
}
@Test
public void testReadWrite() throws Exception {
LoadBalancerStateStore store = new LoadBalancerStateStore(REGION, UTIL.getZooKeeperWatcher());
assertTrue(store.get());
store.set(false);
assertFalse(store.get());
// restart
store = new LoadBalancerStateStore(REGION, UTIL.getZooKeeperWatcher());
assertFalse(store.get());
store.set(true);
assertTrue(store.get());
}
@Test
public void testMigrate() throws Exception {
// prepare data on zk which set balancer on to false, since the default value is true
byte[] zkData = ProtobufUtil.prependPBMagic(
LoadBalancerProtos.LoadBalancerState.newBuilder().setBalancerOn(false).build().toByteArray());
ZKUtil.createSetData(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().balancerZNode, zkData);
LoadBalancerStateStore store = new LoadBalancerStateStore(REGION, UTIL.getZooKeeperWatcher());
assertFalse(store.get());
// should have deleted the node on zk
assertEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().balancerZNode));
}
}

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@ -60,7 +59,7 @@ public class TestRegionNormalizerManagerConfigurationObserver {
@Mock
private MasterServices masterServices;
@Mock
private RegionNormalizerTracker tracker;
private RegionNormalizerStateStore stateStore;
@Mock
private RegionNormalizerChore chore;
@Mock
@ -75,7 +74,7 @@ public class TestRegionNormalizerManagerConfigurationObserver {
normalizer = new SimpleRegionNormalizer();
worker = new RegionNormalizerWorker(conf, masterServices, normalizer, queue);
final RegionNormalizerManager normalizerManager =
new RegionNormalizerManager(tracker, chore, queue, worker);
new RegionNormalizerManager(stateStore, chore, queue, worker);
configurationManager = new ConfigurationManager();
configurationManager.registerObserver(normalizerManager);
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.master.MasterStateStoreTestBase;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionNormalizerProtos;
@Category({ MasterTests.class, MediumTests.class })
public class TestRegionNormalizerStateStore extends MasterStateStoreTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionNormalizerStateStore.class);
@After
public void tearDown() throws Exception {
cleanup();
ZKUtil.deleteNodeFailSilent(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().regionNormalizerZNode);
}
@Test
public void testReadWrite() throws Exception {
RegionNormalizerStateStore store =
new RegionNormalizerStateStore(REGION, UTIL.getZooKeeperWatcher());
assertTrue(store.get());
store.set(false);
assertFalse(store.get());
// restart
store = new RegionNormalizerStateStore(REGION, UTIL.getZooKeeperWatcher());
assertFalse(store.get());
store.set(true);
assertTrue(store.get());
}
@Test
public void testMigrate() throws Exception {
// prepare data on zk which set normalizer on to false, since the default value is true
byte[] zkData = ProtobufUtil.prependPBMagic(RegionNormalizerProtos.RegionNormalizerState
.newBuilder().setNormalizerOn(false).build().toByteArray());
ZKUtil.createSetData(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().regionNormalizerZNode, zkData);
RegionNormalizerStateStore store =
new RegionNormalizerStateStore(REGION, UTIL.getZooKeeperWatcher());
assertFalse(store.get());
// should have deleted the node on zk
assertEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().regionNormalizerZNode));
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.snapshot;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.master.MasterStateStoreTestBase;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotCleanupProtos;
@Category({ MasterTests.class, MediumTests.class })
public class TestSnapshotCleanupStateStore extends MasterStateStoreTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSnapshotCleanupStateStore.class);
@After
public void tearDown() throws Exception {
cleanup();
ZKUtil.deleteNodeFailSilent(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().snapshotCleanupZNode);
}
@Test
public void testReadWrite() throws Exception {
SnapshotCleanupStateStore store =
new SnapshotCleanupStateStore(REGION, UTIL.getZooKeeperWatcher());
assertTrue(store.get());
store.set(false);
assertFalse(store.get());
// restart
store = new SnapshotCleanupStateStore(REGION, UTIL.getZooKeeperWatcher());
assertFalse(store.get());
store.set(true);
assertTrue(store.get());
}
@Test
public void testMigrate() throws Exception {
// prepare data on zk which set snapshot cleanup enabled to false, since the default value is
// true
byte[] zkData = ProtobufUtil.prependPBMagic(SnapshotCleanupProtos.SnapshotCleanupState
.newBuilder().setSnapshotCleanupEnabled(false).build().toByteArray());
ZKUtil.createSetData(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().snapshotCleanupZNode, zkData);
SnapshotCleanupStateStore store =
new SnapshotCleanupStateStore(REGION, UTIL.getZooKeeperWatcher());
assertFalse(store.get());
// should have deleted the node on zk
assertEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
UTIL.getZooKeeperWatcher().getZNodePaths().snapshotCleanupZNode));
}
}

View File

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionNormalizerProtos;
/**
* Tracks region normalizer state up in ZK
*/
@InterfaceAudience.Private
public class RegionNormalizerTracker extends ZKNodeTracker {
private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerTracker.class);
public RegionNormalizerTracker(ZKWatcher watcher, Abortable abortable) {
super(watcher, watcher.getZNodePaths().regionNormalizerZNode, abortable);
}
/**
* Return true if region normalizer is on, false otherwise
*/
public boolean isNormalizerOn() {
byte[] upData = super.getData(false);
try {
// if data in ZK is null, use default of on.
return upData == null || parseFrom(upData).getNormalizerOn();
} catch (DeserializationException dex) {
LOG
.error("ZK state for RegionNormalizer could not be parsed " + Bytes.toStringBinary(upData));
// return false to be safe.
return false;
}
}
/**
* Set region normalizer on/off
* @param normalizerOn whether normalizer should be on or off
* @throws KeeperException if a ZooKeeper operation fails
*/
public void setNormalizerOn(boolean normalizerOn) throws KeeperException {
byte[] upData = toByteArray(normalizerOn);
try {
ZKUtil.setData(watcher, watcher.getZNodePaths().regionNormalizerZNode, upData);
} catch (KeeperException.NoNodeException nne) {
ZKUtil.createAndWatch(watcher, watcher.getZNodePaths().regionNormalizerZNode, upData);
}
super.nodeDataChanged(watcher.getZNodePaths().regionNormalizerZNode);
}
private byte[] toByteArray(boolean isNormalizerOn) {
RegionNormalizerProtos.RegionNormalizerState.Builder builder =
RegionNormalizerProtos.RegionNormalizerState.newBuilder();
builder.setNormalizerOn(isNormalizerOn);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
private RegionNormalizerProtos.RegionNormalizerState parseFrom(byte[] pbBytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(pbBytes);
RegionNormalizerProtos.RegionNormalizerState.Builder builder =
RegionNormalizerProtos.RegionNormalizerState.newBuilder();
try {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen);
} catch (IOException e) {
throw new DeserializationException(e);
}
return builder.build();
}
}

View File

@ -1,106 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotCleanupProtos;
/**
* Tracks status of snapshot auto cleanup based on TTL
*/
@InterfaceAudience.Private
public class SnapshotCleanupTracker extends ZKNodeTracker {
/**
* Constructs a new ZK node tracker.
* <p>
* After construction, use {@link #start} to kick off tracking.
* @param watcher reference to the {@link ZKWatcher} which also contains configuration and
* constants
* @param abortable used to abort if a fatal error occurs
*/
public SnapshotCleanupTracker(ZKWatcher watcher, Abortable abortable) {
super(watcher, watcher.getZNodePaths().snapshotCleanupZNode, abortable);
}
/**
* Returns the current state of the snapshot auto cleanup based on TTL
* @return <code>true</code> if the snapshot auto cleanup is enabled, <code>false</code>
* otherwise.
*/
public boolean isSnapshotCleanupEnabled() {
byte[] snapshotCleanupZNodeData = super.getData(false);
try {
// if data in ZK is null, use default of on.
return snapshotCleanupZNodeData == null
|| parseFrom(snapshotCleanupZNodeData).getSnapshotCleanupEnabled();
} catch (DeserializationException dex) {
LOG.error("ZK state for Snapshot Cleanup could not be parsed "
+ Bytes.toStringBinary(snapshotCleanupZNodeData), dex);
// return false to be safe.
return false;
}
}
/**
* Set snapshot auto clean on/off
* @param snapshotCleanupEnabled true if the snapshot auto cleanup should be on, false otherwise
* @throws KeeperException if ZooKeeper operation fails
*/
public void setSnapshotCleanupEnabled(final boolean snapshotCleanupEnabled)
throws KeeperException {
byte[] snapshotCleanupZNodeData = toByteArray(snapshotCleanupEnabled);
try {
ZKUtil.setData(watcher, watcher.getZNodePaths().snapshotCleanupZNode,
snapshotCleanupZNodeData);
} catch (KeeperException.NoNodeException nne) {
ZKUtil.createAndWatch(watcher, watcher.getZNodePaths().snapshotCleanupZNode,
snapshotCleanupZNodeData);
}
super.nodeDataChanged(watcher.getZNodePaths().snapshotCleanupZNode);
}
private byte[] toByteArray(final boolean isSnapshotCleanupEnabled) {
SnapshotCleanupProtos.SnapshotCleanupState.Builder builder =
SnapshotCleanupProtos.SnapshotCleanupState.newBuilder();
builder.setSnapshotCleanupEnabled(isSnapshotCleanupEnabled);
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
}
private SnapshotCleanupProtos.SnapshotCleanupState parseFrom(final byte[] pbBytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(pbBytes);
SnapshotCleanupProtos.SnapshotCleanupState.Builder builder =
SnapshotCleanupProtos.SnapshotCleanupState.newBuilder();
try {
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen);
} catch (IOException e) {
throw new DeserializationException(e);
}
return builder.build();
}
}