HBASE-27028 Add a shell command for flushing master local region (#4457)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
LiangJun He 2022-06-13 11:09:05 +08:00 committed by GitHub
parent 5e34cdf1ef
commit 002c92cd7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 220 additions and 8 deletions

View File

@ -2539,4 +2539,9 @@ public interface Admin extends Abortable, Closeable {
*/ */
List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, ServerType serverType, List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, ServerType serverType,
int limit, Map<String, Object> filterParams) throws IOException; int limit, Map<String, Object> filterParams) throws IOException;
/**
* Flush master local region
*/
void flushMasterStore() throws IOException;
} }

View File

@ -1078,4 +1078,9 @@ class AdminOverAsyncAdmin implements Admin {
ServerType serverType, int limit, Map<String, Object> filterParams) throws IOException { ServerType serverType, int limit, Map<String, Object> filterParams) throws IOException {
return get(admin.getLogEntries(serverNames, logType, serverType, limit, filterParams)); return get(admin.getLogEntries(serverNames, logType, serverType, limit, filterParams));
} }
@Override
public void flushMasterStore() throws IOException {
get(admin.flushMasterStore());
}
} }

View File

@ -1785,4 +1785,9 @@ public interface AsyncAdmin {
*/ */
CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, String logType, CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, String logType,
ServerType serverType, int limit, Map<String, Object> filterParams); ServerType serverType, int limit, Map<String, Object> filterParams);
/**
* Flush master local region
*/
CompletableFuture<Void> flushMasterStore();
} }

View File

@ -949,4 +949,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
String logType, ServerType serverType, int limit, Map<String, Object> filterParams) { String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
return wrap(rawAdmin.getLogEntries(serverNames, logType, serverType, limit, filterParams)); return wrap(rawAdmin.getLogEntries(serverNames, logType, serverType, limit, filterParams));
} }
@Override
public CompletableFuture<Void> flushMasterStore() {
return wrap(rawAdmin.flushMasterStore());
}
} }

View File

@ -178,6 +178,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
@ -4280,4 +4282,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return CompletableFuture.completedFuture(Collections.emptyList()); return CompletableFuture.completedFuture(Collections.emptyList());
} }
} }
@Override
public CompletableFuture<Void> flushMasterStore() {
FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
return this.<Void> newMasterCaller()
.action(((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
Void> call(controller, stub, request.build(),
(s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)))
.call();
}
} }

View File

@ -758,6 +758,9 @@ message ModifyColumnStoreFileTrackerResponse {
optional uint64 proc_id = 1; optional uint64 proc_id = 1;
} }
message FlushMasterStoreRequest {}
message FlushMasterStoreResponse {}
service MasterService { service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */ /** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@ -1197,6 +1200,9 @@ service MasterService {
rpc ModifyColumnStoreFileTracker(ModifyColumnStoreFileTrackerRequest) rpc ModifyColumnStoreFileTracker(ModifyColumnStoreFileTrackerRequest)
returns(ModifyColumnStoreFileTrackerResponse); returns(ModifyColumnStoreFileTrackerResponse);
rpc FlushMasterStore(FlushMasterStoreRequest)
returns(FlushMasterStoreResponse);
} }
// HBCK Service definitions. // HBCK Service definitions.

View File

@ -1000,6 +1000,22 @@ public interface MasterObserver {
final TableName tableName) throws IOException { final TableName tableName) throws IOException {
} }
/**
* Called before the master local region memstore is flushed to disk.
* @param ctx the environment to interact with the framework and master
*/
default void preMasterStoreFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
/**
* Called after the master local region memstore is flushed to disk.
* @param ctx the environment to interact with the framework and master
*/
default void postMasterStoreFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
/** /**
* Called before the quota for the user is stored. * Called before the quota for the user is stored.
* @param ctx the environment to interact with the framework and master * @param ctx the environment to interact with the framework and master

View File

@ -4225,6 +4225,26 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
return metaRegionLocationCache.getMetaRegionLocations(); return metaRegionLocationCache.getMetaRegionLocations();
} }
@Override
public void flushMasterStore() throws IOException {
LOG.info("Force flush master local region.");
if (this.cpHost != null) {
try {
cpHost.preMasterStoreFlush();
} catch (IOException ioe) {
LOG.error("Error invoking master coprocessor preMasterStoreFlush()", ioe);
}
}
masterRegion.flush(true);
if (this.cpHost != null) {
try {
cpHost.postMasterStoreFlush();
} catch (IOException ioe) {
LOG.error("Error invoking master coprocessor postMasterStoreFlush()", ioe);
}
}
}
public Collection<ServerName> getLiveRegionServers() { public Collection<ServerName> getLiveRegionServers() {
return regionServerTracker.getRegionServers(); return regionServerTracker.getRegionServers();
} }

View File

@ -1218,6 +1218,24 @@ public class MasterCoprocessorHost
}); });
} }
public void preMasterStoreFlush() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preMasterStoreFlush(this);
}
});
}
public void postMasterStoreFlush() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postMasterStoreFlush(this);
}
});
}
public void preSetUserQuota(final String user, final GlobalQuotaSettings quotas) public void preSetUserQuota(final String user, final GlobalQuotaSettings quotas)
throws IOException { throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {

View File

@ -230,6 +230,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
@ -3469,4 +3471,16 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
ReplicateWALEntryRequest request) throws ServiceException { ReplicateWALEntryRequest request) throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("Unsupported method on master")); throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
} }
@Override
public FlushMasterStoreResponse flushMasterStore(RpcController controller,
FlushMasterStoreRequest request) throws ServiceException {
rpcPreCheck("flushMasterStore");
try {
server.flushMasterStore();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
return FlushMasterStoreResponse.newBuilder().build();
}
} }

View File

@ -510,4 +510,9 @@ public interface MasterServices extends Server {
* We need to get this in MTP to tell the syncer the new meta replica count. * We need to get this in MTP to tell the syncer the new meta replica count.
*/ */
MetaLocationSyncer getMetaLocationSyncer(); MetaLocationSyncer getMetaLocationSyncer();
/**
* Flush master local region
*/
void flushMasterStore() throws IOException;
} }

View File

@ -161,10 +161,11 @@ public final class MasterRegion {
return region.getScanner(scan); return region.getScanner(scan);
} }
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public FlushResult flush(boolean force) throws IOException { public FlushResult flush(boolean force) throws IOException {
return region.flush(force); flusherAndCompactor.resetChangesAfterLastFlush();
FlushResult flushResult = region.flush(force);
flusherAndCompactor.recordLastFlushTime();
return flushResult;
} }
@RestrictedApi(explanation = "Should only be called in tests", link = "", @RestrictedApi(explanation = "Should only be called in tests", link = "",

View File

@ -180,7 +180,7 @@ class MasterRegionFlusherAndCompactor implements Closeable {
} }
private void flushLoop() { private void flushLoop() {
lastFlushTime = EnvironmentEdgeManager.currentTime(); recordLastFlushTime();
while (!closed) { while (!closed) {
flushLock.lock(); flushLock.lock();
try { try {
@ -202,10 +202,10 @@ class MasterRegionFlusherAndCompactor implements Closeable {
flushLock.unlock(); flushLock.unlock();
} }
assert flushRequest; assert flushRequest;
changesAfterLastFlush.set(0); resetChangesAfterLastFlush();
try { try {
region.flush(true); region.flush(true);
lastFlushTime = EnvironmentEdgeManager.currentTime(); recordLastFlushTime();
} catch (IOException e) { } catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e); LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e);
abortable.abort("Failed to flush master local region", e); abortable.abort("Failed to flush master local region", e);
@ -263,6 +263,14 @@ class MasterRegionFlusherAndCompactor implements Closeable {
} }
} }
void resetChangesAfterLastFlush() {
changesAfterLastFlush.set(0);
}
void recordLastFlushTime() {
lastFlushTime = EnvironmentEdgeManager.currentTime();
}
@Override @Override
public void close() { public void close() {
closed = true; closed = true;

View File

@ -189,6 +189,8 @@ public class TestMasterObserver {
private boolean postRequestLockCalled; private boolean postRequestLockCalled;
private boolean preLockHeartbeatCalled; private boolean preLockHeartbeatCalled;
private boolean postLockHeartbeatCalled; private boolean postLockHeartbeatCalled;
private boolean preMasterStoreFlushCalled;
private boolean postMasterStoreFlushCalled;
public void resetStates() { public void resetStates() {
preCreateTableRegionInfosCalled = false; preCreateTableRegionInfosCalled = false;
@ -280,6 +282,8 @@ public class TestMasterObserver {
postRequestLockCalled = false; postRequestLockCalled = false;
preLockHeartbeatCalled = false; preLockHeartbeatCalled = false;
postLockHeartbeatCalled = false; postLockHeartbeatCalled = false;
preMasterStoreFlushCalled = false;
postMasterStoreFlushCalled = false;
} }
@Override @Override
@ -1045,6 +1049,18 @@ public class TestMasterObserver {
TableName tableName) throws IOException { TableName tableName) throws IOException {
} }
@Override
public void preMasterStoreFlush(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
preMasterStoreFlushCalled = true;
}
@Override
public void postMasterStoreFlush(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
postMasterStoreFlushCalled = true;
}
@Override @Override
public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx, public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final String userName, final GlobalQuotaSettings quotas) throws IOException { final String userName, final GlobalQuotaSettings quotas) throws IOException {
@ -1680,4 +1696,23 @@ public class TestMasterObserver {
ProcedureTestingUtility.waitNoProcedureRunning(master.getMasterProcedureExecutor()); ProcedureTestingUtility.waitNoProcedureRunning(master.getMasterProcedureExecutor());
ProcedureTestingUtility.assertProcNotFailed(master.getMasterProcedureExecutor(), procId); ProcedureTestingUtility.assertProcNotFailed(master.getMasterProcedureExecutor(), procId);
} }
@Test
public void testMasterStoreOperations() throws Exception {
SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
HMaster master = cluster.getMaster();
MasterCoprocessorHost host = master.getMasterCoprocessorHost();
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
cp.resetStates();
assertFalse("No master store flush call", cp.preMasterStoreFlushCalled);
assertFalse("No master store flush call", cp.postMasterStoreFlushCalled);
try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
Admin admin = connection.getAdmin()) {
admin.flushMasterStore();
assertTrue("Master store flush called", cp.preMasterStoreFlushCalled);
assertTrue("Master store flush called", cp.postMasterStoreFlushCalled);
}
}
} }

View File

@ -503,6 +503,10 @@ public class MockNoopMasterServices implements MasterServices {
return null; return null;
} }
@Override
public void flushMasterStore() {
}
@Override @Override
public long modifyTableStoreFileTracker(TableName tableName, String dstSFT, long nonceGroup, public long modifyTableStoreFileTracker(TableName tableName, String dstSFT, long nonceGroup,
long nonce) throws IOException { long nonce) throws IOException {

View File

@ -148,6 +148,5 @@ public class TestMasterRegionFlush {
assertEquals(1, flushCalled.get()); assertEquals(1, flushCalled.get());
Thread.sleep(1000); Thread.sleep(1000);
assertEquals(2, flushCalled.get()); assertEquals(2, flushCalled.get());
} }
} }

View File

@ -934,4 +934,9 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable {
throws IOException { throws IOException {
return admin.modifyTableStoreFileTrackerAsync(tableName, dstSFT); return admin.modifyTableStoreFileTrackerAsync(tableName, dstSFT);
} }
@Override
public void flushMasterStore() throws IOException {
admin.flushMasterStore();
}
} }

View File

@ -1868,11 +1868,17 @@ module Hbase
@admin.modifyTableStoreFileTracker(tableName, sft) @admin.modifyTableStoreFileTracker(tableName, sft)
end end
#---------------------------------------------------------------------------------------------- #----------------------------------------------------------------------------------------------
# Change table column family's sft # Change table column family's sft
def modify_table_family_sft(tableName, family_bytes, sft) def modify_table_family_sft(tableName, family_bytes, sft)
@admin.modifyColumnFamilyStoreFileTracker(tableName, family_bytes, sft) @admin.modifyColumnFamilyStoreFileTracker(tableName, family_bytes, sft)
end end
#----------------------------------------------------------------------------------------------
# Flush master local region
def flush_master_store()
@admin.flushMasterStore()
end
end end
# rubocop:enable Metrics/ClassLength # rubocop:enable Metrics/ClassLength
end end

View File

@ -446,6 +446,7 @@ Shell.load_command_group(
compact compact
compaction_switch compaction_switch
flush flush
flush_master_store
get_balancer_decisions get_balancer_decisions
get_balancer_rejections get_balancer_rejections
get_slowlog_responses get_slowlog_responses

View File

@ -0,0 +1,37 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class FlushMasterStore < Command
def help
<<-EOF
Flush master local region.
For example:
hbase> flush_master_store
EOF
end
def command()
admin.flush_master_store()
end
end
end
end

View File

@ -1309,4 +1309,9 @@ public class ThriftAdmin implements Admin {
throw new NotImplementedException( throw new NotImplementedException(
"modifyTableStoreFileTrackerAsync not supported in ThriftAdmin"); "modifyTableStoreFileTrackerAsync not supported in ThriftAdmin");
} }
@Override
public void flushMasterStore() throws IOException {
throw new NotImplementedException("flushMasterStore not supported in ThriftAdmin");
}
} }