HBASE-18130 Refactor ReplicationSource

This commit is contained in:
Guanghao Zhang 2017-05-28 16:39:59 +08:00
parent db8ce0566d
commit 123086edad
10 changed files with 930 additions and 543 deletions

View File

@ -0,0 +1,182 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
/**
* Class that handles the recovered source of a replication stream, which is transfered from
* another dead region server. This will be closed when all logs are pushed to peer cluster.
*/
@InterfaceAudience.Private
public class RecoveredReplicationSource extends ReplicationSource {
private static final Log LOG = LogFactory.getLog(RecoveredReplicationSource.class);
private String actualPeerId;
@Override
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
final MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
clusterId, replicationEndpoint, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
@Override
protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
final RecoveredReplicationSourceShipperThread worker =
new RecoveredReplicationSourceShipperThread(conf, walGroupId, queue, this,
this.replicationQueues);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler());
worker.setWALReader(
startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
}
}
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =
new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
pathsLoop: for (Path path : queue) {
if (fs.exists(path)) { // still in same location, don't need to do anything
newPaths.add(path);
continue;
}
// Path changed - try to find the right path.
hasPathChanged = true;
if (stopper instanceof ReplicationSyncUp.DummyServer) {
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
Path newPath = getReplSyncUpPath(path);
newPaths.add(newPath);
continue;
} else {
// See if Path exists in the dead RS folder (there could be a chain of failures
// to look at)
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
final Path walDir = FSUtils.getWALRootDir(conf);
for (String curDeadServerName : deadRegionServers) {
final Path deadRsDirectory =
new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
for (Path possibleLogLocation : locs) {
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
if (manager.getFs().exists(possibleLogLocation)) {
// We found the right new location
LOG.info("Log " + path + " still exists at " + possibleLogLocation);
newPaths.add(possibleLogLocation);
continue pathsLoop;
}
}
}
// didn't find a new location
LOG.error(
String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
newPaths.add(path);
}
}
if (hasPathChanged) {
if (newPaths.size() != queue.size()) { // this shouldn't happen
LOG.error("Recovery queue size is incorrect");
throw new IOException("Recovery queue size error");
}
// put the correct locations in the queue
// since this is a recovered queue with no new incoming logs,
// there shouldn't be any concurrency issues
queue.clear();
for (Path path : newPaths) {
queue.add(path);
}
}
}
// N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
// area rather than to the wal area for a particular region server.
private Path getReplSyncUpPath(Path path) throws IOException {
FileStatus[] rss = fs.listStatus(manager.getLogDir());
for (FileStatus rs : rss) {
Path p = rs.getPath();
FileStatus[] logs = fs.listStatus(p);
for (FileStatus log : logs) {
p = new Path(p, log.getPath().getName());
if (p.getName().equals(path.getName())) {
LOG.info("Log " + p.getName() + " found at " + p);
return p;
}
}
}
LOG.error("Didn't find path for: " + path.getName());
return path;
}
public void tryFinish() {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allOtherTaskDone = true;
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.isActive()) {
allOtherTaskDone = false;
break;
}
}
if (allOtherTaskDone) {
manager.closeRecoveredQueue(this);
LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
+ getStats());
}
}
}
@Override
public String getPeerId() {
return this.actualPeerId;
}
}

View File

@ -0,0 +1,147 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.util.Threads;
/**
* Used by a {@link RecoveredReplicationSource}.
*/
@InterfaceAudience.Private
public class RecoveredReplicationSourceShipperThread extends ReplicationSourceShipperThread {
private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipperThread.class);
protected final RecoveredReplicationSource source;
private final ReplicationQueues replicationQueues;
public RecoveredReplicationSourceShipperThread(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
ReplicationQueues replicationQueues) {
super(conf, walGroupId, queue, source);
this.source = source;
this.replicationQueues = replicationQueues;
}
@Override
public void run() {
// Loop until we close down
while (isActive()) {
int sleepMultiplier = 1;
// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
while (entryReader == null) {
if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
sleepMultiplier)) {
sleepMultiplier++;
}
}
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
if (entryBatch.getWalEntries().isEmpty()
&& entryBatch.getLastSeqIds().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ source.getPeerClusterZnode());
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerRunning(false);
continue;
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}
source.tryFinish();
}
@Override
public long getStartPosition() {
long startPosition = getRecoveredQueueStartPos();
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
try {
source.locateRecoveredPaths(queue);
break;
} catch (IOException e) {
LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
numRetries++;
}
}
return startPosition;
}
// If this is a recovered queue, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos() {
long startPosition = 0;
String peerClusterZnode = source.getPeerClusterZnode();
try {
startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
this.queue.peek().getName());
if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ startPosition);
}
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
}
return startPosition;
}
@Override
protected void updateLogPosition(long lastReadPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
lastReadPosition, true, false);
lastLoggedPosition = lastReadPosition;
}
private void terminate(String reason, Exception cause) {
if (cause == null) {
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
} else {
LOG.error("Closing worker for wal group " + this.walGroupId
+ " because an error occurred: " + reason, cause);
}
entryReader.interrupt();
Threads.shutdown(entryReader, sleepForRetries);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
}
}

View File

@ -18,9 +18,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
@ -42,19 +39,14 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@ -66,11 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@ -96,33 +84,30 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// each presents a queue for one wal group
private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
// per group queue size, keep no more than this number of logs in each wal group
private int queueSizePerGroup;
private ReplicationQueues replicationQueues;
protected int queueSizePerGroup;
protected ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
private Configuration conf;
private ReplicationQueueInfo replicationQueueInfo;
protected Configuration conf;
protected ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
String actualPeerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
protected ReplicationSourceManager manager;
// Should we stop everything?
private Stoppable stopper;
protected Stoppable stopper;
// How long should we sleep for each retry
private long sleepForRetries;
private FileSystem fs;
protected FileSystem fs;
// id of this cluster
private UUID clusterId;
// id of the other cluster
private UUID peerClusterId;
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// total number of edits we replicated
private AtomicLong totalReplicatedOperations = new AtomicLong(0);
// The znode we currently play with
private String peerClusterZnode;
protected String peerClusterZnode;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
@ -139,7 +124,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
private ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
new ConcurrentHashMap<>();
private AtomicLong totalBufferUsed;
@ -182,8 +168,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
this.actualPeerId = replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
@ -213,15 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
final ReplicationSourceShipperThread worker =
new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
} else {
LOG.debug("Starting up worker for wal group " + logPrefix);
worker.startup();
}
tryStartNewShipperThread(logPrefix, queue);
}
}
queue.put(log);
@ -262,15 +238,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
private void uninitialize() {
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
if (replicationEndpoint.state() == Service.State.STARTING
|| replicationEndpoint.state() == Service.State.RUNNING) {
replicationEndpoint.stopAndWait();
}
}
@Override
public void run() {
// mark we are running now
@ -322,15 +289,98 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
final ReplicationSourceShipperThread worker =
new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup();
tryStartNewShipperThread(walGroupId, queue);
}
}
protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
final ReplicationSourceShipperThread worker = new ReplicationSourceShipperThread(conf,
walGroupId, queue, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler());
worker.setWALReader(startNewWALReaderThread(worker.getName(), walGroupId, queue,
worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
}
}
protected ReplicationSourceWALReaderThread startNewWALReaderThread(String threadName,
String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager,
replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics);
Threads.setDaemonThreadRunning(walReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
getUncaughtExceptionHandler());
return walReader;
}
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
return new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
stopper.stop("Unexpected exception in " + t.getName());
}
};
}
@Override
public ReplicationEndpoint getReplicationEndpoint() {
return this.replicationEndpoint;
}
@Override
public ReplicationSourceManager getSourceManager() {
return this.manager;
}
@Override
public void tryThrottle(int batchSize) throws InterruptedException {
checkBandwidthChangeAndResetThrottler();
if (throttler.isEnabled()) {
long sleepTicks = throttler.getNextSleepInterval(batchSize);
if (sleepTicks > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
}
Thread.sleep(sleepTicks);
// reset throttler's cycle start tick when sleep for throttling occurs
throttler.resetStartTick();
}
}
}
private void checkBandwidthChangeAndResetThrottler() {
long peerBandwidth = getCurrentBandwidth();
if (peerBandwidth != currentBandwidth) {
currentBandwidth = peerBandwidth;
throttler.setBandwidth((double) currentBandwidth / 10.0);
LOG.info("ReplicationSource : " + peerId
+ " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
}
}
private long getCurrentBandwidth() {
ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
// user can set peer bandwidth to 0 to use default bandwidth
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}
private void uninitialize() {
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
if (replicationEndpoint.state() == Service.State.STARTING
|| replicationEndpoint.state() == Service.State.RUNNING) {
replicationEndpoint.stopAndWait();
}
}
@ -358,7 +408,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
*
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
@Override
public boolean isPeerEnabled() {
return this.replicationPeers.getStatusOfPeer(this.peerId);
}
@ -428,7 +479,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
@Override
public String getPeerClusterId() {
public String getPeerId() {
return this.peerId;
}
@ -441,7 +492,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return null;
}
private boolean isSourceActive() {
@Override
public boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
@ -492,467 +544,17 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* Get Replication Source Metrics
* @return sourceMetrics
*/
@Override
public MetricsSource getSourceMetrics() {
return this.metrics;
}
private long getCurrentBandwidth() {
ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
// user can set peer bandwidth to 0 to use default bandwidth
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}
// This thread reads entries from a queue and ships them.
// Entries are placed onto the queue by ReplicationSourceWALReaderThread
public class ReplicationSourceShipperThread extends Thread {
ReplicationSourceInterface source;
String walGroupId;
PriorityBlockingQueue<Path> queue;
ReplicationQueueInfo replicationQueueInfo;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
// Indicates whether this particular worker is running
private boolean workerRunning = true;
ReplicationSourceWALReaderThread entryReader;
// Use guava cache to set ttl for each key
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(
new CacheLoader<String, Boolean>() {
@Override
public Boolean load(String key) throws Exception {
return false;
}
}
);
public ReplicationSourceShipperThread(String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
ReplicationSourceInterface source) {
this.walGroupId = walGroupId;
this.queue = queue;
this.replicationQueueInfo = replicationQueueInfo;
this.source = source;
}
@Override
public void run() {
// Loop until we close down
while (isWorkerActive()) {
int sleepMultiplier = 1;
// Sleep until replication is enabled again
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
while (entryReader == null) {
if (sleepForRetries("Replication WAL entry reader thread not initialized",
sleepMultiplier)) {
sleepMultiplier++;
}
if (sleepMultiplier == maxRetriesMultiplier) {
LOG.warn("Replication WAL entry reader thread not initialized");
}
}
try {
WALEntryBatch entryBatch = entryReader.take();
for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
waitingUntilCanPush(entry);
}
shipEdits(entryBatch);
releaseBufferQuota((int) entryBatch.getHeapSize());
if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
&& entryBatch.getLastSeqIds().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerRunning(false);
continue;
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}
if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allOtherTaskDone = true;
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (!worker.equals(this) && worker.isAlive()) {
allOtherTaskDone = false;
break;
}
}
if (allOtherTaskDone) {
manager.closeRecoveredQueue(this.source);
LOG.info("Finished recovering queue " + peerClusterZnode
+ " with the following stats: " + getStats());
}
}
}
}
private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
String key = entry.getKey();
long seq = entry.getValue();
boolean deleteKey = false;
if (seq <= 0) {
// There is a REGION_CLOSE marker, we can not continue skipping after this entry.
deleteKey = true;
seq = -seq;
}
if (!canSkipWaitingSet.getUnchecked(key)) {
try {
manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
} catch (IOException e) {
LOG.error("waitUntilCanBePushed fail", e);
stopper.stop("waitUntilCanBePushed fail");
} catch (InterruptedException e) {
LOG.warn("waitUntilCanBePushed interrupted", e);
Thread.currentThread().interrupt();
}
canSkipWaitingSet.put(key, true);
}
if (deleteKey) {
canSkipWaitingSet.invalidate(key);
}
}
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = peerClusterZnode;
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerClusterZnode.split("-")[0];
}
List<Cell> cells = edit.getCells();
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
Cell cell = cells.get(i);
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
manager.cleanUpHFileRefs(peerId, storeFileList);
metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
}
}
}
}
private void checkBandwidthChangeAndResetThrottler() {
long peerBandwidth = getCurrentBandwidth();
if (peerBandwidth != currentBandwidth) {
currentBandwidth = peerBandwidth;
throttler.setBandwidth((double) currentBandwidth / 10.0);
LOG.info("ReplicationSource : " + peerId
+ " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
}
}
/**
* Do the shipping logic
*/
protected void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
currentPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
// Save positions to meta table before zk.
updateSerialRepPositions(entryBatch.getLastSeqIds());
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
}
return;
}
int currentSize = (int) entryBatch.getHeapSize();
while (isWorkerActive()) {
try {
checkBandwidthChangeAndResetThrottler();
if (throttler.isEnabled()) {
long sleepTicks = throttler.getNextSleepInterval(currentSize);
if (sleepTicks > 0) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
}
Thread.sleep(sleepTicks);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping for throttling control");
Thread.currentThread().interrupt();
// current thread might be interrupted to terminate
// directly go back to while() for confirm this
continue;
}
// reset throttler's cycle start tick when sleep for throttling occurs
throttler.resetStartTick();
}
}
// create replicateContext here, so the entries can be GC'd upon return from this call
// stack
ReplicationEndpoint.ReplicateContext replicateContext =
new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId);
long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = replicationEndpoint.replicate(replicateContext);
long endTimeNs = System.nanoTime();
if (!replicated) {
continue;
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
if (this.lastLoggedPosition != lastReadPosition) {
//Clean up hfile references
int size = entries.size();
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
// Save positions to meta table before zk.
updateSerialRepPositions(entryBatch.getLastSeqIds());
//Log and clean up WAL logs
updateLogPosition(lastReadPosition);
}
if (throttler.isEnabled()) {
throttler.addPushSize(currentSize);
}
totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
// FIXME check relationship between wal group and overall
metrics.shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles());
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
+ totalReplicatedOperations + " operations in "
+ ((endTimeNs - startTimeNs) / 1000000) + " ms");
}
break;
} catch (Exception ex) {
LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
+ org.apache.hadoop.util.StringUtils.stringifyException(ex));
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
}
private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
try {
MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
lastPositionsForSerialScope);
} catch (IOException e) {
LOG.error("updateReplicationPositions fail", e);
stopper.stop("updateReplicationPositions fail");
}
}
private void updateLogPosition(long lastReadPosition) {
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
}
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
+ getCurrentPath(), e);
stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
}
};
Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
+ peerClusterZnode, handler);
workerThreads.put(walGroupId, this);
long startPosition = 0;
if (this.replicationQueueInfo.isQueueRecovered()) {
startPosition = getRecoveredQueueStartPos(startPosition);
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
try {
locateRecoveredPaths();
break;
} catch (IOException e) {
LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
numRetries++;
}
}
}
startWALReaderThread(n, handler, startPosition);
}
// If this is a recovered queue, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos(long startPosition) {
try {
startPosition =
(replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ startPosition);
}
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
}
return startPosition;
}
// start a background thread to read and batch entries
private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHandler handler,
long startPosition) {
ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
startPosition, fs, conf, readerFilter, metrics);
Threads.setDaemonThreadRunning(entryReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
handler);
}
// Loops through the recovered queue and tries to find the location of each log
// this is necessary because the logs may have moved before recovery was initiated
private void locateRecoveredPaths() throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =
new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
pathsLoop: for (Path path : queue) {
if (fs.exists(path)) { // still in same location, don't need to do anything
newPaths.add(path);
continue;
}
// Path changed - try to find the right path.
hasPathChanged = true;
if (stopper instanceof ReplicationSyncUp.DummyServer) {
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
Path newPath = getReplSyncUpPath(path);
newPaths.add(newPath);
continue;
} else {
// See if Path exists in the dead RS folder (there could be a chain of failures
// to look at)
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
final Path walDir = FSUtils.getWALRootDir(conf);
for (String curDeadServerName : deadRegionServers) {
final Path deadRsDirectory =
new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
for (Path possibleLogLocation : locs) {
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
if (manager.getFs().exists(possibleLogLocation)) {
// We found the right new location
LOG.info("Log " + path + " still exists at " + possibleLogLocation);
newPaths.add(possibleLogLocation);
continue pathsLoop;
}
}
}
// didn't find a new location
LOG.error(
String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
newPaths.add(path);
}
}
if (hasPathChanged) {
if (newPaths.size() != queue.size()) { // this shouldn't happen
LOG.error("Recovery queue size is incorrect");
throw new IOException("Recovery queue size error");
}
// put the correct locations in the queue
// since this is a recovered queue with no new incoming logs,
// there shouldn't be any concurrency issues
queue.clear();
for (Path path : newPaths) {
queue.add(path);
}
}
}
// N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
// area rather than to the wal area for a particular region server.
private Path getReplSyncUpPath(Path path) throws IOException {
FileStatus[] rss = fs.listStatus(manager.getLogDir());
for (FileStatus rs : rss) {
Path p = rs.getPath();
FileStatus[] logs = fs.listStatus(p);
for (FileStatus log : logs) {
p = new Path(p, log.getPath().getName());
if (p.getName().equals(path.getName())) {
LOG.info("Log " + p.getName() + " found at " + p);
return p;
}
}
}
LOG.error("Didn't find path for: " + path.getName());
return path;
}
public Path getCurrentPath() {
return this.currentPath;
}
public long getCurrentPosition() {
return this.lastLoggedPosition;
}
private boolean isWorkerActive() {
return !stopper.isStopped() && workerRunning && !isInterrupted();
}
private void terminate(String reason, Exception cause) {
if (cause == null) {
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
} else {
LOG.error("Closing worker for wal group " + this.walGroupId
+ " because an error occurred: " + reason, cause);
}
entryReader.interrupt();
Threads.shutdown(entryReader, sleepForRetries);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
}
public void setWorkerRunning(boolean workerRunning) {
entryReader.setReaderRunning(workerRunning);
this.workerRunning = workerRunning;
}
private void releaseBufferQuota(int size) {
totalBufferUsed.addAndGet(-size);
@Override
public void postShipEdits(List<Entry> entries, int batchSize) {
if (throttler.isEnabled()) {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
totalBufferUsed.addAndGet(-batchSize);
}
}

View File

@ -0,0 +1,55 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
/**
* Constructs a {@link ReplicationSourceInterface}
*/
@InterfaceAudience.Private
public class ReplicationSourceFactory {
private static final Log LOG = LogFactory.getLog(ReplicationSourceFactory.class);
static ReplicationSourceInterface create(Configuration conf, String peerId) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
ReplicationSourceInterface src;
try {
String defaultReplicationSourceImpl =
isQueueRecovered ? RecoveredReplicationSource.class.getCanonicalName()
: ReplicationSource.class.getCanonicalName();
@SuppressWarnings("rawtypes")
Class c = Class.forName(
conf.get("replication.replicationsource.implementation", defaultReplicationSourceImpl));
src = (ReplicationSourceInterface) c.newInstance();
} catch (Exception e) {
LOG.warn("Passed replication source implementation throws errors, "
+ "defaulting to ReplicationSource",
e);
src = isQueueRecovered ? new RecoveredReplicationSource() : new ReplicationSource();
}
return src;
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Interface that defines a replication source
@ -65,10 +66,15 @@ public interface ReplicationSourceInterface {
void enqueueLog(Path log);
/**
* Get the current log that's replicated
* @return the current log
* Add hfile names to the queue to be replicated.
* @param tableName Name of the table these files belongs to
* @param family Name of the family these files belong to
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
* will be added in the queue for replication}
* @throws ReplicationException If failed to add hfile references
*/
Path getCurrentPath();
void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException;
/**
* Start the replication
@ -88,6 +94,12 @@ public interface ReplicationSourceInterface {
*/
void terminate(String reason, Exception cause);
/**
* Get the current log that's replicated
* @return the current log
*/
Path getCurrentPath();
/**
* Get the id that the source is replicating to
*
@ -98,9 +110,9 @@ public interface ReplicationSourceInterface {
/**
* Get the id that the source is replicating to.
*
* @return peer cluster id
* @return peer id
*/
String getPeerClusterId();
String getPeerId();
/**
* Get a string representation of the current statistics
@ -110,14 +122,41 @@ public interface ReplicationSourceInterface {
String getStats();
/**
* Add hfile names to the queue to be replicated.
* @param tableName Name of the table these files belongs to
* @param family Name of the family these files belong to
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
* will be added in the queue for replication}
* @throws ReplicationException If failed to add hfile references
* @return peer enabled or not
*/
void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException;
boolean isPeerEnabled();
/**
* @return active or not
*/
boolean isSourceActive();
/**
* @return metrics of this replication source
*/
MetricsSource getSourceMetrics();
/**
* @return the replication endpoint used by this replication source
*/
ReplicationEndpoint getReplicationEndpoint();
/**
* @return the replication source manager
*/
ReplicationSourceManager getSourceManager();
/**
* Try to throttle when the peer config with a bandwidth
* @param batchSize entries size will be pushed
* @throws InterruptedException
*/
void tryThrottle(int batchSize) throws InterruptedException;
/**
* Call this after the shipper thread ship some entries to peer cluster.
* @param entries pushed
* @param batchSize entries size pushed
*/
void postShipEdits(List<Entry> entries, int batchSize);
}

View File

@ -464,17 +464,8 @@ public class ReplicationSourceManager implements ReplicationListener {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
ReplicationSourceInterface src;
try {
@SuppressWarnings("rawtypes")
Class c = Class.forName(conf.get("replication.replicationsource.implementation",
ReplicationSource.class.getCanonicalName()));
src = (ReplicationSourceInterface) c.newInstance();
} catch (Exception e) {
LOG.warn("Passed replication source implementation throws errors, " +
"defaulting to ReplicationSource", e);
src = new ReplicationSource();
}
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId);
ReplicationEndpoint replicationEndpoint = null;
try {
@ -575,7 +566,7 @@ public class ReplicationSourceManager implements ReplicationListener {
synchronized (oldsources) {
// First close all the recovered sources for this peer
for (ReplicationSourceInterface src : oldsources) {
if (id.equals(src.getPeerClusterId())) {
if (id.equals(src.getPeerId())) {
oldSourcesToDelete.add(src);
}
}
@ -591,7 +582,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronize on replicationPeers to avoid adding source for the to-be-removed peer
synchronized (this.replicationPeers) {
for (ReplicationSourceInterface src : this.sources) {
if (id.equals(src.getPeerClusterId())) {
if (id.equals(src.getPeerId())) {
srcToRemove.add(src);
}
}
@ -752,7 +743,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer
synchronized (oldsources) {
if (!this.rp.getConnectedPeerIds().contains(src.getPeerClusterId())) {
if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
closeRecoveredQueue(src);
continue;
@ -834,11 +825,11 @@ public class ReplicationSourceManager implements ReplicationListener {
public String getStats() {
StringBuffer stats = new StringBuffer();
for (ReplicationSourceInterface source : sources) {
stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n");
}
for (ReplicationSourceInterface oldSource : oldsources) {
stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId()+": ");
stats.append(oldSource.getStats()+ "\n");
}
return stats.toString();

View File

@ -0,0 +1,336 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
* This thread reads entries from a queue and ships them. Entries are placed onto the queue by
* ReplicationSourceWALReaderThread
*/
@InterfaceAudience.Private
public class ReplicationSourceShipperThread extends Thread {
private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
protected final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
protected final ReplicationSourceInterface source;
// Last position in the log that we sent to ZooKeeper
protected long lastLoggedPosition = -1;
// Path of the current log
protected volatile Path currentPath;
// Indicates whether this particular worker is running
private boolean workerRunning = true;
protected ReplicationSourceWALReaderThread entryReader;
// How long should we sleep for each retry
protected final long sleepForRetries;
// Maximum number of retries before taking bold actions
protected final int maxRetriesMultiplier;
// Use guava cache to set ttl for each key
private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(
new CacheLoader<String, Boolean>() {
@Override
public Boolean load(String key) throws Exception {
return false;
}
}
);
public ReplicationSourceShipperThread(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
this.conf = conf;
this.walGroupId = walGroupId;
this.queue = queue;
this.source = source;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
}
@Override
public void run() {
// Loop until we close down
while (isActive()) {
int sleepMultiplier = 1;
// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
while (entryReader == null) {
if (sleepForRetries("Replication WAL entry reader thread not initialized",
sleepMultiplier)) {
sleepMultiplier++;
}
}
try {
WALEntryBatch entryBatch = entryReader.take();
for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
waitingUntilCanPush(entry);
}
shipEdits(entryBatch);
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}
}
/**
* Do the shipping logic
*/
protected void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
currentPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
// Save positions to meta table before zk.
updateSerialRepPositions(entryBatch.getLastSeqIds());
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
walGroupId);
}
return;
}
int currentSize = (int) entryBatch.getHeapSize();
while (isActive()) {
try {
try {
source.tryThrottle(currentSize);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping for throttling control");
Thread.currentThread().interrupt();
// current thread might be interrupted to terminate
// directly go back to while() for confirm this
continue;
}
// create replicateContext here, so the entries can be GC'd upon return from this call
// stack
ReplicationEndpoint.ReplicateContext replicateContext =
new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId);
long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
long endTimeNs = System.nanoTime();
if (!replicated) {
continue;
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
if (this.lastLoggedPosition != lastReadPosition) {
//Clean up hfile references
int size = entries.size();
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
// Save positions to meta table before zk.
updateSerialRepPositions(entryBatch.getLastSeqIds());
//Log and clean up WAL logs
updateLogPosition(lastReadPosition);
}
source.postShipEdits(entries, currentSize);
// FIXME check relationship between wal group and overall
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
entryBatch.getNbHFiles());
source.getSourceMetrics().setAgeOfLastShippedOp(
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
+ " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
}
break;
} catch (Exception ex) {
LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
+ org.apache.hadoop.util.StringUtils.stringifyException(ex));
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
}
private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
String key = entry.getKey();
long seq = entry.getValue();
boolean deleteKey = false;
if (seq <= 0) {
// There is a REGION_CLOSE marker, we can not continue skipping after this entry.
deleteKey = true;
seq = -seq;
}
if (!canSkipWaitingSet.getUnchecked(key)) {
try {
source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
} catch (IOException e) {
LOG.error("waitUntilCanBePushed fail", e);
throw new RuntimeException("waitUntilCanBePushed fail");
} catch (InterruptedException e) {
LOG.warn("waitUntilCanBePushed interrupted", e);
Thread.currentThread().interrupt();
}
canSkipWaitingSet.put(key, true);
}
if (deleteKey) {
canSkipWaitingSet.invalidate(key);
}
}
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerId.split("-")[0];
}
List<Cell> cells = edit.getCells();
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
Cell cell = cells.get(i);
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
}
}
}
}
protected void updateLogPosition(long lastReadPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
lastReadPosition, false, false);
lastLoggedPosition = lastReadPosition;
}
private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
try {
MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
source.getPeerId(), lastPositionsForSerialScope);
} catch (IOException e) {
LOG.error("updateReplicationPositions fail", e);
throw new RuntimeException("updateReplicationPositions fail");
}
}
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
+ source.getPeerClusterZnode(), handler);
}
public PriorityBlockingQueue<Path> getLogQueue() {
return this.queue;
}
public Path getCurrentPath() {
return this.currentPath;
}
public long getCurrentPosition() {
return this.lastLoggedPosition;
}
public void setWALReader(ReplicationSourceWALReaderThread entryReader) {
this.entryReader = entryReader;
}
public long getStartPosition() {
return 0;
}
protected boolean isActive() {
return source.isSourceActive() && workerRunning && !isInterrupted();
}
public void setWorkerRunning(boolean workerRunning) {
entryReader.setReaderRunning(workerRunning);
this.workerRunning = workerRunning;
}
/**
* Do the sleeping logic
* @param msg Why we sleep
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
public boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Source that does nothing at all, helpful to test ReplicationSourceManager
@ -82,7 +83,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
public String getPeerClusterId() {
public String getPeerId() {
String[] parts = peerClusterId.split("-", 2);
return parts.length != 1 ?
parts[0] : peerClusterId;
@ -98,4 +99,37 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
throws ReplicationException {
return;
}
@Override
public boolean isPeerEnabled() {
return true;
}
@Override
public boolean isSourceActive() {
return true;
}
@Override
public MetricsSource getSourceMetrics() {
return null;
}
@Override
public ReplicationEndpoint getReplicationEndpoint() {
return null;
}
@Override
public ReplicationSourceManager getSourceManager() {
return manager;
}
@Override
public void tryThrottle(int batchSize) throws InterruptedException {
}
@Override
public void postShipEdits(List<Entry> entries, int batchSize) {
}
}

View File

@ -462,7 +462,7 @@ public abstract class TestReplicationSourceManager {
// Make sure that the replication source was not initialized
List<ReplicationSourceInterface> sources = manager.getSources();
for (ReplicationSourceInterface source : sources) {
assertNotEquals("FakePeer", source.getPeerClusterId());
assertNotEquals("FakePeer", source.getPeerId());
}
// Create a replication queue for the fake peer