HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space
Pass WALFactory to Replication instead of WALProvider. WALFactory has all WALProviders in it, not just the user-space WALProvider. Do this so ReplicationService has access to all WALProviders in the Server (To be exploited by the follow-on patch in HBASE-25055) Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
97979436f8
commit
b5a242f42a
@ -245,7 +245,7 @@ public class HRegionServer extends Thread implements
|
||||
/**
|
||||
* For testing only! Set to true to skip notifying region assignment to master .
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@SuppressWarnings("checkstyle:VisibilityModifier") @VisibleForTesting
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
|
||||
public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
|
||||
|
||||
@ -1915,8 +1915,7 @@ public class HRegionServer extends Thread implements
|
||||
throw new IOException("Can not create wal directory " + logDir);
|
||||
}
|
||||
// Instantiate replication if replication enabled. Pass it the log directories.
|
||||
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
|
||||
factory.getWALProvider());
|
||||
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
|
||||
this.walFactory = factory;
|
||||
}
|
||||
|
||||
@ -3056,12 +3055,11 @@ public class HRegionServer extends Thread implements
|
||||
* Load the replication executorService objects, if any
|
||||
*/
|
||||
private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
|
||||
FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
|
||||
FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
|
||||
if ((server instanceof HMaster) &&
|
||||
(!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
|
||||
return;
|
||||
}
|
||||
|
||||
// read in the name of the source replication class from the config file.
|
||||
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
|
||||
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
|
||||
@ -3074,19 +3072,19 @@ public class HRegionServer extends Thread implements
|
||||
// only one object.
|
||||
if (sourceClassname.equals(sinkClassname)) {
|
||||
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
|
||||
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
|
||||
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
|
||||
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
|
||||
} else {
|
||||
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
|
||||
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
|
||||
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
|
||||
server.replicationSinkHandler = newReplicationInstance(sinkClassname,
|
||||
ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
|
||||
ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T extends ReplicationService> T newReplicationInstance(String classname,
|
||||
Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
|
||||
Path oldLogDir, WALProvider walProvider) throws IOException {
|
||||
Path oldLogDir, WALFactory walFactory) throws IOException {
|
||||
final Class<? extends T> clazz;
|
||||
try {
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
@ -3095,7 +3093,7 @@ public class HRegionServer extends Thread implements
|
||||
throw new IOException("Could not find class for " + classname);
|
||||
}
|
||||
T service = ReflectionUtils.newInstance(clazz, conf);
|
||||
service.initialize(server, walFs, logDir, oldLogDir, walProvider);
|
||||
service.initialize(server, walFs, logDir, oldLogDir, walFactory);
|
||||
return service;
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
@ -32,14 +32,11 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ReplicationService {
|
||||
|
||||
/**
|
||||
* Initializes the replication service object.
|
||||
* @param walProvider can be null if not initialized inside a live region server environment, for
|
||||
* example, {@code ReplicationSyncUp}.
|
||||
*/
|
||||
void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider)
|
||||
throws IOException;
|
||||
void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Start replication services.
|
||||
|
@ -1,4 +1,4 @@
|
||||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -42,16 +42,16 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||
|
||||
/**
|
||||
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
|
||||
*/
|
||||
@ -85,7 +85,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
||||
|
||||
@Override
|
||||
public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
|
||||
WALProvider walProvider) throws IOException {
|
||||
WALFactory walFactory) throws IOException {
|
||||
this.server = server;
|
||||
this.conf = this.server.getConfiguration();
|
||||
this.isReplicationForBulkLoadDataEnabled =
|
||||
@ -123,8 +123,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
||||
}
|
||||
this.globalMetricsSource = CompatibilitySingletonFactory
|
||||
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
|
||||
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
|
||||
this.server, fs, logDir, oldLogDir, clusterId,
|
||||
WALProvider walProvider = walFactory.getWALProvider();
|
||||
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
|
||||
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
|
||||
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
|
||||
globalMetricsSource);
|
||||
if (walProvider != null) {
|
||||
@ -174,7 +175,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
||||
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
|
||||
* directory required for replicating hfiles
|
||||
* @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
|
||||
@ -187,7 +187,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
||||
/**
|
||||
* If replication is enabled and this cluster is a master,
|
||||
* it starts
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void startReplicationService() throws IOException {
|
||||
|
@ -1,4 +1,4 @@
|
||||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
@ -82,7 +83,8 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
||||
|
||||
System.out.println("Start Replication Server start");
|
||||
Replication replication = new Replication();
|
||||
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
|
||||
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
|
||||
new WALFactory(conf, "test"));
|
||||
ReplicationSourceManager manager = replication.getReplicationManager();
|
||||
manager.init().get();
|
||||
while (manager.activeFailoverTaskCount() > 0) {
|
||||
|
@ -1,5 +1,4 @@
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -21,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -182,7 +182,8 @@ public abstract class TestReplicationSourceManager {
|
||||
logDir = new Path(utility.getDataTestDir(),
|
||||
HConstants.HREGION_LOGDIR_NAME);
|
||||
replication = new Replication();
|
||||
replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
|
||||
replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
|
||||
new WALFactory(conf, "test"));
|
||||
managerOfCluster = getManagerFromCluster();
|
||||
if (managerOfCluster != null) {
|
||||
// After replication procedure, we need to add peer by hand (other than by receiving
|
||||
|
Loading…
x
Reference in New Issue
Block a user