HBASE-21588 Procedure v2 wal splitting implementation
This commit is contained in:
parent
348c2dfe9b
commit
f02ac310d2
@ -1347,6 +1347,14 @@ public final class HConstants {
|
|||||||
public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
|
public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
|
||||||
"hbase.client.fast.fail.interceptor.impl";
|
"hbase.client.fast.fail.interceptor.impl";
|
||||||
|
|
||||||
|
public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated";
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true;
|
||||||
|
|
||||||
|
public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = "hbase.regionserver.wal.max.splitters";
|
||||||
|
|
||||||
|
public static final int DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER = 2;
|
||||||
|
|
||||||
/** Config key for if the server should send backpressure and if the client should listen to
|
/** Config key for if the server should send backpressure and if the client should listen to
|
||||||
* that backpressure from the server */
|
* that backpressure from the server */
|
||||||
public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled";
|
public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled";
|
||||||
|
@ -308,6 +308,8 @@ enum ServerCrashState {
|
|||||||
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
|
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
|
||||||
SERVER_CRASH_SPLIT_META_LOGS = 10;
|
SERVER_CRASH_SPLIT_META_LOGS = 10;
|
||||||
SERVER_CRASH_ASSIGN_META = 11;
|
SERVER_CRASH_ASSIGN_META = 11;
|
||||||
|
SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12;
|
||||||
|
SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13;
|
||||||
SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
|
SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
|
||||||
SERVER_CRASH_FINISH = 100;
|
SERVER_CRASH_FINISH = 100;
|
||||||
}
|
}
|
||||||
@ -502,4 +504,27 @@ message SwitchRpcThrottleStateData {
|
|||||||
message SwitchRpcThrottleRemoteStateData {
|
message SwitchRpcThrottleRemoteStateData {
|
||||||
required ServerName target_server = 1;
|
required ServerName target_server = 1;
|
||||||
required bool rpc_throttle_enabled = 2;
|
required bool rpc_throttle_enabled = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SplitWALParameter {
|
||||||
|
required string wal_path = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
message SplitWALData {
|
||||||
|
required string wal_path = 1;
|
||||||
|
required ServerName crashed_server = 2;
|
||||||
|
optional ServerName worker = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SplitWALRemoteData {
|
||||||
|
required string wal_path = 1;
|
||||||
|
required ServerName crashed_server = 2;
|
||||||
|
required ServerName worker = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum SplitWALState {
|
||||||
|
ACQUIRE_SPLIT_WAL_WORKER = 1;
|
||||||
|
DISPATCH_WAL_TO_WORKER = 2;
|
||||||
|
RELEASE_SPLIT_WORKER = 3;
|
||||||
}
|
}
|
@ -48,9 +48,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface SplitLogWorkerCoordination {
|
public interface SplitLogWorkerCoordination {
|
||||||
|
|
||||||
/* SplitLogWorker part */
|
|
||||||
int DEFAULT_MAX_SPLITTERS = 2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize internal values. This method should be used when corresponding SplitLogWorker
|
* Initialize internal values. This method should be used when corresponding SplitLogWorker
|
||||||
* instance is created
|
* instance is created
|
||||||
|
@ -19,6 +19,9 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hbase.coordination;
|
package org.apache.hadoop.hbase.coordination;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -135,7 +138,8 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||||||
this.server = server;
|
this.server = server;
|
||||||
this.worker = worker;
|
this.worker = worker;
|
||||||
this.splitTaskExecutor = splitExecutor;
|
this.splitTaskExecutor = splitExecutor;
|
||||||
maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
|
maxConcurrentTasks =
|
||||||
|
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
|
||||||
reportPeriod =
|
reportPeriod =
|
||||||
conf.getInt("hbase.splitlog.report.period",
|
conf.getInt("hbase.splitlog.report.period",
|
||||||
conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
|
conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
|
||||||
|
@ -17,7 +17,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
|
||||||
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
|
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
|
||||||
|
|
||||||
import com.google.protobuf.Descriptors;
|
import com.google.protobuf.Descriptors;
|
||||||
import com.google.protobuf.Service;
|
import com.google.protobuf.Service;
|
||||||
@ -335,6 +337,13 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||||||
private MasterFileSystem fileSystemManager;
|
private MasterFileSystem fileSystemManager;
|
||||||
private MasterWalManager walManager;
|
private MasterWalManager walManager;
|
||||||
|
|
||||||
|
// manager to manage procedure-based WAL splitting, can be null if current
|
||||||
|
// is zk-based WAL splitting. SplitWALManager will replace SplitLogManager
|
||||||
|
// and MasterWalManager, which means zk-based WAL splitting code will be
|
||||||
|
// useless after we switch to the procedure-based one. our eventual goal
|
||||||
|
// is to remove all the zk-based WAL splitting code.
|
||||||
|
private SplitWALManager splitWALManager;
|
||||||
|
|
||||||
// server manager to deal with region server info
|
// server manager to deal with region server info
|
||||||
private volatile ServerManager serverManager;
|
private volatile ServerManager serverManager;
|
||||||
|
|
||||||
@ -942,6 +951,10 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||||||
|
|
||||||
status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
|
status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
|
||||||
this.serverManager = createServerManager(this);
|
this.serverManager = createServerManager(this);
|
||||||
|
if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
|
||||||
|
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
|
||||||
|
this.splitWALManager = new SplitWALManager(this);
|
||||||
|
}
|
||||||
createProcedureExecutor();
|
createProcedureExecutor();
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> procsByType =
|
Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> procsByType =
|
||||||
@ -1378,6 +1391,11 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||||||
return this.walManager;
|
return this.walManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SplitWALManager getSplitWALManager() {
|
||||||
|
return splitWALManager;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TableStateManager getTableStateManager() {
|
public TableStateManager getTableStateManager() {
|
||||||
return tableStateManager;
|
return tableStateManager;
|
||||||
|
@ -495,4 +495,11 @@ public interface MasterServices extends Server {
|
|||||||
* @return True if cluster is up; false if cluster is not up (we are shutting down).
|
* @return True if cluster is up; false if cluster is not up (we are shutting down).
|
||||||
*/
|
*/
|
||||||
boolean isClusterUp();
|
boolean isClusterUp();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return return null if current is zk-based WAL splitting
|
||||||
|
*/
|
||||||
|
default SplitWALManager getSplitWALManager(){
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,7 +60,8 @@ public class MasterWalManager {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final static PathFilter NON_META_FILTER = new PathFilter() {
|
@VisibleForTesting
|
||||||
|
public final static PathFilter NON_META_FILTER = new PathFilter() {
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Path p) {
|
public boolean accept(Path p) {
|
||||||
return !AbstractFSWALProvider.isMetaFile(p);
|
return !AbstractFSWALProvider.isMetaFile(p);
|
||||||
@ -167,7 +168,6 @@ public class MasterWalManager {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @return listing of ServerNames found by parsing WAL directory paths in FS.
|
* @return listing of ServerNames found by parsing WAL directory paths in FS.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public Set<ServerName> getServerNamesFromWALDirPath(final PathFilter filter) throws IOException {
|
public Set<ServerName> getServerNamesFromWALDirPath(final PathFilter filter) throws IOException {
|
||||||
FileStatus[] walDirForServerNames = getWALDirPaths(filter);
|
FileStatus[] walDirForServerNames = getWALDirPaths(filter);
|
||||||
@ -290,7 +290,7 @@ public class MasterWalManager {
|
|||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
|
||||||
"We only release this lock when we set it. Updates to code that uses it should verify use " +
|
"We only release this lock when we set it. Updates to code that uses it should verify use " +
|
||||||
"of the guard boolean.")
|
"of the guard boolean.")
|
||||||
private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
|
List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
|
||||||
List<Path> logDirs = new ArrayList<>();
|
List<Path> logDirs = new ArrayList<>();
|
||||||
boolean needReleaseLock = false;
|
boolean needReleaseLock = false;
|
||||||
if (!this.services.isInitialized()) {
|
if (!this.services.isInitialized()) {
|
||||||
|
@ -0,0 +1,239 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
|
import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
|
||||||
|
import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each
|
||||||
|
* {@link SplitWALProcedure}.
|
||||||
|
* Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER).
|
||||||
|
* Helps assign and release workers for split tasks.
|
||||||
|
* Provide helper method to delete split WAL file and directory.
|
||||||
|
*
|
||||||
|
* The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta)
|
||||||
|
* can get the files that need to split via getWALsToSplit(crashedServer, splitMeta)
|
||||||
|
* can delete the splitting WAL and directory via deleteSplitWAL(wal)
|
||||||
|
* and deleteSplitWAL(crashedServer)
|
||||||
|
* can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath)
|
||||||
|
* can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure)
|
||||||
|
* and releaseSplitWALWorker(worker, scheduler)
|
||||||
|
*
|
||||||
|
* This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager},
|
||||||
|
* {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
|
||||||
|
* {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed
|
||||||
|
* after we switch to procedure-based WAL splitting.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SplitWALManager {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
|
||||||
|
|
||||||
|
private final MasterServices master;
|
||||||
|
private final SplitWorkerAssigner splitWorkerAssigner;
|
||||||
|
private final Path rootDir;
|
||||||
|
private final FileSystem fs;
|
||||||
|
private final Configuration conf;
|
||||||
|
|
||||||
|
public SplitWALManager(MasterServices master) {
|
||||||
|
this.master = master;
|
||||||
|
this.conf = master.getConfiguration();
|
||||||
|
this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
|
||||||
|
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
|
||||||
|
this.rootDir = master.getMasterFileSystem().getWALRootDir();
|
||||||
|
this.fs = master.getMasterFileSystem().getFileSystem();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
// 1. list all splitting files
|
||||||
|
List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta);
|
||||||
|
// 2. create corresponding procedures
|
||||||
|
return createSplitWALProcedures(splittingFiles, crashedServer);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("failed to create procedures for splitting logs of {}", crashedServer, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta)
|
||||||
|
throws IOException {
|
||||||
|
List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
|
||||||
|
FileStatus[] fileStatuses =
|
||||||
|
SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
|
||||||
|
LOG.info("size of WALs of {} is {}, isMeta: {}", serverName, fileStatuses.length, splitMeta);
|
||||||
|
return Lists.newArrayList(fileStatuses);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path getWALSplitDir(ServerName serverName) {
|
||||||
|
Path logDir =
|
||||||
|
new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
|
||||||
|
return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteSplitWAL(String wal) throws IOException {
|
||||||
|
fs.delete(new Path(wal), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteWALDir(ServerName serverName) throws IOException {
|
||||||
|
Path splitDir = getWALSplitDir(serverName);
|
||||||
|
fs.delete(splitDir, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSplitWALFinished(String walPath) throws IOException {
|
||||||
|
return !fs.exists(new Path(rootDir, walPath));
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
|
||||||
|
ServerName crashedServer) {
|
||||||
|
return splittingWALs.stream()
|
||||||
|
.map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* try to acquire an worker from online servers which is executring
|
||||||
|
* @param procedure split WAL task
|
||||||
|
* @return an available region server which could execute this task
|
||||||
|
* @throws ProcedureSuspendedException if there is no available worker,
|
||||||
|
* it will throw this exception to let the procedure wait
|
||||||
|
*/
|
||||||
|
public ServerName acquireSplitWALWorker(Procedure<?> procedure)
|
||||||
|
throws ProcedureSuspendedException {
|
||||||
|
Optional<ServerName> worker = splitWorkerAssigner.acquire();
|
||||||
|
LOG.debug("acquired a worker {} to split a WAL", worker);
|
||||||
|
if (worker.isPresent()) {
|
||||||
|
return worker.get();
|
||||||
|
}
|
||||||
|
splitWorkerAssigner.suspend(procedure);
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After the worker finished the split WAL task, it will release the worker, and wake up all the
|
||||||
|
* suspend procedures in the ProcedureEvent
|
||||||
|
* @param worker worker which is about to release
|
||||||
|
* @param scheduler scheduler which is to wake up the procedure event
|
||||||
|
*/
|
||||||
|
public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
|
||||||
|
LOG.debug("release a worker {} to split a WAL", worker);
|
||||||
|
splitWorkerAssigner.release(worker);
|
||||||
|
splitWorkerAssigner.wake(scheduler);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL
|
||||||
|
* tasks running on the region server side, they will not be count by the new splitWorkerAssigner.
|
||||||
|
* Thus we should add the workers of running tasks to the assigner when we load the procedures
|
||||||
|
* from MasterProcWALs.
|
||||||
|
* @param worker region server which is executing a split WAL task
|
||||||
|
*/
|
||||||
|
public void addUsedSplitWALWorker(ServerName worker){
|
||||||
|
splitWorkerAssigner.addUsedWorker(worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* help assign and release a worker for each WAL splitting task
|
||||||
|
* For each worker, concurrent running splitting task should be no more than maxSplitTasks
|
||||||
|
* If a task failed to acquire a worker, it will suspend and wait for workers available
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private static final class SplitWorkerAssigner implements ServerListener {
|
||||||
|
private int maxSplitTasks;
|
||||||
|
private final ProcedureEvent<?> event;
|
||||||
|
private Map<ServerName, Integer> currentWorkers = new HashMap<>();
|
||||||
|
private MasterServices master;
|
||||||
|
|
||||||
|
public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
|
||||||
|
this.maxSplitTasks = maxSplitTasks;
|
||||||
|
this.master = master;
|
||||||
|
this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
|
||||||
|
this.master.getServerManager().registerListener(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized Optional<ServerName> acquire() {
|
||||||
|
List<ServerName> serverList = master.getServerManager().getOnlineServersList();
|
||||||
|
Collections.shuffle(serverList);
|
||||||
|
Optional<ServerName> worker = serverList.stream().filter(
|
||||||
|
serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
|
||||||
|
.findAny();
|
||||||
|
if (worker.isPresent()) {
|
||||||
|
currentWorkers.compute(worker.get(), (serverName,
|
||||||
|
availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
|
||||||
|
}
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void release(ServerName serverName) {
|
||||||
|
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void suspend(Procedure<?> proc) {
|
||||||
|
event.suspend();
|
||||||
|
event.suspendIfNotReady(proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void wake(MasterProcedureScheduler scheduler) {
|
||||||
|
if (!event.isReady()) {
|
||||||
|
event.wake(scheduler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serverAdded(ServerName worker) {
|
||||||
|
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void addUsedWorker(ServerName worker) {
|
||||||
|
// load used worker when master restart
|
||||||
|
currentWorkers.compute(worker, (serverName,
|
||||||
|
availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -17,6 +17,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.procedure;
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -28,9 +31,11 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
|||||||
import org.apache.hadoop.hbase.client.TableState;
|
import org.apache.hadoop.hbase.client.TableState;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.MasterWalManager;
|
import org.apache.hadoop.hbase.master.MasterWalManager;
|
||||||
|
import org.apache.hadoop.hbase.master.SplitWALManager;
|
||||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||||
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
|
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
@ -107,6 +112,7 @@ public class ServerCrashProcedure
|
|||||||
protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
|
protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
|
||||||
throws ProcedureSuspendedException, ProcedureYieldException {
|
throws ProcedureSuspendedException, ProcedureYieldException {
|
||||||
final MasterServices services = env.getMasterServices();
|
final MasterServices services = env.getMasterServices();
|
||||||
|
final AssignmentManager am = env.getAssignmentManager();
|
||||||
// HBASE-14802
|
// HBASE-14802
|
||||||
// If we have not yet notified that we are processing a dead server, we should do now.
|
// If we have not yet notified that we are processing a dead server, we should do now.
|
||||||
if (!notifiedDeadServer) {
|
if (!notifiedDeadServer) {
|
||||||
@ -117,6 +123,7 @@ public class ServerCrashProcedure
|
|||||||
switch (state) {
|
switch (state) {
|
||||||
case SERVER_CRASH_START:
|
case SERVER_CRASH_START:
|
||||||
case SERVER_CRASH_SPLIT_META_LOGS:
|
case SERVER_CRASH_SPLIT_META_LOGS:
|
||||||
|
case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
|
||||||
case SERVER_CRASH_ASSIGN_META:
|
case SERVER_CRASH_ASSIGN_META:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -137,8 +144,24 @@ public class ServerCrashProcedure
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case SERVER_CRASH_SPLIT_META_LOGS:
|
case SERVER_CRASH_SPLIT_META_LOGS:
|
||||||
splitMetaLogs(env);
|
if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
|
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
|
||||||
|
splitMetaLogs(env);
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
|
||||||
|
} else {
|
||||||
|
am.getRegionStates().metaLogSplitting(serverName);
|
||||||
|
addChildProcedure(createSplittingWalProcedures(env, true));
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
|
||||||
|
if(isSplittingDone(env, true)){
|
||||||
|
cleanupSplitDir(env);
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
|
||||||
|
am.getRegionStates().metaLogSplit(serverName);
|
||||||
|
} else {
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case SERVER_CRASH_ASSIGN_META:
|
case SERVER_CRASH_ASSIGN_META:
|
||||||
assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
|
assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
|
||||||
@ -156,8 +179,24 @@ public class ServerCrashProcedure
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case SERVER_CRASH_SPLIT_LOGS:
|
case SERVER_CRASH_SPLIT_LOGS:
|
||||||
splitLogs(env);
|
if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
|
||||||
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
|
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
|
||||||
|
splitLogs(env);
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
|
||||||
|
} else {
|
||||||
|
am.getRegionStates().logSplitting(this.serverName);
|
||||||
|
addChildProcedure(createSplittingWalProcedures(env, false));
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_WALS_DIR);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case SERVER_CRASH_DELETE_SPLIT_WALS_DIR:
|
||||||
|
if (isSplittingDone(env, false)) {
|
||||||
|
cleanupSplitDir(env);
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
|
||||||
|
am.getRegionStates().logSplit(this.serverName);
|
||||||
|
} else {
|
||||||
|
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case SERVER_CRASH_ASSIGN:
|
case SERVER_CRASH_ASSIGN:
|
||||||
// If no regions to assign, skip assign and skip to the finish.
|
// If no regions to assign, skip assign and skip to the finish.
|
||||||
@ -179,6 +218,7 @@ public class ServerCrashProcedure
|
|||||||
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
|
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
|
||||||
break;
|
break;
|
||||||
case SERVER_CRASH_FINISH:
|
case SERVER_CRASH_FINISH:
|
||||||
|
LOG.info("removed crashed server {} after splitting done", serverName);
|
||||||
services.getAssignmentManager().getRegionStates().removeServer(serverName);
|
services.getAssignmentManager().getRegionStates().removeServer(serverName);
|
||||||
services.getServerManager().getDeadServers().finish(serverName);
|
services.getServerManager().getDeadServers().finish(serverName);
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
@ -191,6 +231,34 @@ public class ServerCrashProcedure
|
|||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanupSplitDir(MasterProcedureEnv env) {
|
||||||
|
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
|
||||||
|
try {
|
||||||
|
splitWALManager.deleteWALDir(serverName);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("remove WAL directory of server {} failed, ignore...", serverName, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) {
|
||||||
|
LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta);
|
||||||
|
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
|
||||||
|
try {
|
||||||
|
return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("get filelist of serverName {} failed, retry...", serverName, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, boolean splitMeta)
|
||||||
|
throws IOException {
|
||||||
|
LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta);
|
||||||
|
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
|
||||||
|
List<Procedure> procedures = splitWALManager.splitWALs(serverName, splitMeta);
|
||||||
|
return procedures.toArray(new Procedure[procedures.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
private boolean filterDefaultMetaRegions() {
|
private boolean filterDefaultMetaRegions() {
|
||||||
if (regionsOnCrashedServer == null) {
|
if (regionsOnCrashedServer == null) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -27,7 +27,18 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface ServerProcedureInterface {
|
public interface ServerProcedureInterface {
|
||||||
public enum ServerOperationType {
|
public enum ServerOperationType {
|
||||||
CRASH_HANDLER, SWITCH_RPC_THROTTLE
|
CRASH_HANDLER, SWITCH_RPC_THROTTLE,
|
||||||
|
/**
|
||||||
|
* help find a available region server as worker and release worker after task done invoke
|
||||||
|
* SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker manage the split wal
|
||||||
|
* task flow, will retry if SPLIT_WAL_REMOTE failed
|
||||||
|
*/
|
||||||
|
SPLIT_WAL,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* send the split WAL request to region server and handle the response
|
||||||
|
*/
|
||||||
|
SPLIT_WAL_REMOTE
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -36,6 +36,8 @@ class ServerQueue extends Queue<ServerName> {
|
|||||||
case CRASH_HANDLER:
|
case CRASH_HANDLER:
|
||||||
return true;
|
return true;
|
||||||
case SWITCH_RPC_THROTTLE:
|
case SWITCH_RPC_THROTTLE:
|
||||||
|
case SPLIT_WAL:
|
||||||
|
case SPLIT_WAL_REMOTE:
|
||||||
return false;
|
return false;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
@ -0,0 +1,199 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.master.SplitWALManager;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The procedure is to split a WAL. It will get an available region server and
|
||||||
|
* schedule a {@link SplitWALRemoteProcedure} to actually send the request to region
|
||||||
|
* server to split this WAL.
|
||||||
|
* It also check if the split wal task really succeed. If the WAL still exists, it will
|
||||||
|
* schedule another region server to split this WAL.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SplitWALProcedure
|
||||||
|
extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
|
||||||
|
implements ServerProcedureInterface {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(SplitWALProcedure.class);
|
||||||
|
private String walPath;
|
||||||
|
private ServerName worker;
|
||||||
|
private ServerName crashedServer;
|
||||||
|
private int attempts = 0;
|
||||||
|
|
||||||
|
public SplitWALProcedure() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public SplitWALProcedure(String walPath, ServerName crashedServer) {
|
||||||
|
this.walPath = walPath;
|
||||||
|
this.crashedServer = crashedServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
|
||||||
|
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||||
|
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
|
||||||
|
switch (state) {
|
||||||
|
case ACQUIRE_SPLIT_WAL_WORKER:
|
||||||
|
worker = splitWALManager.acquireSplitWALWorker(this);
|
||||||
|
setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
case DISPATCH_WAL_TO_WORKER:
|
||||||
|
assert worker != null;
|
||||||
|
addChildProcedure(new SplitWALRemoteProcedure(worker, crashedServer, walPath));
|
||||||
|
setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
case RELEASE_SPLIT_WORKER:
|
||||||
|
boolean finished;
|
||||||
|
try {
|
||||||
|
finished = splitWALManager.isSplitWALFinished(walPath);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
long backoff = ProcedureUtil.getBackoffTimeMs(attempts++);
|
||||||
|
LOG.warn(
|
||||||
|
"Failed to check whether splitting wal {} success, wait {} seconds to retry",
|
||||||
|
walPath, backoff / 1000, ioe);
|
||||||
|
throw suspend(backoff);
|
||||||
|
}
|
||||||
|
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
|
||||||
|
if (!finished) {
|
||||||
|
LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker);
|
||||||
|
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
}
|
||||||
|
return Flow.NO_MORE_STATE;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollbackState(MasterProcedureEnv env,
|
||||||
|
MasterProcedureProtos.SplitWALState splitOneWalState)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
if (splitOneWalState == getInitialState()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MasterProcedureProtos.SplitWALState getState(int stateId) {
|
||||||
|
return MasterProcedureProtos.SplitWALState.forNumber(stateId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getStateId(MasterProcedureProtos.SplitWALState state) {
|
||||||
|
return state.getNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MasterProcedureProtos.SplitWALState getInitialState() {
|
||||||
|
return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
super.serializeStateData(serializer);
|
||||||
|
MasterProcedureProtos.SplitWALData.Builder builder =
|
||||||
|
MasterProcedureProtos.SplitWALData.newBuilder();
|
||||||
|
builder.setWalPath(walPath).setCrashedServer(ProtobufUtil.toServerName(crashedServer));
|
||||||
|
if (worker != null) {
|
||||||
|
builder.setWorker(ProtobufUtil.toServerName(worker));
|
||||||
|
}
|
||||||
|
serializer.serialize(builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
super.deserializeStateData(serializer);
|
||||||
|
MasterProcedureProtos.SplitWALData data =
|
||||||
|
serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
|
||||||
|
walPath = data.getWalPath();
|
||||||
|
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
|
||||||
|
if (data.hasWorker()) {
|
||||||
|
worker = ProtobufUtil.toServerName(data.getWorker());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
|
||||||
|
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||||
|
env.getProcedureScheduler().addFront(this);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final ProcedureSuspendedException suspend(long backoff)
|
||||||
|
throws ProcedureSuspendedException {
|
||||||
|
attempts++;
|
||||||
|
setTimeout(Math.toIntExact(backoff));
|
||||||
|
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||||
|
skipPersistence();
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getWAL() {
|
||||||
|
return walPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public ServerName getWorker(){
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerName() {
|
||||||
|
return this.crashedServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMetaTableRegion() {
|
||||||
|
return AbstractFSWALProvider.isMetaFile(new Path(walPath));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerOperationType getServerOperationType() {
|
||||||
|
return ServerOperationType.SPLIT_WAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void afterReplay(MasterProcedureEnv env){
|
||||||
|
if(worker != null){
|
||||||
|
env.getMasterServices().getSplitWALManager().addUsedSplitWALWorker(worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,195 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.NoServerDispatchException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||||
|
/**
|
||||||
|
* A remote procedure which is used to send split WAL request to region server.
|
||||||
|
* it will return null if the task is succeed or return a DoNotRetryIOException
|
||||||
|
* {@link SplitWALProcedure} will help handle the situation that encounter
|
||||||
|
* DoNotRetryIOException. Otherwise it will retry until succeed.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
|
||||||
|
implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName>,
|
||||||
|
ServerProcedureInterface {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class);
|
||||||
|
private String walPath;
|
||||||
|
private ServerName worker;
|
||||||
|
private ServerName crashedServer;
|
||||||
|
private boolean dispatched;
|
||||||
|
private ProcedureEvent<?> event;
|
||||||
|
private boolean success = false;
|
||||||
|
|
||||||
|
public SplitWALRemoteProcedure() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) {
|
||||||
|
this.worker = worker;
|
||||||
|
this.crashedServer = crashedServer;
|
||||||
|
this.walPath = wal;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||||
|
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||||
|
if (dispatched) {
|
||||||
|
if (success) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
dispatched = false;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
env.getRemoteDispatcher().addOperationToNode(worker, this);
|
||||||
|
} catch (NoNodeDispatchException | NullTargetServerDispatchException
|
||||||
|
| NoServerDispatchException e) {
|
||||||
|
// When send to a wrong target server, it need construct a new SplitWALRemoteProcedure.
|
||||||
|
// Thus return null for this procedure and let SplitWALProcedure to handle this.
|
||||||
|
LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, worker, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
dispatched = true;
|
||||||
|
event = new ProcedureEvent<>(this);
|
||||||
|
event.suspendIfNotReady(this);
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean abort(MasterProcedureEnv env) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
|
||||||
|
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
|
||||||
|
builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker))
|
||||||
|
.setCrashedServer(ProtobufUtil.toServerName(crashedServer));
|
||||||
|
serializer.serialize(builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
MasterProcedureProtos.SplitWALRemoteData data =
|
||||||
|
serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class);
|
||||||
|
walPath = data.getWalPath();
|
||||||
|
worker = ProtobufUtil.toServerName(data.getWorker());
|
||||||
|
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env,
|
||||||
|
ServerName serverName) {
|
||||||
|
return new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplitWALCallable.class,
|
||||||
|
MasterProcedureProtos.SplitWALParameter.newBuilder().setWalPath(walPath).build()
|
||||||
|
.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
|
||||||
|
IOException exception) {
|
||||||
|
complete(env, exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remoteOperationCompleted(MasterProcedureEnv env) {
|
||||||
|
complete(env, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void complete(MasterProcedureEnv env, Throwable error) {
|
||||||
|
if (event == null) {
|
||||||
|
LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
|
||||||
|
getProcId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (error == null) {
|
||||||
|
LOG.info("split WAL {} on {} succeeded", walPath, worker);
|
||||||
|
try {
|
||||||
|
env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
|
||||||
|
} catch (IOException e){
|
||||||
|
LOG.warn("remove WAL {} failed, ignore...", walPath, e);
|
||||||
|
}
|
||||||
|
success = true;
|
||||||
|
} else {
|
||||||
|
if (error instanceof DoNotRetryIOException) {
|
||||||
|
LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server",
|
||||||
|
walPath, worker, error);
|
||||||
|
success = true;
|
||||||
|
} else {
|
||||||
|
LOG.warn("split WAL {} failed, retry...", walPath, error);
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
event.wake(env.getProcedureScheduler());
|
||||||
|
event = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
|
||||||
|
complete(env, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getWAL() {
|
||||||
|
return this.walPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerName() {
|
||||||
|
// return the crashed server is to use the queue of root ServerCrashProcedure
|
||||||
|
return this.crashedServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMetaTableRegion() {
|
||||||
|
return AbstractFSWALProvider.isMetaFile(new Path(walPath));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerOperationType getServerOperationType() {
|
||||||
|
return ServerOperationType.SPLIT_WAL_REMOTE;
|
||||||
|
}
|
||||||
|
}
|
@ -17,6 +17,11 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.Thread.UncaughtExceptionHandler;
|
import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
import java.lang.management.MemoryType;
|
import java.lang.management.MemoryType;
|
||||||
@ -88,7 +93,6 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
|
|||||||
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
|
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
|
||||||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||||
@ -616,7 +620,10 @@ public class HRegionServer extends HasThread implements
|
|||||||
rpcServices.isa.getPort(), this, canCreateBaseZNode());
|
rpcServices.isa.getPort(), this, canCreateBaseZNode());
|
||||||
// If no master in cluster, skip trying to track one or look for a cluster status.
|
// If no master in cluster, skip trying to track one or look for a cluster status.
|
||||||
if (!this.masterless) {
|
if (!this.masterless) {
|
||||||
this.csm = new ZkCoordinatedStateManager(this);
|
if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
|
||||||
|
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
|
||||||
|
this.csm = new ZkCoordinatedStateManager(this);
|
||||||
|
}
|
||||||
|
|
||||||
masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
|
masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
|
||||||
masterAddressTracker.start();
|
masterAddressTracker.start();
|
||||||
@ -1923,7 +1930,7 @@ public class HRegionServer extends HasThread implements
|
|||||||
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
|
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
|
||||||
}
|
}
|
||||||
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
|
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
|
||||||
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
|
HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
|
||||||
// Start the threads for compacted files discharger
|
// Start the threads for compacted files discharger
|
||||||
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
|
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
|
||||||
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
|
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
|
||||||
@ -1966,9 +1973,10 @@ public class HRegionServer extends HasThread implements
|
|||||||
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
|
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
|
||||||
sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
|
sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
|
||||||
if (this.csm != null) {
|
if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
|
||||||
|
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
|
||||||
// SplitLogWorker needs csm. If none, don't start this.
|
// SplitLogWorker needs csm. If none, don't start this.
|
||||||
this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
|
this.splitLogWorker = new SplitLogWorker(sinkConf, this,
|
||||||
this, walFactory);
|
this, walFactory);
|
||||||
splitLogWorker.start();
|
splitLogWorker.start();
|
||||||
} else {
|
} else {
|
||||||
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
|||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
@ -78,58 +79,60 @@ public class SplitLogWorker implements Runnable {
|
|||||||
coordination.init(server, conf, splitTaskExecutor, this);
|
coordination.init(server, conf, splitTaskExecutor, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SplitLogWorker(final Server hserver, final Configuration conf,
|
public SplitLogWorker(Configuration conf, RegionServerServices server,
|
||||||
final RegionServerServices server, final LastSequenceId sequenceIdChecker,
|
LastSequenceId sequenceIdChecker, WALFactory factory) {
|
||||||
final WALFactory factory) {
|
this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory));
|
||||||
this(hserver, conf, server, new TaskExecutor() {
|
}
|
||||||
@Override
|
|
||||||
public Status exec(String filename, CancelableProgressable p) {
|
static Status splitLog(String filename, CancelableProgressable p, Configuration conf,
|
||||||
Path walDir;
|
RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
|
||||||
FileSystem fs;
|
Path walDir;
|
||||||
try {
|
FileSystem fs;
|
||||||
walDir = FSUtils.getWALRootDir(conf);
|
try {
|
||||||
fs = walDir.getFileSystem(conf);
|
walDir = FSUtils.getWALRootDir(conf);
|
||||||
} catch (IOException e) {
|
fs = walDir.getFileSystem(conf);
|
||||||
LOG.warn("could not find root dir or fs", e);
|
} catch (IOException e) {
|
||||||
return Status.RESIGNED;
|
LOG.warn("could not find root dir or fs", e);
|
||||||
}
|
return Status.RESIGNED;
|
||||||
// TODO have to correctly figure out when log splitting has been
|
}
|
||||||
// interrupted or has encountered a transient error and when it has
|
// TODO have to correctly figure out when log splitting has been
|
||||||
// encountered a bad non-retry-able persistent error.
|
// interrupted or has encountered a transient error and when it has
|
||||||
try {
|
// encountered a bad non-retry-able persistent error.
|
||||||
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
|
try {
|
||||||
fs, conf, p, sequenceIdChecker,
|
SplitLogWorkerCoordination splitLogWorkerCoordination =
|
||||||
server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) {
|
server.getCoordinatedStateManager() == null ? null
|
||||||
return Status.PREEMPTED;
|
: server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
|
||||||
}
|
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf,
|
||||||
} catch (InterruptedIOException iioe) {
|
p, sequenceIdChecker, splitLogWorkerCoordination, factory)) {
|
||||||
LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
|
return Status.PREEMPTED;
|
||||||
return Status.RESIGNED;
|
}
|
||||||
} catch (IOException e) {
|
} catch (InterruptedIOException iioe) {
|
||||||
if (e instanceof FileNotFoundException) {
|
LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
|
||||||
// A wal file may not exist anymore. Nothing can be recovered so move on
|
return Status.RESIGNED;
|
||||||
LOG.warn("WAL {} does not exist anymore", filename, e);
|
} catch (IOException e) {
|
||||||
return Status.DONE;
|
if (e instanceof FileNotFoundException) {
|
||||||
}
|
// A wal file may not exist anymore. Nothing can be recovered so move on
|
||||||
Throwable cause = e.getCause();
|
LOG.warn("WAL {} does not exist anymore", filename, e);
|
||||||
if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
|
|
||||||
|| cause instanceof ConnectException
|
|
||||||
|| cause instanceof SocketTimeoutException)) {
|
|
||||||
LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
|
|
||||||
+ "resigning", e);
|
|
||||||
return Status.RESIGNED;
|
|
||||||
} else if (cause instanceof InterruptedException) {
|
|
||||||
LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
|
|
||||||
return Status.RESIGNED;
|
|
||||||
}
|
|
||||||
LOG.warn("log splitting of " + filename + " failed, returning error", e);
|
|
||||||
return Status.ERR;
|
|
||||||
}
|
|
||||||
return Status.DONE;
|
return Status.DONE;
|
||||||
}
|
}
|
||||||
});
|
Throwable cause = e.getCause();
|
||||||
|
if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
|
||||||
|
|| cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
|
||||||
|
LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
|
||||||
|
+ "resigning",
|
||||||
|
e);
|
||||||
|
return Status.RESIGNED;
|
||||||
|
} else if (cause instanceof InterruptedException) {
|
||||||
|
LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
|
||||||
|
return Status.RESIGNED;
|
||||||
|
}
|
||||||
|
LOG.warn("log splitting of " + filename + " failed, returning error", e);
|
||||||
|
return Status.ERR;
|
||||||
|
}
|
||||||
|
return Status.DONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -0,0 +1,109 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.executor.EventType;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
||||||
|
import org.apache.hadoop.hbase.util.KeyLocker;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This callable is used to do the real split WAL task. It is called by
|
||||||
|
* {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} from master and executed
|
||||||
|
* by executor service which is in charge of executing the events of EventType.RS_LOG_REPLAY
|
||||||
|
*
|
||||||
|
* When execute this callable, it will call SplitLogWorker.splitLog() to split the WAL.
|
||||||
|
* If the return value is SplitLogWorker.TaskExecutor.Status.DONE, it means the task is successful
|
||||||
|
* and it will return null to end the call. Otherwise it will throw an exception and let
|
||||||
|
* {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} to handle this problem.
|
||||||
|
*
|
||||||
|
* This class is to replace the zk-based WAL splitting related code, {@link SplitLogWorker},
|
||||||
|
* {@link org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination} and
|
||||||
|
* {@link org.apache.hadoop.hbase.coordination.ZkSplitLogWorkerCoordination} can be removed after
|
||||||
|
* we switch to procedure-based WAL splitting.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SplitWALCallable implements RSProcedureCallable {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class);
|
||||||
|
|
||||||
|
private String walPath;
|
||||||
|
private Exception initError;
|
||||||
|
private HRegionServer rs;
|
||||||
|
private final KeyLocker<String> splitWALLocks = new KeyLocker<>();
|
||||||
|
private volatile Lock splitWALLock = null;
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(byte[] parameter, HRegionServer rs) {
|
||||||
|
try {
|
||||||
|
this.rs = rs;
|
||||||
|
MasterProcedureProtos.SplitWALParameter param =
|
||||||
|
MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
|
||||||
|
this.walPath = param.getWalPath();
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
LOG.error("parse proto buffer of split WAL request failed ", e);
|
||||||
|
initError = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventType getEventType() {
|
||||||
|
return EventType.RS_LOG_REPLAY;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
if (initError != null) {
|
||||||
|
throw initError;
|
||||||
|
}
|
||||||
|
//grab a lock
|
||||||
|
splitWALLock = splitWALLocks.acquireLock(walPath);
|
||||||
|
try{
|
||||||
|
splitWal();
|
||||||
|
LOG.info("split WAL {} succeed.", walPath);
|
||||||
|
} catch (IOException e){
|
||||||
|
LOG.warn("failed to split WAL {}.", walPath, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
splitWALLock.unlock();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getWalPath() {
|
||||||
|
return this.walPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void splitWal() throws IOException {
|
||||||
|
SplitLogWorker.TaskExecutor.Status status =
|
||||||
|
SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.walFactory);
|
||||||
|
if (status != SplitLogWorker.TaskExecutor.Status.DONE) {
|
||||||
|
throw new IOException("Split WAL " + walPath + " failed at server ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -18,6 +18,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
|
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
|
||||||
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
|
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
|
||||||
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
|
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
|
||||||
@ -143,7 +144,7 @@ public abstract class AbstractTestDLS {
|
|||||||
conf.setInt("zookeeper.recovery.retry", 0);
|
conf.setInt("zookeeper.recovery.retry", 0);
|
||||||
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
|
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
|
||||||
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
|
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
|
||||||
conf.setInt("hbase.regionserver.wal.max.splitters", 3);
|
conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
|
||||||
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
||||||
conf.set("hbase.wal.provider", getWalProvider());
|
conf.set("hbase.wal.provider", getWalProvider());
|
||||||
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
||||||
|
@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotEquals;
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -47,12 +49,16 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
|||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category({ MasterTests.class, LargeTests.class })
|
@Category({ MasterTests.class, LargeTests.class })
|
||||||
public class TestRestartCluster {
|
public class TestRestartCluster {
|
||||||
|
|
||||||
@ -63,6 +69,9 @@ public class TestRestartCluster {
|
|||||||
private static final Logger LOG = LoggerFactory.getLogger(TestRestartCluster.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestRestartCluster.class);
|
||||||
private HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
private HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@Parameterized.Parameter
|
||||||
|
public boolean splitWALCoordinatedByZK;
|
||||||
|
|
||||||
private static final TableName[] TABLES = {
|
private static final TableName[] TABLES = {
|
||||||
TableName.valueOf("restartTableOne"),
|
TableName.valueOf("restartTableOne"),
|
||||||
TableName.valueOf("restartTableTwo"),
|
TableName.valueOf("restartTableTwo"),
|
||||||
@ -70,6 +79,13 @@ public class TestRestartCluster {
|
|||||||
};
|
};
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("family");
|
private static final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
LOG.info("WAL splitting coordinated by zk? {}", splitWALCoordinatedByZK);
|
||||||
|
UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
|
||||||
|
splitWALCoordinatedByZK);
|
||||||
|
}
|
||||||
|
|
||||||
@After public void tearDown() throws Exception {
|
@After public void tearDown() throws Exception {
|
||||||
UTIL.shutdownMiniCluster();
|
UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
@ -304,4 +320,9 @@ public class TestRestartCluster {
|
|||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection coordinatedByZK() {
|
||||||
|
return Arrays.asList(false, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master;
|
|||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||||
@ -45,6 +48,8 @@ import org.junit.Rule;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -53,6 +58,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
|||||||
/**
|
/**
|
||||||
* Tests the restarting of everything as done during rolling restarts.
|
* Tests the restarting of everything as done during rolling restarts.
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category({MasterTests.class, LargeTests.class})
|
@Category({MasterTests.class, LargeTests.class})
|
||||||
public class TestRollingRestart {
|
public class TestRollingRestart {
|
||||||
|
|
||||||
@ -65,6 +71,9 @@ public class TestRollingRestart {
|
|||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@Parameterized.Parameter
|
||||||
|
public boolean splitWALCoordinatedByZK;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicRollingRestart() throws Exception {
|
public void testBasicRollingRestart() throws Exception {
|
||||||
|
|
||||||
@ -78,6 +87,8 @@ public class TestRollingRestart {
|
|||||||
// Start the cluster
|
// Start the cluster
|
||||||
log("Starting cluster");
|
log("Starting cluster");
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
|
||||||
|
splitWALCoordinatedByZK);
|
||||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
||||||
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
||||||
.numMasters(NUM_MASTERS).numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
|
.numMasters(NUM_MASTERS).numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
|
||||||
@ -87,7 +98,8 @@ public class TestRollingRestart {
|
|||||||
cluster.waitForActiveAndReadyMaster();
|
cluster.waitForActiveAndReadyMaster();
|
||||||
|
|
||||||
// Create a table with regions
|
// Create a table with regions
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
final TableName tableName =
|
||||||
|
TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-"));
|
||||||
byte [] family = Bytes.toBytes("family");
|
byte [] family = Bytes.toBytes("family");
|
||||||
log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
|
log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
|
||||||
Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE);
|
Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE);
|
||||||
@ -282,5 +294,9 @@ public class TestRollingRestart {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection coordinatedByZK() {
|
||||||
|
return Arrays.asList(false, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,383 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
|
import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
|
||||||
|
public class TestSplitWALManager {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestSplitWALManager.class);
|
||||||
|
|
||||||
|
private static HBaseTestingUtility TEST_UTIL;
|
||||||
|
private HMaster master;
|
||||||
|
private SplitWALManager splitWALManager;
|
||||||
|
private TableName TABLE_NAME;
|
||||||
|
private byte[] FAMILY;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
|
||||||
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
splitWALManager = master.getSplitWALManager();
|
||||||
|
TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
|
||||||
|
FAMILY = Bytes.toBytes("test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireAndRelease() throws Exception {
|
||||||
|
List<FakeServerProcedure> testProcedures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
testProcedures.add(new FakeServerProcedure(
|
||||||
|
TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
|
||||||
|
}
|
||||||
|
ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
|
||||||
|
Assert.assertNotNull(server);
|
||||||
|
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
|
||||||
|
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
|
||||||
|
|
||||||
|
Exception e = null;
|
||||||
|
try {
|
||||||
|
splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
|
||||||
|
} catch (ProcedureSuspendedException suspendException) {
|
||||||
|
e = suspendException;
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(e);
|
||||||
|
Assert.assertTrue(e instanceof ProcedureSuspendedException);
|
||||||
|
|
||||||
|
splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
|
||||||
|
.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
|
||||||
|
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddNewServer() throws Exception {
|
||||||
|
List<FakeServerProcedure> testProcedures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
testProcedures.add(new FakeServerProcedure(
|
||||||
|
TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
|
||||||
|
}
|
||||||
|
ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
|
||||||
|
Assert.assertNotNull(server);
|
||||||
|
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
|
||||||
|
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
|
||||||
|
|
||||||
|
Exception e = null;
|
||||||
|
try {
|
||||||
|
splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
|
||||||
|
} catch (ProcedureSuspendedException suspendException) {
|
||||||
|
e = suspendException;
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(e);
|
||||||
|
Assert.assertTrue(e instanceof ProcedureSuspendedException);
|
||||||
|
|
||||||
|
JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer();
|
||||||
|
newServer.waitForServerOnline();
|
||||||
|
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateSplitWALProcedures() throws Exception {
|
||||||
|
TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
|
||||||
|
// load table
|
||||||
|
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
|
||||||
|
ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
|
||||||
|
Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
|
||||||
|
AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
|
||||||
|
// Test splitting meta wal
|
||||||
|
FileStatus[] wals =
|
||||||
|
TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER);
|
||||||
|
Assert.assertEquals(1, wals.length);
|
||||||
|
List<Procedure> testProcedures =
|
||||||
|
splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
|
||||||
|
Assert.assertEquals(1, testProcedures.size());
|
||||||
|
ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
|
||||||
|
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
|
||||||
|
|
||||||
|
// Test splitting wal
|
||||||
|
wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER);
|
||||||
|
Assert.assertEquals(1, wals.length);
|
||||||
|
testProcedures =
|
||||||
|
splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
|
||||||
|
Assert.assertEquals(1, testProcedures.size());
|
||||||
|
ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
|
||||||
|
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireAndReleaseSplitWALWorker() throws Exception {
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
|
||||||
|
List<FakeServerProcedure> testProcedures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
FakeServerProcedure procedure =
|
||||||
|
new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
|
||||||
|
testProcedures.add(procedure);
|
||||||
|
ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE,
|
||||||
|
HConstants.NO_NONCE);
|
||||||
|
}
|
||||||
|
TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
|
||||||
|
FakeServerProcedure failedProcedure =
|
||||||
|
new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
|
||||||
|
ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE,
|
||||||
|
HConstants.NO_NONCE);
|
||||||
|
TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
|
||||||
|
Assert.assertFalse(failedProcedure.isWorkerAcquired());
|
||||||
|
// let one procedure finish and release worker
|
||||||
|
testProcedures.get(0).countDown();
|
||||||
|
TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
|
||||||
|
Assert.assertTrue(testProcedures.get(0).isSuccess());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetWALsToSplit() throws Exception {
|
||||||
|
TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
|
||||||
|
// load table
|
||||||
|
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
|
||||||
|
ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
|
||||||
|
List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true);
|
||||||
|
Assert.assertEquals(1, metaWals.size());
|
||||||
|
List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
|
||||||
|
Assert.assertEquals(1, wals.size());
|
||||||
|
ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||||
|
.map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
|
||||||
|
.get();
|
||||||
|
metaWals = splitWALManager.getWALsToSplit(testServer, true);
|
||||||
|
Assert.assertEquals(0, metaWals.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplitLogs() throws Exception {
|
||||||
|
TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
|
||||||
|
// load table
|
||||||
|
TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
|
||||||
|
ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
|
||||||
|
ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||||
|
.map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
|
||||||
|
.get();
|
||||||
|
List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
|
||||||
|
Assert.assertEquals(1, procedures.size());
|
||||||
|
ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
|
||||||
|
Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
|
||||||
|
|
||||||
|
procedures = splitWALManager.splitWALs(metaServer, true);
|
||||||
|
Assert.assertEquals(1, procedures.size());
|
||||||
|
ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
|
||||||
|
Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
|
||||||
|
Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkerReloadWhenMasterRestart() throws Exception {
|
||||||
|
List<FakeServerProcedure> testProcedures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
FakeServerProcedure procedure =
|
||||||
|
new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
|
||||||
|
testProcedures.add(procedure);
|
||||||
|
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
|
||||||
|
HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||||
|
}
|
||||||
|
TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
|
||||||
|
// Kill master
|
||||||
|
TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
|
||||||
|
TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
|
||||||
|
// restart master
|
||||||
|
TEST_UTIL.getHBaseCluster().startMaster();
|
||||||
|
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
|
||||||
|
this.master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
|
||||||
|
FakeServerProcedure failedProcedure =
|
||||||
|
new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
|
||||||
|
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure,
|
||||||
|
HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||||
|
TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
|
||||||
|
Assert.assertFalse(failedProcedure.isWorkerAcquired());
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
testProcedures.get(i).countDown();
|
||||||
|
}
|
||||||
|
failedProcedure.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class FakeServerProcedure
|
||||||
|
extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
|
||||||
|
implements ServerProcedureInterface {
|
||||||
|
|
||||||
|
private ServerName serverName;
|
||||||
|
private ServerName worker;
|
||||||
|
private CountDownLatch barrier = new CountDownLatch(1);
|
||||||
|
private boolean triedToAcquire = false;
|
||||||
|
|
||||||
|
public FakeServerProcedure() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public FakeServerProcedure(ServerName serverName) {
|
||||||
|
this.serverName = serverName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ServerName getServerName() {
|
||||||
|
return serverName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMetaTableRegion() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerOperationType getServerOperationType() {
|
||||||
|
return SPLIT_WAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Flow executeFromState(MasterProcedureEnv env,
|
||||||
|
MasterProcedureProtos.SplitWALState state)
|
||||||
|
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||||
|
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
|
||||||
|
switch (state) {
|
||||||
|
case ACQUIRE_SPLIT_WAL_WORKER:
|
||||||
|
triedToAcquire = true;
|
||||||
|
worker = splitWALManager.acquireSplitWALWorker(this);
|
||||||
|
setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
case DISPATCH_WAL_TO_WORKER:
|
||||||
|
barrier.await();
|
||||||
|
setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
case RELEASE_SPLIT_WORKER:
|
||||||
|
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
|
||||||
|
return Flow.NO_MORE_STATE;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isWorkerAcquired() {
|
||||||
|
return worker != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTriedToAcquire() {
|
||||||
|
return triedToAcquire;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void countDown() {
|
||||||
|
this.barrier.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MasterProcedureProtos.SplitWALState getState(int stateId) {
|
||||||
|
return MasterProcedureProtos.SplitWALState.forNumber(stateId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getStateId(MasterProcedureProtos.SplitWALState state) {
|
||||||
|
return state.getNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MasterProcedureProtos.SplitWALState getInitialState() {
|
||||||
|
return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean holdLock(MasterProcedureEnv env) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean abort(MasterProcedureEnv env) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
MasterProcedureProtos.SplitWALData.Builder builder =
|
||||||
|
MasterProcedureProtos.SplitWALData.newBuilder();
|
||||||
|
builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
|
||||||
|
serializer.serialize(builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||||
|
MasterProcedureProtos.SplitWALData data =
|
||||||
|
serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
|
||||||
|
serverName = ProtobufUtil.toServerName(data.getCrashedServer());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
@ -44,9 +46,13 @@ import org.junit.Before;
|
|||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameter;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category({MasterTests.class, LargeTests.class})
|
@Category({MasterTests.class, LargeTests.class})
|
||||||
public class TestServerCrashProcedure {
|
public class TestServerCrashProcedure {
|
||||||
|
|
||||||
@ -58,6 +64,9 @@ public class TestServerCrashProcedure {
|
|||||||
|
|
||||||
protected HBaseTestingUtility util;
|
protected HBaseTestingUtility util;
|
||||||
|
|
||||||
|
@Parameter
|
||||||
|
public boolean splitWALCoordinatedByZK;
|
||||||
|
|
||||||
private ProcedureMetrics serverCrashProcMetrics;
|
private ProcedureMetrics serverCrashProcMetrics;
|
||||||
private long serverCrashSubmittedCount = 0;
|
private long serverCrashSubmittedCount = 0;
|
||||||
private long serverCrashFailedCount = 0;
|
private long serverCrashFailedCount = 0;
|
||||||
@ -67,6 +76,10 @@ public class TestServerCrashProcedure {
|
|||||||
conf.set("hbase.balancer.tablesOnMaster", "none");
|
conf.set("hbase.balancer.tablesOnMaster", "none");
|
||||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
||||||
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3);
|
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3);
|
||||||
|
conf.setBoolean("hbase.split.writer.creation.bounded", true);
|
||||||
|
conf.setInt("hbase.regionserver.hlog.splitlog.writer.threads", 8);
|
||||||
|
LOG.info("WAL splitting coordinated by zk? {}", splitWALCoordinatedByZK);
|
||||||
|
conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
@ -173,7 +186,8 @@ public class TestServerCrashProcedure {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConcurrentSCPForSameServer() throws Exception {
|
public void testConcurrentSCPForSameServer() throws Exception {
|
||||||
final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer");
|
final TableName tableName =
|
||||||
|
TableName.valueOf("testConcurrentSCPForSameServer-" + splitWALCoordinatedByZK);
|
||||||
try (Table t = createTable(tableName)) {
|
try (Table t = createTable(tableName)) {
|
||||||
// Load the table with a bit of data so some logs to split and some edits in each region.
|
// Load the table with a bit of data so some logs to split and some edits in each region.
|
||||||
this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
|
this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
|
||||||
@ -222,4 +236,9 @@ public class TestServerCrashProcedure {
|
|||||||
serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount();
|
serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount();
|
||||||
serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount();
|
serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection coordinatedByZK() {
|
||||||
|
return Arrays.asList(false, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,133 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.SplitWALManager;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestSplitWALProcedure {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestSplitWALProcedure.class);
|
||||||
|
|
||||||
|
private static HBaseTestingUtility TEST_UTIL;
|
||||||
|
private HMaster master;
|
||||||
|
private TableName TABLE_NAME;
|
||||||
|
private SplitWALManager splitWALManager;
|
||||||
|
private byte[] FAMILY;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
|
||||||
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
splitWALManager = master.getSplitWALManager();
|
||||||
|
TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALProcedure"));
|
||||||
|
FAMILY = Bytes.toBytes("test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(
|
||||||
|
master.getMasterProcedureExecutor(), false);
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandleDeadWorker() throws Exception {
|
||||||
|
Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
TEST_UTIL.loadTable(table, FAMILY);
|
||||||
|
}
|
||||||
|
HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
|
||||||
|
List<FileStatus> wals = splitWALManager.getWALsToSplit(testServer.getServerName(), false);
|
||||||
|
Assert.assertEquals(1, wals.size());
|
||||||
|
TEST_UTIL.getHBaseCluster().killRegionServer(testServer.getServerName());
|
||||||
|
TEST_UTIL.waitFor(30000, () -> master.getProcedures().stream()
|
||||||
|
.anyMatch(procedure -> procedure instanceof SplitWALProcedure));
|
||||||
|
Procedure splitWALProcedure = master.getProcedures().stream()
|
||||||
|
.filter(procedure -> procedure instanceof SplitWALProcedure).findAny().get();
|
||||||
|
Assert.assertNotNull(splitWALProcedure);
|
||||||
|
TEST_UTIL.waitFor(5000, () -> ((SplitWALProcedure) splitWALProcedure).getWorker() != null);
|
||||||
|
TEST_UTIL.getHBaseCluster()
|
||||||
|
.killRegionServer(((SplitWALProcedure) splitWALProcedure).getWorker());
|
||||||
|
ProcedureTestingUtility.waitProcedure(masterPE, splitWALProcedure.getProcId());
|
||||||
|
Assert.assertTrue(splitWALProcedure.isSuccess());
|
||||||
|
ProcedureTestingUtility.waitAllProcedures(masterPE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMasterRestart() throws Exception {
|
||||||
|
Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
TEST_UTIL.loadTable(table, FAMILY);
|
||||||
|
}
|
||||||
|
HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
|
||||||
|
List<FileStatus> wals = splitWALManager.getWALsToSplit(testServer.getServerName(), false);
|
||||||
|
Assert.assertEquals(1, wals.size());
|
||||||
|
SplitWALProcedure splitWALProcedure =
|
||||||
|
new SplitWALProcedure(wals.get(0).getPath().toString(), testServer.getServerName());
|
||||||
|
long pid = ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(),
|
||||||
|
splitWALProcedure, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||||
|
TEST_UTIL.waitFor(5000, () -> splitWALProcedure.getWorker() != null);
|
||||||
|
// Kill master
|
||||||
|
TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
|
||||||
|
TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
|
||||||
|
// restart master
|
||||||
|
TEST_UTIL.getHBaseCluster().startMaster();
|
||||||
|
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
|
||||||
|
this.master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), pid);
|
||||||
|
Optional<Procedure<?>> procedure =
|
||||||
|
master.getProcedures().stream().filter(p -> p.getProcId() == pid).findAny();
|
||||||
|
// make sure procedure is successful and wal is deleted
|
||||||
|
Assert.assertTrue(procedure.isPresent());
|
||||||
|
Assert.assertTrue(procedure.get().isSuccess());
|
||||||
|
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals.get(0).getPath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.CoreMatchers.not;
|
import static org.hamcrest.CoreMatchers.not;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
@ -449,7 +450,7 @@ public class TestSplitLogWorker {
|
|||||||
final ServerName RS = ServerName.valueOf("rs,1,1");
|
final ServerName RS = ServerName.valueOf("rs,1,1");
|
||||||
final int maxTasks = 3;
|
final int maxTasks = 3;
|
||||||
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||||
testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
|
testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks);
|
||||||
RegionServerServices mockedRS = getRegionServer(RS);
|
RegionServerServices mockedRS = getRegionServer(RS);
|
||||||
for (int i = 0; i < maxTasks; i++) {
|
for (int i = 0; i < maxTasks; i++) {
|
||||||
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
|
||||||
@ -485,7 +486,7 @@ public class TestSplitLogWorker {
|
|||||||
final ServerName RS2 = ServerName.valueOf("rs,1,2");
|
final ServerName RS2 = ServerName.valueOf("rs,1,2");
|
||||||
final int maxTasks = 3;
|
final int maxTasks = 3;
|
||||||
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||||
testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
|
testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks);
|
||||||
RegionServerServices mockedRS = getRegionServer(RS);
|
RegionServerServices mockedRS = getRegionServer(RS);
|
||||||
|
|
||||||
// create two RS nodes
|
// create two RS nodes
|
||||||
|
Loading…
x
Reference in New Issue
Block a user