diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index f6f00c55254..5b4b6fb4bd6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -981,10 +981,12 @@ public final class HConstants { */ public static final String REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service"; - public static final String - REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service"; public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT = "org.apache.hadoop.hbase.replication.regionserver.Replication"; + public static final String + REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service"; + public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT = + "org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl"; public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled"; public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f14da2f6a17..cd90fb87d9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -301,6 +301,7 @@ public class HRegionServer extends Thread implements // Replication services. If no replication, this handler will be null. private ReplicationSourceService replicationSourceHandler; private ReplicationSinkService replicationSinkHandler; + private boolean sameReplicationSourceAndSink; // Compactions public CompactSplit compactSplitThread; @@ -1390,20 +1391,32 @@ public class HRegionServer extends Thread implements serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); } } - // for the replicationLoad purpose. Only need to get from one executorService - // either source or sink will get the same info - ReplicationSourceService rsources = getReplicationSourceService(); - if (rsources != null) { + if (sameReplicationSourceAndSink && replicationSourceHandler != null) { // always refresh first to get the latest value - ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); + ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); if (rLoad != null) { serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); - for (ClusterStatusProtos.ReplicationLoadSource rLS : - rLoad.getReplicationLoadSourceEntries()) { + for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad + .getReplicationLoadSourceEntries()) { serverLoad.addReplLoadSource(rLS); } - + } + } else { + if (replicationSourceHandler != null) { + ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad(); + if (rLoad != null) { + for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad + .getReplicationLoadSourceEntries()) { + serverLoad.addReplLoadSource(rLS); + } + } + } + if (replicationSinkHandler != null) { + ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad(); + if (rLoad != null) { + serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); + } } } @@ -1921,8 +1934,7 @@ public class HRegionServer extends Thread implements * Start up replication source and sink handlers. */ private void startReplicationService() throws IOException { - if (this.replicationSourceHandler == this.replicationSinkHandler && - this.replicationSourceHandler != null) { + if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { this.replicationSourceHandler.startReplicationService(); } else { if (this.replicationSourceHandler != null) { @@ -2628,9 +2640,10 @@ public class HRegionServer extends Thread implements if (this.compactSplitThread != null) { this.compactSplitThread.join(); } - if (this.executorService != null) this.executorService.shutdown(); - if (this.replicationSourceHandler != null && - this.replicationSourceHandler == this.replicationSinkHandler) { + if (this.executorService != null) { + this.executorService.shutdown(); + } + if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) { this.replicationSourceHandler.stopReplicationService(); } else { if (this.replicationSourceHandler != null) { @@ -3070,7 +3083,7 @@ public class HRegionServer extends Thread implements // read in the name of the sink replication class from the config file. String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, - HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); + HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT); // If both the sink and the source class names are the same, then instantiate // only one object. @@ -3078,11 +3091,13 @@ public class HRegionServer extends Thread implements server.replicationSourceHandler = newReplicationInstance(sourceClassname, ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; + server.sameReplicationSourceAndSink = true; } else { server.replicationSourceHandler = newReplicationInstance(sourceClassname, ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider); server.replicationSinkHandler = newReplicationInstance(sinkClassname, ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider); + server.sameReplicationSourceAndSink = false; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java new file mode 100644 index 00000000000..9b0e3f79fe0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + +@InterfaceAudience.Private +public class ReplicationSinkServiceImpl implements ReplicationSinkService { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class); + + private Configuration conf; + + private Server server; + + private ReplicationSink replicationSink; + + // ReplicationLoad to access replication metrics + private ReplicationLoad replicationLoad; + + private int statsPeriod; + + @Override + public void replicateLogEntries(List entries, CellScanner cells, + String replicationClusterId, String sourceBaseNamespaceDirPath, + String sourceHFileArchiveDirPath) throws IOException { + this.replicationSink.replicateEntries(entries, cells, replicationClusterId, + sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath); + } + + @Override + public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir, + WALProvider walProvider) throws IOException { + this.server = server; + this.conf = server.getConfiguration(); + this.statsPeriod = + this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); + this.replicationLoad = new ReplicationLoad(); + } + + @Override + public void startReplicationService() throws IOException { + this.replicationSink = new ReplicationSink(this.conf); + this.server.getChoreService().scheduleChore( + new ReplicationStatisticsChore("ReplicationSinkStatistics", server, statsPeriod)); + } + + @Override + public void stopReplicationService() { + if (this.replicationSink != null) { + this.replicationSink.stopReplicationSinkServices(); + } + } + + @Override + public ReplicationLoad refreshAndGetReplicationLoad() { + if (replicationLoad == null) { + return null; + } + // always build for latest data + replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics()); + return replicationLoad; + } + + private final class ReplicationStatisticsChore extends ScheduledChore { + + ReplicationStatisticsChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected void chore() { + printStats(replicationSink.getStats()); + } + + private void printStats(String stats) { + if (!stats.isEmpty()) { + LOG.info(stats); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 195877bf5f3..33975edb590 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -22,18 +22,15 @@ import java.util.ArrayList; import java.util.List; import java.util.OptionalLong; import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; @@ -51,15 +48,11 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; - /** * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. */ @InterfaceAudience.Private -public class Replication implements ReplicationSourceService, ReplicationSinkService { +public class Replication implements ReplicationSourceService { private static final Logger LOG = LoggerFactory.getLogger(Replication.class); private boolean isReplicationForBulkLoadDataEnabled; @@ -68,13 +61,10 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private ReplicationPeers replicationPeers; private ReplicationTracker replicationTracker; private Configuration conf; - private ReplicationSink replicationSink; private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider; // Hosting server private Server server; - /** Statistics thread schedule pool */ - private ScheduledExecutorService scheduleThreadPool; - private int statsThreadPeriod; + private int statsPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; private MetricsReplicationGlobalSourceSource globalMetricsSource; @@ -94,11 +84,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer this.conf = this.server.getConfiguration(); this.isReplicationForBulkLoadDataEnabled = ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf); - this.scheduleThreadPool = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder() - .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d") - .setDaemon(true) - .build()); if (this.isReplicationForBulkLoadDataEnabled) { if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) { @@ -154,9 +139,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0)); } } - this.statsThreadPeriod = + this.statsPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); - LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod); this.replicationLoad = new ReplicationLoad(); this.peerProcedureHandler = @@ -173,39 +157,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer */ @Override public void stopReplicationService() { - join(); - } - - /** - * Join with the replication threads - */ - public void join() { this.replicationManager.join(); - if (this.replicationSink != null) { - this.replicationSink.stopReplicationSinkServices(); - } - scheduleThreadPool.shutdown(); - } - - /** - * Carry on the list of log entries down to the sink - * @param entries list of entries to replicate - * @param cells The data -- the cells -- that entries describes (the entries do not - * contain the Cells we are replicating; they are passed here on the side in this - * CellScanner). - * @param replicationClusterId Id which will uniquely identify source cluster FS client - * configurations in the replication configuration directory - * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace - * directory required for replicating hfiles - * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory - * @throws IOException - */ - @Override - public void replicateLogEntries(List entries, CellScanner cells, - String replicationClusterId, String sourceBaseNamespaceDirPath, - String sourceHFileArchiveDirPath) throws IOException { - this.replicationSink.replicateEntries(entries, cells, replicationClusterId, - sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath); } /** @@ -216,10 +168,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer @Override public void startReplicationService() throws IOException { this.replicationManager.init(); - this.replicationSink = new ReplicationSink(this.conf); - this.scheduleThreadPool.scheduleAtFixedRate( - new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), - statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); + this.server.getChoreService().scheduleChore( + new ReplicationStatisticsChore("ReplicationSourceStatistics", server, statsPeriod)); LOG.info("{} started", this.server.toString()); } @@ -244,21 +194,15 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer /** * Statistics task. Periodically prints the cache statistics to the log. */ - private final static class ReplicationStatisticsTask implements Runnable { + private final class ReplicationStatisticsChore extends ScheduledChore { - private final ReplicationSink replicationSink; - private final ReplicationSourceManager replicationManager; - - public ReplicationStatisticsTask(ReplicationSink replicationSink, - ReplicationSourceManager replicationManager) { - this.replicationManager = replicationManager; - this.replicationSink = replicationSink; + ReplicationStatisticsChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); } @Override - public void run() { - printStats(this.replicationManager.getStats()); - printStats(this.replicationSink.getStats()); + protected void chore() { + printStats(replicationManager.getStats()); } private void printStats(String stats) { @@ -274,17 +218,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer return null; } // always build for latest data - buildReplicationLoad(); - return this.replicationLoad; - } - - private void buildReplicationLoad() { List allSources = new ArrayList<>(); allSources.addAll(this.replicationManager.getSources()); allSources.addAll(this.replicationManager.getOldSources()); - // get sink - MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); - this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics); + this.replicationLoad.buildReplicationLoad(allSources, null); + return this.replicationLoad; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index e011e0af737..6fb21dcfbcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -36,7 +36,6 @@ public class ReplicationLoad { // Empty load instance. public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad(); - private MetricsSink sinkMetrics; private List replicationLoadSourceEntries; private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink; @@ -49,21 +48,22 @@ public class ReplicationLoad { /** * buildReplicationLoad * @param sources List of ReplicationSource instances for which metrics should be reported - * @param skMetrics + * @param sinkMetrics metrics of the replication sink */ public void buildReplicationLoad(final List sources, - final MetricsSink skMetrics) { - this.sinkMetrics = skMetrics; + final MetricsSink sinkMetrics) { - // build the SinkLoad - ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild = + if (sinkMetrics != null) { + // build the SinkLoad + ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild = ClusterStatusProtos.ReplicationLoadSink.newBuilder(); - rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp()); - rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); - rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp()); - rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps()); - this.replicationLoadSink = rLoadSinkBuild.build(); + rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp()); + rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); + rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp()); + rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps()); + this.replicationLoadSink = rLoadSinkBuild.build(); + } this.replicationLoadSourceEntries = new ArrayList<>(); for (ReplicationSourceInterface source : sources) { @@ -157,5 +157,4 @@ public class ReplicationLoad { public String toString() { return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString(); } - }