HBASE-19073 Cleanup CoordinatedStateManager

- Merged BaseCSM class into CSM interface
- Removed config hbase.coordinated.state.manager.class
- Since state manager is not pluggable anymore, we don't need start/stop/initialize to setup unknown classes. Our internal ZkCSM now requires Server in constructor itself. Makes the dependency clearer too.
- Removed CSM from HRegionServer and HMaster constructor. Although it's a step back from dependency injection, but it's more consistent with our current (not good)  pattern where we initialize everything in the ctor itself.

Change-Id: Ifca06bb354adec5b11ea1bad4707e014410491fc
This commit is contained in:
Apekshit Sharma 2017-10-20 17:20:17 -07:00
parent 9f2f2db91c
commit 3ce7ab3c70
46 changed files with 161 additions and 408 deletions

View File

@ -26,12 +26,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
@ -95,10 +95,7 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager {
// setup the default procedure coordinator // setup the default procedure coordinator
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
BaseCoordinatedStateManager coordManager = CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(master);
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory
.getCoordinatedStateManager(master.getConfiguration());
coordManager.initialize(master);
ProcedureCoordinatorRpcs comms = ProcedureCoordinatorRpcs comms =
coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);

View File

@ -24,12 +24,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
@ -159,12 +159,8 @@ public class LogRollRegionServerProcedureManager extends RegionServerProcedureMa
+ " setting"); + " setting");
return; return;
} }
BaseCoordinatedStateManager coordManager = CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(rss);
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory. this.memberRpcs = coordManager
getCoordinatedStateManager(rss.getConfiguration());
coordManager.initialize(rss);
this.memberRpcs =
coordManager
.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
// read in the backup handler configuration properties // read in the backup handler configuration properties

View File

@ -1239,10 +1239,6 @@ public final class HConstants {
public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
/** Config for pluggable consensus provider */
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class";
/** Configuration key for SplitLog manager timeout */ /** Configuration key for SplitLog manager timeout */
public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout"; public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout";
@ -1328,7 +1324,7 @@ public final class HConstants {
public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY = public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
"hbase.canary.write.table.check.period"; "hbase.canary.write.table.check.period";
public static final String HBASE_CANARY_READ_RAW_SCAN_KEY = "hbase.canary.read.raw.enabled"; public static final String HBASE_CANARY_READ_RAW_SCAN_KEY = "hbase.canary.read.raw.enabled";
/** /**

View File

@ -17,7 +17,14 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/** /**
* Implementations of this interface will keep and return to clients * Implementations of this interface will keep and return to clients
@ -28,31 +35,27 @@ import org.apache.yetus.audience.InterfaceAudience;
* For each coarse-grained area of operations there will be a separate * For each coarse-grained area of operations there will be a separate
* interface with implementation, providing API for relevant operations * interface with implementation, providing API for relevant operations
* requiring coordination. * requiring coordination.
*
* Property hbase.coordinated.state.manager.class in hbase-site.xml controls
* which provider to use.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface CoordinatedStateManager { public interface CoordinatedStateManager {
/**
* Method to retrieve coordination for split log worker
*/
SplitLogWorkerCoordination getSplitLogWorkerCoordination();
/** /**
* Initialize coordinated state management service. * Method to retrieve coordination for split log manager
* @param server server instance to run within.
*/ */
void initialize(Server server); SplitLogManagerCoordination getSplitLogManagerCoordination();
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
*/
ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
throws IOException;
/** /**
* Starts service. * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
*/ */
void start(); ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException;
/**
* Stops service.
*/
void stop();
/**
* @return instance of Server coordinated state manager runs within
*/
Server getServer();
} }

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Creates instance of {@link CoordinatedStateManager}
* based on configuration.
*/
@InterfaceAudience.Private
public final class CoordinatedStateManagerFactory {
/**
* Private to keep this class from being accidentally instantiated.
*/
private CoordinatedStateManagerFactory(){}
/**
* Creates consensus provider from the given configuration.
* @param conf Configuration
* @return Implementation of {@link CoordinatedStateManager}
*/
public static CoordinatedStateManager getCoordinatedStateManager(Configuration conf) {
Class<? extends CoordinatedStateManager> coordinatedStateMgrKlass =
conf.getClass(HConstants.HBASE_COORDINATED_STATE_MANAGER_CLASS,
ZkCoordinatedStateManager.class, CoordinatedStateManager.class);
return ReflectionUtils.newInstance(coordinatedStateMgrKlass, conf);
}
}

View File

@ -173,14 +173,8 @@ public class LocalHBaseCluster {
// Create each regionserver with its own Configuration instance so each has // Create each regionserver with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in // its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager). // the guts of ConnectionManager).
// Also, create separate CoordinatedStateManager instance per Server.
// This is special case when we have to have more than 1 CoordinatedStateManager
// within 1 process.
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.RegionServerThread rst =
JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf
.getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index); .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
this.regionThreads.add(rst); this.regionThreads.add(rst);
@ -208,13 +202,7 @@ public class LocalHBaseCluster {
// Create each master with its own Configuration instance so each has // Create each master with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in // its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager. // the guts of ConnectionManager.
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
// Also, create separate CoordinatedStateManager instance per Server.
// This is special case when we have to have more than 1 CoordinatedStateManager
// within 1 process.
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
(Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index); (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
this.masterThreads.add(mt); this.masterThreads.add(mt);
return mt; return mt;

View File

@ -1,76 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Server;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.zookeeper.KeeperException;
/**
* Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations.
* Defines methods to retrieve coordination objects for relevant areas. CoordinatedStateManager
* reference returned from Server interface has to be casted to this type to
* access those methods.
*/
@InterfaceAudience.Private
public abstract class BaseCoordinatedStateManager implements CoordinatedStateManager {
@Override
public void initialize(Server server) {
}
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public Server getServer() {
return null;
}
/**
* Method to retrieve coordination for split log worker
*/
public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination();
/**
* Method to retrieve coordination for split log manager
*/
public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
*/
public abstract ProcedureCoordinatorRpcs
getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
*/
public abstract ProcedureMemberRpcs
getProcedureMemberRpcs(String procType) throws KeeperException;
}

View File

@ -58,7 +58,7 @@ public interface SplitLogManagerCoordination {
/** /**
* Detail class that shares data between coordination and split log manager * Detail class that shares data between coordination and split log manager
*/ */
public static class SplitLogManagerDetails { class SplitLogManagerDetails {
final private ConcurrentMap<String, Task> tasks; final private ConcurrentMap<String, Task> tasks;
final private MasterServices master; final private MasterServices master;
final private Set<String> failedDeletions; final private Set<String> failedDeletions;
@ -156,7 +156,7 @@ public interface SplitLogManagerCoordination {
* @throws InterruptedIOException * @throws InterruptedIOException
* @throws IOException in case of failure * @throws IOException in case of failure
*/ */
void setRecoveryMode(boolean b) throws InterruptedIOException, IOException; void setRecoveryMode(boolean b) throws IOException;
/** /**
* Removes known stale servers * Removes known stale servers

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter;
@ -101,14 +102,14 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
private boolean isDrainingDone = false; private boolean isDrainingDone = false;
public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager, public ZKSplitLogManagerCoordination(Configuration conf, ZooKeeperWatcher watcher) {
ZooKeeperWatcher watcher) {
super(watcher); super(watcher);
this.conf = conf;
taskFinisher = new TaskFinisher() { taskFinisher = new TaskFinisher() {
@Override @Override
public Status finish(ServerName workerName, String logfile) { public Status finish(ServerName workerName, String logfile) {
try { try {
WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration()); WALSplitter.finishSplitLogFile(logfile, conf);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Could not finish splitting of log file " + logfile, e); LOG.warn("Could not finish splitting of log file " + logfile, e);
return Status.ERR; return Status.ERR;
@ -116,7 +117,6 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
return Status.DONE; return Status.DONE;
} }
}; };
this.conf = manager.getServer().getConfiguration();
} }
@Override @Override
@ -1122,6 +1122,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
/** /**
* Temporary function that is used by unit tests only * Temporary function that is used by unit tests only
*/ */
@VisibleForTesting
public void setIgnoreDeleteForTesting(boolean b) { public void setIgnoreDeleteForTesting(boolean b) {
ignoreZKDeleteForTesting = b; ignoreZKDeleteForTesting = b;
} }

View File

@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.coordination;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
@ -33,24 +36,16 @@ import org.apache.zookeeper.KeeperException;
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}. * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { public class ZkCoordinatedStateManager implements CoordinatedStateManager {
protected Server server;
protected ZooKeeperWatcher watcher; protected ZooKeeperWatcher watcher;
protected SplitLogWorkerCoordination splitLogWorkerCoordination; protected SplitLogWorkerCoordination splitLogWorkerCoordination;
protected SplitLogManagerCoordination splitLogManagerCoordination; protected SplitLogManagerCoordination splitLogManagerCoordination;
@Override public ZkCoordinatedStateManager(Server server) {
public void initialize(Server server) {
this.server = server;
this.watcher = server.getZooKeeper(); this.watcher = server.getZooKeeper();
splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher); splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(server.getServerName(), watcher);
splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher); splitLogManagerCoordination = new ZKSplitLogManagerCoordination(server.getConfiguration(),
watcher);
}
@Override
public Server getServer() {
return server;
} }
@Override @Override

View File

@ -90,13 +90,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
protected final AtomicInteger tasksInProgress = new AtomicInteger(0); protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
private int maxConcurrentTasks = 0; private int maxConcurrentTasks = 0;
private final ZkCoordinatedStateManager manager; private final ServerName serverName;
public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager, public ZkSplitLogWorkerCoordination(ServerName serverName, ZooKeeperWatcher watcher) {
ZooKeeperWatcher watcher) {
super(watcher); super(watcher);
manager = zkCoordinatedStateManager; this.serverName = serverName;
} }
/** /**
@ -185,7 +183,6 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
// currentTask can change but that's ok // currentTask can change but that's ok
String taskpath = currentTask; String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) { if (taskpath != null && taskpath.equals(path)) {
ServerName serverName = manager.getServer().getServerName();
// have to compare data. cannot compare version because then there // have to compare data. cannot compare version because then there
// will be race with attemptToOwnTask() // will be race with attemptToOwnTask()
// cannot just check whether the node has been transitioned to // cannot just check whether the node has been transitioned to

View File

@ -468,13 +468,12 @@ public class HMaster extends HRegionServer implements MasterServices {
* #finishActiveMasterInitialization(MonitoredTask) after * #finishActiveMasterInitialization(MonitoredTask) after
* the master becomes the active one. * the master becomes the active one.
*/ */
public HMaster(final Configuration conf, CoordinatedStateManager csm) public HMaster(final Configuration conf)
throws IOException, KeeperException { throws IOException, KeeperException {
super(conf, csm); super(conf);
try { try {
this.rsFatals = new MemoryBoundedLogMessageBuffer( this.rsFatals = new MemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
LOG.info("hbase.rootdir=" + getRootDir() + LOG.info("hbase.rootdir=" + getRootDir() +
", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
@ -2828,11 +2827,10 @@ public class HMaster extends HRegionServer implements MasterServices {
* @return HMaster instance. * @return HMaster instance.
*/ */
public static HMaster constructMaster(Class<? extends HMaster> masterClass, public static HMaster constructMaster(Class<? extends HMaster> masterClass,
final Configuration conf, final CoordinatedStateManager cp) { final Configuration conf) {
try { try {
Constructor<? extends HMaster> c = Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class);
masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class); return c.newInstance(conf);
return c.newInstance(conf, cp);
} catch(Exception e) { } catch(Exception e) {
Throwable error = e; Throwable error = e;
if (e instanceof InvocationTargetException && if (e instanceof InvocationTargetException &&

View File

@ -29,8 +29,6 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
@ -230,9 +228,7 @@ public class HMasterCommandLine extends ServerCommandLine {
waitOnMasterThreads(cluster); waitOnMasterThreads(cluster);
} else { } else {
logProcessInfo(getConf()); logProcessInfo(getConf());
CoordinatedStateManager csm = HMaster master = HMaster.constructMaster(masterClass, conf);
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
HMaster master = HMaster.constructMaster(masterClass, conf, csm);
if (master.isStopped()) { if (master.isStopped()) {
LOG.info("Won't bring the Master up as a shutdown is requested"); LOG.info("Won't bring the Master up as a shutdown is requested");
return 1; return 1;
@ -302,9 +298,9 @@ public class HMasterCommandLine extends ServerCommandLine {
public static class LocalHMaster extends HMaster { public static class LocalHMaster extends HMaster {
private MiniZooKeeperCluster zkcluster = null; private MiniZooKeeperCluster zkcluster = null;
public LocalHMaster(Configuration conf, CoordinatedStateManager csm) public LocalHMaster(Configuration conf)
throws IOException, KeeperException, InterruptedException { throws IOException, KeeperException, InterruptedException {
super(conf, csm); super(conf);
} }
@Override @Override

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -153,8 +152,7 @@ public class SplitLogManager {
} }
private SplitLogManagerCoordination getSplitLogManagerCoordination() { private SplitLogManagerCoordination getSplitLogManagerCoordination() {
return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
.getSplitLogManagerCoordination();
} }
private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException { private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {

View File

@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -92,8 +91,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient; import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@ -505,7 +504,7 @@ public class HRegionServer extends HasThread implements
protected final RSRpcServices rpcServices; protected final RSRpcServices rpcServices;
protected BaseCoordinatedStateManager csm; protected CoordinatedStateManager csm;
/** /**
* Configuration manager is used to register/deregister and notify the configuration observers * Configuration manager is used to register/deregister and notify the configuration observers
@ -533,21 +532,13 @@ public class HRegionServer extends HasThread implements
private final boolean masterless; private final boolean masterless;
static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
/**
* Starts a HRegionServer at the default location.
*/
public HRegionServer(Configuration conf) throws IOException, InterruptedException {
this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
}
/** /**
* Starts a HRegionServer at the default location * Starts a HRegionServer at the default location
*
* @param csm implementation of CoordinatedStateManager to be used
*/ */
// Don't start any services or managers in here in the Consructor. // Don't start any services or managers in here in the Constructor.
// Defer till after we register with the Master as much as possible. See #startServices. // Defer till after we register with the Master as much as possible. See #startServices.
public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException { public HRegionServer(Configuration conf) throws IOException {
super("RegionServer"); // thread name super("RegionServer"); // thread name
try { try {
this.startcode = System.currentTimeMillis(); this.startcode = System.currentTimeMillis();
@ -642,9 +633,7 @@ public class HRegionServer extends HasThread implements
// If no master in cluster, skip trying to track one or look for a cluster status. // If no master in cluster, skip trying to track one or look for a cluster status.
if (!this.masterless) { if (!this.masterless) {
this.csm = (BaseCoordinatedStateManager) csm; this.csm = new ZkCoordinatedStateManager(this);
this.csm.initialize(this);
this.csm.start();
masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
masterAddressTracker.start(); masterAddressTracker.start();
@ -2924,7 +2913,7 @@ public class HRegionServer extends HasThread implements
} }
@Override @Override
public BaseCoordinatedStateManager getCoordinatedStateManager() { public CoordinatedStateManager getCoordinatedStateManager() {
return csm; return csm;
} }
@ -3020,11 +3009,11 @@ public class HRegionServer extends HasThread implements
*/ */
public static HRegionServer constructRegionServer( public static HRegionServer constructRegionServer(
Class<? extends HRegionServer> regionServerClass, Class<? extends HRegionServer> regionServerClass,
final Configuration conf2, CoordinatedStateManager cp) { final Configuration conf2) {
try { try {
Constructor<? extends HRegionServer> c = regionServerClass Constructor<? extends HRegionServer> c = regionServerClass
.getConstructor(Configuration.class, CoordinatedStateManager.class); .getConstructor(Configuration.class);
return c.newInstance(conf2, cp); return c.newInstance(conf2);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Failed construction of " + "Regionserver: " throw new RuntimeException("Failed construction of " + "Regionserver: "
+ regionServerClass.toString(), e); + regionServerClass.toString(), e);

View File

@ -23,10 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.util.ServerCommandLine; import org.apache.hadoop.hbase.util.ServerCommandLine;
/** /**
@ -52,7 +50,6 @@ public class HRegionServerCommandLine extends ServerCommandLine {
private int start() throws Exception { private int start() throws Exception {
Configuration conf = getConf(); Configuration conf = getConf();
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
try { try {
// If 'local', don't start a region server here. Defer to // If 'local', don't start a region server here. Defer to
// LocalHBaseCluster. It manages 'local' clusters. // LocalHBaseCluster. It manages 'local' clusters.
@ -61,7 +58,7 @@ public class HRegionServerCommandLine extends ServerCommandLine {
+ HConstants.CLUSTER_DISTRIBUTED + " is false"); + HConstants.CLUSTER_DISTRIBUTED + " is false");
} else { } else {
logProcessInfo(getConf()); logProcessInfo(getConf());
HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf, cp); HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf);
hrs.start(); hrs.start();
hrs.join(); hrs.join();
if (hrs.isAborted()) { if (hrs.isAborted()) {

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
@ -45,8 +44,8 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
/** /**
* This worker is spawned in every regionserver, including master. The Worker waits for log * This worker is spawned in every regionserver, including master. The Worker waits for log
* splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager} * splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager}
* running in the master and races with other workers in other serves to acquire those tasks. * running in the master and races with other workers in other serves to acquire those tasks.
* The coordination is done via coordination engine. * The coordination is done via coordination engine.
* <p> * <p>
* If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task. * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task.
@ -75,9 +74,7 @@ public class SplitLogWorker implements Runnable {
TaskExecutor splitTaskExecutor) { TaskExecutor splitTaskExecutor) {
this.server = server; this.server = server;
this.conf = conf; this.conf = conf;
this.coordination = this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager())
.getSplitLogWorkerCoordination();
this.server = server; this.server = server;
coordination.init(server, conf, splitTaskExecutor, this); coordination.init(server, conf, splitTaskExecutor, this);
} }
@ -102,7 +99,9 @@ public class SplitLogWorker implements Runnable {
// encountered a bad non-retry-able persistent error. // encountered a bad non-retry-able persistent error.
try { try {
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) { fs, conf, p, sequenceIdChecker,
server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
server.getConnection(), mode, factory)) {
return Status.PREEMPTED; return Status.PREEMPTED;
} }
} catch (InterruptedIOException iioe) { } catch (InterruptedIOException iioe) {
@ -186,7 +185,7 @@ public class SplitLogWorker implements Runnable {
* acquired by a {@link SplitLogWorker}. Since there isn't a water-tight * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
* guarantee that two workers will not be executing the same task therefore it * guarantee that two workers will not be executing the same task therefore it
* is better to have workers prepare the task and then have the * is better to have workers prepare the task and then have the
* {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
* SplitLogManager.TaskFinisher * SplitLogManager.TaskFinisher
*/ */
public interface TaskExecutor { public interface TaskExecutor {

View File

@ -72,23 +72,18 @@ public class JVMClusterUtil {
* Creates a {@link RegionServerThread}. * Creates a {@link RegionServerThread}.
* Call 'start' on the returned thread to make it run. * Call 'start' on the returned thread to make it run.
* @param c Configuration to use. * @param c Configuration to use.
* @param cp consensus provider to use
* @param hrsc Class to create. * @param hrsc Class to create.
* @param index Used distinguishing the object returned. * @param index Used distinguishing the object returned.
* @throws IOException * @throws IOException
* @return Region server added. * @return Region server added.
*/ */
public static JVMClusterUtil.RegionServerThread createRegionServerThread( public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
final Configuration c, CoordinatedStateManager cp, final Class<? extends HRegionServer> hrsc, final Class<? extends HRegionServer> hrsc, final int index) throws IOException {
final int index)
throws IOException {
HRegionServer server; HRegionServer server;
try { try {
Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class,
CoordinatedStateManager.class);
ctor.setAccessible(true); ctor.setAccessible(true);
server = ctor.newInstance(c, cp); server = ctor.newInstance(c);
} catch (InvocationTargetException ite) { } catch (InvocationTargetException ite) {
Throwable target = ite.getTargetException(); Throwable target = ite.getTargetException();
throw new RuntimeException("Failed construction of RegionServer: " + throw new RuntimeException("Failed construction of RegionServer: " +
@ -124,20 +119,16 @@ public class JVMClusterUtil {
* Creates a {@link MasterThread}. * Creates a {@link MasterThread}.
* Call 'start' on the returned thread to make it run. * Call 'start' on the returned thread to make it run.
* @param c Configuration to use. * @param c Configuration to use.
* @param cp consensus provider to use
* @param hmc Class to create. * @param hmc Class to create.
* @param index Used distinguishing the object returned. * @param index Used distinguishing the object returned.
* @throws IOException * @throws IOException
* @return Master added. * @return Master added.
*/ */
public static JVMClusterUtil.MasterThread createMasterThread( public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
final Configuration c, CoordinatedStateManager cp, final Class<? extends HMaster> hmc, final Class<? extends HMaster> hmc, final int index) throws IOException {
final int index)
throws IOException {
HMaster server; HMaster server;
try { try {
server = hmc.getConstructor(Configuration.class, CoordinatedStateManager.class). server = hmc.getConstructor(Configuration.class).newInstance(c);
newInstance(c, cp);
} catch (InvocationTargetException ite) { } catch (InvocationTargetException ite) {
Throwable target = ite.getTargetException(); Throwable target = ite.getTargetException();
throw new RuntimeException("Failed construction of Master: " + throw new RuntimeException("Failed construction of Master: " +

View File

@ -18,6 +18,9 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@ -82,7 +85,6 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
@ -146,7 +148,8 @@ public class WALSplitter {
private Map<TableName, TableState> tableStatesCache = private Map<TableName, TableState> tableStatesCache =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private BaseCoordinatedStateManager csm; private SplitLogWorkerCoordination splitLogWorkerCoordination;
private Connection connection;
private final WALFactory walFactory; private final WALFactory walFactory;
private MonitoredTask status; private MonitoredTask status;
@ -177,7 +180,8 @@ public class WALSplitter {
@VisibleForTesting @VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker, FileSystem fs, LastSequenceId idChecker,
CoordinatedStateManager csm, RecoveryMode mode) { SplitLogWorkerCoordination splitLogWorkerCoordination, Connection connection,
RecoveryMode mode) {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@ -185,7 +189,9 @@ public class WALSplitter {
this.rootDir = rootDir; this.rootDir = rootDir;
this.fs = fs; this.fs = fs;
this.sequenceIdChecker = idChecker; this.sequenceIdChecker = idChecker;
this.csm = (BaseCoordinatedStateManager)csm; this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.connection = connection;
this.walFactory = factory; this.walFactory = factory;
this.controller = new PipelineController(); this.controller = new PipelineController();
@ -199,7 +205,7 @@ public class WALSplitter {
this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (csm != null && this.distributedLogReplay) { if (this.splitLogWorkerCoordination != null && this.distributedLogReplay) {
outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads); outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
} else { } else {
if (this.distributedLogReplay) { if (this.distributedLogReplay) {
@ -217,20 +223,14 @@ public class WALSplitter {
* <p> * <p>
* If the log file has N regions then N recovered.edits files will be produced. * If the log file has N regions then N recovered.edits files will be produced.
* <p> * <p>
* @param rootDir
* @param logfile
* @param fs
* @param conf
* @param reporter
* @param idChecker
* @param cp coordination state manager
* @return false if it is interrupted by the progress-able. * @return false if it is interrupted by the progress-able.
* @throws IOException
*/ */
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException { SplitLogWorkerCoordination splitLogWorkerCoordination, Connection connection,
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode); RecoveryMode mode, final WALFactory factory) throws IOException {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker,
splitLogWorkerCoordination, connection, mode);
return s.splitLogFile(logfile, reporter); return s.splitLogFile(logfile, reporter);
} }
@ -246,7 +246,7 @@ public class WALSplitter {
List<Path> splits = new ArrayList<>(); List<Path> splits = new ArrayList<>();
if (logfiles != null && logfiles.length > 0) { if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) { for (FileStatus logfile: logfiles) {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, null,
RecoveryMode.LOG_SPLITTING); RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) { if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
@ -317,9 +317,8 @@ public class WALSplitter {
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) { if (lastFlushedSequenceId == null) {
if (this.distributedLogReplay) { if (this.distributedLogReplay) {
RegionStoreSequenceIds ids = RegionStoreSequenceIds ids = splitLogWorkerCoordination.getRegionFlushedSequenceId(
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, failedServerName, encodedRegionNameAsStr);
encodedRegionNameAsStr);
if (ids != null) { if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId(); lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -377,10 +376,9 @@ public class WALSplitter {
throw iie; throw iie;
} catch (CorruptedLogFileException e) { } catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted WAL=" + logPath, e); LOG.warn("Could not parse, corrupted WAL=" + logPath, e);
if (this.csm != null) { if (splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null. // Some tests pass in a csm of null.
this.csm.getSplitLogWorkerCoordination().markCorrupted(rootDir, splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs);
logfile.getPath().getName(), fs);
} else { } else {
// for tests only // for tests only
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
@ -1952,7 +1950,7 @@ public class WALSplitter {
// retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
// update the value for the region // update the value for the region
RegionStoreSequenceIds ids = RegionStoreSequenceIds ids =
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, splitLogWorkerCoordination.getRegionFlushedSequenceId(failedServerName,
loc.getRegionInfo().getEncodedName()); loc.getRegionInfo().getEncodedName());
if (ids != null) { if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId(); lastFlushedSequenceId = ids.getLastFlushedSequenceId();
@ -2185,15 +2183,14 @@ public class WALSplitter {
} }
private boolean isTableDisabledOrDisabling(TableName tableName) { private boolean isTableDisabledOrDisabling(TableName tableName) {
if (csm == null) if (connection == null)
return false; // we can't get state without CoordinatedStateManager return false; // we can't get state without CoordinatedStateManager
if (tableName.isSystemTable()) if (tableName.isSystemTable())
return false; // assume that system tables never can be disabled return false; // assume that system tables never can be disabled
TableState tableState = tableStatesCache.get(tableName); TableState tableState = tableStatesCache.get(tableName);
if (tableState == null) { if (tableState == null) {
try { try {
tableState = tableState = MetaTableAccessor.getTableState(connection, tableName);
MetaTableAccessor.getTableState(csm.getServer().getConnection(), tableName);
if (tableState != null) if (tableState != null)
tableStatesCache.put(tableName, tableState); tableStatesCache.put(tableName, tableState);
} catch (IOException e) { } catch (IOException e) {

View File

@ -109,9 +109,9 @@ public class MiniHBaseCluster extends HBaseCluster {
private Thread shutdownThread = null; private Thread shutdownThread = null;
private User user = null; private User user = null;
public MiniHBaseClusterRegionServer(Configuration conf, CoordinatedStateManager cp) public MiniHBaseClusterRegionServer(Configuration conf)
throws IOException, InterruptedException { throws IOException, InterruptedException {
super(conf, cp); super(conf);
this.user = User.getCurrent(); this.user = User.getCurrent();
} }

View File

@ -66,10 +66,9 @@ public class TestLocalHBaseCluster {
* running in local mode. * running in local mode.
*/ */
public static class MyHMaster extends HMaster { public static class MyHMaster extends HMaster {
public MyHMaster(Configuration conf, CoordinatedStateManager cp) public MyHMaster(Configuration conf) throws IOException, KeeperException,
throws IOException, KeeperException,
InterruptedException { InterruptedException {
super(conf, cp); super(conf);
} }
public int echo(int val) { public int echo(int val) {
@ -82,9 +81,8 @@ public class TestLocalHBaseCluster {
*/ */
public static class MyHRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer { public static class MyHRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public MyHRegionServer(Configuration conf, CoordinatedStateManager cp) throws IOException, public MyHRegionServer(Configuration conf) throws IOException, InterruptedException {
InterruptedException { super(conf);
super(conf, cp);
} }
public int echo(int val) { public int echo(int val) {

View File

@ -44,9 +44,8 @@ import java.io.IOException;
private static class TestMockRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer { private static class TestMockRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public TestMockRegionServer(Configuration conf, CoordinatedStateManager cp) public TestMockRegionServer(Configuration conf) throws IOException, InterruptedException {
throws IOException, InterruptedException { super(conf);
super(conf, cp);
} }
protected int movedRegionCleanerPeriod() { protected int movedRegionCleanerPeriod() {

View File

@ -141,9 +141,9 @@ public class TestClientScannerRPCTimeout {
} }
private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer { private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
public RegionServerWithScanTimeout(Configuration conf, CoordinatedStateManager cp) public RegionServerWithScanTimeout(Configuration conf)
throws IOException, InterruptedException { throws IOException, InterruptedException {
super(conf, cp); super(conf);
} }
protected RSRpcServices createRpcServices() throws IOException { protected RSRpcServices createRpcServices() throws IOException {
@ -168,7 +168,7 @@ public class TestClientScannerRPCTimeout {
throws ServiceException { throws ServiceException {
if (request.hasScannerId()) { if (request.hasScannerId()) {
ScanResponse scanResponse = super.scan(controller, request); ScanResponse scanResponse = super.scan(controller, request);
if (this.tableScannerId == request.getScannerId() && if (this.tableScannerId == request.getScannerId() &&
(sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) { (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
try { try {
LOG.info("SLEEPING " + (rpcTimeout + 500)); LOG.info("SLEEPING " + (rpcTimeout + 500));

View File

@ -198,9 +198,9 @@ public class TestMetaCache {
public static class RegionServerWithFakeRpcServices extends HRegionServer { public static class RegionServerWithFakeRpcServices extends HRegionServer {
private FakeRSRpcServices rsRpcServices; private FakeRSRpcServices rsRpcServices;
public RegionServerWithFakeRpcServices(Configuration conf, CoordinatedStateManager cp) public RegionServerWithFakeRpcServices(Configuration conf)
throws IOException, InterruptedException { throws IOException, InterruptedException {
super(conf, cp); super(conf);
} }
@Override @Override

View File

@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@ -1151,8 +1150,8 @@ public class TestDistributedLogSplitting {
out.write(Bytes.toBytes("corrupted bytes")); out.write(Bytes.toBytes("corrupted bytes"));
out.close(); out.close();
ZKSplitLogManagerCoordination coordination = ZKSplitLogManagerCoordination coordination =
(ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
.getCoordinatedStateManager()).getSplitLogManagerCoordination(); .getSplitLogManagerCoordination();
coordination.setIgnoreDeleteForTesting(true); coordination.setIgnoreDeleteForTesting(true);
executor = Executors.newSingleThreadExecutor(); executor = Executors.newSingleThreadExecutor();
Runnable runnable = new Runnable() { Runnable runnable = new Runnable() {

View File

@ -26,8 +26,6 @@ import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -68,10 +66,9 @@ public class TestHMasterRPCException {
conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 2000); conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 2000);
testUtil.startMiniZKCluster(); testUtil.startMiniZKCluster();
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
ZooKeeperWatcher watcher = testUtil.getZooKeeperWatcher(); ZooKeeperWatcher watcher = testUtil.getZooKeeperWatcher();
ZKUtil.createWithParents(watcher, watcher.znodePaths.masterAddressZNode, Bytes.toBytes("fake:123")); ZKUtil.createWithParents(watcher, watcher.znodePaths.masterAddressZNode, Bytes.toBytes("fake:123"));
master = new HMaster(conf, cp); master = new HMaster(conf);
rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
} }

View File

@ -51,9 +51,8 @@ public class TestMasterMetrics {
private static HBaseTestingUtility TEST_UTIL; private static HBaseTestingUtility TEST_UTIL;
public static class MyMaster extends HMaster { public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp) throws IOException, public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
KeeperException, InterruptedException { super(conf);
super(conf, cp);
} }
/* /*
@Override @Override

View File

@ -33,8 +33,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.CoordinatedStateException; import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
@ -47,7 +45,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -133,9 +130,7 @@ public class TestMasterNoCluster {
@Test @Test
public void testStopDuringStart() public void testStopDuringStart()
throws IOException, KeeperException, InterruptedException { throws IOException, KeeperException, InterruptedException {
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager( HMaster master = new HMaster(TESTUTIL.getConfiguration());
TESTUTIL.getConfiguration());
HMaster master = new HMaster(TESTUTIL.getConfiguration(), cp);
master.start(); master.start();
// Immediately have it stop. We used hang in assigning meta. // Immediately have it stop. We used hang in assigning meta.
master.stopMaster(); master.stopMaster();
@ -148,7 +143,7 @@ public class TestMasterNoCluster {
* @throws IOException * @throws IOException
* @throws KeeperException * @throws KeeperException
* @throws InterruptedException * @throws InterruptedException
* @throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException * @throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException
*/ */
@Ignore @Test // Disabled since HBASE-18511. Reenable when master can carry regions. @Ignore @Test // Disabled since HBASE-18511. Reenable when master can carry regions.
public void testFailover() throws Exception { public void testFailover() throws Exception {
@ -186,15 +181,13 @@ public class TestMasterNoCluster {
// and get notification on transitions. We need to fake out any rpcs the // and get notification on transitions. We need to fake out any rpcs the
// master does opening/closing regions. Also need to fake out the address // master does opening/closing regions. Also need to fake out the address
// of the 'remote' mocked up regionservers. // of the 'remote' mocked up regionservers.
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TESTUTIL.getConfiguration());
// Insert a mock for the connection, use TESTUTIL.getConfiguration rather than // Insert a mock for the connection, use TESTUTIL.getConfiguration rather than
// the conf from the master; the conf will already have an ClusterConnection // the conf from the master; the conf will already have an ClusterConnection
// associate so the below mocking of a connection will fail. // associate so the below mocking of a connection will fail.
final ClusterConnection mockedConnection = HConnectionTestingUtility.getMockedConnectionAndDecorate( final ClusterConnection mockedConnection = HConnectionTestingUtility.getMockedConnectionAndDecorate(
TESTUTIL.getConfiguration(), rs0, rs0, rs0.getServerName(), TESTUTIL.getConfiguration(), rs0, rs0, rs0.getServerName(),
HRegionInfo.FIRST_META_REGIONINFO); HRegionInfo.FIRST_META_REGIONINFO);
HMaster master = new HMaster(conf, cp) { HMaster master = new HMaster(conf) {
InetAddress getRemoteInetAddress(final int port, final long serverStartCode) InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
throws UnknownHostException { throws UnknownHostException {
// Return different address dependent on port passed. // Return different address dependent on port passed.
@ -262,9 +255,7 @@ public class TestMasterNoCluster {
final ServerName deadServer = ServerName.valueOf("test.sample", 1, 100); final ServerName deadServer = ServerName.valueOf("test.sample", 1, 100);
final MockRegionServer rs0 = new MockRegionServer(conf, newServer); final MockRegionServer rs0 = new MockRegionServer(conf, newServer);
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager( HMaster master = new HMaster(conf) {
TESTUTIL.getConfiguration());
HMaster master = new HMaster(conf, cp) {
@Override @Override
MasterMetaBootstrap createMetaBootstrap(final HMaster master, final MonitoredTask status) { MasterMetaBootstrap createMetaBootstrap(final HMaster master, final MonitoredTask status) {
return new MasterMetaBootstrap(this, status) { return new MasterMetaBootstrap(this, status) {

View File

@ -124,10 +124,9 @@ public class TestMetaShutdownHandler {
public static class MyRegionServer extends MiniHBaseClusterRegionServer { public static class MyRegionServer extends MiniHBaseClusterRegionServer {
public MyRegionServer(Configuration conf, CoordinatedStateManager cp) public MyRegionServer(Configuration conf) throws IOException, KeeperException,
throws IOException, KeeperException,
InterruptedException { InterruptedException {
super(conf, cp); super(conf);
} }
@Override @Override

View File

@ -39,7 +39,6 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -48,7 +47,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
@ -57,6 +55,7 @@ import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
@ -105,8 +104,7 @@ public class TestSplitLogManager {
public DummyMasterServices(ZooKeeperWatcher zkw, Configuration conf) { public DummyMasterServices(ZooKeeperWatcher zkw, Configuration conf) {
super(conf); super(conf);
this.zkw = zkw; this.zkw = zkw;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); cm = new ZkCoordinatedStateManager(this);
cm.initialize(this);
} }
@Override @Override

View File

@ -39,9 +39,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequ
public class OOMERegionServer extends HRegionServer { public class OOMERegionServer extends HRegionServer {
private List<Put> retainer = new ArrayList<>(); private List<Put> retainer = new ArrayList<>();
public OOMERegionServer(HBaseConfiguration conf, CoordinatedStateManager cp) public OOMERegionServer(HBaseConfiguration conf) throws IOException, InterruptedException {
throws IOException, InterruptedException { super(conf);
super(conf, cp);
} }
public void put(byte [] regionName, Put put) public void put(byte [] regionName, Put put)

View File

@ -27,8 +27,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.LoadBalancer;
@ -75,11 +73,9 @@ public class TestClusterId {
TEST_UTIL.startMiniDFSCluster(1); TEST_UTIL.startMiniDFSCluster(1);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
//start region server, needs to be separate //start region server, needs to be separate
//so we get an unset clusterId //so we get an unset clusterId
rst = JVMClusterUtil.createRegionServerThread(conf,cp, rst = JVMClusterUtil.createRegionServerThread(conf, HRegionServer.class, 0);
HRegionServer.class, 0);
rst.start(); rst.start();
//Make sure RS is in blocking state //Make sure RS is in blocking state
Thread.sleep(10000); Thread.sleep(10000);
@ -92,7 +88,7 @@ public class TestClusterId {
assertNotNull(clusterId); assertNotNull(clusterId);
assertEquals(clusterId, rst.getRegionServer().getClusterId()); assertEquals(clusterId, rst.getRegionServer().getClusterId());
} }
@Test @Test
public void testRewritingClusterIdToPB() throws Exception { public void testRewritingClusterIdToPB() throws Exception {
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniZKCluster();
@ -115,6 +111,6 @@ public class TestClusterId {
int expected = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration())? 2: 1; int expected = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration())? 2: 1;
assertEquals(expected, master.getServerManager().getOnlineServersList().size()); assertEquals(expected, master.getServerManager().getOnlineServersList().size());
} }
} }

View File

@ -75,10 +75,6 @@ public class TestCompactionInDeadRegionServer {
super(conf); super(conf);
} }
public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) throws IOException {
super(conf, csm);
}
@Override @Override
protected void tryRegionServerReport(long reportStartTime, long reportEndTime) protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
throws IOException { throws IOException {

View File

@ -31,11 +31,9 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Get; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Get;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
@ -65,8 +63,7 @@ public class TestPriorityRpc {
conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf); final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.getDataTestDir(this.getClass().getName()); TEST_UTIL.getDataTestDir(this.getClass().getName());
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf);
regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf, cp);
priority = regionServer.rpcServices.getPriority(); priority = regionServer.rpcServices.getPriority();
} }

View File

@ -191,9 +191,9 @@ public class TestRSKilledWhenInitializing {
* notices and so removes the region from its set of online regionservers. * notices and so removes the region from its set of online regionservers.
*/ */
static class RegisterAndDieRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer { static class RegisterAndDieRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public RegisterAndDieRegionServer(Configuration conf, CoordinatedStateManager cp) public RegisterAndDieRegionServer(Configuration conf)
throws IOException, InterruptedException { throws IOException, InterruptedException {
super(conf, cp); super(conf);
} }
@Override @Override

View File

@ -518,10 +518,8 @@ public class TestRegionMergeTransactionOnCluster {
// Make it public so that JVMClusterUtil can access it. // Make it public so that JVMClusterUtil can access it.
public static class MyMaster extends HMaster { public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp) public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
throws IOException, KeeperException, super(conf);
InterruptedException {
super(conf, cp);
} }
@Override @Override

View File

@ -71,7 +71,7 @@ public class TestRegionServerHostname {
TEST_UTIL.getConfiguration().set(HRegionServer.RS_HOSTNAME_KEY, invalidHostname); TEST_UTIL.getConfiguration().set(HRegionServer.RS_HOSTNAME_KEY, invalidHostname);
HRegionServer hrs = null; HRegionServer hrs = null;
try { try {
hrs = new HRegionServer(TEST_UTIL.getConfiguration(), null); hrs = new HRegionServer(TEST_UTIL.getConfiguration());
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
assertTrue(iae.getMessage(), assertTrue(iae.getMessage(),
iae.getMessage().contains("Failed resolve of " + invalidHostname) || iae.getMessage().contains("Failed resolve of " + invalidHostname) ||

View File

@ -164,10 +164,9 @@ public class TestRegionServerReportForDuty {
private boolean rpcStubCreatedFlag = false; private boolean rpcStubCreatedFlag = false;
private boolean masterChanged = false; private boolean masterChanged = false;
public MyRegionServer(Configuration conf, CoordinatedStateManager cp) public MyRegionServer(Configuration conf) throws IOException, KeeperException,
throws IOException, KeeperException,
InterruptedException { InterruptedException {
super(conf, cp); super(conf);
} }
@Override @Override

View File

@ -397,11 +397,6 @@ public class TestScannerHeartbeatMessages {
super(conf); super(conf);
} }
public HeartbeatHRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException {
super(conf, csm);
}
@Override @Override
protected RSRpcServices createRpcServices() throws IOException { protected RSRpcServices createRpcServices() throws IOException {
return new HeartbeatRPCServices(this); return new HeartbeatRPCServices(this);

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
@ -88,8 +88,7 @@ public class TestSplitLogWorker {
public DummyServer(ZooKeeperWatcher zkw, Configuration conf) { public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
this.zkw = zkw; this.zkw = zkw;
this.conf = conf; this.conf = conf;
cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); cm = new ZkCoordinatedStateManager(this);
cm.initialize(this);
} }
@Override @Override

View File

@ -951,10 +951,8 @@ public class TestSplitTransactionOnCluster {
// Make it public so that JVMClusterUtil can access it. // Make it public so that JVMClusterUtil can access it.
public static class MyMaster extends HMaster { public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, CoordinatedStateManager cp) public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
throws IOException, KeeperException, super(conf);
InterruptedException {
super(conf, cp);
} }
@Override @Override

View File

@ -906,7 +906,7 @@ public abstract class AbstractTestWALReplay {
assertNotNull(listStatus); assertNotNull(listStatus);
assertTrue(listStatus.length > 0); assertTrue(listStatus.length > 0);
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, mode, wals); this.fs, this.conf, null, null, null, null, mode, wals);
FileStatus[] listStatus1 = this.fs.listStatus( FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
"recovered.edits")), new PathFilter() { "recovered.edits")), new PathFilter() {
@ -1059,9 +1059,9 @@ public abstract class AbstractTestWALReplay {
first = fs.getFileStatus(smallFile); first = fs.getFileStatus(smallFile);
second = fs.getFileStatus(largeFile); second = fs.getFileStatus(largeFile);
} }
WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, null,
RecoveryMode.LOG_SPLITTING, wals); RecoveryMode.LOG_SPLITTING, wals);
WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, null,
RecoveryMode.LOG_SPLITTING, wals); RecoveryMode.LOG_SPLITTING, wals);
WAL wal = createWAL(this.conf, hbaseRootDir, logName); WAL wal = createWAL(this.conf, hbaseRootDir, logName);
region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);

View File

@ -277,11 +277,6 @@ public class TestReplicationSource {
super(conf); super(conf);
} }
public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException, InterruptedException {
super(conf, csm);
}
@Override @Override
protected void stopServiceThreads() { protected void stopServiceThreads() {
// Add a delay before service threads are shutdown. // Add a delay before service threads are shutdown.

View File

@ -134,7 +134,7 @@ public class TestWALReaderOnSecureWAL {
wal.sync(); wal.sync();
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
wal.shutdown(); wal.shutdown();
return walPath; return walPath;
} finally { } finally {
// restore the cell codec class // restore the cell codec class
@ -182,11 +182,11 @@ public class TestWALReaderOnSecureWAL {
} }
FileStatus[] listStatus = fs.listStatus(walPath.getParent()); FileStatus[] listStatus = fs.listStatus(walPath.getParent());
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
try { try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode); WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, null, mode);
s.splitLogFile(listStatus[0], null); s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt"); "corrupt");
@ -229,11 +229,11 @@ public class TestWALReaderOnSecureWAL {
} }
FileStatus[] listStatus = fs.listStatus(walPath.getParent()); FileStatus[] listStatus = fs.listStatus(walPath.getParent());
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
try { try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode); WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, null, mode);
s.splitLogFile(listStatus[0], null); s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt"); "corrupt");

View File

@ -372,8 +372,7 @@ public class TestWALSplit {
} }
/** /**
* @throws IOException * {@see https://issues.apache.org/jira/browse/HBASE-3020}
* @see https://issues.apache.org/jira/browse/HBASE-3020
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testRecoveredEditsPathForMeta() throws IOException { public void testRecoveredEditsPathForMeta() throws IOException {
@ -805,7 +804,7 @@ public class TestWALSplit {
assertTrue("There should be some log greater than size 0.", 0 < largestSize); assertTrue("There should be some log greater than size 0.", 0 < largestSize);
// Set up a splitter that will throw an IOE on the output side // Set up a splitter that will throw an IOE on the output side
WALSplitter logSplitter = new WALSplitter(wals, WALSplitter logSplitter = new WALSplitter(wals,
conf, HBASEDIR, fs, null, null, this.mode) { conf, HBASEDIR, fs, null, null, null, this.mode) {
@Override @Override
protected Writer createWriter(Path logfile) throws IOException { protected Writer createWriter(Path logfile) throws IOException {
Writer mockWriter = Mockito.mock(Writer.class); Writer mockWriter = Mockito.mock(Writer.class);
@ -932,7 +931,7 @@ public class TestWALSplit {
try { try {
conf.setInt("hbase.splitlog.report.period", 1000); conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = WALSplitter.splitLogFile( boolean ret = WALSplitter.splitLogFile(
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode, wals); HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode, wals);
assertFalse("Log splitting should failed", ret); assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0); assertTrue(count.get() > 0);
} catch (IOException e) { } catch (IOException e) {
@ -991,7 +990,7 @@ public class TestWALSplit {
// Create a splitter that reads and writes the data without touching disk // Create a splitter that reads and writes the data without touching disk
WALSplitter logSplitter = new WALSplitter(wals, WALSplitter logSplitter = new WALSplitter(wals,
localConf, HBASEDIR, fs, null, null, this.mode) { localConf, HBASEDIR, fs, null, null, null, this.mode) {
/* Produce a mock writer that doesn't write anywhere */ /* Produce a mock writer that doesn't write anywhere */
@Override @Override
@ -1123,8 +1122,7 @@ public class TestWALSplit {
} }
/** /**
* @throws IOException * {@see https://issues.apache.org/jira/browse/HBASE-4862}
* @see https://issues.apache.org/jira/browse/HBASE-4862
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
@ -1142,7 +1140,7 @@ public class TestWALSplit {
logfiles != null && logfiles.length > 0); logfiles != null && logfiles.length > 0);
WALSplitter logSplitter = new WALSplitter(wals, WALSplitter logSplitter = new WALSplitter(wals,
conf, HBASEDIR, fs, null, null, this.mode) { conf, HBASEDIR, fs, null, null, null, this.mode) {
@Override @Override
protected Writer createWriter(Path logfile) protected Writer createWriter(Path logfile)
throws IOException { throws IOException {

View File

@ -2024,17 +2024,6 @@ A comma-separated list of
.Default .Default
`` ``
[[hbase.coordinated.state.manager.class]]
*`hbase.coordinated.state.manager.class`*::
+
.Description
Fully qualified name of class implementing coordinated state manager.
+
.Default
`org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager`
[[hbase.regionserver.storefile.refresh.period]] [[hbase.regionserver.storefile.refresh.period]]
*`hbase.regionserver.storefile.refresh.period`*:: *`hbase.regionserver.storefile.refresh.period`*::
+ +