HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space
Pass WALFactory to Replication instead of WALProvider. WALFactory has all WALProviders in it, not just the user-space WALProvider. Do this so ReplicationService has access to all WALProviders in the Server (To be exploited by the follow-on patch in HBASE-25055)
This commit is contained in:
parent
55399a0320
commit
7964d2ec6d
|
@ -1943,8 +1943,7 @@ public class HRegionServer extends Thread implements
|
||||||
throw new IOException("Can not create wal directory " + logDir);
|
throw new IOException("Can not create wal directory " + logDir);
|
||||||
}
|
}
|
||||||
// Instantiate replication if replication enabled. Pass it the log directories.
|
// Instantiate replication if replication enabled. Pass it the log directories.
|
||||||
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
|
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
|
||||||
factory.getWALProvider());
|
|
||||||
}
|
}
|
||||||
this.walFactory = factory;
|
this.walFactory = factory;
|
||||||
}
|
}
|
||||||
|
@ -3096,7 +3095,7 @@ public class HRegionServer extends Thread implements
|
||||||
* Load the replication executorService objects, if any
|
* Load the replication executorService objects, if any
|
||||||
*/
|
*/
|
||||||
private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
|
private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
|
||||||
FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
|
FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
|
||||||
// read in the name of the source replication class from the config file.
|
// read in the name of the source replication class from the config file.
|
||||||
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
|
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
|
||||||
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
|
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
|
||||||
|
@ -3109,21 +3108,21 @@ public class HRegionServer extends Thread implements
|
||||||
// only one object.
|
// only one object.
|
||||||
if (sourceClassname.equals(sinkClassname)) {
|
if (sourceClassname.equals(sinkClassname)) {
|
||||||
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
|
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
|
||||||
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
|
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
|
||||||
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
|
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
|
||||||
server.sameReplicationSourceAndSink = true;
|
server.sameReplicationSourceAndSink = true;
|
||||||
} else {
|
} else {
|
||||||
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
|
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
|
||||||
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
|
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
|
||||||
server.replicationSinkHandler = newReplicationInstance(sinkClassname,
|
server.replicationSinkHandler = newReplicationInstance(sinkClassname,
|
||||||
ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
|
ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
|
||||||
server.sameReplicationSourceAndSink = false;
|
server.sameReplicationSourceAndSink = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends ReplicationService> T newReplicationInstance(String classname,
|
private static <T extends ReplicationService> T newReplicationInstance(String classname,
|
||||||
Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
|
Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
|
||||||
Path oldLogDir, WALProvider walProvider) throws IOException {
|
Path oldLogDir, WALFactory walFactory) throws IOException {
|
||||||
final Class<? extends T> clazz;
|
final Class<? extends T> clazz;
|
||||||
try {
|
try {
|
||||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||||
|
@ -3132,7 +3131,7 @@ public class HRegionServer extends Thread implements
|
||||||
throw new IOException("Could not find class for " + classname);
|
throw new IOException("Could not find class for " + classname);
|
||||||
}
|
}
|
||||||
T service = ReflectionUtils.newInstance(clazz, conf);
|
T service = ReflectionUtils.newInstance(clazz, conf);
|
||||||
service.initialize(server, walFs, logDir, oldLogDir, walProvider);
|
service.initialize(server, walFs, logDir, oldLogDir, walFactory);
|
||||||
return service;
|
return service;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,13 +32,10 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface ReplicationService {
|
public interface ReplicationService {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the replication service object.
|
* Initializes the replication service object.
|
||||||
* @param walProvider can be null if not initialized inside a live region server environment, for
|
|
||||||
* example, {@code ReplicationSyncUp}.
|
|
||||||
*/
|
*/
|
||||||
void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider)
|
void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||||
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
|
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -81,7 +82,7 @@ public class Replication implements ReplicationSourceService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
|
public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
|
||||||
WALProvider walProvider) throws IOException {
|
WALFactory walFactory) throws IOException {
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.conf = this.server.getConfiguration();
|
this.conf = this.server.getConfiguration();
|
||||||
this.isReplicationForBulkLoadDataEnabled =
|
this.isReplicationForBulkLoadDataEnabled =
|
||||||
|
@ -115,6 +116,7 @@ public class Replication implements ReplicationSourceService {
|
||||||
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
|
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
|
||||||
this.globalMetricsSource = CompatibilitySingletonFactory
|
this.globalMetricsSource = CompatibilitySingletonFactory
|
||||||
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
|
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
|
||||||
|
WALProvider walProvider = walFactory.getWALProvider();
|
||||||
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
|
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
|
||||||
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
|
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
|
||||||
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
|
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
|
||||||
|
@ -165,7 +167,6 @@ public class Replication implements ReplicationSourceService {
|
||||||
/**
|
/**
|
||||||
* If replication is enabled and this cluster is a master,
|
* If replication is enabled and this cluster is a master,
|
||||||
* it starts
|
* it starts
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void startReplicationService() throws IOException {
|
public void startReplicationService() throws IOException {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
@ -82,7 +83,8 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
||||||
|
|
||||||
System.out.println("Start Replication Server start");
|
System.out.println("Start Replication Server start");
|
||||||
Replication replication = new Replication();
|
Replication replication = new Replication();
|
||||||
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
|
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
|
||||||
|
new WALFactory(conf, "test", false));
|
||||||
ReplicationSourceManager manager = replication.getReplicationManager();
|
ReplicationSourceManager manager = replication.getReplicationManager();
|
||||||
manager.init().get();
|
manager.init().get();
|
||||||
while (manager.activeFailoverTaskCount() > 0) {
|
while (manager.activeFailoverTaskCount() > 0) {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
|
|
@ -194,7 +194,8 @@ public abstract class TestReplicationSourceManager {
|
||||||
logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
|
logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
|
||||||
remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
|
remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
|
||||||
replication = new Replication();
|
replication = new Replication();
|
||||||
replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
|
replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
|
||||||
|
new WALFactory(conf, "test", false));
|
||||||
managerOfCluster = getManagerFromCluster();
|
managerOfCluster = getManagerFromCluster();
|
||||||
if (managerOfCluster != null) {
|
if (managerOfCluster != null) {
|
||||||
// After replication procedure, we need to add peer by hand (other than by receiving
|
// After replication procedure, we need to add peer by hand (other than by receiving
|
||||||
|
|
Loading…
Reference in New Issue