HBASE-20456 Support removing a ReplicationSourceShipper for a special wal group

This commit is contained in:
zhangduo 2018-04-24 22:01:21 +08:00
parent 66cced16dc
commit b281328228
13 changed files with 163 additions and 50 deletions

View File

@ -682,6 +682,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
protected void doShutdown() throws IOException {
waitForSafePoint();
closeWriter(this.writer);
this.writer = null;
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -144,15 +143,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
void tryFinish() {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished());
if (allTasksDone) {
if (workerThreads.isEmpty()) {
this.getSourceMetrics().clear();
manager.removeRecoveredSource(this);
LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats());
}
manager.finishRecoveredSource(this);
}
}

View File

@ -47,13 +47,6 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
this.replicationQueues = queueStorage;
}
@Override
protected void noMoreData() {
LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
}
@Override
protected void postFinish() {
source.tryFinish();

View File

@ -62,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@ -120,6 +121,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private long defaultBandwidth;
private long currentBandwidth;
private WALFileLengthProvider walFileLengthProvider;
@VisibleForTesting
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();
@ -190,6 +192,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
// the shipper may quit immediately
queue.put(log);
queues.put(logPrefix, queue);
if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
@ -197,8 +202,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(logPrefix, queue);
}
}
} else {
queue.put(log);
}
this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the warn threshold
int queueSize = queue.size();
@ -633,5 +640,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
return queueStorage;
}
void removeWorker(ReplicationSourceShipper worker) {
workerThreads.remove(worker.walGroupId, worker);
}
}

View File

@ -444,12 +444,25 @@ public class ReplicationSourceManager implements ReplicationListener {
* Clear the metrics and related replication queue of the specified old source
* @param src source to clear
*/
void removeRecoveredSource(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getQueueId());
this.oldsources.remove(src);
private boolean removeRecoveredSource(ReplicationSourceInterface src) {
if (!this.oldsources.remove(src)) {
return false;
}
LOG.info("Done with the recovered queue {}", src.getQueueId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
this.walsByIdRecoveredQueues.remove(src.getQueueId());
return true;
}
void finishRecoveredSource(ReplicationSourceInterface src) {
synchronized (oldsources) {
if (!removeRecoveredSource(src)) {
return;
}
}
LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
src.getStats());
}
/**

View File

@ -51,13 +51,13 @@ public class ReplicationSourceShipper extends Thread {
public enum WorkerState {
RUNNING,
STOPPED,
FINISHED, // The worker is done processing a recovered queue
FINISHED, // The worker is done processing a queue
}
private final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
private final ReplicationSourceInterface source;
private final ReplicationSource source;
// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
@ -74,7 +74,7 @@ public class ReplicationSourceShipper extends Thread {
protected final int maxRetriesMultiplier;
public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
this.conf = conf;
this.walGroupId = walGroupId;
this.queue = queue;
@ -99,7 +99,7 @@ public class ReplicationSourceShipper extends Thread {
}
try {
WALEntryBatch entryBatch = entryReader.take();
// the NO_MORE_DATA instance has no path so do not all shipEdits
// the NO_MORE_DATA instance has no path so do not call shipEdits
if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
noMoreData();
} else {
@ -114,12 +114,20 @@ public class ReplicationSourceShipper extends Thread {
if (!isFinished()) {
setWorkerState(WorkerState.STOPPED);
} else {
source.removeWorker(this);
postFinish();
}
}
// To be implemented by recovered shipper
protected void noMoreData() {
private void noMoreData() {
if (source.isRecovered()) {
LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId,
source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
} else {
LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId());
}
setWorkerState(WorkerState.FINISHED);
}
// To be implemented by recovered shipper

View File

@ -143,7 +143,7 @@ class ReplicationSourceWALReader extends Thread {
entryBatchQueue.put(batch);
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch(entryStream.getCurrentPath());
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
}
}
@ -227,10 +227,11 @@ class ReplicationSourceWALReader extends Thread {
return batch;
}
private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
private void handleEmptyWALEntryBatch() throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
if (source.isRecovered()) {
// we're done with queue recovery, shut ourself down
if (logQueue.isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state.
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);

View File

@ -304,7 +304,8 @@ class WALEntryStream implements Closeable {
return true;
}
} else {
// no more files in queue, this could only happen for recovered queue.
// no more files in queue, this could happen for recovered queue, or for a wal group of a sync
// replication peer which has already been transited to DA or S.
setCurrentPath(null);
}
return false;

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
@ -247,26 +248,30 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
if (walName == null) {
throw new IllegalArgumentException("The WAL path couldn't be null");
}
final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2 : 1)]);
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(walName.getName());
if (matcher.matches()) {
return Long.parseLong(matcher.group(2));
} else {
throw new IllegalArgumentException(walName.getName() + " is not a valid wal file name");
}
}
/**
* Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for
* description.
*/
private static final Pattern pattern =
Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*");
private static final Pattern WAL_FILE_NAME_PATTERN =
Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?");
/**
* A WAL file name is of the format: &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}
* &lt;file-creation-timestamp&gt;[.meta]. provider-name is usually made up of a server-name and a
* provider-id
* &lt;file-creation-timestamp&gt;[.&lt;suffix&gt;]. provider-name is usually made up of a
* server-name and a provider-id
* @param filename name of the file to validate
* @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise
*/
public static boolean validateWALFilename(String filename) {
return pattern.matcher(filename).matches();
return WAL_FILE_NAME_PATTERN.matcher(filename).matches();
}
/**
@ -517,10 +522,15 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* log_prefix.filenumber.log_suffix
* @param name Name of the WAL to parse
* @return prefix of the log
* @throws IllegalArgumentException if the name passed in is not a valid wal file name
* @see AbstractFSWAL#getCurrentFileName()
*/
public static String getWALPrefixFromWALName(String name) {
int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
return name.substring(0, endIndex);
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
if (matcher.matches()) {
return matcher.group(1);
} else {
throw new IllegalArgumentException(name + " is not a valid wal file name");
}
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@ -113,8 +113,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
channelClass = eventLoopGroupAndChannelClass.getSecond();
}
// Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then
// back to A, the log prefix will be changed. This is used to simplify the implementation for
// replication source, where we do not need to consider that a terminated shipper could be added
// back.
private String getLogPrefix(String peerId) {
return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId;
return factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + "-" + peerId;
}
private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
@ -250,7 +254,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
SyncReplicationState to, int stage) {
if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE) {
if (from == SyncReplicationState.ACTIVE) {
if (stage == 0) {
Lock lock = createLock.acquireLock(peerId);
try {

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
@ -393,8 +394,8 @@ public abstract class TestReplicationSourceManager {
// populate some znodes in the peer znode
SortedSet<String> files = new TreeSet<>();
String group = "testgroup";
String file1 = group + ".log1";
String file2 = group + ".log2";
String file1 = group + "." + EnvironmentEdgeManager.currentTime() + ".log1";
String file2 = group + "." + EnvironmentEdgeManager.currentTime() + ".log2";
files.add(file1);
files.add(file2);
for (String file : files) {

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Testcase for HBASE-20456.
*/
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplicationShipperQuit extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationShipperQuit.class);
@Test
public void testShipperQuitWhenDA() throws Exception {
// set to serial replication
UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
.newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
.newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
DualAsyncFSWAL wal =
(DualAsyncFSWAL) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
String walGroupId =
AbstractFSWALProvider.getWALPrefixFromWALName(wal.getCurrentFileName().getName());
ReplicationSourceShipper shipper =
((ReplicationSource) ((Replication) rs.getReplicationSourceService()).getReplicationManager()
.getSource(PEER_ID)).workerThreads.get(walGroupId);
assertFalse(shipper.isFinished());
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
writeAndVerifyReplication(UTIL1, UTIL2, 100, 200);
ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService())
.getReplicationManager().getSource(PEER_ID);
// the peer is serial so here we can make sure that the previous wals have already been
// replicated, and finally the shipper should be removed from the worker pool
UTIL1.waitFor(10000, () -> !source.workerThreads.containsKey(walGroupId));
assertTrue(shipper.isFinished());
}
}

View File

@ -413,9 +413,7 @@ public class TestWALEntryStream {
batch = reader.take();
assertEquals(walPath, batch.getLastWalPath());
assertEquals(5, batch.getNbEntries());
// Actually this should be true but we haven't handled this yet since for a normal queue the
// last one is always open... Not a big deal for now.
assertFalse(batch.isEndOfFile());
assertTrue(batch.isEndOfFile());
assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
}