HBASE-19999 Remove the SYNC_REPLICATION_ENABLED flag

This commit is contained in:
Guanghao Zhang 2018-03-09 11:30:25 +08:00 committed by zhangduo
parent 183b8d0581
commit c7d1085fa2
7 changed files with 38 additions and 19 deletions

View File

@ -37,8 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationUtils {
public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled";
public static final String REPLICATION_ATTR_NAME = "__rep__";
public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";

View File

@ -1804,10 +1804,8 @@ public class HRegionServer extends HasThread implements
private void setupWALAndReplication() throws IOException {
boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
(!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf));
if (isMasterNoTableOrSystemTableOnly) {
conf.setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false);
}
WALFactory factory = new WALFactory(conf, serverName.toString());
WALFactory factory =
new WALFactory(conf, serverName.toString(), !isMasterNoTableOrSystemTableOnly);
if (!isMasterNoTableOrSystemTableOnly) {
// TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
@ -1931,11 +1929,8 @@ public class HRegionServer extends HasThread implements
}
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
if (conf.getBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false)) {
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 2));
}
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1));
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
@ -67,7 +68,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private final WALProvider provider;
private SyncReplicationPeerInfoProvider peerInfoProvider;
private SyncReplicationPeerInfoProvider peerInfoProvider =
new DefaultSyncReplicationPeerInfoProvider();
private WALFactory factory;
@ -235,4 +237,19 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
safeClose(peerId2WAL.remove(peerId));
}
}
private static class DefaultSyncReplicationPeerInfoProvider
implements SyncReplicationPeerInfoProvider {
@Override
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
return Optional.empty();
}
@Override
public boolean checkState(RegionInfo info,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
return false;
}
}
}

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@ -151,6 +150,21 @@ public class WALFactory {
* to make a directory
*/
public WALFactory(Configuration conf, String factoryId) throws IOException {
// default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
// for HMaster or HRegionServer which take system table only. See HBASE-19999
this(conf, factoryId, true);
}
/**
* @param conf must not be null, will keep a reference to read params in later reader/writer
* instances.
* @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
* to make a directory
* @param enableSyncReplicationWALProvider whether wrap the wal provider to a
* {@link SyncReplicationWALProvider}
*/
public WALFactory(Configuration conf, String factoryId, boolean enableSyncReplicationWALProvider)
throws IOException {
// until we've moved reader/writer construction down into providers, this initialization must
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
@ -162,7 +176,7 @@ public class WALFactory {
// end required early initialization
if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
if (conf.getBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false)) {
if (enableSyncReplicationWALProvider) {
provider = new SyncReplicationWALProvider(provider);
}
provider.init(this, conf, null);

View File

@ -84,7 +84,6 @@ public class TestSyncReplication {
private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
util.setZkCluster(ZK_UTIL.getZkCluster());
Configuration conf = util.getConfiguration();
conf.setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
conf.setInt("replication.source.size.capacity", 102400);
conf.setLong("replication.source.sleepforretries", 100);

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -102,7 +101,6 @@ public class TestRecoverStandbyProcedure {
@BeforeClass
public static void setupCluster() throws Exception {
UTIL.getConfiguration().setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
UTIL.startMiniCluster(RS_NUMBER);
UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
conf = UTIL.getConfiguration();

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -94,7 +93,6 @@ public class TestSyncReplicationWALProvider {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
UTIL.startMiniDFSCluster(3);
FACTORY = new WALFactory(UTIL.getConfiguration(), "test");
((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());