HBASE-19073 Cleanup CoordinatedStateManager

- Merged BaseCSM class into CSM interface
- Removed config hbase.coordinated.state.manager.class
- Since state manager is not pluggable anymore, we don't need start/stop/initialize to setup unknown classes. Our internal ZkCSM now requires Server in constructor itself. Makes the dependency clearer too.
- Removed CSM from HRegionServer and HMaster constructor. Although it's a step back from dependency injection, but it's more consistent with our current (not good)  pattern where we initialize everything in the ctor itself.

Change-Id: Ifca06bb354adec5b11ea1bad4707e014410491fc
This commit is contained in:
Apekshit Sharma 2017-10-20 17:20:17 -07:00
parent eee3b0180e
commit dd70cc3081
46 changed files with 161 additions and 408 deletions

View File

@ -26,12 +26,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterServices;
@ -95,10 +95,7 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager {
// setup the default procedure coordinator
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
BaseCoordinatedStateManager coordManager =
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory
.getCoordinatedStateManager(master.getConfiguration());
coordManager.initialize(master);
CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(master);
ProcedureCoordinatorRpcs comms =
coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);

View File

@ -24,12 +24,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
@ -159,12 +159,8 @@ public class LogRollRegionServerProcedureManager extends RegionServerProcedureMa
+ " setting");
return;
}
BaseCoordinatedStateManager coordManager =
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory.
getCoordinatedStateManager(rss.getConfiguration());
coordManager.initialize(rss);
this.memberRpcs =
coordManager
CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(rss);
this.memberRpcs = coordManager
.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
// read in the backup handler configuration properties

View File

@ -1209,10 +1209,6 @@ public final class HConstants {
public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
/** Config for pluggable consensus provider */
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class";
/** Configuration key for SplitLog manager timeout */
public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout";
@ -1298,7 +1294,7 @@ public final class HConstants {
public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
"hbase.canary.write.table.check.period";
public static final String HBASE_CANARY_READ_RAW_SCAN_KEY = "hbase.canary.read.raw.enabled";
/**

View File

@ -17,7 +17,14 @@
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/**
* Implementations of this interface will keep and return to clients
@ -28,31 +35,27 @@ import org.apache.yetus.audience.InterfaceAudience;
* For each coarse-grained area of operations there will be a separate
* interface with implementation, providing API for relevant operations
* requiring coordination.
*
* Property hbase.coordinated.state.manager.class in hbase-site.xml controls
* which provider to use.
*/
@InterfaceAudience.Private
public interface CoordinatedStateManager {
/**
* Method to retrieve coordination for split log worker
*/
SplitLogWorkerCoordination getSplitLogWorkerCoordination();
/**
* Initialize coordinated state management service.
* @param server server instance to run within.
* Method to retrieve coordination for split log manager
*/
void initialize(Server server);
SplitLogManagerCoordination getSplitLogManagerCoordination();
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
*/
ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
throws IOException;
/**
* Starts service.
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
*/
void start();
ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException;
/**
* Stops service.
*/
void stop();
/**
* @return instance of Server coordinated state manager runs within
*/
Server getServer();
}

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Creates instance of {@link CoordinatedStateManager}
* based on configuration.
*/
@InterfaceAudience.Private
public final class CoordinatedStateManagerFactory {
/**
* Private to keep this class from being accidentally instantiated.
*/
private CoordinatedStateManagerFactory(){}
/**
* Creates consensus provider from the given configuration.
* @param conf Configuration
* @return Implementation of {@link CoordinatedStateManager}
*/
public static CoordinatedStateManager getCoordinatedStateManager(Configuration conf) {
Class<? extends CoordinatedStateManager> coordinatedStateMgrKlass =
conf.getClass(HConstants.HBASE_COORDINATED_STATE_MANAGER_CLASS,
ZkCoordinatedStateManager.class, CoordinatedStateManager.class);
return ReflectionUtils.newInstance(coordinatedStateMgrKlass, conf);
}
}

View File

@ -173,14 +173,8 @@ public class LocalHBaseCluster {
// Create each regionserver with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager).
// Also, create separate CoordinatedStateManager instance per Server.
// This is special case when we have to have more than 1 CoordinatedStateManager
// within 1 process.
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
JVMClusterUtil.RegionServerThread rst =
JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf
JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf
.getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
this.regionThreads.add(rst);
@ -208,13 +202,7 @@ public class LocalHBaseCluster {
// Create each master with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager.
// Also, create separate CoordinatedStateManager instance per Server.
// This is special case when we have to have more than 1 CoordinatedStateManager
// within 1 process.
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
(Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
this.masterThreads.add(mt);
return mt;

View File

@ -1,76 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Server;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.zookeeper.KeeperException;
/**
* Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations.
* Defines methods to retrieve coordination objects for relevant areas. CoordinatedStateManager
* reference returned from Server interface has to be casted to this type to
* access those methods.
*/
@InterfaceAudience.Private
public abstract class BaseCoordinatedStateManager implements CoordinatedStateManager {
@Override
public void initialize(Server server) {
}
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public Server getServer() {
return null;
}
/**
* Method to retrieve coordination for split log worker
*/
public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination();
/**
* Method to retrieve coordination for split log manager
*/
public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
*/
public abstract ProcedureCoordinatorRpcs
getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
*/
public abstract ProcedureMemberRpcs
getProcedureMemberRpcs(String procType) throws KeeperException;
}

View File

@ -58,7 +58,7 @@ public interface SplitLogManagerCoordination {
/**
* Detail class that shares data between coordination and split log manager
*/
public static class SplitLogManagerDetails {
class SplitLogManagerDetails {
final private ConcurrentMap<String, Task> tasks;
final private MasterServices master;
final private Set<String> failedDeletions;
@ -156,7 +156,7 @@ public interface SplitLogManagerCoordination {
* @throws InterruptedIOException
* @throws IOException in case of failure
*/
void setRecoveryMode(boolean b) throws InterruptedIOException, IOException;
void setRecoveryMode(boolean b) throws IOException;
/**
* Removes known stale servers

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter;
@ -101,14 +102,14 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
private boolean isDrainingDone = false;
public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager,
ZooKeeperWatcher watcher) {
public ZKSplitLogManagerCoordination(Configuration conf, ZooKeeperWatcher watcher) {
super(watcher);
this.conf = conf;
taskFinisher = new TaskFinisher() {
@Override
public Status finish(ServerName workerName, String logfile) {
try {
WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration());
WALSplitter.finishSplitLogFile(logfile, conf);
} catch (IOException e) {
LOG.warn("Could not finish splitting of log file " + logfile, e);
return Status.ERR;
@ -116,7 +117,6 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
return Status.DONE;
}
};
this.conf = manager.getServer().getConfiguration();
}
@Override
@ -1122,6 +1122,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
/**
* Temporary function that is used by unit tests only
*/
@VisibleForTesting
public void setIgnoreDeleteForTesting(boolean b) {
ignoreZKDeleteForTesting = b;
}

View File

@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
@ -33,24 +36,16 @@ import org.apache.zookeeper.KeeperException;
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
protected Server server;
public class ZkCoordinatedStateManager implements CoordinatedStateManager {
protected ZooKeeperWatcher watcher;
protected SplitLogWorkerCoordination splitLogWorkerCoordination;
protected SplitLogManagerCoordination splitLogManagerCoordination;
@Override
public void initialize(Server server) {
this.server = server;
public ZkCoordinatedStateManager(Server server) {
this.watcher = server.getZooKeeper();
splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher);
splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher);
}
@Override
public Server getServer() {
return server;
splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(server.getServerName(), watcher);
splitLogManagerCoordination = new ZKSplitLogManagerCoordination(server.getConfiguration(),
watcher);
}
@Override

View File

@ -90,13 +90,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
private int maxConcurrentTasks = 0;
private final ZkCoordinatedStateManager manager;
private final ServerName serverName;
public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager,
ZooKeeperWatcher watcher) {
public ZkSplitLogWorkerCoordination(ServerName serverName, ZooKeeperWatcher watcher) {
super(watcher);
manager = zkCoordinatedStateManager;
this.serverName = serverName;
}
/**
@ -185,7 +183,6 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
// currentTask can change but that's ok
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
ServerName serverName = manager.getServer().getServerName();
// have to compare data. cannot compare version because then there
// will be race with attemptToOwnTask()
// cannot just check whether the node has been transitioned to

View File

@ -468,13 +468,12 @@ public class HMaster extends HRegionServer implements MasterServices {
* #finishActiveMasterInitialization(MonitoredTask) after
* the master becomes the active one.
*/
public HMaster(final Configuration conf, CoordinatedStateManager csm)
public HMaster(final Configuration conf)
throws IOException, KeeperException {
super(conf, csm);
super(conf);
try {
this.rsFatals = new MemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
LOG.info("hbase.rootdir=" + getRootDir() +
", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
@ -2828,11 +2827,10 @@ public class HMaster extends HRegionServer implements MasterServices {
* @return HMaster instance.
*/
public static HMaster constructMaster(Class<? extends HMaster> masterClass,
final Configuration conf, final CoordinatedStateManager cp) {
final Configuration conf) {
try {
Constructor<? extends HMaster> c =
masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
return c.newInstance(conf, cp);
Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class);
return c.newInstance(conf);
} catch(Exception e) {
Throwable error = e;
if (e instanceof InvocationTargetException &&

View File

@ -29,8 +29,6 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MasterNotRunningException;
@ -230,9 +228,7 @@ public class HMasterCommandLine extends ServerCommandLine {
waitOnMasterThreads(cluster);
} else {
logProcessInfo(getConf());
CoordinatedStateManager csm =
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
HMaster master = HMaster.constructMaster(masterClass, conf, csm);
HMaster master = HMaster.constructMaster(masterClass, conf);
if (master.isStopped()) {
LOG.info("Won't bring the Master up as a shutdown is requested");
return 1;
@ -302,9 +298,9 @@ public class HMasterCommandLine extends ServerCommandLine {
public static class LocalHMaster extends HMaster {
private MiniZooKeeperCluster zkcluster = null;
public LocalHMaster(Configuration conf, CoordinatedStateManager csm)
public LocalHMaster(Configuration conf)
throws IOException, KeeperException, InterruptedException {
super(conf, csm);
super(conf);
}
@Override

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -153,8 +152,7 @@ public class SplitLogManager {
}
private SplitLogManagerCoordination getSplitLogManagerCoordination() {
return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination();
return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
}
private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {

View File

@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -92,8 +91,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@ -505,7 +504,7 @@ public class HRegionServer extends HasThread implements
protected final RSRpcServices rpcServices;
protected BaseCoordinatedStateManager csm;
protected CoordinatedStateManager csm;
/**
* Configuration manager is used to register/deregister and notify the configuration observers
@ -533,21 +532,13 @@ public class HRegionServer extends HasThread implements
private final boolean masterless;
static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
/**
* Starts a HRegionServer at the default location.
*/
public HRegionServer(Configuration conf) throws IOException, InterruptedException {
this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
}
/**
* Starts a HRegionServer at the default location
*
* @param csm implementation of CoordinatedStateManager to be used
*/
// Don't start any services or managers in here in the Consructor.
// Don't start any services or managers in here in the Constructor.
// Defer till after we register with the Master as much as possible. See #startServices.
public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException {
public HRegionServer(Configuration conf) throws IOException {
super("RegionServer"); // thread name
try {
this.startcode = System.currentTimeMillis();
@ -642,9 +633,7 @@ public class HRegionServer extends HasThread implements
// If no master in cluster, skip trying to track one or look for a cluster status.
if (!this.masterless) {
this.csm = (BaseCoordinatedStateManager) csm;
this.csm.initialize(this);
this.csm.start();
this.csm = new ZkCoordinatedStateManager(this);
masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
masterAddressTracker.start();
@ -2924,7 +2913,7 @@ public class HRegionServer extends HasThread implements
}
@Override
public BaseCoordinatedStateManager getCoordinatedStateManager() {
public CoordinatedStateManager getCoordinatedStateManager() {
return csm;
}
@ -3020,11 +3009,11 @@ public class HRegionServer extends HasThread implements
*/
public static HRegionServer constructRegionServer(
Class<? extends HRegionServer> regionServerClass,
final Configuration conf2, CoordinatedStateManager cp) {
final Configuration conf2) {
try {
Constructor<? extends HRegionServer> c = regionServerClass
.getConstructor(Configuration.class, CoordinatedStateManager.class);
return c.newInstance(conf2, cp);
.getConstructor(Configuration.class);
return c.newInstance(conf2);
} catch (Exception e) {
throw new RuntimeException("Failed construction of " + "Regionserver: "
+ regionServerClass.toString(), e);

View File

@ -23,10 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.util.ServerCommandLine;
/**
@ -52,7 +50,6 @@ public class HRegionServerCommandLine extends ServerCommandLine {
private int start() throws Exception {
Configuration conf = getConf();
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
try {
// If 'local', don't start a region server here. Defer to
// LocalHBaseCluster. It manages 'local' clusters.
@ -61,7 +58,7 @@ public class HRegionServerCommandLine extends ServerCommandLine {
+ HConstants.CLUSTER_DISTRIBUTED + " is false");
} else {
logProcessInfo(getConf());
HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf, cp);
HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf);
hrs.start();
hrs.join();
if (hrs.isAborted()) {

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.wal.WALFactory;
@ -45,8 +44,8 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
/**
* This worker is spawned in every regionserver, including master. The Worker waits for log
* splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager}
* running in the master and races with other workers in other serves to acquire those tasks.
* splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager}
* running in the master and races with other workers in other serves to acquire those tasks.
* The coordination is done via coordination engine.
* <p>
* If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task.
@ -75,9 +74,7 @@ public class SplitLogWorker implements Runnable {
TaskExecutor splitTaskExecutor) {
this.server = server;
this.conf = conf;
this.coordination =
((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager())
.getSplitLogWorkerCoordination();
this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
this.server = server;
coordination.init(server, conf, splitTaskExecutor, this);
}
@ -102,7 +99,9 @@ public class SplitLogWorker implements Runnable {
// encountered a bad non-retry-able persistent error.
try {
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
fs, conf, p, sequenceIdChecker,
server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
server.getConnection(), mode, factory)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@ -186,7 +185,7 @@ public class SplitLogWorker implements Runnable {
* acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
* guarantee that two workers will not be executing the same task therefore it
* is better to have workers prepare the task and then have the
* {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
* {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
* SplitLogManager.TaskFinisher
*/
public interface TaskExecutor {

View File

@ -72,23 +72,18 @@ public class JVMClusterUtil {
* Creates a {@link RegionServerThread}.
* Call 'start' on the returned thread to make it run.
* @param c Configuration to use.
* @param cp consensus provider to use
* @param hrsc Class to create.
* @param index Used distinguishing the object returned.
* @throws IOException
* @return Region server added.
*/
public static JVMClusterUtil.RegionServerThread createRegionServerThread(
final Configuration c, CoordinatedStateManager cp, final Class<? extends HRegionServer> hrsc,
final int index)
throws IOException {
public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
final Class<? extends HRegionServer> hrsc, final int index) throws IOException {
HRegionServer server;
try {
Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class,
CoordinatedStateManager.class);
Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
ctor.setAccessible(true);
server = ctor.newInstance(c, cp);
server = ctor.newInstance(c);
} catch (InvocationTargetException ite) {
Throwable target = ite.getTargetException();
throw new RuntimeException("Failed construction of RegionServer: " +
@ -124,20 +119,16 @@ public class JVMClusterUtil {
* Creates a {@link MasterThread}.
* Call 'start' on the returned thread to make it run.
* @param c Configuration to use.
* @param cp consensus provider to use
* @param hmc Class to create.
* @param index Used distinguishing the object returned.
* @throws IOException
* @return Master added.
*/
public static JVMClusterUtil.MasterThread createMasterThread(
final Configuration c, CoordinatedStateManager cp, final Class<? extends HMaster> hmc,
final int index)
throws IOException {
public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
final Class<? extends HMaster> hmc, final int index) throws IOException {
HMaster server;
try {
server = hmc.getConstructor(Configuration.class, CoordinatedStateManager.class).
newInstance(c, cp);
server = hmc.getConstructor(Configuration.class).newInstance(c);
} catch (InvocationTargetException ite) {
Throwable target = ite.getTargetException();
throw new RuntimeException("Failed construction of Master: " +

View File

@ -18,6 +18,9 @@
*/
package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@ -82,7 +85,6 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize;
@ -146,7 +148,8 @@ public class WALSplitter {
private Map<TableName, TableState> tableStatesCache =
new ConcurrentHashMap<>();
private BaseCoordinatedStateManager csm;
private SplitLogWorkerCoordination splitLogWorkerCoordination;
private Connection connection;
private final WALFactory walFactory;
private MonitoredTask status;
@ -177,7 +180,8 @@ public class WALSplitter {
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker,
CoordinatedStateManager csm, RecoveryMode mode) {
SplitLogWorkerCoordination splitLogWorkerCoordination, Connection connection,
RecoveryMode mode) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@ -185,7 +189,9 @@ public class WALSplitter {
this.rootDir = rootDir;
this.fs = fs;
this.sequenceIdChecker = idChecker;
this.csm = (BaseCoordinatedStateManager)csm;
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.connection = connection;
this.walFactory = factory;
this.controller = new PipelineController();
@ -199,7 +205,7 @@ public class WALSplitter {
this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (csm != null && this.distributedLogReplay) {
if (this.splitLogWorkerCoordination != null && this.distributedLogReplay) {
outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
} else {
if (this.distributedLogReplay) {
@ -217,20 +223,14 @@ public class WALSplitter {
* <p>
* If the log file has N regions then N recovered.edits files will be produced.
* <p>
* @param rootDir
* @param logfile
* @param fs
* @param conf
* @param reporter
* @param idChecker
* @param cp coordination state manager
* @return false if it is interrupted by the progress-able.
* @throws IOException
*/
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
SplitLogWorkerCoordination splitLogWorkerCoordination, Connection connection,
RecoveryMode mode, final WALFactory factory) throws IOException {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker,
splitLogWorkerCoordination, connection, mode);
return s.splitLogFile(logfile, reporter);
}
@ -246,7 +246,7 @@ public class WALSplitter {
List<Path> splits = new ArrayList<>();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, null,
RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
@ -317,9 +317,8 @@ public class WALSplitter {
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) {
if (this.distributedLogReplay) {
RegionStoreSequenceIds ids =
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
encodedRegionNameAsStr);
RegionStoreSequenceIds ids = splitLogWorkerCoordination.getRegionFlushedSequenceId(
failedServerName, encodedRegionNameAsStr);
if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) {
@ -377,10 +376,9 @@ public class WALSplitter {
throw iie;
} catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted WAL=" + logPath, e);
if (this.csm != null) {
if (splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null.
this.csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
logfile.getPath().getName(), fs);
splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs);
} else {
// for tests only
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
@ -1952,7 +1950,7 @@ public class WALSplitter {
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
// update the value for the region
RegionStoreSequenceIds ids =
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
splitLogWorkerCoordination.getRegionFlushedSequenceId(failedServerName,
loc.getRegionInfo().getEncodedName());
if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
@ -2185,15 +2183,14 @@ public class WALSplitter {
}
private boolean isTableDisabledOrDisabling(TableName tableName) {
if (csm == null)
if (connection == null)
return false; // we can't get state without CoordinatedStateManager
if (tableName.isSystemTable())
return false; // assume that system tables never can be disabled
TableState tableState = tableStatesCache.get(tableName);
if (tableState == null) {
try {
tableState =
MetaTableAccessor.getTableState(csm.getServer().getConnection(), tableName);
tableState = MetaTableAccessor.getTableState(connection, tableName);
if (tableState != null)
tableStatesCache.put(tableName, tableState);
} catch (IOException e) {

View File

@ -109,9 +109,9 @@ public class MiniHBaseCluster extends HBaseCluster {
private Thread shutdownThread = null;
private User user = null;
public MiniHBaseClusterRegionServer(Configuration conf, CoordinatedStateManager cp)
public MiniHBaseClusterRegionServer(Configuration conf)
throws IOException, InterruptedException {
super(conf, cp);
super(conf);
this.user = User.getCurrent();
}

View File

@ -66,10 +66,9 @@ public class TestLocalHBaseCluster {
* running in local mode.
*/
public static class MyHMaster extends HMaster {
public MyHMaster(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
public MyHMaster(Configuration conf) throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
super(conf);
}
public int echo(int val) {
@ -82,9 +81,8 @@ public class TestLocalHBaseCluster {
*/
public static class MyHRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public MyHRegionServer(Configuration conf, CoordinatedStateManager cp) throws IOException,
InterruptedException {
super(conf, cp);
public MyHRegionServer(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
public int echo(int val) {

View File

@ -44,9 +44,8 @@ import java.io.IOException;
private static class TestMockRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public TestMockRegionServer(Configuration conf, CoordinatedStateManager cp)
throws IOException, InterruptedException {
super(conf, cp);
public TestMockRegionServer(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
protected int movedRegionCleanerPeriod() {

View File

@ -141,9 +141,9 @@ public class TestClientScannerRPCTimeout {
}
private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
public RegionServerWithScanTimeout(Configuration conf, CoordinatedStateManager cp)
public RegionServerWithScanTimeout(Configuration conf)
throws IOException, InterruptedException {
super(conf, cp);
super(conf);
}
protected RSRpcServices createRpcServices() throws IOException {
@ -168,7 +168,7 @@ public class TestClientScannerRPCTimeout {
throws ServiceException {
if (request.hasScannerId()) {
ScanResponse scanResponse = super.scan(controller, request);
if (this.tableScannerId == request.getScannerId() &&
if (this.tableScannerId == request.getScannerId() &&
(sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
try {
LOG.info("SLEEPING " + (rpcTimeout + 500));

View File

@ -198,9 +198,9 @@ public class TestMetaCache {
public static class RegionServerWithFakeRpcServices extends HRegionServer {
private FakeRSRpcServices rsRpcServices;
public RegionServerWithFakeRpcServices(Configuration conf, CoordinatedStateManager cp)
public RegionServerWithFakeRpcServices(Configuration conf)
throws IOException, InterruptedException {
super(conf, cp);
super(conf);
}
@Override

View File

@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@ -1151,8 +1150,8 @@ public class TestDistributedLogSplitting {
out.write(Bytes.toBytes("corrupted bytes"));
out.close();
ZKSplitLogManagerCoordination coordination =
(ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master
.getCoordinatedStateManager()).getSplitLogManagerCoordination();
(ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
.getSplitLogManagerCoordination();
coordination.setIgnoreDeleteForTesting(true);
executor = Executors.newSingleThreadExecutor();
Runnable runnable = new Runnable() {

View File

@ -26,8 +26,6 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
@ -68,10 +66,9 @@ public class TestHMasterRPCException {
conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 2000);
testUtil.startMiniZKCluster();
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
ZooKeeperWatcher watcher = testUtil.getZooKeeperWatcher();
ZKUtil.createWithParents(watcher, watcher.znodePaths.masterAddressZNode, Bytes.toBytes("fake:123"));
master = new HMaster(conf, cp);
master = new HMaster(conf);
rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
}

View File

@ -51,9 +51,8 @@ public class TestMasterMetrics {
private static HBaseTestingUtility TEST_UTIL;
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp) throws IOException,
KeeperException, InterruptedException {
super(conf, cp);
public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
super(conf);
}
/*
@Override

View File

@ -33,8 +33,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -47,7 +45,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -133,9 +130,7 @@ public class TestMasterNoCluster {
@Test
public void testStopDuringStart()
throws IOException, KeeperException, InterruptedException {
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TESTUTIL.getConfiguration());
HMaster master = new HMaster(TESTUTIL.getConfiguration(), cp);
HMaster master = new HMaster(TESTUTIL.getConfiguration());
master.start();
// Immediately have it stop. We used hang in assigning meta.
master.stopMaster();
@ -148,7 +143,7 @@ public class TestMasterNoCluster {
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
* @throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException
* @throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException
*/
@Ignore @Test // Disabled since HBASE-18511. Reenable when master can carry regions.
public void testFailover() throws Exception {
@ -186,15 +181,13 @@ public class TestMasterNoCluster {
// and get notification on transitions. We need to fake out any rpcs the
// master does opening/closing regions. Also need to fake out the address
// of the 'remote' mocked up regionservers.
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TESTUTIL.getConfiguration());
// Insert a mock for the connection, use TESTUTIL.getConfiguration rather than
// the conf from the master; the conf will already have an ClusterConnection
// associate so the below mocking of a connection will fail.
final ClusterConnection mockedConnection = HConnectionTestingUtility.getMockedConnectionAndDecorate(
TESTUTIL.getConfiguration(), rs0, rs0, rs0.getServerName(),
HRegionInfo.FIRST_META_REGIONINFO);
HMaster master = new HMaster(conf, cp) {
HMaster master = new HMaster(conf) {
InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
throws UnknownHostException {
// Return different address dependent on port passed.
@ -262,9 +255,7 @@ public class TestMasterNoCluster {
final ServerName deadServer = ServerName.valueOf("test.sample", 1, 100);
final MockRegionServer rs0 = new MockRegionServer(conf, newServer);
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TESTUTIL.getConfiguration());
HMaster master = new HMaster(conf, cp) {
HMaster master = new HMaster(conf) {
@Override
MasterMetaBootstrap createMetaBootstrap(final HMaster master, final MonitoredTask status) {
return new MasterMetaBootstrap(this, status) {

View File

@ -124,10 +124,9 @@ public class TestMetaShutdownHandler {
public static class MyRegionServer extends MiniHBaseClusterRegionServer {
public MyRegionServer(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
public MyRegionServer(Configuration conf) throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
super(conf);
}
@Override

View File

@ -39,7 +39,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log;
@ -48,7 +47,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -57,6 +55,7 @@ import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
@ -105,8 +104,7 @@ public class TestSplitLogManager {
public DummyMasterServices(ZooKeeperWatcher zkw, Configuration conf) {
super(conf);
this.zkw = zkw;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
cm.initialize(this);
cm = new ZkCoordinatedStateManager(this);
}
@Override

View File

@ -39,9 +39,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequ
public class OOMERegionServer extends HRegionServer {
private List<Put> retainer = new ArrayList<>();
public OOMERegionServer(HBaseConfiguration conf, CoordinatedStateManager cp)
throws IOException, InterruptedException {
super(conf, cp);
public OOMERegionServer(HBaseConfiguration conf) throws IOException, InterruptedException {
super(conf);
}
public void put(byte [] regionName, Put put)

View File

@ -27,8 +27,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.LoadBalancer;
@ -75,11 +73,9 @@ public class TestClusterId {
TEST_UTIL.startMiniDFSCluster(1);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
//start region server, needs to be separate
//so we get an unset clusterId
rst = JVMClusterUtil.createRegionServerThread(conf,cp,
HRegionServer.class, 0);
rst = JVMClusterUtil.createRegionServerThread(conf, HRegionServer.class, 0);
rst.start();
//Make sure RS is in blocking state
Thread.sleep(10000);
@ -92,7 +88,7 @@ public class TestClusterId {
assertNotNull(clusterId);
assertEquals(clusterId, rst.getRegionServer().getClusterId());
}
@Test
public void testRewritingClusterIdToPB() throws Exception {
TEST_UTIL.startMiniZKCluster();
@ -115,6 +111,6 @@ public class TestClusterId {
int expected = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration())? 2: 1;
assertEquals(expected, master.getServerManager().getOnlineServersList().size());
}
}

View File

@ -75,10 +75,6 @@ public class TestCompactionInDeadRegionServer {
super(conf);
}
public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) throws IOException {
super(conf, csm);
}
@Override
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
throws IOException {

View File

@ -31,11 +31,9 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Get;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
@ -65,8 +63,7 @@ public class TestPriorityRpc {
conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.getDataTestDir(this.getClass().getName());
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf, cp);
regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf);
priority = regionServer.rpcServices.getPriority();
}

View File

@ -191,9 +191,9 @@ public class TestRSKilledWhenInitializing {
* notices and so removes the region from its set of online regionservers.
*/
static class RegisterAndDieRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public RegisterAndDieRegionServer(Configuration conf, CoordinatedStateManager cp)
public RegisterAndDieRegionServer(Configuration conf)
throws IOException, InterruptedException {
super(conf, cp);
super(conf);
}
@Override

View File

@ -518,10 +518,8 @@ public class TestRegionMergeTransactionOnCluster {
// Make it public so that JVMClusterUtil can access it.
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
super(conf);
}
@Override

View File

@ -71,7 +71,7 @@ public class TestRegionServerHostname {
TEST_UTIL.getConfiguration().set(HRegionServer.RS_HOSTNAME_KEY, invalidHostname);
HRegionServer hrs = null;
try {
hrs = new HRegionServer(TEST_UTIL.getConfiguration(), null);
hrs = new HRegionServer(TEST_UTIL.getConfiguration());
} catch (IllegalArgumentException iae) {
assertTrue(iae.getMessage(),
iae.getMessage().contains("Failed resolve of " + invalidHostname) ||

View File

@ -164,10 +164,9 @@ public class TestRegionServerReportForDuty {
private boolean rpcStubCreatedFlag = false;
private boolean masterChanged = false;
public MyRegionServer(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
public MyRegionServer(Configuration conf) throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
super(conf);
}
@Override

View File

@ -397,11 +397,6 @@ public class TestScannerHeartbeatMessages {
super(conf);
}
public HeartbeatHRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException {
super(conf, csm);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new HeartbeatRPCServices(this);

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
@ -88,8 +88,7 @@ public class TestSplitLogWorker {
public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
this.zkw = zkw;
this.conf = conf;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
cm.initialize(this);
cm = new ZkCoordinatedStateManager(this);
}
@Override

View File

@ -951,10 +951,8 @@ public class TestSplitTransactionOnCluster {
// Make it public so that JVMClusterUtil can access it.
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
super(conf);
}
@Override

View File

@ -906,7 +906,7 @@ public abstract class AbstractTestWALReplay {
assertNotNull(listStatus);
assertTrue(listStatus.length > 0);
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, mode, wals);
this.fs, this.conf, null, null, null, null, mode, wals);
FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
"recovered.edits")), new PathFilter() {
@ -1059,9 +1059,9 @@ public abstract class AbstractTestWALReplay {
first = fs.getFileStatus(smallFile);
second = fs.getFileStatus(largeFile);
}
WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null,
WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, null,
RecoveryMode.LOG_SPLITTING, wals);
WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null,
WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, null,
RecoveryMode.LOG_SPLITTING, wals);
WAL wal = createWAL(this.conf, hbaseRootDir, logName);
region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);

View File

@ -277,11 +277,6 @@ public class TestReplicationSource {
super(conf);
}
public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException, InterruptedException {
super(conf, csm);
}
@Override
protected void stopServiceThreads() {
// Add a delay before service threads are shutdown.

View File

@ -134,7 +134,7 @@ public class TestWALReaderOnSecureWAL {
wal.sync();
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
wal.shutdown();
return walPath;
} finally {
// restore the cell codec class
@ -182,11 +182,11 @@ public class TestWALReaderOnSecureWAL {
}
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
Path rootdir = FSUtils.getRootDir(conf);
try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, null, mode);
s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");
@ -229,11 +229,11 @@ public class TestWALReaderOnSecureWAL {
}
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
Path rootdir = FSUtils.getRootDir(conf);
try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, null, mode);
s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");

View File

@ -372,8 +372,7 @@ public class TestWALSplit {
}
/**
* @throws IOException
* @see https://issues.apache.org/jira/browse/HBASE-3020
* {@see https://issues.apache.org/jira/browse/HBASE-3020}
*/
@Test (timeout=300000)
public void testRecoveredEditsPathForMeta() throws IOException {
@ -805,7 +804,7 @@ public class TestWALSplit {
assertTrue("There should be some log greater than size 0.", 0 < largestSize);
// Set up a splitter that will throw an IOE on the output side
WALSplitter logSplitter = new WALSplitter(wals,
conf, HBASEDIR, fs, null, null, this.mode) {
conf, HBASEDIR, fs, null, null, null, this.mode) {
@Override
protected Writer createWriter(Path logfile) throws IOException {
Writer mockWriter = Mockito.mock(Writer.class);
@ -932,7 +931,7 @@ public class TestWALSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = WALSplitter.splitLogFile(
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode, wals);
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode, wals);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@ -991,7 +990,7 @@ public class TestWALSplit {
// Create a splitter that reads and writes the data without touching disk
WALSplitter logSplitter = new WALSplitter(wals,
localConf, HBASEDIR, fs, null, null, this.mode) {
localConf, HBASEDIR, fs, null, null, null, this.mode) {
/* Produce a mock writer that doesn't write anywhere */
@Override
@ -1123,8 +1122,7 @@ public class TestWALSplit {
}
/**
* @throws IOException
* @see https://issues.apache.org/jira/browse/HBASE-4862
* {@see https://issues.apache.org/jira/browse/HBASE-4862}
*/
@Test (timeout=300000)
public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
@ -1142,7 +1140,7 @@ public class TestWALSplit {
logfiles != null && logfiles.length > 0);
WALSplitter logSplitter = new WALSplitter(wals,
conf, HBASEDIR, fs, null, null, this.mode) {
conf, HBASEDIR, fs, null, null, null, this.mode) {
@Override
protected Writer createWriter(Path logfile)
throws IOException {

View File

@ -2024,17 +2024,6 @@ A comma-separated list of
.Default
``
[[hbase.coordinated.state.manager.class]]
*`hbase.coordinated.state.manager.class`*::
+
.Description
Fully qualified name of class implementing coordinated state manager.
+
.Default
`org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager`
[[hbase.regionserver.storefile.refresh.period]]
*`hbase.regionserver.storefile.refresh.period`*::
+