HBASE-16040 Remove configuration "hbase.replication"
This commit is contained in:
parent
f4cec2e202
commit
68c1b34dbc
|
@ -113,11 +113,6 @@ public class ReplicationAdmin implements Closeable {
|
|||
* @throws RuntimeException if replication isn't enabled.
|
||||
*/
|
||||
public ReplicationAdmin(Configuration conf) throws IOException {
|
||||
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT)) {
|
||||
throw new RuntimeException("hbase.replication isn't true, please " +
|
||||
"enable it in order to use replication");
|
||||
}
|
||||
this.connection = ConnectionFactory.createConnection(conf);
|
||||
try {
|
||||
zkw = createZooKeeperWatcher();
|
||||
|
|
|
@ -838,10 +838,6 @@ public final class HConstants {
|
|||
/*
|
||||
* cluster replication constants.
|
||||
*/
|
||||
public static final String
|
||||
REPLICATION_ENABLE_KEY = "hbase.replication";
|
||||
public static final boolean
|
||||
REPLICATION_ENABLE_DEFAULT = true;
|
||||
public static final String
|
||||
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
|
||||
public static final String
|
||||
|
|
|
@ -1518,8 +1518,6 @@ possible configurations would overwhelm and obscure the important.
|
|||
have region replication > 1. If this is enabled once, disabling this replication also
|
||||
requires disabling the replication peer using shell or ReplicationAdmin java class.
|
||||
Replication to secondary region replicas works over standard inter-cluster replication.
|
||||
So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"
|
||||
to true for this feature to work.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
|
|
|
@ -98,7 +98,6 @@ public class IntegrationTestRegionReplicaReplication extends IntegrationTestInge
|
|||
|
||||
// enable async wal replication to region replicas for unit tests
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
|
||||
|
||||
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB
|
||||
conf.setInt("hbase.hstore.blockingStoreFiles", 100);
|
||||
|
|
|
@ -267,10 +267,6 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
if (!doCommandLine(args)) {
|
||||
return null;
|
||||
}
|
||||
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT)) {
|
||||
throw new IOException("Replication needs to be enabled to verify it.");
|
||||
}
|
||||
conf.set(NAME+".peerId", peerId);
|
||||
conf.set(NAME+".tableName", tableName);
|
||||
conf.setLong(NAME+".startTime", startTime);
|
||||
|
|
|
@ -2651,12 +2651,6 @@ public class HRegionServer extends HasThread implements
|
|||
static private void createNewReplicationInstance(Configuration conf,
|
||||
HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
|
||||
|
||||
// If replication is not enabled, then return immediately.
|
||||
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// read in the name of the source replication class from the config file.
|
||||
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
|
||||
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
|
||||
|
|
|
@ -117,12 +117,11 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
|
|||
@Override
|
||||
public void setConf(Configuration config) {
|
||||
// If either replication or replication of bulk load hfiles is disabled, keep all members null
|
||||
if (!(config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT) && config.getBoolean(
|
||||
if (!(config.getBoolean(
|
||||
HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) {
|
||||
LOG.warn(HConstants.REPLICATION_ENABLE_KEY
|
||||
+ " is not enabled so allowing all hfile references to be deleted. Better to remove "
|
||||
LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
|
||||
+ " is not enabled. Better to remove "
|
||||
+ ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
|
||||
+ " configuration.");
|
||||
return;
|
||||
|
|
|
@ -91,12 +91,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
|
|||
|
||||
@Override
|
||||
public void setConf(Configuration config) {
|
||||
// If replication is disabled, keep all members null
|
||||
if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT)) {
|
||||
LOG.warn("Not configured - allowing all wals to be deleted");
|
||||
return;
|
||||
}
|
||||
// Make my own Configuration. Then I'll have my own connection to zk that
|
||||
// I can close myself when comes time.
|
||||
Configuration conf = new Configuration(config);
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
|
||||
import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -72,7 +71,6 @@ public class Replication extends WALActionsListener.Base implements
|
|||
ReplicationSourceService, ReplicationSinkService {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(Replication.class);
|
||||
private boolean replication;
|
||||
private boolean replicationForBulkLoadData;
|
||||
private ReplicationSourceManager replicationManager;
|
||||
private ReplicationQueues replicationQueues;
|
||||
|
@ -110,7 +108,6 @@ public class Replication extends WALActionsListener.Base implements
|
|||
final Path logDir, final Path oldLogDir) throws IOException {
|
||||
this.server = server;
|
||||
this.conf = this.server.getConfiguration();
|
||||
this.replication = isReplication(this.conf);
|
||||
this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
|
||||
this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
|
||||
new ThreadFactoryBuilder()
|
||||
|
@ -125,49 +122,34 @@ public class Replication extends WALActionsListener.Base implements
|
|||
+ " is set to true.");
|
||||
}
|
||||
}
|
||||
if (replication) {
|
||||
try {
|
||||
this.replicationQueues =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
|
||||
server.getZooKeeper()));
|
||||
this.replicationQueues.init(this.server.getServerName().toString());
|
||||
this.replicationPeers =
|
||||
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
|
||||
this.replicationPeers.init();
|
||||
this.replicationTracker =
|
||||
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
|
||||
this.conf, this.server, this.server);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed replication handler create", e);
|
||||
}
|
||||
UUID clusterId = null;
|
||||
try {
|
||||
clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
|
||||
} catch (KeeperException ke) {
|
||||
throw new IOException("Could not read cluster id", ke);
|
||||
}
|
||||
this.replicationManager =
|
||||
new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
|
||||
conf, this.server, fs, logDir, oldLogDir, clusterId);
|
||||
this.statsThreadPeriod =
|
||||
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
|
||||
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
|
||||
this.replicationLoad = new ReplicationLoad();
|
||||
} else {
|
||||
this.replicationManager = null;
|
||||
this.replicationQueues = null;
|
||||
this.replicationPeers = null;
|
||||
this.replicationTracker = null;
|
||||
this.replicationLoad = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c Configuration to look at
|
||||
* @return True if replication is enabled.
|
||||
*/
|
||||
public static boolean isReplication(final Configuration c) {
|
||||
return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
try {
|
||||
this.replicationQueues =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
|
||||
server.getZooKeeper()));
|
||||
this.replicationQueues.init(this.server.getServerName().toString());
|
||||
this.replicationPeers =
|
||||
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
|
||||
this.replicationPeers.init();
|
||||
this.replicationTracker =
|
||||
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
|
||||
this.conf, this.server, this.server);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed replication handler create", e);
|
||||
}
|
||||
UUID clusterId = null;
|
||||
try {
|
||||
clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
|
||||
} catch (KeeperException ke) {
|
||||
throw new IOException("Could not read cluster id", ke);
|
||||
}
|
||||
this.replicationManager =
|
||||
new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
|
||||
conf, this.server, fs, logDir, oldLogDir, clusterId);
|
||||
this.statsThreadPeriod =
|
||||
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
|
||||
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
|
||||
this.replicationLoad = new ReplicationLoad();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -196,11 +178,9 @@ public class Replication extends WALActionsListener.Base implements
|
|||
* Join with the replication threads
|
||||
*/
|
||||
public void join() {
|
||||
if (this.replication) {
|
||||
this.replicationManager.join();
|
||||
if (this.replicationSink != null) {
|
||||
this.replicationSink.stopReplicationSinkServices();
|
||||
}
|
||||
this.replicationManager.join();
|
||||
if (this.replicationSink != null) {
|
||||
this.replicationSink.stopReplicationSinkServices();
|
||||
}
|
||||
scheduleThreadPool.shutdown();
|
||||
}
|
||||
|
@ -221,10 +201,8 @@ public class Replication extends WALActionsListener.Base implements
|
|||
public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
|
||||
String replicationClusterId, String sourceBaseNamespaceDirPath,
|
||||
String sourceHFileArchiveDirPath) throws IOException {
|
||||
if (this.replication) {
|
||||
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
|
||||
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
|
||||
}
|
||||
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
|
||||
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -233,17 +211,15 @@ public class Replication extends WALActionsListener.Base implements
|
|||
* @throws IOException
|
||||
*/
|
||||
public void startReplicationService() throws IOException {
|
||||
if (this.replication) {
|
||||
try {
|
||||
this.replicationManager.init();
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
this.replicationSink = new ReplicationSink(this.conf, this.server);
|
||||
this.scheduleThreadPool.scheduleAtFixedRate(
|
||||
new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
|
||||
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
|
||||
try {
|
||||
this.replicationManager.init();
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
this.replicationSink = new ReplicationSink(this.conf, this.server);
|
||||
this.scheduleThreadPool.scheduleAtFixedRate(
|
||||
new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
|
||||
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -335,9 +311,6 @@ public class Replication extends WALActionsListener.Base implements
|
|||
* @param conf
|
||||
*/
|
||||
public static void decorateMasterConfiguration(Configuration conf) {
|
||||
if (!isReplication(conf)) {
|
||||
return;
|
||||
}
|
||||
String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
|
||||
String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
|
||||
if (!plugins.contains(cleanerClass)) {
|
||||
|
|
|
@ -47,8 +47,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
|
|||
* have region replication > 1. If this is enabled once, disabling this replication also
|
||||
* requires disabling the replication peer using shell or ReplicationAdmin java class.
|
||||
* Replication to secondary region replicas works over standard inter-cluster replication.·
|
||||
* So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"·
|
||||
* to true for this feature to work.
|
||||
*/
|
||||
public static final String REGION_REPLICA_REPLICATION_CONF_KEY
|
||||
= "hbase.region.replica.replication.enabled";
|
||||
|
|
|
@ -76,7 +76,6 @@ public class TestReplicationAdmin {
|
|||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
admin = new ReplicationAdmin(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,6 @@ public class TestReplicationAdminWithTwoDifferentZKClusters {
|
|||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
utility1 = new HBaseTestingUtility(conf1);
|
||||
utility1.startMiniCluster();
|
||||
admin = new ReplicationAdmin(conf1);
|
||||
|
|
|
@ -92,7 +92,6 @@ public class TestLogsCleaner {
|
|||
// set TTL
|
||||
long ttl = 10000;
|
||||
conf.setLong("hbase.master.logcleaner.ttl", ttl);
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
Replication.decorateMasterConfiguration(conf);
|
||||
Server server = new DummyServer();
|
||||
ReplicationQueues repQueues =
|
||||
|
|
|
@ -96,7 +96,6 @@ public class TestClusterId {
|
|||
TEST_UTIL.startMiniZKCluster();
|
||||
TEST_UTIL.startMiniDFSCluster(1);
|
||||
TEST_UTIL.createRootDir();
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.replication", true);
|
||||
Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
|
||||
FileSystem fs = rootDir.getFileSystem(TEST_UTIL.getConfiguration());
|
||||
Path filePath = new Path(rootDir, HConstants.CLUSTER_ID_FILE_NAME);
|
||||
|
|
|
@ -108,7 +108,6 @@ public class TestRegionReplicaFailover {
|
|||
Configuration conf = HTU.getConfiguration();
|
||||
// Up the handlers; this test needs more than usual.
|
||||
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
|
||||
conf.setInt("replication.stats.thread.period.seconds", 5);
|
||||
|
|
|
@ -117,8 +117,6 @@ public class TestMasterReplication {
|
|||
baseConfiguration.setLong("replication.source.sleepforretries", 100);
|
||||
baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
|
||||
baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
|
||||
baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
|
||||
baseConfiguration.set("hbase.replication.source.fs.conf.provider",
|
||||
TestSourceFSConfigurationProvider.class.getCanonicalName());
|
||||
|
@ -429,29 +427,6 @@ public class TestMasterReplication {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Test RSRpcServices#replicateWALEntry when replication is disabled. This is to simulate
|
||||
* HBASE-14840
|
||||
*/
|
||||
@Test(timeout = 180000, expected = ServiceException.class)
|
||||
public void testReplicateWALEntryWhenReplicationIsDisabled() throws Exception {
|
||||
LOG.info("testSimplePutDelete");
|
||||
baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
|
||||
Table[] htables = null;
|
||||
try {
|
||||
startMiniClusters(1);
|
||||
createTableOnClusters(table);
|
||||
htables = getHTablesOnClusters(tableName);
|
||||
|
||||
HRegionServer rs = utilities[0].getRSForFirstRegionInTable(tableName);
|
||||
RSRpcServices rsrpc = new RSRpcServices(rs);
|
||||
rsrpc.replicateWALEntry(null, null);
|
||||
} finally {
|
||||
close(htables);
|
||||
shutDownMiniClusters();
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
configurations = null;
|
||||
|
|
|
@ -88,7 +88,6 @@ public class TestMultiSlaveReplication {
|
|||
conf1.setLong("replication.source.sleepforretries", 100);
|
||||
conf1.setInt("hbase.regionserver.maxlogs", 10);
|
||||
conf1.setLong("hbase.master.logcleaner.ttl", 10);
|
||||
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf1.setBoolean("dfs.support.append", true);
|
||||
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
|
|
|
@ -102,7 +102,6 @@ public class TestPerTableCFReplication {
|
|||
conf1.setLong("replication.source.sleepforretries", 100);
|
||||
conf1.setInt("hbase.regionserver.maxlogs", 10);
|
||||
conf1.setLong("hbase.master.logcleaner.ttl", 10);
|
||||
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf1.setBoolean("dfs.support.append", true);
|
||||
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
|
|
|
@ -97,7 +97,6 @@ public class TestReplicationBase {
|
|||
conf1.setLong("hbase.master.logcleaner.ttl", 10);
|
||||
conf1.setInt("zookeeper.recovery.retry", 1);
|
||||
conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
|
||||
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf1.setBoolean("dfs.support.append", true);
|
||||
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||
conf1.setInt("replication.stats.thread.period.seconds", 5);
|
||||
|
@ -120,7 +119,6 @@ public class TestReplicationBase {
|
|||
conf2 = HBaseConfiguration.create(conf1);
|
||||
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf2.setBoolean("dfs.support.append", true);
|
||||
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
|
||||
|
|
|
@ -89,7 +89,6 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
conf.setLong("hbase.master.logcleaner.ttl", 10);
|
||||
conf.setInt("zookeeper.recovery.retry", 1);
|
||||
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
|
||||
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||
conf.setInt("replication.stats.thread.period.seconds", 5);
|
||||
|
|
|
@ -98,7 +98,6 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
|
|||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
Configuration conf = HTU.getConfiguration();
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
|
||||
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
|
||||
|
||||
|
|
|
@ -116,8 +116,6 @@ public class TestReplicationSink {
|
|||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
|
||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
|
||||
TestSourceFSConfigurationProvider.class.getCanonicalName());
|
||||
|
||||
|
|
|
@ -54,8 +54,6 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
|
|||
conf = HBaseConfiguration.create();
|
||||
conf.set("replication.replicationsource.implementation",
|
||||
ReplicationSourceDummy.class.getCanonicalName());
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf.setLong("replication.sleep.before.failover", 2000);
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 10);
|
||||
utility = new HBaseTestingUtility(conf);
|
||||
|
|
|
@ -43,8 +43,6 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS
|
|||
conf = HBaseConfiguration.create();
|
||||
conf.set("replication.replicationsource.implementation",
|
||||
ReplicationSourceDummy.class.getCanonicalName());
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
|
||||
HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf.setLong("replication.sleep.before.failover", 2000);
|
||||
conf.setInt("replication.source.maxretriesmultiplier", 10);
|
||||
|
||||
|
|
|
@ -93,7 +93,6 @@ public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit
|
|||
conf.setInt("replication.stats.thread.period.seconds", 5);
|
||||
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
setVisibilityLabelServiceImpl(conf, ExpAsStringVisibilityLabelServiceImpl.class);
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
|
||||
VisibilityTestUtil.enableVisiblityLabels(conf);
|
||||
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
|
||||
|
@ -124,7 +123,6 @@ public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit
|
|||
conf1.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
conf1.setBoolean("dfs.support.append", true);
|
||||
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
|
||||
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
TestCoprocessorForTagsAtSink.class.getName());
|
||||
|
|
|
@ -143,7 +143,6 @@ public class TestVisibilityLabelsReplication {
|
|||
conf.setInt("replication.stats.thread.period.seconds", 5);
|
||||
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
setVisibilityLabelServiceImpl(conf);
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
|
||||
VisibilityTestUtil.enableVisiblityLabels(conf);
|
||||
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
|
||||
|
@ -173,7 +172,6 @@ public class TestVisibilityLabelsReplication {
|
|||
conf1.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
conf1.setBoolean("dfs.support.append", true);
|
||||
conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
|
||||
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
|
||||
conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
|
||||
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
TestCoprocessorForTagsAtSink.class.getName());
|
||||
|
|
|
@ -709,43 +709,37 @@ module Hbase
|
|||
puts(" %s" % [ server ])
|
||||
end
|
||||
elsif format == "replication"
|
||||
#check whether replication is enabled or not
|
||||
if (!@admin.getConfiguration().getBoolean(org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_KEY,
|
||||
org.apache.hadoop.hbase.HConstants::REPLICATION_ENABLE_DEFAULT))
|
||||
puts("Please enable replication first.")
|
||||
else
|
||||
puts("version %s" % [ status.getHBaseVersion() ])
|
||||
puts("%d live servers" % [ status.getServersSize() ])
|
||||
for server in status.getServers()
|
||||
sl = status.getLoad(server)
|
||||
rSinkString = " SINK :"
|
||||
rSourceString = " SOURCE:"
|
||||
rLoadSink = sl.getReplicationLoadSink()
|
||||
rSinkString << " AgeOfLastAppliedOp=" + rLoadSink.getAgeOfLastAppliedOp().to_s
|
||||
rSinkString << ", TimeStampsOfLastAppliedOp=" +
|
||||
(java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp())).toString()
|
||||
rLoadSourceList = sl.getReplicationLoadSourceList()
|
||||
index = 0
|
||||
while index < rLoadSourceList.size()
|
||||
rLoadSource = rLoadSourceList.get(index)
|
||||
rSourceString << " PeerID=" + rLoadSource.getPeerID()
|
||||
rSourceString << ", AgeOfLastShippedOp=" + rLoadSource.getAgeOfLastShippedOp().to_s
|
||||
rSourceString << ", SizeOfLogQueue=" + rLoadSource.getSizeOfLogQueue().to_s
|
||||
rSourceString << ", TimeStampsOfLastShippedOp=" +
|
||||
(java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp())).toString()
|
||||
rSourceString << ", Replication Lag=" + rLoadSource.getReplicationLag().to_s
|
||||
index = index + 1
|
||||
end
|
||||
puts(" %s:" %
|
||||
[ server.getHostname() ])
|
||||
if type.casecmp("SOURCE") == 0
|
||||
puts("%s" % rSourceString)
|
||||
elsif type.casecmp("SINK") == 0
|
||||
puts("%s" % rSinkString)
|
||||
else
|
||||
puts("%s" % rSourceString)
|
||||
puts("%s" % rSinkString)
|
||||
end
|
||||
puts("version %s" % [ status.getHBaseVersion() ])
|
||||
puts("%d live servers" % [ status.getServersSize() ])
|
||||
for server in status.getServers()
|
||||
sl = status.getLoad(server)
|
||||
rSinkString = " SINK :"
|
||||
rSourceString = " SOURCE:"
|
||||
rLoadSink = sl.getReplicationLoadSink()
|
||||
rSinkString << " AgeOfLastAppliedOp=" + rLoadSink.getAgeOfLastAppliedOp().to_s
|
||||
rSinkString << ", TimeStampsOfLastAppliedOp=" +
|
||||
(java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp())).toString()
|
||||
rLoadSourceList = sl.getReplicationLoadSourceList()
|
||||
index = 0
|
||||
while index < rLoadSourceList.size()
|
||||
rLoadSource = rLoadSourceList.get(index)
|
||||
rSourceString << " PeerID=" + rLoadSource.getPeerID()
|
||||
rSourceString << ", AgeOfLastShippedOp=" + rLoadSource.getAgeOfLastShippedOp().to_s
|
||||
rSourceString << ", SizeOfLogQueue=" + rLoadSource.getSizeOfLogQueue().to_s
|
||||
rSourceString << ", TimeStampsOfLastShippedOp=" +
|
||||
(java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp())).toString()
|
||||
rSourceString << ", Replication Lag=" + rLoadSource.getReplicationLag().to_s
|
||||
index = index + 1
|
||||
end
|
||||
puts(" %s:" %
|
||||
[ server.getHostname() ])
|
||||
if type.casecmp("SOURCE") == 0
|
||||
puts("%s" % rSourceString)
|
||||
elsif type.casecmp("SINK") == 0
|
||||
puts("%s" % rSinkString)
|
||||
else
|
||||
puts("%s" % rSourceString)
|
||||
puts("%s" % rSinkString)
|
||||
end
|
||||
end
|
||||
elsif format == "simple"
|
||||
|
|
|
@ -2564,7 +2564,7 @@ Instead you can change the number of region replicas per table to increase or de
|
|||
<name>hbase.region.replica.replication.enabled</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Whether asynchronous WAL replication to the secondary region replicas is enabled or not. If this is enabled, a replication peer named "region_replica_replication" will be created which will tail the logs and replicate the mutations to region replicas for tables that have region replication > 1. If this is enabled once, disabling this replication also requires disabling the replication peer using shell or ReplicationAdmin java class. Replication to secondary region replicas works over standard inter-cluster replication. So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"· to true for this feature to work.
|
||||
Whether asynchronous WAL replication to the secondary region replicas is enabled or not. If this is enabled, a replication peer named "region_replica_replication" will be created which will tail the logs and replicate the mutations to region replicas for tables that have region replication > 1. If this is enabled once, disabling this replication also requires disabling the replication peer using shell or ReplicationAdmin java class. Replication to secondary region replicas works over standard inter-cluster replication.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
|
|
|
@ -2063,8 +2063,7 @@ Fully qualified name of class implementing coordinated state manager.
|
|||
have region replication > 1. If this is enabled once, disabling this replication also
|
||||
requires disabling the replication peer using shell or ReplicationAdmin java class.
|
||||
Replication to secondary region replicas works over standard inter-cluster replication.
|
||||
So replication, if disabled explicitly, also has to be enabled by setting "hbase.replication"
|
||||
to true for this feature to work.
|
||||
|
||||
|
||||
+
|
||||
.Default
|
||||
|
|
|
@ -1648,11 +1648,6 @@ The following metrics are exposed at the global region server level and (since H
|
|||
| The name of the rs znode
|
||||
| rs
|
||||
|
||||
| hbase.replication
|
||||
| Whether replication is enabled or disabled on a given
|
||||
cluster
|
||||
| true
|
||||
|
||||
| replication.sleep.before.failover
|
||||
| How many milliseconds a worker should sleep before attempting to replicate
|
||||
a dead region server's WAL queues.
|
||||
|
|
Loading…
Reference in New Issue