HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space

Pass WALFactory to Replication instead of WALProvider. WALFactory has all
WALProviders in it, not just the user-space WALProvider. Do this so
ReplicationService has access to all WALProviders in the Server (To be
exploited by the follow-on patch in HBASE-25055)
This commit is contained in:
stack 2020-09-18 17:29:23 -07:00
parent bace137c36
commit 2b1e8b306f
6 changed files with 23 additions and 26 deletions

View File

@ -1918,7 +1918,6 @@ public class HRegionServer extends Thread implements
*/ */
private void setupWALAndReplication() throws IOException { private void setupWALAndReplication() throws IOException {
WALFactory factory = new WALFactory(conf, serverName.toString(), (Server)this); WALFactory factory = new WALFactory(conf, serverName.toString(), (Server)this);
// TODO Replication make assumptions here based on the default filesystem impl // TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
@ -1935,8 +1934,7 @@ public class HRegionServer extends Thread implements
throw new IOException("Can not create wal directory " + logDir); throw new IOException("Can not create wal directory " + logDir);
} }
// Instantiate replication if replication enabled. Pass it the log directories. // Instantiate replication if replication enabled. Pass it the log directories.
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
factory.getWALProvider());
this.walFactory = factory; this.walFactory = factory;
} }
@ -3077,12 +3075,11 @@ public class HRegionServer extends Thread implements
* Load the replication executorService objects, if any * Load the replication executorService objects, if any
*/ */
private static void createNewReplicationInstance(Configuration conf, HRegionServer server, private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException { FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
if ((server instanceof HMaster) && if ((server instanceof HMaster) &&
(!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) { (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
return; return;
} }
// read in the name of the source replication class from the config file. // read in the name of the source replication class from the config file.
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
@ -3095,19 +3092,19 @@ public class HRegionServer extends Thread implements
// only one object. // only one object.
if (sourceClassname.equals(sinkClassname)) { if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = newReplicationInstance(sourceClassname, server.replicationSourceHandler = newReplicationInstance(sourceClassname,
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
} else { } else {
server.replicationSourceHandler = newReplicationInstance(sourceClassname, server.replicationSourceHandler = newReplicationInstance(sourceClassname,
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
server.replicationSinkHandler = newReplicationInstance(sinkClassname, server.replicationSinkHandler = newReplicationInstance(sinkClassname,
ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
} }
} }
private static <T extends ReplicationService> T newReplicationInstance(String classname, private static <T extends ReplicationService> T newReplicationInstance(String classname,
Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
Path oldLogDir, WALProvider walProvider) throws IOException { Path oldLogDir, WALFactory walFactory) throws IOException {
final Class<? extends T> clazz; final Class<? extends T> clazz;
try { try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@ -3116,7 +3113,7 @@ public class HRegionServer extends Thread implements
throw new IOException("Could not find class for " + classname); throw new IOException("Could not find class for " + classname);
} }
T service = ReflectionUtils.newInstance(clazz, conf); T service = ReflectionUtils.newInstance(clazz, conf);
service.initialize(server, walFs, logDir, oldLogDir, walProvider); service.initialize(server, walFs, logDir, oldLogDir, walFactory);
return service; return service;
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -32,14 +32,11 @@ import org.apache.yetus.audience.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface ReplicationService { public interface ReplicationService {
/** /**
* Initializes the replication service object. * Initializes the replication service object.
* @param walProvider can be null if not initialized inside a live region server environment, for
* example, {@code ReplicationSyncUp}.
*/ */
void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory)
throws IOException; throws IOException;
/** /**
* Start replication services. * Start replication services.

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -85,7 +86,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
@Override @Override
public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
WALProvider walProvider) throws IOException { WALFactory walFactory) throws IOException {
this.server = server; this.server = server;
this.conf = this.server.getConfiguration(); this.conf = this.server.getConfiguration();
this.isReplicationForBulkLoadDataEnabled = this.isReplicationForBulkLoadDataEnabled =
@ -123,8 +124,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
} }
this.globalMetricsSource = CompatibilitySingletonFactory this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, WALProvider walProvider = walFactory.getWALProvider();
this.server, fs, logDir, oldLogDir, clusterId, this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
globalMetricsSource); globalMetricsSource);
if (walProvider != null) { if (walProvider != null) {
@ -187,7 +189,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
/** /**
* If replication is enabled and this cluster is a master, * If replication is enabled and this cluster is a master,
* it starts * it starts
* @throws IOException
*/ */
@Override @Override
public void startReplicationService() throws IOException { public void startReplicationService() throws IOException {

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -82,7 +83,8 @@ public class ReplicationSyncUp extends Configured implements Tool {
System.out.println("Start Replication Server start"); System.out.println("Start Replication Server start");
Replication replication = new Replication(); Replication replication = new Replication();
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
new WALFactory(conf, "test", null));
ReplicationSourceManager manager = replication.getReplicationManager(); ReplicationSourceManager manager = replication.getReplicationManager();
manager.init().get(); manager.init().get();
while (manager.activeFailoverTaskCount() > 0) { while (manager.activeFailoverTaskCount() > 0) {

View File

@ -1,5 +1,4 @@
/* /*
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information

View File

@ -182,7 +182,8 @@ public abstract class TestReplicationSourceManager {
logDir = new Path(utility.getDataTestDir(), logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME); HConstants.HREGION_LOGDIR_NAME);
replication = new Replication(); replication = new Replication();
replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
new WALFactory(conf, "test", null));
managerOfCluster = getManagerFromCluster(); managerOfCluster = getManagerFromCluster();
if (managerOfCluster != null) { if (managerOfCluster != null) {
// After replication procedure, we need to add peer by hand (other than by receiving // After replication procedure, we need to add peer by hand (other than by receiving