HBASE-24574 Procedure V2 - Distributed WAL Splitting => LOGGING (#1912)
hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Don't register as a chore on construction if no coordination state manager instance (there is no instance when procv2 WAL splitter). hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java Edit logs. hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java Add proc name rather than rely on default behavior. Add detail to the toString. hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Factoring hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java Print the maxLogs... we don't do it any where. hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java Utility method to strip prefix from wal path. Signed-off-by: Guanghao Zhang <zghao@apache.org> Signed-off-by: tianjingyun <tianjy@apache.org>
This commit is contained in:
parent
bd79c4065c
commit
aadd2bb1a1
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -16,14 +16,12 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
|
import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
|
||||||
import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
|
import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
|
||||||
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
|
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
|
||||||
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
|
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
|
||||||
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
|
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
|
||||||
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
|
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -57,7 +55,6 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -88,6 +85,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
* completed (either with success or with error) it will be not be submitted
|
* completed (either with success or with error) it will be not be submitted
|
||||||
* again. If a task is resubmitted then there is a risk that old "delete task"
|
* again. If a task is resubmitted then there is a risk that old "delete task"
|
||||||
* can delete the re-submission.
|
* can delete the re-submission.
|
||||||
|
* @see SplitWALManager for an alternate implementation based on Procedures.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SplitLogManager {
|
public class SplitLogManager {
|
||||||
|
@ -121,21 +119,28 @@ public class SplitLogManager {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.server = master;
|
this.server = master;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
// If no CoordinatedStateManager, skip registering as a chore service (The
|
||||||
|
// CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL
|
||||||
|
// splitting. It is null if we are configured to use procedure-based distributed WAL
|
||||||
|
// splitting.
|
||||||
|
if (server.getCoordinatedStateManager() != null) {
|
||||||
this.choreService =
|
this.choreService =
|
||||||
new ChoreService(master.getServerName().toShortString() + ".splitLogManager.");
|
new ChoreService(master.getServerName().toShortString() + ".splitLogManager.");
|
||||||
if (server.getCoordinatedStateManager() != null) {
|
|
||||||
SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
|
SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
|
||||||
Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
|
Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
|
||||||
SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
|
SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
|
||||||
coordination.setDetails(details);
|
coordination.setDetails(details);
|
||||||
coordination.init();
|
coordination.init();
|
||||||
}
|
|
||||||
this.unassignedTimeout =
|
this.unassignedTimeout =
|
||||||
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
|
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
|
||||||
this.timeoutMonitor =
|
this.timeoutMonitor =
|
||||||
new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
|
new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
|
||||||
master);
|
master);
|
||||||
choreService.scheduleChore(timeoutMonitor);
|
this.choreService.scheduleChore(timeoutMonitor);
|
||||||
|
} else {
|
||||||
|
this.choreService = null;
|
||||||
|
this.timeoutMonitor = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private SplitLogManagerCoordination getSplitLogManagerCoordination() {
|
private SplitLogManagerCoordination getSplitLogManagerCoordination() {
|
||||||
|
@ -560,7 +565,9 @@ public class SplitLogManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
if (server.getCoordinatedStateManager() == null) return;
|
if (server.getCoordinatedStateManager() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int resubmitted = 0;
|
int resubmitted = 0;
|
||||||
int unassigned = 0;
|
int unassigned = 0;
|
||||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
* {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
|
* {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
|
||||||
* {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed
|
* {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed
|
||||||
* after we switch to procedure-based WAL splitting.
|
* after we switch to procedure-based WAL splitting.
|
||||||
|
* @see SplitLogManager for the original distributed split WAL manager.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SplitWALManager {
|
public class SplitWALManager {
|
||||||
|
@ -96,7 +97,7 @@ public class SplitWALManager {
|
||||||
// 2. create corresponding procedures
|
// 2. create corresponding procedures
|
||||||
return createSplitWALProcedures(splittingFiles, crashedServer);
|
return createSplitWALProcedures(splittingFiles, crashedServer);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("failed to create procedures for splitting logs of {}", crashedServer, e);
|
LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -106,7 +107,7 @@ public class SplitWALManager {
|
||||||
List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
|
List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
|
||||||
FileStatus[] fileStatuses =
|
FileStatus[] fileStatuses =
|
||||||
SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
|
SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
|
||||||
LOG.info("size of WALs of {} is {}, isMeta: {}", serverName, fileStatuses.length, splitMeta);
|
LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.length, splitMeta);
|
||||||
return Lists.newArrayList(fileStatuses);
|
return Lists.newArrayList(fileStatuses);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,7 +123,9 @@ public class SplitWALManager {
|
||||||
|
|
||||||
public void deleteWALDir(ServerName serverName) throws IOException {
|
public void deleteWALDir(ServerName serverName) throws IOException {
|
||||||
Path splitDir = getWALSplitDir(serverName);
|
Path splitDir = getWALSplitDir(serverName);
|
||||||
fs.delete(splitDir, false);
|
if (!fs.delete(splitDir, false)) {
|
||||||
|
LOG.warn("Failed delete {}", splitDir);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isSplitWALFinished(String walPath) throws IOException {
|
public boolean isSplitWALFinished(String walPath) throws IOException {
|
||||||
|
@ -138,17 +141,17 @@ public class SplitWALManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* try to acquire an worker from online servers which is executring
|
* Acquire a split WAL worker
|
||||||
* @param procedure split WAL task
|
* @param procedure split WAL task
|
||||||
* @return an available region server which could execute this task
|
* @return an available region server which could execute this task
|
||||||
* @throws ProcedureSuspendedException if there is no available worker,
|
* @throws ProcedureSuspendedException if there is no available worker,
|
||||||
* it will throw this exception to let the procedure wait
|
* it will throw this exception to WAIT the procedure.
|
||||||
*/
|
*/
|
||||||
public ServerName acquireSplitWALWorker(Procedure<?> procedure)
|
public ServerName acquireSplitWALWorker(Procedure<?> procedure)
|
||||||
throws ProcedureSuspendedException {
|
throws ProcedureSuspendedException {
|
||||||
Optional<ServerName> worker = splitWorkerAssigner.acquire();
|
Optional<ServerName> worker = splitWorkerAssigner.acquire();
|
||||||
LOG.debug("acquired a worker {} to split a WAL", worker);
|
|
||||||
if (worker.isPresent()) {
|
if (worker.isPresent()) {
|
||||||
|
LOG.debug("Acquired split WAL worker={}", worker.get());
|
||||||
return worker.get();
|
return worker.get();
|
||||||
}
|
}
|
||||||
splitWorkerAssigner.suspend(procedure);
|
splitWorkerAssigner.suspend(procedure);
|
||||||
|
@ -162,7 +165,7 @@ public class SplitWALManager {
|
||||||
* @param scheduler scheduler which is to wake up the procedure event
|
* @param scheduler scheduler which is to wake up the procedure event
|
||||||
*/
|
*/
|
||||||
public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
|
public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
|
||||||
LOG.debug("release a worker {} to split a WAL", worker);
|
LOG.debug("Release split WAL worker={}", worker);
|
||||||
splitWorkerAssigner.release(worker);
|
splitWorkerAssigner.release(worker);
|
||||||
splitWorkerAssigner.wake(scheduler);
|
splitWorkerAssigner.wake(scheduler);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1565,7 +1565,8 @@ public class AssignmentManager {
|
||||||
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
|
||||||
carryingMeta = isCarryingMeta(serverName);
|
carryingMeta = isCarryingMeta(serverName);
|
||||||
if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
|
if (!force && serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
|
||||||
LOG.info("Skip adding SCP for {} (meta={}) -- running?", serverNode, carryingMeta);
|
LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) -- running?",
|
||||||
|
serverNode, carryingMeta);
|
||||||
return Procedure.NO_PROC_ID;
|
return Procedure.NO_PROC_ID;
|
||||||
} else {
|
} else {
|
||||||
MasterProcedureEnv mpe = procExec.getEnvironment();
|
MasterProcedureEnv mpe = procExec.getEnvironment();
|
||||||
|
@ -1586,7 +1587,8 @@ public class AssignmentManager {
|
||||||
pid = procExec.submitProcedure(
|
pid = procExec.submitProcedure(
|
||||||
new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
|
new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta));
|
||||||
}
|
}
|
||||||
LOG.info("Scheduled SCP pid={} for {} (carryingMeta={}){}.", pid, serverName, carryingMeta,
|
LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.",
|
||||||
|
pid, serverName, carryingMeta,
|
||||||
serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState);
|
serverNode == null? "": " " + serverNode.toString() + ", oldState=" + oldState);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master.assignment;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData;
|
||||||
|
@ -365,4 +363,20 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
|
||||||
protected void afterReplay(MasterProcedureEnv env) {
|
protected void afterReplay(MasterProcedureEnv env) {
|
||||||
getParent(env).attachRemoteProc(this);
|
getParent(env).attachRemoteProc(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public String getProcName() {
|
||||||
|
return getClass().getSimpleName() + " " + region.getEncodedName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override protected void toStringClassDetails(StringBuilder builder) {
|
||||||
|
builder.append(getProcName());
|
||||||
|
if (targetServer != null) {
|
||||||
|
builder.append(", server=");
|
||||||
|
builder.append(this.targetServer);
|
||||||
|
}
|
||||||
|
if (this.retryCounter != null) {
|
||||||
|
builder.append(", retry=");
|
||||||
|
builder.append(this.retryCounter);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -32,9 +32,7 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperat
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||||
|
|
||||||
|
@ -94,11 +92,12 @@ public abstract class RegionTransitionProcedure extends Procedure<MasterProcedur
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void toStringClassDetails(final StringBuilder sb) {
|
public void toStringClassDetails(final StringBuilder sb) {
|
||||||
sb.append(getClass().getSimpleName());
|
sb.append(getProcName());
|
||||||
sb.append(" table=");
|
}
|
||||||
sb.append(getTableName());
|
|
||||||
sb.append(", region=");
|
@Override public String getProcName() {
|
||||||
sb.append(getRegionInfo() == null ? null : getRegionInfo().getEncodedName());
|
RegionInfo r = getRegionInfo();
|
||||||
|
return getClass().getSimpleName() + " " + getTableName() + (r != null? r.getEncodedName(): "");
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionStateNode getRegionState(final MasterProcedureEnv env) {
|
public RegionStateNode getRegionState(final MasterProcedureEnv env) {
|
||||||
|
|
|
@ -381,15 +381,17 @@ public class ServerCrashProcedure
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void toStringClassDetails(StringBuilder sb) {
|
public void toStringClassDetails(StringBuilder sb) {
|
||||||
sb.append(getClass().getSimpleName());
|
sb.append(getProcName());
|
||||||
sb.append(" server=");
|
|
||||||
sb.append(serverName);
|
|
||||||
sb.append(", splitWal=");
|
sb.append(", splitWal=");
|
||||||
sb.append(shouldSplitWal);
|
sb.append(shouldSplitWal);
|
||||||
sb.append(", meta=");
|
sb.append(", meta=");
|
||||||
sb.append(carryingMeta);
|
sb.append(carryingMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public String getProcName() {
|
||||||
|
return getClass().getSimpleName() + " " + this.serverName;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
protected void serializeStateData(ProcedureStateSerializer serializer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -17,9 +16,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.procedure;
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.master.SplitWALManager;
|
import org.apache.hadoop.hbase.master.SplitWALManager;
|
||||||
|
@ -33,7 +30,6 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||||
|
@ -192,6 +188,30 @@ public class SplitWALProcedure
|
||||||
if(worker != null){
|
if(worker != null){
|
||||||
env.getMasterServices().getSplitWALManager().addUsedSplitWALWorker(worker);
|
env.getMasterServices().getSplitWALManager().addUsedSplitWALWorker(worker);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override protected void toStringClassDetails(StringBuilder builder) {
|
||||||
|
builder.append(getProcName());
|
||||||
|
if (this.worker != null) {
|
||||||
|
builder.append(", worker=");
|
||||||
|
builder.append(this.worker);
|
||||||
|
}
|
||||||
|
if (this.retryCounter != null) {
|
||||||
|
builder.append(", retry=");
|
||||||
|
builder.append(this.retryCounter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getProcName() {
|
||||||
|
return getClass().getSimpleName() + " " + getWALNameFromStrPath(getWAL());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Return the WAL filename when given a Path-as-a-string; i.e. return the last path
|
||||||
|
* component only.
|
||||||
|
*/
|
||||||
|
static String getWALNameFromStrPath(String path) {
|
||||||
|
int slashIndex = path.lastIndexOf('/');
|
||||||
|
return slashIndex != -1? path.substring(slashIndex + 1): path;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
@ -31,14 +30,14 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A remote procedure which is used to send split WAL request to region server.
|
* A remote procedure which is used to send split WAL request to region server.
|
||||||
* it will return null if the task is succeed or return a DoNotRetryIOException
|
* It will return null if the task succeeded or return a DoNotRetryIOException.
|
||||||
* {@link SplitWALProcedure} will help handle the situation that encounter
|
* {@link SplitWALProcedure} will help handle the situation that encounters
|
||||||
* DoNotRetryIOException. Otherwise it will retry until succeed.
|
* DoNotRetryIOException. Otherwise it will retry until success.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SplitWALRemoteProcedure extends ServerRemoteProcedure
|
public class SplitWALRemoteProcedure extends ServerRemoteProcedure
|
||||||
|
@ -96,20 +95,18 @@ public class SplitWALRemoteProcedure extends ServerRemoteProcedure
|
||||||
@Override
|
@Override
|
||||||
protected void complete(MasterProcedureEnv env, Throwable error) {
|
protected void complete(MasterProcedureEnv env, Throwable error) {
|
||||||
if (error == null) {
|
if (error == null) {
|
||||||
LOG.info("split WAL {} on {} succeeded", walPath, targetServer);
|
|
||||||
try {
|
try {
|
||||||
env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
|
env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("remove WAL {} failed, ignore...", walPath, e);
|
LOG.warn("Failed split of {}; ignore...", walPath, e);
|
||||||
}
|
}
|
||||||
succ = true;
|
succ = true;
|
||||||
} else {
|
} else {
|
||||||
if (error instanceof DoNotRetryIOException) {
|
if (error instanceof DoNotRetryIOException) {
|
||||||
LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server",
|
LOG.warn("Sent {} to wrong server {}, try another", walPath, targetServer, error);
|
||||||
walPath, targetServer, error);
|
|
||||||
succ = true;
|
succ = true;
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("split WAL {} failed, retry...", walPath, error);
|
LOG.warn("Failed split of {}, retry...", walPath, error);
|
||||||
succ = false;
|
succ = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,4 +131,16 @@ public class SplitWALRemoteProcedure extends ServerRemoteProcedure
|
||||||
public ServerOperationType getServerOperationType() {
|
public ServerOperationType getServerOperationType() {
|
||||||
return ServerOperationType.SPLIT_WAL_REMOTE;
|
return ServerOperationType.SPLIT_WAL_REMOTE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override protected void toStringClassDetails(StringBuilder builder) {
|
||||||
|
builder.append(getProcName());
|
||||||
|
if (this.targetServer != null) {
|
||||||
|
builder.append(", worker=");
|
||||||
|
builder.append(this.targetServer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getProcName() {
|
||||||
|
return getClass().getSimpleName() + " " + SplitWALProcedure.getWALNameFromStrPath(getWAL());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class SplitWALCallable implements RSProcedureCallable {
|
||||||
MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
|
MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
|
||||||
this.walPath = param.getWalPath();
|
this.walPath = param.getWalPath();
|
||||||
} catch (InvalidProtocolBufferException e) {
|
} catch (InvalidProtocolBufferException e) {
|
||||||
LOG.error("parse proto buffer of split WAL request failed ", e);
|
LOG.error("Parse proto buffer of split WAL request failed ", e);
|
||||||
initError = e;
|
initError = e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -84,9 +84,9 @@ public class SplitWALCallable implements RSProcedureCallable {
|
||||||
splitWALLock = splitWALLocks.acquireLock(walPath);
|
splitWALLock = splitWALLocks.acquireLock(walPath);
|
||||||
try{
|
try{
|
||||||
splitWal();
|
splitWal();
|
||||||
LOG.info("split WAL {} succeed.", walPath);
|
LOG.info("Successful split of {}", walPath);
|
||||||
} catch (IOException e){
|
} catch (IOException e){
|
||||||
LOG.warn("failed to split WAL {}.", walPath, e);
|
LOG.warn("Failed split of {}.", walPath, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
|
|
@ -459,7 +459,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
|
|
||||||
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
|
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
|
||||||
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
|
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
|
||||||
walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
|
walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir +
|
||||||
|
", maxLogs=" + this.maxLogs);
|
||||||
this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
|
this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
|
||||||
DEFAULT_SLOW_SYNC_TIME_MS));
|
DEFAULT_SLOW_SYNC_TIME_MS));
|
||||||
this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
|
this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
|
||||||
|
|
Loading…
Reference in New Issue