HBASE-24632 Enable procedure-based log splitting as default in hbase3 Add deprecation of 'classic' zk-based WAL splitter. (#2156)

Also fix three bugs:

 * We were trying to delete non-empty directory; weren't doing
 accounting for meta WALs where meta had moved off the server
 (successfully)
 * We were deleting split WALs rather than archiving them.
 * We were not handling corrupt files.

Deprecations and removal of tests of old system.

Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org>
This commit is contained in:
Michael Stack 2020-07-29 07:37:28 -07:00 committed by GitHub
parent 56f32ea895
commit 345b77a4ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 393 additions and 464 deletions

View File

@ -1447,9 +1447,18 @@ public final class HConstants {
public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
"hbase.client.fast.fail.interceptor.impl";
/**
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter; see SplitWALManager.
*/
@Deprecated
public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated";
public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true;
/**
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0.
*/
@Deprecated
public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = false;
public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = "hbase.regionserver.wal.max.splitters";

View File

@ -25,9 +25,14 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Counters kept by the distributed WAL split log process.
* Used by master and regionserver packages.
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter, see SplitWALManager
*/
@Deprecated
@InterfaceAudience.Private
public class SplitLogCounters {
private SplitLogCounters() {}
//Spnager counters
public final static LongAdder tot_mgr_log_split_batch_start = new LongAdder();
public final static LongAdder tot_mgr_log_split_batch_success = new LongAdder();

View File

@ -31,7 +31,10 @@ import org.apache.hadoop.hbase.util.Bytes;
* Encapsulates protobuf serialization/deserialization so we don't leak generated pb outside of
* this class. Used by regionserver and master packages.
* <p>Immutable
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter, see SplitWALManager
*/
@Deprecated
@InterfaceAudience.Private
public class SplitLogTask {
private final ServerName originServer;

View File

@ -40,8 +40,11 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* Methods required for task life circle: <BR>
* {@link #checkTaskStillAvailable(String)} Check that task is still there <BR>
* {@link #checkTasks()} check for unassigned tasks and resubmit them
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter, see SplitWALManager
*/
@InterfaceAudience.Private
@Deprecated
public interface SplitLogManagerCoordination {
/**
* Detail class that shares data between coordination and split log manager

View File

@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -18,16 +17,14 @@
*/
package org.apache.hadoop.hbase.coordination;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@ -44,7 +41,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* <p>
* Important methods for WALSplitterHandler: <BR>
* splitting task has completed.
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter, see SplitWALManager
*/
@Deprecated
@InterfaceAudience.Private
public interface SplitLogWorkerCoordination {

View File

@ -25,7 +25,10 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter (see SplitWALManager) which doesn't use this zk-based coordinator.
*/
@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ZkCoordinatedStateManager implements CoordinatedStateManager {
protected ZKWatcher watcher;

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
@ -2436,6 +2437,12 @@ public class MasterRpcServices extends RSRpcServices implements
@Override
public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
ReportProcedureDoneRequest request) throws ServiceException {
// Check Masters is up and ready for duty before progressing. Remote side will keep trying.
try {
this.master.checkServiceStarted();
} catch (ServerNotRunningYetException snrye) {
throw new ServiceException(snrye);
}
request.getResultList().forEach(result -> {
if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
master.remoteProcedureCompleted(result.getProcId());

View File

@ -1,5 +1,4 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@ -55,6 +53,9 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
public class MasterWalManager {
private static final Logger LOG = LoggerFactory.getLogger(MasterWalManager.class);
/**
* Filter *in* WAL files that are for the hbase:meta Region.
*/
final static PathFilter META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
@ -62,6 +63,9 @@ public class MasterWalManager {
}
};
/**
* Filter *out* WAL files that are for the hbase:meta Region; i.e. return user-space WALs only.
*/
@VisibleForTesting
public final static PathFilter NON_META_FILTER = new PathFilter() {
@Override
@ -81,10 +85,19 @@ public class MasterWalManager {
// The Path to the old logs dir
private final Path oldLogDir;
private final Path rootDir;
// create the split log lock
private final Lock splitLogLock = new ReentrantLock();
/**
* Superceded by {@link SplitWALManager}; i.e. procedure-based WAL splitting rather than
* 'classic' zk-coordinated WAL splitting.
* @deprecated since 2.3.0 and 3.0.0 to be removed in 4.0.0; replaced by {@link SplitWALManager}.
* @see SplitWALManager
*/
@Deprecated
private final SplitLogManager splitLogManager;
// Is the fileystem ok?
@ -102,7 +115,6 @@ public class MasterWalManager {
this.rootDir = rootDir;
this.services = services;
this.splitLogManager = new SplitLogManager(services, conf);
this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
}
@ -204,7 +216,7 @@ public class MasterWalManager {
*/
@Deprecated
public Set<ServerName> getFailedServersFromLogFolders() throws IOException {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
boolean retrySplitting = !conf.getBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY,
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
Set<ServerName> serverNames = new HashSet<>();
@ -361,11 +373,13 @@ public class MasterWalManager {
}
/**
* For meta region open and closed normally on a server, it may leave some meta
* WAL in the server's wal dir. Since meta region is no long on this server,
* The SCP won't split those meta wals, just leaving them there. So deleting
* the wal dir will fail since the dir is not empty. Actually We can safely achive those
* meta log and Archiving the meta log and delete the dir.
* The hbase:meta region may OPEN and CLOSE without issue on a server and then move elsewhere.
* On CLOSE, the WAL for the hbase:meta table may not be archived yet (The WAL is only needed if
* hbase:meta did not close cleanaly). Since meta region is no long on this server,
* the ServerCrashProcedure won't split these leftover hbase:meta WALs, just leaving them in
* the WAL splitting dir. If we try to delete the WAL splitting for the server, it fail since
* the dir is not totally empty. We can safely archive these hbase:meta log; then the
* WAL dir can be deleted.
* @param serverName the server to archive meta log
*/
public void archiveMetaLog(final ServerName serverName) {
@ -396,6 +410,4 @@ public class MasterWalManager {
LOG.warn("Failed archiving meta log for server " + serverName, ie);
}
}
}

View File

@ -159,6 +159,9 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
@Override
public Map<String,Entry<Long,Long>> getTableSpaceUtilization() {
if (master == null) {
return Collections.emptyMap();
}
QuotaObserverChore quotaChore = master.getQuotaObserverChore();
if (quotaChore == null) {
return Collections.emptyMap();

View File

@ -86,7 +86,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* again. If a task is resubmitted then there is a risk that old "delete task"
* can delete the re-submission.
* @see SplitWALManager for an alternate implementation based on Procedures.
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter, see SplitWALManager.
*/
@Deprecated
@InterfaceAudience.Private
public class SplitLogManager {
private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,35 +16,36 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -78,15 +79,17 @@ public class SplitWALManager {
private final Path rootDir;
private final FileSystem fs;
private final Configuration conf;
private final Path walArchiveDir;
public SplitWALManager(MasterServices master) {
public SplitWALManager(MasterServices master) throws IOException {
this.master = master;
this.conf = master.getConfiguration();
this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
this.rootDir = master.getMasterFileSystem().getWALRootDir();
// TODO: This should be the WAL FS, not the Master FS?
this.fs = master.getMasterFileSystem().getFileSystem();
this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
}
public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
@ -117,14 +120,24 @@ public class SplitWALManager {
return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
}
public void deleteSplitWAL(String wal) throws IOException {
fs.delete(new Path(wal), false);
/**
* Archive processed WAL
*/
public void archive(String wal) throws IOException {
WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir);
}
public void deleteWALDir(ServerName serverName) throws IOException {
Path splitDir = getWALSplitDir(serverName);
if (!fs.delete(splitDir, false)) {
LOG.warn("Failed delete {}", splitDir);
try {
if (!fs.delete(splitDir, false)) {
LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true));
}
} catch (PathIsNotEmptyDirectoryException e) {
FileStatus [] files = CommonFSUtils.listStatus(fs, splitDir);
LOG.warn("PathIsNotEmptyDirectoryException {}",
Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList()));
throw e;
}
}
@ -197,7 +210,11 @@ public class SplitWALManager {
this.maxSplitTasks = maxSplitTasks;
this.master = master;
this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
this.master.getServerManager().registerListener(this);
// ServerManager might be null in a test context where we are mocking; allow for this
ServerManager sm = this.master.getServerManager();
if (sm != null) {
sm.registerListener(this);
}
}
public synchronized Optional<ServerName> acquire() {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,10 +16,8 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -47,7 +45,6 @@ import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
@ -154,8 +151,8 @@ public class ServerCrashProcedure
break;
case SERVER_CRASH_SPLIT_META_LOGS:
if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
splitMetaLogs(env);
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
zkCoordinatedSplitMetaLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
} else {
am.getRegionStates().metaLogSplitting(serverName);
@ -164,8 +161,7 @@ public class ServerCrashProcedure
}
break;
case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
if(isSplittingDone(env, true)){
cleanupSplitDir(env);
if (isSplittingDone(env, true)) {
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
am.getRegionStates().metaLogSplit(serverName);
} else {
@ -195,7 +191,7 @@ public class ServerCrashProcedure
case SERVER_CRASH_SPLIT_LOGS:
if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
splitLogs(env);
zkCoordinatedSplitLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
} else {
am.getRegionStates().logSplitting(this.serverName);
@ -256,19 +252,27 @@ public class ServerCrashProcedure
private void cleanupSplitDir(MasterProcedureEnv env) {
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
try {
if (!this.carryingMeta) {
// If we are NOT carrying hbase:meta, check if any left-over hbase:meta WAL files from an
// old hbase:meta tenancy on this server; clean these up if any before trying to remove the
// WAL directory of this server or we will fail. See archiveMetaLog comment for more details
// on this condition.
env.getMasterServices().getMasterWalManager().archiveMetaLog(this.serverName);
}
splitWALManager.deleteWALDir(serverName);
} catch (IOException e) {
LOG.warn("Remove WAL directory of server {} failed, ignore...", serverName, e);
LOG.warn("Remove WAL directory for {} failed, ignore...{}", serverName, e.getMessage());
}
}
private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) {
LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta);
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
try {
return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0;
int wals = splitWALManager.getWALsToSplit(serverName, splitMeta).size();
LOG.debug("Check if {} WAL splitting is done? wals={}, meta={}", serverName, wals, splitMeta);
return wals == 0;
} catch (IOException e) {
LOG.warn("get filelist of serverName {} failed, retry...", serverName, e);
LOG.warn("Get WALs of {} failed, retry...", serverName, e);
return false;
}
}
@ -293,7 +297,12 @@ public class ServerCrashProcedure
return hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri);
}
private void splitMetaLogs(MasterProcedureEnv env) throws IOException {
/**
* Split hbase:meta logs using 'classic' zk-based coordination.
* Superceded by procedure-based WAL splitting.
* @see #createSplittingWalProcedures(MasterProcedureEnv, boolean)
*/
private void zkCoordinatedSplitMetaLogs(MasterProcedureEnv env) throws IOException {
LOG.debug("Splitting meta WALs {}", this);
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
@ -303,7 +312,12 @@ public class ServerCrashProcedure
LOG.debug("Done splitting meta WALs {}", this);
}
private void splitLogs(final MasterProcedureEnv env) throws IOException {
/**
* Split logs using 'classic' zk-based coordination.
* Superceded by procedure-based WAL splitting.
* @see #createSplittingWalProcedures(MasterProcedureEnv, boolean)
*/
private void zkCoordinatedSplitLogs(final MasterProcedureEnv env) throws IOException {
LOG.debug("Splitting WALs {}", this);
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
@ -333,14 +347,12 @@ public class ServerCrashProcedure
currentRunningState = getCurrentState();
}
int childrenLatch = getChildrenLatch();
status.setStatus(msg + " current State " + currentRunningState
+ (childrenLatch > 0 ? "; remaining num of running child procedures = " + childrenLatch
: ""));
status.setStatus(msg + " current State " + currentRunningState + (childrenLatch > 0?
"; remaining num of running child procedures = " + childrenLatch: ""));
}
@Override
protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
throws IOException {
protected void rollbackState(MasterProcedureEnv env, ServerCrashState state) throws IOException {
// Can't rollback.
throw new UnsupportedOperationException("unhandled state=" + state);
}
@ -424,7 +436,8 @@ public class ServerCrashProcedure
int size = state.getRegionsOnCrashedServerCount();
if (size > 0) {
this.regionsOnCrashedServer = new ArrayList<>(size);
for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri: state.getRegionsOnCrashedServerList()) {
for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri:
state.getRegionsOnCrashedServerList()) {
this.regionsOnCrashedServer.add(ProtobufUtil.toRegionInfo(ri));
}
}

View File

@ -96,7 +96,7 @@ public class SplitWALRemoteProcedure extends ServerRemoteProcedure
protected void complete(MasterProcedureEnv env, Throwable error) {
if (error == null) {
try {
env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
env.getMasterServices().getSplitWALManager().archive(walPath);
} catch (IOException e) {
LOG.warn("Failed split of {}; ignore...", walPath, e);
}

View File

@ -66,7 +66,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* the absence of a global lock there is a unavoidable race here - a worker might have just finished
* its task when it is stripped of its ownership. Here we rely on the idempotency of the log
* splitting task for correctness
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter, see SplitWALRemoteProcedure
*/
@Deprecated
@InterfaceAudience.Private
public class SplitLogWorker implements Runnable {
@ -181,8 +184,8 @@ public class SplitLogWorker implements Runnable {
SplitLogWorkerCoordination splitLogWorkerCoordination =
server.getCoordinatedStateManager() == null ? null
: server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf, p,
sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf,
p, sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {

View File

@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.CancelableProgressable;
/**
* Handles log splitting a wal
* Used by the zk-based distributed log splitting. Created by ZKSplitLogWorkerCoordination.
*/
* @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
* distributed WAL splitter, see SplitWALManager
*/
@Deprecated
@InterfaceAudience.Private
public class WALSplitterHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(WALSplitterHandler.class);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@ -57,9 +56,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@ -87,9 +84,6 @@ public final class WALSplitUtil {
* the splitLogFile() part. If the master crashes then this function might get called multiple
* times.
* <p>
* @param logfile
* @param conf
* @throws IOException
*/
public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
Path walDir = CommonFSUtils.getWALRootDir(conf);
@ -100,20 +94,9 @@ public final class WALSplitUtil {
} else {
walPath = new Path(walDir, logfile);
}
finishSplitLogFile(walDir, oldLogDir, walPath, conf);
}
static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath,
Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<>();
List<Path> corruptedLogs = new ArrayList<>();
FileSystem walFS = walDir.getFileSystem(conf);
if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) {
corruptedLogs.add(walPath);
} else {
processedLogs.add(walPath);
}
archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf);
boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS);
archive(walPath, corrupt, oldLogDir, walFS, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
walFS.delete(stagingDir, true);
}
@ -122,40 +105,40 @@ public final class WALSplitUtil {
* Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
* that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
*/
private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs,
final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException {
final Path corruptDir =
new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
corruptDir);
}
if (!walFS.mkdirs(corruptDir)) {
LOG.info("Unable to mkdir {}", corruptDir);
}
walFS.mkdirs(oldWALDir);
// this method can get restarted or called multiple times for archiving
// the same log files.
for (Path corruptedWAL : corruptedWALs) {
Path p = new Path(corruptDir, corruptedWAL.getName());
if (walFS.exists(corruptedWAL)) {
if (!walFS.rename(corruptedWAL, p)) {
LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p);
} else {
LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p);
}
static void archive(final Path wal, final boolean corrupt, final Path oldWALDir,
final FileSystem walFS, final Configuration conf) throws IOException {
Path dir;
Path target;
if (corrupt) {
dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir);
}
target = new Path(dir, wal.getName());
} else {
dir = oldWALDir;
target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal);
}
mkdir(walFS, dir);
moveWAL(walFS, wal, target);
}
for (Path p : processedWALs) {
Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p);
if (walFS.exists(p)) {
if (!CommonFSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
LOG.warn("Unable to move {} to {}", p, newPath);
} else {
LOG.info("Archived processed log {} to {}", p, newPath);
}
private static void mkdir(FileSystem fs, Path dir) throws IOException {
if (!fs.mkdirs(dir)) {
LOG.warn("Failed mkdir {}", dir);
}
}
/**
* Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir.
* WAL may have already been moved; makes allowance.
*/
public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException {
if (fs.exists(p)) {
if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) {
LOG.warn("Failed move of {} to {}", p, targetDir);
} else {
LOG.info("Moved {} to {}", p, targetDir);
}
}
}
@ -172,7 +155,6 @@ public final class WALSplitUtil {
* @param tmpDirName of the directory used to sideline old recovered edits file
* @param conf configuration
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
@SuppressWarnings("deprecation")
@VisibleForTesting
@ -217,7 +199,7 @@ public final class WALSplitUtil {
/**
* Get the completed recovered edits file path, renaming it to be by last edit in the file from
* its first edit. Then we could use the name to skip recovered edits when doing
* {@link HRegion#replayRecoveredEditsIfAny}.
* HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask).
* @return dstPath take file's last edit log seq num as the name
*/
static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
@ -304,7 +286,6 @@ public final class WALSplitUtil {
* @param walFS WAL FileSystem used to retrieving split edits files.
* @param regionDir WAL region dir to look for recovered edits files under.
* @return Files in passed <code>regionDir</code> as a sorted set.
* @throws IOException
*/
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
final Path regionDir) throws IOException {
@ -350,7 +331,6 @@ public final class WALSplitUtil {
* @param fs the file system used to rename bad edits file.
* @param edits Edits file to move aside.
* @return The name of the moved aside file.
* @throws IOException
*/
public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
throws IOException {
@ -453,9 +433,9 @@ public final class WALSplitUtil {
}
private final ClientProtos.MutationProto.MutationType type;
public final Mutation mutation;
public final long nonceGroup;
public final long nonce;
@SuppressWarnings("checkstyle:VisibilityModifier") public final Mutation mutation;
@SuppressWarnings("checkstyle:VisibilityModifier") public final long nonceGroup;
@SuppressWarnings("checkstyle:VisibilityModifier") public final long nonce;
@Override
public int compareTo(final MutationReplay d) {
@ -484,12 +464,9 @@ public final class WALSplitUtil {
/**
* This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &amp;
* WALEdit from the passed in WALEntry
* @param entry
* @param cells
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
* extracted from the passed in WALEntry.
* @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
* @throws IOException
*/
public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
@ -517,7 +494,9 @@ public final class WALSplitUtil {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
if (val != null) val.add(cell);
if (val != null) {
val.add(cell);
}
boolean isNewRowOrType =
previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
@ -576,8 +555,6 @@ public final class WALSplitUtil {
* @param tableName the table name
* @param encodedRegionName the encoded region name
* @param familyName the column family name
* @param seqId the sequence id which used to generate file name
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
* @return Path to recovered.hfiles directory of the region's column family.
*/
static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf,

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -30,6 +29,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -63,24 +62,26 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import javax.validation.constraints.Null;
/**
* Split RegionServer WAL files. Splits the WAL into new files,
* one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
* See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or
* {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable,
* LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for
* entry-point.
* Create an instance and call {@link #splitWAL(FileStatus, CancelableProgressable)} per file or
* use static helper methods.
*/
@InterfaceAudience.Private
public class WALSplitter {
private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors";
/** By default we retry errors in splitting, rather than skipping. */
/**
* By default we retry errors in splitting, rather than skipping.
*/
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
// Parameters for split process
protected final Path walDir;
protected final Path walRootDir;
protected final FileSystem walFS;
protected final Configuration conf;
final Path rootDir;
@ -100,8 +101,6 @@ public class WALSplitter {
private final WALFactory walFactory;
private MonitoredTask status;
// For checking the latest flushed sequence id
protected final LastSequenceId sequenceIdChecker;
@ -132,17 +131,28 @@ public class WALSplitter {
public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
public final static String SPLIT_WAL_WRITER_THREADS =
"hbase.regionserver.hlog.splitlog.writer.threads";
"hbase.regionserver.hlog.splitlog.writer.threads";
private final int numWriterThreads;
private final long bufferSize;
private final boolean splitWriterCreationBounded;
private final boolean hfile;
private final boolean skipErrors;
WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
FileSystem walFS, Path rootDir, FileSystem rootFS) {
this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null);
}
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
FileSystem walFS, Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName =
conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir;
this.walRootDir = walRootDir;
this.walFS = walFS;
this.rootDir = rootDir;
this.rootFS = rootFS;
@ -150,32 +160,17 @@ public class WALSplitter {
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.rsServices = rsServices;
this.walFactory = factory;
PipelineController controller = new PipelineController();
this.tmpDirName =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
// if we limit the number of writers opened for sinking recovered edits
boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
if (splitToHFile) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
} else if (splitWriterCreationBounded) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
} else {
entryBuffers = new EntryBuffers(controller, bufferSize);
outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
}
this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT);
}
WALFactory getWalFactory(){
WALFactory getWalFactory() {
return this.walFactory;
}
@ -193,6 +188,9 @@ public class WALSplitter {
/**
* Splits a WAL file.
* Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and tests.
* Not used by new procedure-based WAL splitter.
*
* @return false if it is interrupted by the progress-able.
*/
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
@ -201,9 +199,12 @@ public class WALSplitter {
RegionServerServices rsServices) throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
FileSystem rootFS = rootDir.getFileSystem(conf);
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
splitLogWorkerCoordination, rsServices);
return s.splitLogFile(logfile, reporter);
WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
splitLogWorkerCoordination, rsServices);
// splitWAL returns a data structure with whether split is finished and if the file is corrupt.
// We don't need to propagate corruption flag here because it is propagated by the
// SplitLogWorkerCoordination.
return splitter.splitWAL(logfile, reporter).isFinished();
}
/**
@ -214,85 +215,123 @@ public class WALSplitter {
* @return List of output files created by the split.
*/
@VisibleForTesting
public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS,
Configuration conf, final WALFactory factory) throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
FileSystem rootFS = rootDir.getFileSystem(conf);
final FileStatus[] logfiles =
SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null);
WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS);
final FileStatus[] wals =
SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null);
List<Path> splits = new ArrayList<>();
if (ArrayUtils.isNotEmpty(logfiles)) {
for (FileStatus logfile : logfiles) {
WALSplitter s =
new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
splits.addAll(s.outputSink.splits);
if (ArrayUtils.isNotEmpty(wals)) {
for (FileStatus wal: wals) {
SplitWALResult splitWALResult = splitter.splitWAL(wal, null);
if (splitWALResult.isFinished()) {
WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf);
if (splitter.outputSink.splits != null) {
splits.addAll(splitter.outputSink.splits);
}
}
}
}
if (!walFS.delete(logDir, true)) {
throw new IOException("Unable to delete src dir: " + logDir);
if (!walFS.delete(walsDir, true)) {
throw new IOException("Unable to delete src dir " + walsDir);
}
return splits;
}
/**
* WAL splitting implementation, splits one log file.
* @param logfile should be an actual log file.
* Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable).
* Test {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if
* the WAL is corrupt.
*/
static final class SplitWALResult {
private final boolean finished;
private final boolean corrupt;
private SplitWALResult(boolean finished, boolean corrupt) {
this.finished = finished;
this.corrupt = corrupt;
}
public boolean isFinished() {
return finished;
}
public boolean isCorrupt() {
return corrupt;
}
}
/**
* Setup the output sinks and entry buffers ahead of splitting WAL.
*/
private void createOutputSinkAndEntryBuffers() {
PipelineController controller = new PipelineController();
if (this.hfile) {
this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller,
this.entryBuffers, this.numWriterThreads);
} else if (this.splitWriterCreationBounded) {
this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller,
this.entryBuffers, this.numWriterThreads);
} else {
this.entryBuffers = new EntryBuffers(controller, this.bufferSize);
this.outputSink = new RecoveredEditsOutputSink(this, controller,
this.entryBuffers, this.numWriterThreads);
}
}
/**
* WAL splitting implementation, splits one WAL file.
* @param walStatus should be for an actual WAL file.
*/
@VisibleForTesting
boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
Preconditions.checkState(status == null);
Preconditions.checkArgument(logfile.isFile(),
"passed in file status is for something other than a regular file.");
boolean isCorrupted = false;
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
SPLIT_SKIP_ERRORS_DEFAULT);
SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException {
Path wal = walStatus.getPath();
Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString());
boolean corrupt = false;
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
Path logPath = logfile.getPath();
boolean outputSinkStarted = false;
boolean progressFailed = false;
boolean cancelled = false;
int editsCount = 0;
int editsSkipped = 0;
status = TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
MonitoredTask status =
TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.");
status.enableStatusJournal(true);
Reader logFileReader = null;
this.fileBeingSplit = logfile;
Reader walReader = null;
this.fileBeingSplit = walStatus;
long startTS = EnvironmentEdgeManager.currentTime();
long length = walStatus.getLen();
String lengthStr = StringUtils.humanSize(length);
createOutputSinkAndEntryBuffers();
try {
long logLength = logfile.getLen();
LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
logLength);
status.setStatus("Opening log file " + logPath);
if (reporter != null && !reporter.progress()) {
progressFailed = true;
return false;
String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)";
LOG.info(logStr);
status.setStatus(logStr);
if (cancel != null && !cancel.progress()) {
cancelled = true;
return new SplitWALResult(false, corrupt);
}
logFileReader = getReader(logfile, skipErrors, reporter);
if (logFileReader == null) {
LOG.warn("Nothing to split in WAL={}", logPath);
return true;
walReader = getReader(walStatus, this.skipErrors, cancel);
if (walReader == null) {
LOG.warn("Nothing in {}; empty?", wal);
return new SplitWALResult(true, corrupt);
}
long openCost = EnvironmentEdgeManager.currentTime() - startTS;
LOG.info("Open WAL={} cost {} ms", logPath, openCost);
LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS);
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.setReporter(reporter);
outputSink.setReporter(cancel);
outputSink.setStatus(status);
outputSink.startWriterThreads();
outputSinkStarted = true;
Entry entry;
Long lastFlushedSequenceId = -1L;
startTS = EnvironmentEdgeManager.currentTime();
while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
String encodedRegionNameAsStr = Bytes.toString(region);
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) {
if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
encodedRegionNameAsStr))) {
@ -301,8 +340,7 @@ public class WALSplitter {
// region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
// will get skipped by the seqId check below.
// See more details at https://issues.apache.org/jira/browse/HBASE-24189
LOG.info("{} no longer available in the FS. Skipping all edits for this region.",
encodedRegionNameAsStr);
LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr);
lastFlushedSequenceId = Long.MAX_VALUE;
} else {
if (sequenceIdChecker != null) {
@ -315,7 +353,7 @@ public class WALSplitter {
regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) {
LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
+ TextFormat.shortDebugString(ids));
}
}
@ -344,9 +382,9 @@ public class WALSplitter {
String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
+ " edits, skipped " + editsSkipped + " edits.";
status.setStatus("Split " + countsStr);
if (reporter != null && !reporter.progress()) {
progressFailed = true;
return false;
if (cancel != null && !cancel.progress()) {
cancelled = true;
return new SplitWALResult(false, corrupt);
}
}
}
@ -355,68 +393,64 @@ public class WALSplitter {
iie.initCause(ie);
throw iie;
} catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
if (splitLogWorkerCoordination != null) {
LOG.warn("Could not parse, corrupt WAL={}", wal, e);
// If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update
// zk. Otherwise, it is the newer procedure-based WAL split which has no zk component.
if (this.splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null.
splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
} else {
// for tests only
ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS);
}
isCorrupted = true;
corrupt = true;
} catch (IOException e) {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
throw e;
} finally {
final String log = "Finishing writing output logs and closing down";
final String log = "Finishing writing output for " + wal + " so closing down";
LOG.debug(log);
status.setStatus(log);
try {
if (null != logFileReader) {
logFileReader.close();
if (null != walReader) {
walReader.close();
}
} catch (IOException exception) {
LOG.warn("Could not close WAL reader", exception);
LOG.warn("Could not close {} reader", wal, exception);
}
try {
if (outputSinkStarted) {
// Set progress_failed to true as the immediate following statement will reset its value
// when close() throws exception, progress_failed has the right value
progressFailed = true;
progressFailed = outputSink.close() == null;
// Set cancelled to true as the immediate following statement will reset its value.
// If close() throws an exception, cancelled will have the right value
cancelled = true;
cancelled = outputSink.close() == null;
}
} finally {
long processCost = EnvironmentEdgeManager.currentTime() - startTS;
// See if length got updated post lease recovery
String msg = "Processed " + editsCount + " edits across " +
outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
" ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost +
" ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr +
", length=" + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled;
LOG.info(msg);
status.markComplete(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("WAL split completed for {} , Journal Log: {}", logPath,
status.prettyPrintJournal());
LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal());
}
}
}
return !progressFailed;
return new SplitWALResult(!cancelled, corrupt);
}
private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName)
throws IOException {
Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName);
return this.rootFS.exists(regionDirPath);
private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException {
return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region));
}
/**
* Create a new {@link Reader} for reading logs to split.
* @return Returns null if file has length zero or file can't be found.
*/
private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
protected Reader getReader(FileStatus walStatus, boolean skipErrors, CancelableProgressable cancel)
throws IOException, CorruptedLogFileException {
Path path = file.getPath();
long length = file.getLen();
Path path = walStatus.getPath();
long length = walStatus.getLen();
Reader in;
// Check for possibly empty file. With appends, currently Hadoop reports a
@ -427,9 +461,9 @@ public class WALSplitter {
}
try {
RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, reporter);
RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel);
try {
in = getReader(path, reporter);
in = getReader(path, cancel);
} catch (EOFException e) {
if (length <= 0) {
// TODO should we ignore an empty, not-last log file if skip.errors
@ -451,8 +485,8 @@ public class WALSplitter {
if (!skipErrors || e instanceof InterruptedIOException) {
throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
}
throw new CorruptedLogFileException("skipErrors=true Could not open wal "
+ path + " ignoring", e);
throw new CorruptedLogFileException("skipErrors=true; could not open " + path +
", skipping", e);
}
return in;
}
@ -463,14 +497,14 @@ public class WALSplitter {
return in.next();
} catch (EOFException eof) {
// truncated files are expected if a RS crashes (see HBASE-2643)
LOG.info("EOF from wal {}. Continuing.", path);
LOG.info("EOF from {}; continuing.", path);
return null;
} catch (IOException e) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (e.getCause() != null && (e.getCause() instanceof ParseException
|| e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
LOG.warn("Parse exception from wal {}. Continuing", path, e);
LOG.warn("Parse exception from {}; continuing", path, e);
return null;
}
if (!skipErrors) {
@ -493,7 +527,7 @@ public class WALSplitter {
* Create a new {@link Reader} for reading logs to split.
* @return new Reader instance, caller should close
*/
protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
return walFactory.createReader(walFS, curLogFile, reporter);
}

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,17 +19,9 @@ package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -43,18 +34,14 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.StartMiniClusterOption;
@ -67,7 +54,6 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@ -77,12 +63,10 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.After;
import org.junit.AfterClass;
@ -93,7 +77,6 @@ import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@ -176,83 +159,6 @@ public abstract class AbstractTestDLS {
ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
}
@Test
public void testRecoveredEdits() throws Exception {
conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
startCluster(NUM_RS);
int numLogLines = 10000;
SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
// turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits
master.balanceSwitch(false);
FileSystem fs = master.getMasterFileSystem().getFileSystem();
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
Path rootdir = CommonFSUtils.getRootDir(conf);
int numRegions = 50;
try (Table t = installTable(numRegions)) {
List<RegionInfo> regions = null;
HRegionServer hrs = null;
for (int i = 0; i < NUM_RS; i++) {
hrs = rsts.get(i).getRegionServer();
regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
// At least one RS will have >= to average number of regions.
if (regions.size() >= numRegions / NUM_RS) {
break;
}
}
Path logDir = new Path(rootdir,
AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
LOG.info("#regions = " + regions.size());
Iterator<RegionInfo> it = regions.iterator();
while (it.hasNext()) {
RegionInfo region = it.next();
if (region.getTable().getNamespaceAsString()
.equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
it.remove();
}
}
makeWAL(hrs, regions, numLogLines, 100);
slm.splitLogDistributed(logDir);
int count = 0;
for (RegionInfo hri : regions) {
@SuppressWarnings("deprecation")
Path editsdir = WALSplitUtil
.getRegionDirRecoveredEditsDir(CommonFSUtils.getWALRegionDir(conf,
tableName, hri.getEncodedName()));
LOG.debug("Checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
if (WALSplitUtil.isSequenceIdFile(p)) {
return false;
}
return true;
}
});
LOG.info("Files {}", Arrays.stream(files).map(f -> f.getPath().toString()).
collect(Collectors.joining(",")));
assertTrue("Edits dir should have more than a one file", files.length > 1);
for (int i = 0; i < files.length; i++) {
int c = countWAL(files[i].getPath(), fs, conf);
count += c;
}
LOG.info(count + " edits in " + files.length + " recovered edits files.");
}
// check that the log file is moved
assertFalse(fs.exists(logDir));
assertEquals(numLogLines, count);
}
}
@Test
public void testMasterStartsUpWithLogSplittingWork() throws Exception {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
@ -303,71 +209,6 @@ public abstract class AbstractTestDLS {
}
}
/**
* The original intention of this test was to force an abort of a region server and to make sure
* that the failure path in the region servers is properly evaluated. But it is difficult to
* ensure that the region server doesn't finish the log splitting before it aborts. Also now,
* there is this code path where the master will preempt the region server when master detects
* that the region server has aborted.
* @throws Exception
*/
// Was marked flaky before Distributed Log Replay cleanup.
@Test
public void testWorkerAbort() throws Exception {
LOG.info("testWorkerAbort");
startCluster(3);
int numLogLines = 10000;
SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
FileSystem fs = master.getMasterFileSystem().getFileSystem();
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
HRegionServer hrs = findRSToKill(false);
Path rootdir = CommonFSUtils.getRootDir(conf);
final Path logDir = new Path(rootdir,
AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
try (Table t = installTable(40)) {
makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
new Thread() {
@Override
public void run() {
try {
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
} catch (InterruptedException e) {
}
for (RegionServerThread rst : rsts) {
rst.getRegionServer().abort("testing");
break;
}
}
}.start();
FileStatus[] logfiles = fs.listStatus(logDir);
TaskBatch batch = new TaskBatch();
slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
// waitForCounter but for one of the 2 counters
long curt = System.currentTimeMillis();
long waitTime = 80000;
long endt = curt + waitTime;
while (curt < endt) {
if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
tot_wkr_preempt_task.sum()) == 0) {
Thread.sleep(100);
curt = System.currentTimeMillis();
} else {
assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
tot_wkr_preempt_task.sum()));
return;
}
}
fail("none of the following counters went up in " + waitTime + " milliseconds - " +
"tot_wkr_task_resigned, tot_wkr_task_err, " +
"tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
}
}
@Test
public void testThreeRSAbort() throws Exception {
LOG.info("testThreeRSAbort");
@ -411,6 +252,11 @@ public abstract class AbstractTestDLS {
@Test
public void testDelayedDeleteOnFailure() throws Exception {
if (!this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
// This test depends on zk coordination....
return;
}
LOG.info("testDelayedDeleteOnFailure");
startCluster(1);
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
@ -504,8 +350,9 @@ public abstract class AbstractTestDLS {
NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
LOG.debug("Verifying only catalog region is assigned\n");
if (regions.size() != 1) {
for (String oregion : regions)
for (String oregion : regions) {
LOG.debug("Region still online: " + oregion);
}
}
assertEquals(1 + existingRegions, regions.size());
LOG.debug("Enabling table\n");

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,9 +16,9 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.mockito.ArgumentMatchers.any;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterWalManager;
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.SplitWALManager;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
@ -60,9 +61,7 @@ import org.apache.zookeeper.KeeperException;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
@ -79,6 +78,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrEx
public class MockMasterServices extends MockNoopMasterServices {
private final MasterFileSystem fileSystemManager;
private final MasterWalManager walManager;
private final SplitWALManager splitWALManager;
private final AssignmentManager assignmentManager;
private final TableStateManager tableStateManager;
@ -100,6 +100,10 @@ public class MockMasterServices extends MockNoopMasterServices {
Superusers.initialize(conf);
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
this.splitWALManager =
conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)?
null: new SplitWALManager(this);
// Mock an AM.
this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) {
@Override
@ -358,4 +362,8 @@ public class MockMasterServices extends MockNoopMasterServices {
}
return builder.build();
}
@Override public SplitWALManager getSplitWALManager() {
return splitWALManager;
}
}

View File

@ -77,7 +77,7 @@ public class TestCleanupMetaWAL {
Path walPath = new Path(fs.getWALRootDir(), HConstants.HREGION_LOGDIR_NAME);
for (FileStatus status : CommonFSUtils.listStatus(fs.getFileSystem(), walPath)) {
if (status.getPath().toString().contains(SPLITTING_EXT)) {
fail("Should not have splitting wal dir here:" + status);
fail("Splitting WAL dir should have been cleaned up: " + status);
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.wal;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -84,7 +83,7 @@ public class TestWALReaderOnSecureWAL {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
conf.setBoolean("hbase.hlog.split.skip.errors", true);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
}
@ -168,21 +167,14 @@ public class TestWALReaderOnSecureWAL {
wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
assertFalse(true);
} catch (IOException ioe) {
// expected IOE
System.out.println("Expected ioe " + ioe.getMessage());
}
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
Path rootdir = CommonFSUtils.getRootDir(conf);
try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");
assertTrue(fs.exists(file));
// assertFalse("log splitting should have failed", true);
} catch (IOException ioe) {
assertTrue("WAL should have been sidelined", false);
}
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
WALSplitter.SplitWALResult swr = s.splitWAL(listStatus[0], null);
assertTrue(swr.isCorrupt());
wals.close();
}
@ -219,7 +211,7 @@ public class TestWALReaderOnSecureWAL {
Path rootdir = CommonFSUtils.getRootDir(conf);
try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
s.splitLogFile(listStatus[0], null);
s.splitWAL(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");
assertTrue(!fs.exists(file));

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
@ -91,12 +91,10 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@ -105,18 +103,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
*/
@Category({RegionServerTests.class, LargeTests.class})
public class TestWALSplit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALSplit.class);
{
// Uncomment the following lines if more verbosity is needed for
// debugging (see HBASE-12285 for details).
//((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
}
private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
private static Configuration conf;
@ -143,7 +132,6 @@ public class TestWALSplit {
private static final byte[] VALUE = Bytes.toBytes("v1");
private static final String WAL_FILE_PREFIX = "wal.dat.";
private static List<String> REGIONS = new ArrayList<>();
private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
private static String ROBBER;
private static String ZOMBIE;
private static String [] GROUP = new String [] {"supergroup"};
@ -223,8 +211,6 @@ public class TestWALSplit {
/**
* Simulates splitting a WAL out from under a regionserver that is still trying to write it.
* Ensures we do not lose edits.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
@ -553,7 +539,7 @@ public class TestWALSplit {
@Test
public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
generateWALs(Integer.MAX_VALUE);
corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
Corruptions.APPEND_GARBAGE, true);
@ -562,7 +548,7 @@ public class TestWALSplit {
@Test
public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
generateWALs(Integer.MAX_VALUE);
corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
@ -571,7 +557,7 @@ public class TestWALSplit {
@Test
public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
generateWALs(Integer.MAX_VALUE);
corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
@ -587,7 +573,7 @@ public class TestWALSplit {
@Test
public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
.asList(FaultyProtobufLogReader.FailureType.values()).stream()
.filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
@ -654,14 +640,14 @@ public class TestWALSplit {
@Test (expected = IOException.class)
public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
}
@Test
public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
try {
splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
} catch (IOException e) {
@ -673,7 +659,7 @@ public class TestWALSplit {
private void ignoreCorruption(final Corruptions corruption, final int entryCount,
final int expectedCount) throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
final String REGION = "region__1";
REGIONS.clear();
@ -698,7 +684,7 @@ public class TestWALSplit {
in.close();
// should not have stored the EOF files as corrupt
FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
FileStatus[] archivedLogs = fs.exists(CORRUPTDIR)? fs.listStatus(CORRUPTDIR): new FileStatus[0];
assertEquals(0, archivedLogs.length);
}
@ -717,7 +703,7 @@ public class TestWALSplit {
@Test
public void testLogsGetArchivedAfterSplit() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
generateWALs(-1);
useDifferentDFSClient();
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
@ -793,7 +779,7 @@ public class TestWALSplit {
@Test
public void testIOEOnOutputThread() throws Exception {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
generateWALs(-1);
useDifferentDFSClient();
@ -847,7 +833,7 @@ public class TestWALSplit {
t.setDaemon(true);
t.start();
try {
logSplitter.splitLogFile(logfiles[largestLogFile], null);
logSplitter.splitWAL(logfiles[largestLogFile], null);
fail("Didn't throw!");
} catch (IOException ioe) {
assertTrue(ioe.toString().contains("Injected"));
@ -944,7 +930,7 @@ public class TestWALSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
null, wals, null);
Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@ -1002,9 +988,7 @@ public class TestWALSplit {
makeRegionDirs(regions);
// Create a splitter that reads and writes the data without touching disk
WALSplitter logSplitter =
new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
/* Produce a mock writer that doesn't write anywhere */
@Override
protected Writer createWriter(Path logfile) throws IOException {
@ -1039,8 +1023,8 @@ public class TestWALSplit {
/* Produce a mock reader that generates fake entries */
@Override
protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
throws IOException {
protected Reader getReader(FileStatus file, boolean skipErrors,
CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
Reader mockReader = Mockito.mock(Reader.class);
Mockito.doAnswer(new Answer<Entry>() {
int index = 0;
@ -1064,7 +1048,7 @@ public class TestWALSplit {
}
};
logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
logSplitter.splitWAL(fs.getFileStatus(logPath), null);
// Verify number of written edits per region
Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
@ -1119,7 +1103,7 @@ public class TestWALSplit {
@Test
public void testSplitLogFileFirstLineCorruptionLog()
throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
generateWALs(1, 10, -1);
FileStatus logfile = fs.listStatus(WALDIR)[0];
@ -1175,7 +1159,7 @@ public class TestWALSplit {
}
};
try{
logSplitter.splitLogFile(logfiles[0], null);
logSplitter.splitWAL(logfiles[0], null);
} catch (IOException e) {
LOG.info(e.toString(), e);
fail("Throws IOException when spliting "

View File

@ -221,8 +221,8 @@ public final class MetaTableLocator {
LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
return;
}
LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}", replicaId,
serverName);
LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}, state={}", replicaId,
serverName, state);
// Make the MetaRegionServer pb and then get its bytes and save this as
// the znode content.
MetaRegionServer pbrsr = MetaRegionServer.newBuilder()

View File

@ -32,7 +32,10 @@ import org.slf4j.LoggerFactory;
/**
* Common methods and attributes used by SplitLogManager and SplitLogWorker running distributed
* splitting of WAL logs.
* @deprecated since 2.4.0 and 3.0.0 replaced by procedure-based WAL splitting; see
* SplitWALManager.
*/
@Deprecated
@InterfaceAudience.Private
public final class ZKSplitLog {
private static final Logger LOG = LoggerFactory.getLogger(ZKSplitLog.class);