diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f14da2f6a17..8abede5b272 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1911,8 +1911,7 @@ public class HRegionServer extends Thread implements throw new IOException("Can not create wal directory " + logDir); } // Instantiate replication if replication enabled. Pass it the log directories. - createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, - factory.getWALProvider()); + createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); } this.walFactory = factory; } @@ -3063,7 +3062,7 @@ public class HRegionServer extends Thread implements * Load the replication executorService objects, if any */ private static void createNewReplicationInstance(Configuration conf, HRegionServer server, - FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { + FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { // read in the name of the source replication class from the config file. String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); @@ -3076,19 +3075,19 @@ public class HRegionServer extends Thread implements // only one object. if (sourceClassname.equals(sinkClassname)) { server.replicationSourceHandler = newReplicationInstance(sourceClassname, - ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; } else { server.replicationSourceHandler = newReplicationInstance(sourceClassname, - ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); server.replicationSinkHandler = newReplicationInstance(sinkClassname, - ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); } } private static T newReplicationInstance(String classname, Class xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, - Path oldLogDir, WALProvider walProvider) throws IOException { + Path oldLogDir, WALFactory walFactory) throws IOException { final Class clazz; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); @@ -3097,7 +3096,7 @@ public class HRegionServer extends Thread implements throw new IOException("Could not find class for " + classname); } T service = ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, walFs, logDir, oldLogDir, walProvider); + service.initialize(server, walFs, logDir, oldLogDir, walFactory); return service; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index e9bbaea8ae4..33b3321755f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; -import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.yetus.audience.InterfaceAudience; /** @@ -32,14 +32,11 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public interface ReplicationService { - /** * Initializes the replication service object. - * @param walProvider can be null if not initialized inside a live region server environment, for - * example, {@code ReplicationSyncUp}. */ - void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) - throws IOException; + void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory) + throws IOException; /** * Start replication services. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 195877bf5f3..d8a696c7172 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.yetus.audience.InterfaceAudience; @@ -89,7 +90,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer @Override public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, - WALProvider walProvider) throws IOException { + WALFactory walFactory) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); this.isReplicationForBulkLoadDataEnabled = @@ -128,6 +129,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); this.globalMetricsSource = CompatibilitySingletonFactory .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + WALProvider walProvider = walFactory.getWALProvider(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), @@ -198,7 +200,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace * directory required for replicating hfiles * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory - * @throws IOException */ @Override public void replicateLogEntries(List entries, CellScanner cells, @@ -211,7 +212,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer /** * If replication is enabled and this cluster is a master, * it starts - * @throws IOException */ @Override public void startReplicationService() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 98490f137db..b04c7eb75f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -82,7 +83,8 @@ public class ReplicationSyncUp extends Configured implements Tool { System.out.println("Start Replication Server start"); Replication replication = new Replication(); - replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); + replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, + new WALFactory(conf, "test", false)); ReplicationSourceManager manager = replication.getReplicationManager(); manager.init().get(); while (manager.activeFailoverTaskCount() > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 6e1692a9a2b..455b2729815 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 8e38114fa0a..4abb00fee03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -194,7 +194,8 @@ public abstract class TestReplicationSourceManager { logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME); remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME); replication = new Replication(); - replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); + replication.initialize(new DummyServer(), fs, logDir, oldLogDir, + new WALFactory(conf, "test", false)); managerOfCluster = getManagerFromCluster(); if (managerOfCluster != null) { // After replication procedure, we need to add peer by hand (other than by receiving