HBASE-22760 : Pause/Resume/Query Snapshot Auto Cleanup Activity (#618)
This commit is contained in:
parent
909bbaa667
commit
63568854b6
|
@ -3113,4 +3113,26 @@ public interface Admin extends Abortable, Closeable {
|
|||
default List<Boolean> hasUserPermissions(List<Permission> permissions) throws IOException {
|
||||
return hasUserPermissions(null, permissions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn on or off the auto snapshot cleanup based on TTL.
|
||||
*
|
||||
* @param on Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||
* @param synchronous If <code>true</code>, it waits until current snapshot cleanup is completed,
|
||||
* if outstanding.
|
||||
* @return Previous auto snapshot cleanup value
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
boolean snapshotCleanupSwitch(final boolean on, final boolean synchronous)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Query the current state of the auto snapshot cleanup based on TTL.
|
||||
*
|
||||
* @return <code>true</code> if the auto snapshot cleanup is enabled,
|
||||
* <code>false</code> otherwise.
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
boolean isSnapshotCleanupEnabled() throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -1462,4 +1462,27 @@ public interface AsyncAdmin {
|
|||
default CompletableFuture<List<Boolean>> hasUserPermissions(List<Permission> permissions) {
|
||||
return hasUserPermissions(null, permissions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn on or off the auto snapshot cleanup based on TTL.
|
||||
* <p/>
|
||||
* Notice that, the method itself is always non-blocking, which means it will always return
|
||||
* immediately. The {@code sync} parameter only effects when will we complete the returned
|
||||
* {@link CompletableFuture}.
|
||||
*
|
||||
* @param on Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||
* @param sync If <code>true</code>, it waits until current snapshot cleanup is completed,
|
||||
* if outstanding.
|
||||
* @return Previous auto snapshot cleanup value wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> snapshotCleanupSwitch(boolean on, boolean sync);
|
||||
|
||||
/**
|
||||
* Query the current state of the auto snapshot cleanup based on TTL.
|
||||
*
|
||||
* @return true if the auto snapshot cleanup is enabled, false otherwise.
|
||||
* The return value will be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> isSnapshotCleanupEnabled();
|
||||
|
||||
}
|
||||
|
|
|
@ -824,4 +824,16 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
List<Permission> permissions) {
|
||||
return wrap(rawAdmin.hasUserPermissions(userName, permissions));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
|
||||
final boolean sync) {
|
||||
return wrap(rawAdmin.snapshotCleanupSwitch(on, sync));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
|
||||
return wrap(rawAdmin.isSnapshotCleanupEnabled());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1505,6 +1505,20 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return stub.restoreSnapshot(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterProtos.SetSnapshotCleanupResponse switchSnapshotCleanup(
|
||||
RpcController controller, MasterProtos.SetSnapshotCleanupRequest request)
|
||||
throws ServiceException {
|
||||
return stub.switchSnapshotCleanup(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterProtos.IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled(
|
||||
RpcController controller, MasterProtos.IsSnapshotCleanupEnabledRequest request)
|
||||
throws ServiceException {
|
||||
return stub.isSnapshotCleanupEnabled(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterProtos.ExecProcedureResponse execProcedure(
|
||||
RpcController controller, MasterProtos.ExecProcedureRequest request)
|
||||
|
|
|
@ -183,6 +183,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMainte
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
|
||||
|
@ -206,6 +208,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSna
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
|
||||
|
@ -4324,4 +4327,35 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean snapshotCleanupSwitch(boolean on, boolean synchronous) throws IOException {
|
||||
return executeCallable(new MasterCallable<Boolean>(getConnection(),
|
||||
getRpcControllerFactory()) {
|
||||
|
||||
@Override
|
||||
protected Boolean rpcCall() throws Exception {
|
||||
SetSnapshotCleanupRequest req =
|
||||
RequestConverter.buildSetSnapshotCleanupRequest(on, synchronous);
|
||||
return master.switchSnapshotCleanup(getRpcController(), req).getPrevSnapshotCleanup();
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSnapshotCleanupEnabled() throws IOException {
|
||||
return executeCallable(new MasterCallable<Boolean>(getConnection(),
|
||||
getRpcControllerFactory()) {
|
||||
|
||||
@Override
|
||||
protected Boolean rpcCall() throws Exception {
|
||||
IsSnapshotCleanupEnabledRequest req =
|
||||
RequestConverter.buildIsSnapshotCleanupEnabledRequest();
|
||||
return master.isSnapshotCleanupEnabled(getRpcController(), req).getEnabled();
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -205,6 +205,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||
|
@ -255,6 +257,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.SetSnapshotCleanupResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
|
||||
|
@ -3867,4 +3871,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
resp -> resp.getHasUserPermissionList()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
|
||||
final boolean sync) {
|
||||
return this.<Boolean>newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.call(controller, stub,
|
||||
RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
|
||||
MasterService.Interface::switchSnapshotCleanup,
|
||||
SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
|
||||
return this.<Boolean>newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.call(controller, stub,
|
||||
RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
|
||||
MasterService.Interface::isSnapshotCleanupEnabled,
|
||||
IsSnapshotCleanupEnabledResponse::getEnabled))
|
||||
.call();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -100,6 +100,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||
|
@ -150,6 +154,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
|
||||
|
@ -266,6 +272,19 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
|
|||
return stub.restoreSnapshot(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetSnapshotCleanupResponse switchSnapshotCleanup(RpcController controller,
|
||||
SetSnapshotCleanupRequest request) throws ServiceException {
|
||||
return stub.switchSnapshotCleanup(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled(
|
||||
RpcController controller, IsSnapshotCleanupEnabledRequest request)
|
||||
throws ServiceException {
|
||||
return stub.isSnapshotCleanupEnabled(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
|
||||
RemoveReplicationPeerRequest request) throws ServiceException {
|
||||
|
|
|
@ -120,6 +120,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJ
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
|
||||
|
@ -134,6 +136,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleaner
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.SetSnapshotCleanupRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetTableStateInMetaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
|
||||
|
@ -1891,4 +1895,31 @@ public final class RequestConverter {
|
|||
map(r -> buildRegionSpecifier(RegionSpecifierType.ENCODED_REGION_NAME, Bytes.toBytes(r))).
|
||||
collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates SetSnapshotCleanupRequest for turning on/off auto snapshot cleanup
|
||||
*
|
||||
* @param enabled Set to <code>true</code> to enable,
|
||||
* <code>false</code> to disable.
|
||||
* @param synchronous If <code>true</code>, it waits until current snapshot cleanup is completed,
|
||||
* if outstanding.
|
||||
* @return a SetSnapshotCleanupRequest
|
||||
*/
|
||||
public static SetSnapshotCleanupRequest buildSetSnapshotCleanupRequest(
|
||||
final boolean enabled, final boolean synchronous) {
|
||||
return SetSnapshotCleanupRequest.newBuilder().setEnabled(enabled).setSynchronous(synchronous)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates IsSnapshotCleanupEnabledRequest to determine if auto snapshot cleanup
|
||||
* based on TTL expiration is turned on
|
||||
*
|
||||
* @return IsSnapshotCleanupEnabledRequest
|
||||
*/
|
||||
public static IsSnapshotCleanupEnabledRequest buildIsSnapshotCleanupEnabledRequest() {
|
||||
return IsSnapshotCleanupEnabledRequest.newBuilder().build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ public class ZNodePaths {
|
|||
public static final char ZNODE_PATH_SEPARATOR = '/';
|
||||
|
||||
public final static String META_ZNODE_PREFIX = "meta-region-server";
|
||||
private static final String DEFAULT_SNAPSHOT_CLEANUP_ZNODE = "snapshot-cleanup";
|
||||
|
||||
// base znode for this cluster
|
||||
public final String baseZNode;
|
||||
|
@ -89,6 +90,8 @@ public class ZNodePaths {
|
|||
public final String queuesZNode;
|
||||
// znode containing queues of hfile references to be replicated
|
||||
public final String hfileRefsZNode;
|
||||
// znode containing the state of the snapshot auto-cleanup
|
||||
final String snapshotCleanupZNode;
|
||||
|
||||
public ZNodePaths(Configuration conf) {
|
||||
baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||
|
@ -123,20 +126,35 @@ public class ZNodePaths {
|
|||
queuesZNode = joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
|
||||
hfileRefsZNode = joinZNode(replicationZNode,
|
||||
conf.get("zookeeper.znode.replication.hfile.refs", "hfile-refs"));
|
||||
snapshotCleanupZNode = joinZNode(baseZNode,
|
||||
conf.get("zookeeper.znode.snapshot.cleanup", DEFAULT_SNAPSHOT_CLEANUP_ZNODE));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ZNodePaths [baseZNode=" + baseZNode + ", metaReplicaZNodes=" + metaReplicaZNodes
|
||||
+ ", rsZNode=" + rsZNode + ", drainingZNode=" + drainingZNode + ", masterAddressZNode="
|
||||
+ masterAddressZNode + ", backupMasterAddressesZNode=" + backupMasterAddressesZNode
|
||||
+ ", clusterStateZNode=" + clusterStateZNode + ", tableZNode=" + tableZNode
|
||||
+ ", clusterIdZNode=" + clusterIdZNode + ", splitLogZNode=" + splitLogZNode
|
||||
+ ", balancerZNode=" + balancerZNode + ", regionNormalizerZNode=" + regionNormalizerZNode
|
||||
+ ", switchZNode=" + switchZNode + ", tableLockZNode=" + tableLockZNode
|
||||
+ ", namespaceZNode=" + namespaceZNode + ", masterMaintZNode=" + masterMaintZNode
|
||||
+ ", replicationZNode=" + replicationZNode + ", peersZNode=" + peersZNode
|
||||
+ ", queuesZNode=" + queuesZNode + ", hfileRefsZNode=" + hfileRefsZNode + "]";
|
||||
return new StringBuilder()
|
||||
.append("ZNodePaths [baseZNode=").append(baseZNode)
|
||||
.append(", metaReplicaZNodes=").append(metaReplicaZNodes)
|
||||
.append(", rsZNode=").append(rsZNode)
|
||||
.append(", drainingZNode=").append(drainingZNode)
|
||||
.append(", masterAddressZNode=").append(masterAddressZNode)
|
||||
.append(", backupMasterAddressesZNode=").append(backupMasterAddressesZNode)
|
||||
.append(", clusterStateZNode=").append(clusterStateZNode)
|
||||
.append(", tableZNode=").append(tableZNode)
|
||||
.append(", clusterIdZNode=").append(clusterIdZNode)
|
||||
.append(", splitLogZNode=").append(splitLogZNode)
|
||||
.append(", balancerZNode=").append(balancerZNode)
|
||||
.append(", regionNormalizerZNode=").append(regionNormalizerZNode)
|
||||
.append(", switchZNode=").append(switchZNode)
|
||||
.append(", tableLockZNode=").append(tableLockZNode)
|
||||
.append(", namespaceZNode=").append(namespaceZNode)
|
||||
.append(", masterMaintZNode=").append(masterMaintZNode)
|
||||
.append(", replicationZNode=").append(replicationZNode)
|
||||
.append(", peersZNode=").append(peersZNode)
|
||||
.append(", queuesZNode=").append(queuesZNode)
|
||||
.append(", hfileRefsZNode=").append(hfileRefsZNode)
|
||||
.append(", snapshotCleanupZNode=").append(snapshotCleanupZNode)
|
||||
.append("]").toString();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1501,8 +1501,6 @@ public final class HConstants {
|
|||
// User defined Default TTL config key
|
||||
public static final String DEFAULT_SNAPSHOT_TTL_CONFIG_KEY = "hbase.master.snapshot.ttl";
|
||||
|
||||
public static final String SNAPSHOT_CLEANER_DISABLE = "hbase.master.cleaner.snapshot.disable";
|
||||
|
||||
/**
|
||||
* Configurations for master executor services.
|
||||
*/
|
||||
|
|
|
@ -318,6 +318,22 @@ enum MasterSwitchType {
|
|||
MERGE = 1;
|
||||
}
|
||||
|
||||
message SetSnapshotCleanupRequest {
|
||||
required bool enabled = 1;
|
||||
optional bool synchronous = 2;
|
||||
}
|
||||
|
||||
message SetSnapshotCleanupResponse {
|
||||
required bool prev_snapshot_cleanup = 1;
|
||||
}
|
||||
|
||||
message IsSnapshotCleanupEnabledRequest {
|
||||
}
|
||||
|
||||
message IsSnapshotCleanupEnabledResponse {
|
||||
required bool enabled = 1;
|
||||
}
|
||||
|
||||
message SetSplitOrMergeEnabledRequest {
|
||||
required bool enabled = 1;
|
||||
optional bool synchronous = 2;
|
||||
|
@ -896,6 +912,18 @@ service MasterService {
|
|||
*/
|
||||
rpc RestoreSnapshot(RestoreSnapshotRequest) returns(RestoreSnapshotResponse);
|
||||
|
||||
/**
|
||||
* Turn on/off snapshot auto-cleanup based on TTL expiration
|
||||
*/
|
||||
rpc SwitchSnapshotCleanup (SetSnapshotCleanupRequest)
|
||||
returns (SetSnapshotCleanupResponse);
|
||||
|
||||
/**
|
||||
* Determine if snapshot auto-cleanup based on TTL expiration is turned on
|
||||
*/
|
||||
rpc IsSnapshotCleanupEnabled (IsSnapshotCleanupEnabledRequest)
|
||||
returns (IsSnapshotCleanupEnabledResponse);
|
||||
|
||||
/**
|
||||
* Execute a distributed procedure.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
syntax = "proto2";
|
||||
|
||||
// This file contains protocol buffers to represent the state of the snapshot auto cleanup based on TTL
|
||||
package hbase.pb;
|
||||
|
||||
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
|
||||
option java_outer_classname = "SnapshotCleanupProtos";
|
||||
option java_generate_equals_and_hash = true;
|
||||
option optimize_for = SPEED;
|
||||
|
||||
message SnapshotCleanupState {
|
||||
required bool snapshot_cleanup_enabled = 1;
|
||||
}
|
|
@ -203,6 +203,7 @@ import org.apache.hadoop.hbase.util.VersionInfo;
|
|||
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
@ -313,6 +314,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
MetaLocationSyncer metaLocationSyncer;
|
||||
// Tracker for active master location, if any client ZK quorum specified
|
||||
MasterAddressSyncer masterAddressSyncer;
|
||||
// Tracker for auto snapshot cleanup state
|
||||
SnapshotCleanupTracker snapshotCleanupTracker;
|
||||
|
||||
// Tracker for split and merge state
|
||||
private SplitOrMergeTracker splitOrMergeTracker;
|
||||
|
@ -770,6 +773,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
|
||||
this.drainingServerTracker.start();
|
||||
|
||||
this.snapshotCleanupTracker = new SnapshotCleanupTracker(zooKeeper, this);
|
||||
this.snapshotCleanupTracker.start();
|
||||
|
||||
String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
|
||||
boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
|
||||
HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
|
||||
|
@ -1443,15 +1449,15 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
replicationPeerManager);
|
||||
getChoreService().scheduleChore(replicationBarrierCleaner);
|
||||
|
||||
final boolean isSnapshotChoreDisabled = conf.getBoolean(HConstants.SNAPSHOT_CLEANER_DISABLE,
|
||||
false);
|
||||
if (isSnapshotChoreDisabled) {
|
||||
final boolean isSnapshotChoreEnabled = this.snapshotCleanupTracker
|
||||
.isSnapshotCleanupEnabled();
|
||||
this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
|
||||
if (isSnapshotChoreEnabled) {
|
||||
getChoreService().scheduleChore(this.snapshotCleanerChore);
|
||||
} else {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Snapshot Cleaner Chore is disabled. Not starting up the chore..");
|
||||
}
|
||||
} else {
|
||||
this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
|
||||
getChoreService().scheduleChore(this.snapshotCleanerChore);
|
||||
}
|
||||
serviceStarted = true;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
@ -1544,6 +1550,37 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
procedureExecutor.startWorkers();
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn on/off Snapshot Cleanup Chore
|
||||
*
|
||||
* @param on indicates whether Snapshot Cleanup Chore is to be run
|
||||
*/
|
||||
void switchSnapshotCleanup(final boolean on, final boolean synchronous) {
|
||||
if (synchronous) {
|
||||
synchronized (this.snapshotCleanerChore) {
|
||||
switchSnapshotCleanup(on);
|
||||
}
|
||||
} else {
|
||||
switchSnapshotCleanup(on);
|
||||
}
|
||||
}
|
||||
|
||||
private void switchSnapshotCleanup(final boolean on) {
|
||||
try {
|
||||
snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
|
||||
if (on) {
|
||||
if (!getChoreService().isChoreScheduled(this.snapshotCleanerChore)) {
|
||||
getChoreService().scheduleChore(this.snapshotCleanerChore);
|
||||
}
|
||||
} else {
|
||||
getChoreService().cancelChore(this.snapshotCleanerChore);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Error updating snapshot cleanup mode to {}", on, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void stopProcedureExecutor() {
|
||||
if (procedureExecutor != null) {
|
||||
configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
|
||||
|
|
|
@ -219,6 +219,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormaliz
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||
|
@ -271,6 +275,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.SetSnapshotCleanupRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.SetSnapshotCleanupResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetTableStateInMetaRequest;
|
||||
|
@ -1481,6 +1489,55 @@ public class MasterRpcServices extends RSRpcServices
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetSnapshotCleanupResponse switchSnapshotCleanup(
|
||||
RpcController controller, SetSnapshotCleanupRequest request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
final boolean enabled = request.getEnabled();
|
||||
final boolean isSynchronous = request.hasSynchronous() && request.getSynchronous();
|
||||
final boolean prevSnapshotCleanupRunning = this.switchSnapshotCleanup(enabled, isSynchronous);
|
||||
return SetSnapshotCleanupResponse.newBuilder()
|
||||
.setPrevSnapshotCleanup(prevSnapshotCleanupRunning).build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled(
|
||||
RpcController controller, IsSnapshotCleanupEnabledRequest request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
final boolean isSnapshotCleanupEnabled = master.snapshotCleanupTracker
|
||||
.isSnapshotCleanupEnabled();
|
||||
return IsSnapshotCleanupEnabledResponse.newBuilder()
|
||||
.setEnabled(isSnapshotCleanupEnabled).build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn on/off snapshot auto-cleanup based on TTL
|
||||
*
|
||||
* @param enabledNewVal Set to <code>true</code> to enable, <code>false</code> to disable
|
||||
* @param synchronous If <code>true</code>, it waits until current snapshot cleanup is completed,
|
||||
* if outstanding
|
||||
* @return previous snapshot auto-cleanup mode
|
||||
*/
|
||||
private synchronized boolean switchSnapshotCleanup(final boolean enabledNewVal,
|
||||
final boolean synchronous) {
|
||||
final boolean oldValue = master.snapshotCleanupTracker.isSnapshotCleanupEnabled();
|
||||
master.switchSnapshotCleanup(enabledNewVal, synchronous);
|
||||
LOG.info("{} Successfully set snapshot cleanup to {}", master.getClientIdAuditPrefix(),
|
||||
enabledNewVal);
|
||||
return oldValue;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RunCatalogScanResponse runCatalogScan(RpcController c,
|
||||
RunCatalogScanRequest req) throws ServiceException {
|
||||
|
|
|
@ -810,4 +810,31 @@ public class TestAdmin2 extends TestAdminBase {
|
|||
ADMIN.modifyTable(tableDesc);
|
||||
assertEquals(11111111, ADMIN.getDescriptor(tableName).getMaxFileSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnapshotCleanupAsync() throws Exception {
|
||||
testSnapshotCleanup(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnapshotCleanupSync() throws Exception {
|
||||
testSnapshotCleanup(true);
|
||||
}
|
||||
|
||||
private void testSnapshotCleanup(final boolean synchronous) throws IOException {
|
||||
final boolean initialState = ADMIN.isSnapshotCleanupEnabled();
|
||||
// Switch the snapshot auto cleanup state to opposite to initial state
|
||||
boolean prevState = ADMIN.snapshotCleanupSwitch(!initialState, synchronous);
|
||||
// The previous state should be the original state we observed
|
||||
assertEquals(initialState, prevState);
|
||||
// Current state should be opposite of the initial state
|
||||
assertEquals(!initialState, ADMIN.isSnapshotCleanupEnabled());
|
||||
// Reset the state back to what it was initially
|
||||
prevState = ADMIN.snapshotCleanupSwitch(initialState, synchronous);
|
||||
// The previous state should be the opposite of the initial state
|
||||
assertEquals(!initialState, prevState);
|
||||
// Current state should be the original state again
|
||||
assertEquals(initialState, ADMIN.isSnapshotCleanupEnabled());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -105,7 +105,6 @@ public class TestSnapshotCleanerChore {
|
|||
snapshotManager = Mockito.mock(SnapshotManager.class);
|
||||
Stoppable stopper = new StoppableImplementation();
|
||||
Configuration conf = getSnapshotCleanerConf();
|
||||
conf.setStrings("hbase.master.cleaner.snapshot.disable", "false");
|
||||
SnapshotCleanerChore snapshotCleanerChore =
|
||||
new SnapshotCleanerChore(stopper, conf, snapshotManager);
|
||||
List<SnapshotProtos.SnapshotDescription> snapshotDescriptionList = new ArrayList<>();
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Collection;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -74,12 +76,19 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.IsSnapshotCleanupEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
|
||||
.SetSnapshotCleanupRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||
|
||||
/**
|
||||
|
@ -142,6 +151,7 @@ public class TestSnapshotFromMaster {
|
|||
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||
ConstantSizeRegionSplitPolicy.class.getName());
|
||||
conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
|
||||
conf.setInt("hbase.master.cleaner.snapshot.interval", 500);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -282,6 +292,89 @@ public class TestSnapshotFromMaster {
|
|||
master.getMasterRpcServices().deleteSnapshot(null, request);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCompletedSnapshotsWithCleanup() throws Exception {
|
||||
// Enable auto snapshot cleanup for the cluster
|
||||
SetSnapshotCleanupRequest setSnapshotCleanupRequest =
|
||||
SetSnapshotCleanupRequest.newBuilder().setEnabled(true).build();
|
||||
master.getMasterRpcServices().switchSnapshotCleanup(null, setSnapshotCleanupRequest);
|
||||
|
||||
// first check when there are no snapshots
|
||||
GetCompletedSnapshotsRequest request = GetCompletedSnapshotsRequest.newBuilder().build();
|
||||
GetCompletedSnapshotsResponse response =
|
||||
master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 0, response.getSnapshotsCount());
|
||||
|
||||
// write one snapshot to the fs
|
||||
createSnapshotWithTtl("snapshot_01", 1L);
|
||||
createSnapshotWithTtl("snapshot_02", 10L);
|
||||
|
||||
// check that we get one snapshot
|
||||
response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 2, response.getSnapshotsCount());
|
||||
|
||||
// check that 1 snapshot is auto cleaned after 1 sec of TTL expiration
|
||||
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
|
||||
response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 1, response.getSnapshotsCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCompletedSnapshotsWithoutCleanup() throws Exception {
|
||||
// Disable auto snapshot cleanup for the cluster
|
||||
SetSnapshotCleanupRequest setSnapshotCleanupRequest =
|
||||
SetSnapshotCleanupRequest.newBuilder().setEnabled(false).build();
|
||||
master.getMasterRpcServices().switchSnapshotCleanup(null, setSnapshotCleanupRequest);
|
||||
|
||||
// first check when there are no snapshots
|
||||
GetCompletedSnapshotsRequest request = GetCompletedSnapshotsRequest.newBuilder().build();
|
||||
GetCompletedSnapshotsResponse response =
|
||||
master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 0, response.getSnapshotsCount());
|
||||
|
||||
// write one snapshot to the fs
|
||||
createSnapshotWithTtl("snapshot_02", 1L);
|
||||
createSnapshotWithTtl("snapshot_03", 1L);
|
||||
|
||||
// check that we get one snapshot
|
||||
response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 2, response.getSnapshotsCount());
|
||||
|
||||
// check that no snapshot is auto cleaned even after 1 sec of TTL expiration
|
||||
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
|
||||
response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 2, response.getSnapshotsCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnapshotCleanupStatus() throws Exception {
|
||||
// Enable auto snapshot cleanup for the cluster
|
||||
SetSnapshotCleanupRequest setSnapshotCleanupRequest =
|
||||
SetSnapshotCleanupRequest.newBuilder().setEnabled(true).build();
|
||||
master.getMasterRpcServices().switchSnapshotCleanup(null, setSnapshotCleanupRequest);
|
||||
|
||||
// Check if auto snapshot cleanup is enabled
|
||||
IsSnapshotCleanupEnabledRequest isSnapshotCleanupEnabledRequest =
|
||||
IsSnapshotCleanupEnabledRequest.newBuilder().build();
|
||||
IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabledResponse =
|
||||
master.getMasterRpcServices().isSnapshotCleanupEnabled(null,
|
||||
isSnapshotCleanupEnabledRequest);
|
||||
Assert.assertTrue(isSnapshotCleanupEnabledResponse.getEnabled());
|
||||
|
||||
// Disable auto snapshot cleanup for the cluster
|
||||
setSnapshotCleanupRequest = SetSnapshotCleanupRequest.newBuilder()
|
||||
.setEnabled(false).build();
|
||||
master.getMasterRpcServices().switchSnapshotCleanup(null, setSnapshotCleanupRequest);
|
||||
|
||||
// Check if auto snapshot cleanup is disabled
|
||||
isSnapshotCleanupEnabledRequest = IsSnapshotCleanupEnabledRequest
|
||||
.newBuilder().build();
|
||||
isSnapshotCleanupEnabledResponse =
|
||||
master.getMasterRpcServices().isSnapshotCleanupEnabled(null,
|
||||
isSnapshotCleanupEnabledRequest);
|
||||
Assert.assertFalse(isSnapshotCleanupEnabledResponse.getEnabled());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the snapshot hfile archive cleaner works correctly. HFiles that are in snapshots
|
||||
* should be retained, while those that are not in a snapshot should be deleted.
|
||||
|
@ -428,6 +521,16 @@ public class TestSnapshotFromMaster {
|
|||
return builder.getSnapshotDescription();
|
||||
}
|
||||
|
||||
private SnapshotDescription createSnapshotWithTtl(final String snapshotName, final long ttl)
|
||||
throws IOException {
|
||||
SnapshotTestingUtils.SnapshotMock snapshotMock =
|
||||
new SnapshotTestingUtils.SnapshotMock(UTIL.getConfiguration(), fs, rootDir);
|
||||
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder =
|
||||
snapshotMock.createSnapshotV2(snapshotName, "test", 0, ttl);
|
||||
builder.commit();
|
||||
return builder.getSnapshotDescription();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception {
|
||||
// Write some data
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSVisitor;
|
||||
|
@ -671,6 +672,12 @@ public final class SnapshotTestingUtils {
|
|||
return createSnapshot(snapshotName, tableName, numRegions, SnapshotManifestV2.DESCRIPTOR_VERSION);
|
||||
}
|
||||
|
||||
public SnapshotBuilder createSnapshotV2(final String snapshotName, final String tableName,
|
||||
final int numRegions, final long ttl) throws IOException {
|
||||
return createSnapshot(snapshotName, tableName, numRegions,
|
||||
SnapshotManifestV2.DESCRIPTOR_VERSION, ttl);
|
||||
}
|
||||
|
||||
private SnapshotBuilder createSnapshot(final String snapshotName, final String tableName,
|
||||
final int version) throws IOException {
|
||||
return createSnapshot(snapshotName, tableName, TEST_NUM_REGIONS, version);
|
||||
|
@ -692,6 +699,22 @@ public final class SnapshotTestingUtils {
|
|||
return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
|
||||
}
|
||||
|
||||
private SnapshotBuilder createSnapshot(final String snapshotName, final String tableName,
|
||||
final int numRegions, final int version, final long ttl) throws IOException {
|
||||
TableDescriptor htd = createHtd(tableName);
|
||||
RegionData[] regions = createTable(htd, numRegions);
|
||||
SnapshotProtos.SnapshotDescription desc = SnapshotProtos.SnapshotDescription.newBuilder()
|
||||
.setTable(htd.getTableName().getNameAsString())
|
||||
.setName(snapshotName)
|
||||
.setVersion(version)
|
||||
.setCreationTime(EnvironmentEdgeManager.currentTime())
|
||||
.setTtl(ttl)
|
||||
.build();
|
||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
|
||||
SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, fs);
|
||||
return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
|
||||
}
|
||||
|
||||
public TableDescriptor createHtd(final String tableName) {
|
||||
return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY))
|
||||
|
|
|
@ -512,6 +512,22 @@ module Hbase
|
|||
@admin.getTableDescriptor(TableName.valueOf(table_name)).toStringTableAttributes
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Enable/disable snapshot auto-cleanup based on TTL expiration
|
||||
# Returns previous snapshot auto-cleanup switch setting.
|
||||
def snapshot_cleanup_switch(enable_disable)
|
||||
@admin.snapshotCleanupSwitch(
|
||||
java.lang.Boolean.valueOf(enable_disable), java.lang.Boolean.valueOf(false)
|
||||
)
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Query the current state of the snapshot auto-cleanup based on TTL
|
||||
# Returns the snapshot auto-cleanup state (true if enabled)
|
||||
def snapshot_cleanup_enabled?
|
||||
@admin.isSnapshotCleanupEnabled
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Truncates table (deletes all records by recreating the table)
|
||||
def truncate(table_name_str)
|
||||
|
|
|
@ -354,6 +354,8 @@ Shell.load_command_group(
|
|||
compact_rs
|
||||
compaction_state
|
||||
trace
|
||||
snapshot_cleanup_switch
|
||||
snapshot_cleanup_enabled
|
||||
splitormerge_switch
|
||||
splitormerge_enabled
|
||||
clear_compaction_queues
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with this
|
||||
# work for additional information regarding copyright ownership. The ASF
|
||||
# licenses this file to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# Prints if snapshot auto cleanup based on TTL is enabled
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class SnapshotCleanupEnabled < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Query the snapshot auto-cleanup state.
|
||||
Examples:
|
||||
|
||||
hbase> snapshot_cleanup_enabled
|
||||
EOF
|
||||
end
|
||||
|
||||
def command
|
||||
state = admin.snapshot_cleanup_enabled?
|
||||
formatter.row([state.to_s])
|
||||
state
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,43 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# Switch snapshot auto-cleanup based on TTL expiration
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class SnapshotCleanupSwitch < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Enable/Disable snapshot auto-cleanup based on snapshot TTL.
|
||||
Returns previous snapshot auto-cleanup switch state.
|
||||
Examples:
|
||||
|
||||
hbase> snapshot_cleanup_switch true
|
||||
hbase> snapshot_cleanup_switch false
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(enable_disable)
|
||||
prev_state = admin.snapshot_cleanup_switch(enable_disable) ? 'true' : 'false'
|
||||
formatter.row(["Previous snapshot cleanup state : #{prev_state}"])
|
||||
prev_state
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -170,6 +170,20 @@ module Hbase
|
|||
end
|
||||
end
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
define_test 'snapshot auto cleanup should work' do
|
||||
command(:snapshot_cleanup_switch, true)
|
||||
output = capture_stdout { command(:snapshot_cleanup_enabled) }
|
||||
assert(output.include?('true'))
|
||||
|
||||
command(:snapshot_cleanup_switch, false)
|
||||
output = capture_stdout { command(:snapshot_cleanup_enabled) }
|
||||
assert(output.include?('false'))
|
||||
end
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
|
||||
define_test "create should fail with non-string/non-hash column args" do
|
||||
assert_raise(ArgumentError) do
|
||||
command(:create, @create_test_name, 123)
|
||||
|
|
|
@ -1382,6 +1382,16 @@ public class ThriftAdmin implements Admin {
|
|||
throw new NotImplementedException("hasUserPermissions not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean snapshotCleanupSwitch(boolean on, boolean synchronous) {
|
||||
throw new NotImplementedException("snapshotCleanupSwitch not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSnapshotCleanupEnabled() {
|
||||
throw new NotImplementedException("isSnapshotCleanupEnabled not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> splitRegionAsync(byte[] regionName) throws IOException {
|
||||
return splitRegionAsync(regionName, null);
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotCleanupProtos;
|
||||
|
||||
/**
|
||||
* Tracks status of snapshot auto cleanup based on TTL
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SnapshotCleanupTracker extends ZKNodeTracker {
|
||||
|
||||
/**
|
||||
* Constructs a new ZK node tracker.
|
||||
*
|
||||
* <p>After construction, use {@link #start} to kick off tracking.
|
||||
*
|
||||
* @param watcher reference to the {@link ZKWatcher} which also contains configuration and
|
||||
* constants
|
||||
* @param abortable used to abort if a fatal error occurs
|
||||
*/
|
||||
public SnapshotCleanupTracker(ZKWatcher watcher, Abortable abortable) {
|
||||
super(watcher, watcher.getZNodePaths().snapshotCleanupZNode, abortable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current state of the snapshot auto cleanup based on TTL
|
||||
*
|
||||
* @return <code>true</code> if the snapshot auto cleanup is enabled,
|
||||
* <code>false</code> otherwise.
|
||||
*/
|
||||
public boolean isSnapshotCleanupEnabled() {
|
||||
byte[] snapshotCleanupZNodeData = super.getData(false);
|
||||
try {
|
||||
// if data in ZK is null, use default of on.
|
||||
return snapshotCleanupZNodeData == null ||
|
||||
parseFrom(snapshotCleanupZNodeData).getSnapshotCleanupEnabled();
|
||||
} catch (DeserializationException dex) {
|
||||
LOG.error("ZK state for Snapshot Cleanup could not be parsed " +
|
||||
Bytes.toStringBinary(snapshotCleanupZNodeData), dex);
|
||||
// return false to be safe.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set snapshot auto clean on/off
|
||||
*
|
||||
* @param snapshotCleanupEnabled true if the snapshot auto cleanup should be on,
|
||||
* false otherwise
|
||||
* @throws KeeperException if ZooKeeper operation fails
|
||||
*/
|
||||
public void setSnapshotCleanupEnabled(final boolean snapshotCleanupEnabled)
|
||||
throws KeeperException {
|
||||
byte [] snapshotCleanupZNodeData = toByteArray(snapshotCleanupEnabled);
|
||||
try {
|
||||
ZKUtil.setData(watcher, watcher.getZNodePaths().snapshotCleanupZNode,
|
||||
snapshotCleanupZNodeData);
|
||||
} catch(KeeperException.NoNodeException nne) {
|
||||
ZKUtil.createAndWatch(watcher, watcher.getZNodePaths().snapshotCleanupZNode,
|
||||
snapshotCleanupZNodeData);
|
||||
}
|
||||
super.nodeDataChanged(watcher.getZNodePaths().snapshotCleanupZNode);
|
||||
}
|
||||
|
||||
private byte[] toByteArray(final boolean isSnapshotCleanupEnabled) {
|
||||
SnapshotCleanupProtos.SnapshotCleanupState.Builder builder =
|
||||
SnapshotCleanupProtos.SnapshotCleanupState.newBuilder();
|
||||
builder.setSnapshotCleanupEnabled(isSnapshotCleanupEnabled);
|
||||
return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
|
||||
}
|
||||
|
||||
private SnapshotCleanupProtos.SnapshotCleanupState parseFrom(final byte[] pbBytes)
|
||||
throws DeserializationException {
|
||||
ProtobufUtil.expectPBMagicPrefix(pbBytes);
|
||||
SnapshotCleanupProtos.SnapshotCleanupState.Builder builder =
|
||||
SnapshotCleanupProtos.SnapshotCleanupState.newBuilder();
|
||||
try {
|
||||
int magicLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen);
|
||||
} catch (IOException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
|
@ -2759,13 +2759,43 @@ Value 0 for this config indicates TTL: FOREVER
|
|||
|
||||
|
||||
|
||||
.Enable/Disable Snapshot Auto Cleanup on running cluster:
|
||||
|
||||
At any point of time, if Snapshot cleanup is supposed to be stopped due to
|
||||
some snapshot restore activity, it is advisable to disable Snapshot Cleaner with
|
||||
config:
|
||||
By default, snapshot auto cleanup based on TTL would be enabled
|
||||
for any new cluster.
|
||||
At any point in time, if snapshot cleanup is supposed to be stopped due to
|
||||
some snapshot restore activity or any other reason, it is advisable
|
||||
to disable it using shell command:
|
||||
|
||||
`hbase.master.cleaner.snapshot.disable`: "true"
|
||||
----
|
||||
hbase> snapshot_cleanup_switch false
|
||||
----
|
||||
|
||||
We can re-enable it using:
|
||||
|
||||
----
|
||||
hbase> snapshot_cleanup_switch true
|
||||
----
|
||||
|
||||
The shell command with switch false would disable snapshot auto
|
||||
cleanup activity based on TTL and return the previous state of
|
||||
the activity(true: running already, false: disabled already)
|
||||
|
||||
A sample output for above commands:
|
||||
----
|
||||
Previous snapshot cleanup state : true
|
||||
Took 0.0069 seconds
|
||||
=> "true"
|
||||
----
|
||||
|
||||
We can query whether snapshot auto cleanup is enabled for
|
||||
cluster using:
|
||||
|
||||
----
|
||||
hbase> snapshot_cleanup_enabled
|
||||
----
|
||||
|
||||
The command would return output in true/false.
|
||||
|
||||
[[ops.snapshots.list]]
|
||||
=== Listing Snapshots
|
||||
|
|
Loading…
Reference in New Issue