diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 006f4528675..29121651533 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1918,7 +1918,6 @@ public class HRegionServer extends Thread implements */ private void setupWALAndReplication() throws IOException { WALFactory factory = new WALFactory(conf, serverName.toString(), (Server)this); - // TODO Replication make assumptions here based on the default filesystem impl Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); @@ -1935,8 +1934,7 @@ public class HRegionServer extends Thread implements throw new IOException("Can not create wal directory " + logDir); } // Instantiate replication if replication enabled. Pass it the log directories. - createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, - factory.getWALProvider()); + createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); this.walFactory = factory; } @@ -3077,12 +3075,11 @@ public class HRegionServer extends Thread implements * Load the replication executorService objects, if any */ private static void createNewReplicationInstance(Configuration conf, HRegionServer server, - FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { + FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { return; } - // read in the name of the source replication class from the config file. String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); @@ -3095,19 +3092,19 @@ public class HRegionServer extends Thread implements // only one object. if (sourceClassname.equals(sinkClassname)) { server.replicationSourceHandler = newReplicationInstance(sourceClassname, - ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; } else { server.replicationSourceHandler = newReplicationInstance(sourceClassname, - ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); server.replicationSinkHandler = newReplicationInstance(sinkClassname, - ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); } } private static T newReplicationInstance(String classname, Class xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, - Path oldLogDir, WALProvider walProvider) throws IOException { + Path oldLogDir, WALFactory walFactory) throws IOException { final Class clazz; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); @@ -3116,7 +3113,7 @@ public class HRegionServer extends Thread implements throw new IOException("Could not find class for " + classname); } T service = ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, walFs, logDir, oldLogDir, walProvider); + service.initialize(server, walFs, logDir, oldLogDir, walFactory); return service; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index e9bbaea8ae4..33b3321755f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; -import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.yetus.audience.InterfaceAudience; /** @@ -32,14 +32,11 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public interface ReplicationService { - /** * Initializes the replication service object. - * @param walProvider can be null if not initialized inside a live region server environment, for - * example, {@code ReplicationSyncUp}. */ - void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) - throws IOException; + void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory) + throws IOException; /** * Start replication services. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 10ddd0c4946..136dcd40012 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; @@ -85,7 +86,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer @Override public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, - WALProvider walProvider) throws IOException { + WALFactory walFactory) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.isReplicationForBulkLoadDataEnabled = @@ -123,8 +124,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } this.globalMetricsSource = CompatibilitySingletonFactory .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); - this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, - this.server, fs, logDir, oldLogDir, clusterId, + WALProvider walProvider = walFactory.getWALProvider(); + this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, + replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), globalMetricsSource); if (walProvider != null) { @@ -187,7 +189,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer /** * If replication is enabled and this cluster is a master, * it starts - * @throws IOException */ @Override public void startReplicationService() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 5e3a09f5e34..a8fd64048e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -82,7 +83,8 @@ public class ReplicationSyncUp extends Configured implements Tool { System.out.println("Start Replication Server start"); Replication replication = new Replication(); - replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); + replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, + new WALFactory(conf, "test", null)); ReplicationSourceManager manager = replication.getReplicationManager(); manager.init().get(); while (manager.activeFailoverTaskCount() > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index e76c2220076..b017a896579 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 1e40457e674..5f7a1c299ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -182,7 +182,8 @@ public abstract class TestReplicationSourceManager { logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME); replication = new Replication(); - replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); + replication.initialize(new DummyServer(), fs, logDir, oldLogDir, + new WALFactory(conf, "test", null)); managerOfCluster = getManagerFromCluster(); if (managerOfCluster != null) { // After replication procedure, we need to add peer by hand (other than by receiving