HBASE-25086 Refactor Replication: move the default ReplicationSinkService implementation out (#2444)

Signed-off-by: meiyi <myimeiyi@gmail.com>
This commit is contained in:
Guanghao Zhang 2020-09-24 17:25:34 +08:00 committed by GitHub
parent 56c7505f8f
commit 8828643bb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 173 additions and 104 deletions

View File

@ -981,10 +981,12 @@ public final class HConstants {
*/
public static final String
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
public static final String
REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.regionserver.Replication";
public static final String
REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl";
public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */

View File

@ -301,6 +301,7 @@ public class HRegionServer extends Thread implements
// Replication services. If no replication, this handler will be null.
private ReplicationSourceService replicationSourceHandler;
private ReplicationSinkService replicationSinkHandler;
private boolean sameReplicationSourceAndSink;
// Compactions
public CompactSplit compactSplitThread;
@ -1390,20 +1391,32 @@ public class HRegionServer extends Thread implements
serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
}
}
// for the replicationLoad purpose. Only need to get from one executorService
// either source or sink will get the same info
ReplicationSourceService rsources = getReplicationSourceService();
if (rsources != null) {
if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
// always refresh first to get the latest value
ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
if (rLoad != null) {
serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
for (ClusterStatusProtos.ReplicationLoadSource rLS :
rLoad.getReplicationLoadSourceEntries()) {
for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
.getReplicationLoadSourceEntries()) {
serverLoad.addReplLoadSource(rLS);
}
}
} else {
if (replicationSourceHandler != null) {
ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
if (rLoad != null) {
for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
.getReplicationLoadSourceEntries()) {
serverLoad.addReplLoadSource(rLS);
}
}
}
if (replicationSinkHandler != null) {
ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
if (rLoad != null) {
serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
}
}
}
@ -1921,8 +1934,7 @@ public class HRegionServer extends Thread implements
* Start up replication source and sink handlers.
*/
private void startReplicationService() throws IOException {
if (this.replicationSourceHandler == this.replicationSinkHandler &&
this.replicationSourceHandler != null) {
if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
} else {
if (this.replicationSourceHandler != null) {
@ -2628,9 +2640,10 @@ public class HRegionServer extends Thread implements
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
}
if (this.executorService != null) this.executorService.shutdown();
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler == this.replicationSinkHandler) {
if (this.executorService != null) {
this.executorService.shutdown();
}
if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
this.replicationSourceHandler.stopReplicationService();
} else {
if (this.replicationSourceHandler != null) {
@ -3070,7 +3083,7 @@ public class HRegionServer extends Thread implements
// read in the name of the sink replication class from the config file.
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
// If both the sink and the source class names are the same, then instantiate
// only one object.
@ -3078,11 +3091,13 @@ public class HRegionServer extends Thread implements
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
server.sameReplicationSourceAndSink = true;
} else {
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
server.replicationSinkHandler = newReplicationInstance(sinkClassname,
ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
server.sameReplicationSourceAndSink = false;
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@InterfaceAudience.Private
public class ReplicationSinkServiceImpl implements ReplicationSinkService {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class);
private Configuration conf;
private Server server;
private ReplicationSink replicationSink;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
private int statsPeriod;
@Override
public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells,
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}
@Override
public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir,
WALProvider walProvider) throws IOException {
this.server = server;
this.conf = server.getConfiguration();
this.statsPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
this.replicationLoad = new ReplicationLoad();
}
@Override
public void startReplicationService() throws IOException {
this.replicationSink = new ReplicationSink(this.conf);
this.server.getChoreService().scheduleChore(
new ReplicationStatisticsChore("ReplicationSinkStatistics", server, statsPeriod));
}
@Override
public void stopReplicationService() {
if (this.replicationSink != null) {
this.replicationSink.stopReplicationSinkServices();
}
}
@Override
public ReplicationLoad refreshAndGetReplicationLoad() {
if (replicationLoad == null) {
return null;
}
// always build for latest data
replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics());
return replicationLoad;
}
private final class ReplicationStatisticsChore extends ScheduledChore {
ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
super(name, stopper, period);
}
@Override
protected void chore() {
printStats(replicationSink.getStats());
}
private void printStats(String stats) {
if (!stats.isEmpty()) {
LOG.info(stats);
}
}
}
}

View File

@ -22,18 +22,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
@ -51,15 +48,11 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/
@InterfaceAudience.Private
public class Replication implements ReplicationSourceService, ReplicationSinkService {
public class Replication implements ReplicationSourceService {
private static final Logger LOG =
LoggerFactory.getLogger(Replication.class);
private boolean isReplicationForBulkLoadDataEnabled;
@ -68,13 +61,10 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private ReplicationPeers replicationPeers;
private ReplicationTracker replicationTracker;
private Configuration conf;
private ReplicationSink replicationSink;
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
private Server server;
/** Statistics thread schedule pool */
private ScheduledExecutorService scheduleThreadPool;
private int statsThreadPeriod;
private int statsPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
private MetricsReplicationGlobalSourceSource globalMetricsSource;
@ -94,11 +84,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
this.conf = this.server.getConfiguration();
this.isReplicationForBulkLoadDataEnabled =
ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
.setDaemon(true)
.build());
if (this.isReplicationForBulkLoadDataEnabled) {
if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
|| conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
@ -154,9 +139,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
}
}
this.statsThreadPeriod =
this.statsPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad();
this.peerProcedureHandler =
@ -173,39 +157,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
*/
@Override
public void stopReplicationService() {
join();
}
/**
* Join with the replication threads
*/
public void join() {
this.replicationManager.join();
if (this.replicationSink != null) {
this.replicationSink.stopReplicationSinkServices();
}
scheduleThreadPool.shutdown();
}
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
* @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
* contain the Cells we are replicating; they are passed here on the side in this
* CellScanner).
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
* directory required for replicating hfiles
* @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
* @throws IOException
*/
@Override
public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}
/**
@ -216,10 +168,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
this.server.getChoreService().scheduleChore(
new ReplicationStatisticsChore("ReplicationSourceStatistics", server, statsPeriod));
LOG.info("{} started", this.server.toString());
}
@ -244,21 +194,15 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
/**
* Statistics task. Periodically prints the cache statistics to the log.
*/
private final static class ReplicationStatisticsTask implements Runnable {
private final class ReplicationStatisticsChore extends ScheduledChore {
private final ReplicationSink replicationSink;
private final ReplicationSourceManager replicationManager;
public ReplicationStatisticsTask(ReplicationSink replicationSink,
ReplicationSourceManager replicationManager) {
this.replicationManager = replicationManager;
this.replicationSink = replicationSink;
ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
super(name, stopper, period);
}
@Override
public void run() {
printStats(this.replicationManager.getStats());
printStats(this.replicationSink.getStats());
protected void chore() {
printStats(replicationManager.getStats());
}
private void printStats(String stats) {
@ -274,17 +218,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
return null;
}
// always build for latest data
buildReplicationLoad();
return this.replicationLoad;
}
private void buildReplicationLoad() {
List<ReplicationSourceInterface> allSources = new ArrayList<>();
allSources.addAll(this.replicationManager.getSources());
allSources.addAll(this.replicationManager.getOldSources());
// get sink
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
this.replicationLoad.buildReplicationLoad(allSources, null);
return this.replicationLoad;
}
@Override

View File

@ -36,7 +36,6 @@ public class ReplicationLoad {
// Empty load instance.
public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
private MetricsSink sinkMetrics;
private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceEntries;
private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
@ -49,21 +48,22 @@ public class ReplicationLoad {
/**
* buildReplicationLoad
* @param sources List of ReplicationSource instances for which metrics should be reported
* @param skMetrics
* @param sinkMetrics metrics of the replication sink
*/
public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
final MetricsSink skMetrics) {
this.sinkMetrics = skMetrics;
final MetricsSink sinkMetrics) {
// build the SinkLoad
ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
if (sinkMetrics != null) {
// build the SinkLoad
ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
this.replicationLoadSink = rLoadSinkBuild.build();
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
this.replicationLoadSink = rLoadSinkBuild.build();
}
this.replicationLoadSourceEntries = new ArrayList<>();
for (ReplicationSourceInterface source : sources) {
@ -157,5 +157,4 @@ public class ReplicationLoad {
public String toString() {
return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
}
}