HBASE-17928 Shell tool to clear compaction queues (Guangxu Cheng)

This commit is contained in:
tedyu 2017-05-09 18:32:38 -07:00
parent b67f6fecc1
commit 815b0f853b
14 changed files with 1300 additions and 42 deletions

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
@ -2026,4 +2027,14 @@ public interface Admin extends Abortable, Closeable {
* @throws IOException if a remote or network exception occurs
*/
void disableTableReplication(final TableName tableName) throws IOException;
/**
* Clear compacting queues on a regionserver.
* @param sn the region server name
* @param queues the set of queue name
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
void clearCompactionQueues(final ServerName sn, final Set<String> queues)
throws IOException, InterruptedException;
}

View File

@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
@ -4227,4 +4228,25 @@ public class HBaseAdmin implements Admin {
return otherConf;
}
@Override
public void clearCompactionQueues(final ServerName sn, final Set<String> queues)
throws IOException, InterruptedException {
if (queues == null || queues.size() == 0) {
throw new IllegalArgumentException("queues cannot be null or empty");
}
final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
// TODO: There is no timeout on this controller. Set one!
HBaseRpcController controller = rpcControllerFactory.newController();
ClearCompactionQueuesRequest request =
RequestConverter.buildClearCompactionQueuesRequest(queues);
admin.clearCompactionQueues(controller, request);
return null;
}
};
ProtobufUtil.call(callable);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.shaded.protobuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.CellScannable;
@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@ -1696,4 +1698,12 @@ public final class RequestConverter {
builder.setNamespaceName(name);
return builder.build();
}
public static ClearCompactionQueuesRequest buildClearCompactionQueuesRequest(Set<String> queues) {
ClearCompactionQueuesRequest.Builder builder = ClearCompactionQueuesRequest.newBuilder();
for(String name: queues) {
builder.addQueueName(name);
}
return builder.build();
}
}

View File

@ -260,6 +260,13 @@ message GetRegionLoadResponse {
repeated RegionLoad region_loads = 1;
}
message ClearCompactionQueuesRequest {
repeated string queue_name = 1;
}
message ClearCompactionQueuesResponse {
}
service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
@ -314,4 +321,7 @@ service AdminService {
rpc GetRegionLoad(GetRegionLoadRequest)
returns(GetRegionLoadResponse);
rpc ClearCompactionQueues(ClearCompactionQueuesRequest)
returns(ClearCompactionQueuesResponse);
}

View File

@ -171,4 +171,20 @@ public interface RegionServerObserver extends Coprocessor {
default void postReplicateLogEntries(
final ObserverContext<RegionServerCoprocessorEnvironment> ctx,
List<WALEntry> entries, CellScanner cells) throws IOException {}
/**
* This will be called before clearing compaction queues
* @param ctx the environment to interact with the framework and region server.
*/
default void preClearCompactionQueues(
final ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {}
/**
* This will be called after clearing compaction queues
* @param ctx the environment to interact with the framework and region server.
*/
default void postClearCompactionQueues(
final ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {}
}

View File

@ -672,4 +672,12 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
void shutdownLongCompactions(){
this.longCompactions.shutdown();
}
public void clearLongCompactionsQueue() {
longCompactions.getQueue().clear();
}
public void clearShortCompactionsQueue() {
shortCompactions.getQueue().clear();
}
}

View File

@ -31,6 +31,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
@ -110,6 +111,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
@ -268,6 +271,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private final long minimumScanTimeLimitDelta;
final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
/**
* An Rpc callback for closing a RegionScanner.
*/
@ -1611,6 +1616,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return builder.build();
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
ClearCompactionQueuesRequest request) throws ServiceException {
LOG.debug("Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress()
+ " clear compactions queue");
ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
requestCount.increment();
if (clearCompactionQueues.compareAndSet(false,true)) {
try {
checkOpen();
regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
for (String queueName : request.getQueueNameList()) {
LOG.debug("clear " + queueName + " compaction queue");
switch (queueName) {
case "long":
regionServer.compactSplitThread.clearLongCompactionsQueue();
break;
case "short":
regionServer.compactSplitThread.clearShortCompactionsQueue();
break;
default:
LOG.warn("Unknown queue name " + queueName);
throw new IOException("Unknown queue name " + queueName);
}
}
regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
clearCompactionQueues.set(false);
}
} else {
LOG.warn("Clear compactions queue is executing by other admin.");
}
return respBuilder.build();
}
/**
* Get some information of the region server.
*

View File

@ -218,6 +218,26 @@ public class RegionServerCoprocessorHost extends
});
}
public void preClearCompactionQueues() throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preClearCompactionQueues(ctx);
}
});
}
public void postClearCompactionQueues() throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.postClearCompactionQueues(ctx);
}
});
}
private <T> T execOperationWithResult(final T defaultValue,
final CoprocessOperationWithResult<T> ctx) throws IOException {
if (ctx == null)

View File

@ -2667,6 +2667,12 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
List<WALEntry> entries, CellScanner cells) throws IOException {
}
@Override
public void preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
requirePermission(getActiveUser(ctx), "preClearCompactionQueues", Permission.Action.ADMIN);
}
@Override
public void preMoveServersAndTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {

View File

@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
@ -451,6 +453,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return builder.build();
}
@Override
public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
ClearCompactionQueuesRequest request) throws ServiceException {
return null;
}
@Override
public GetStoreFileResponse getStoreFile(RpcController controller,
GetStoreFileRequest request) throws ServiceException {

View File

@ -1215,5 +1215,31 @@ module Hbase
set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
end
#----------------------------------------------------------------------------------------------
# clear compaction queues
def clear_compaction_queues(server_name, queue_name = nil)
names = ['long', 'short']
queues = java.util.HashSet.new
if queue_name.nil?
queues.add('long')
queues.add('short')
elsif queue_name.kind_of?(String)
queues.add(queue_name)
if !(names.include?(queue_name))
raise(ArgumentError, "Unknown queue name #{queue_name}")
end
elsif queue_name.kind_of?(Array)
queue_name.each do |s|
queues.add(s)
if !(names.include?(s))
raise(ArgumentError, "Unknown queue name #{s}")
end
end
else
raise(ArgumentError, "Unknown queue name #{queue_name}")
end
@admin.clearCompactionQueues(ServerName.valueOf(server_name), queues)
end
end
end

View File

@ -357,6 +357,7 @@ Shell.load_command_group(
trace
splitormerge_switch
splitormerge_enabled
clear_compaction_queues
],
# TODO remove older hlog_roll command
:aliases => {

View File

@ -0,0 +1,41 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class ClearCompactionQueues < Command
def help
return <<-EOF
Clear compacting queues on a regionserver.
The queue_name contains short and long.
short is shortCompactions's queue,long is longCompactions's queue.
Examples:
hbase> clear_compaction_queues 'host187.example.com,60020'
hbase> clear_compaction_queues 'host187.example.com,60020','long'
hbase> clear_compaction_queues 'host187.example.com,60020', ['long','short']
EOF
end
def command(server_name, queue_name = nil)
admin.clear_compaction_queues(server_name, queue_name)
end
end
end
end