HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
000e64abda
commit
650350a749
|
@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -182,7 +184,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
private void writeInfoFile(FileSystem fs) throws IOException {
|
||||
private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException {
|
||||
// Record the info of this run. Currently only record the time we run the job. We will use this
|
||||
// timestamp to clean up the data for last sequence ids and hfile refs in replication queue
|
||||
// storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
|
||||
|
@ -190,11 +192,48 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
|||
new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
|
||||
String json = JsonMapper.writeObjectAsString(info);
|
||||
Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
|
||||
try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) {
|
||||
try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) {
|
||||
out.write(Bytes.toBytes(json));
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean parseOpts(String args[]) {
|
||||
LinkedList<String> argv = new LinkedList<>();
|
||||
argv.addAll(Arrays.asList(args));
|
||||
String cmd = null;
|
||||
while ((cmd = argv.poll()) != null) {
|
||||
if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
|
||||
printUsageAndExit(null, 0);
|
||||
}
|
||||
if (cmd.equals("-f")) {
|
||||
return true;
|
||||
}
|
||||
if (!argv.isEmpty()) {
|
||||
printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static void printUsageAndExit(final String message, final int exitCode) {
|
||||
printUsage(message);
|
||||
System.exit(exitCode);
|
||||
}
|
||||
|
||||
private static void printUsage(final String message) {
|
||||
if (message != null && message.length() > 0) {
|
||||
System.err.println(message);
|
||||
}
|
||||
System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\");
|
||||
System.err.println(" <OPTIONS> [-D<property=value>]*");
|
||||
System.err.println();
|
||||
System.err.println("General Options:");
|
||||
System.err.println(" -h|--h|--help Show this help and exit.");
|
||||
System.err
|
||||
.println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. "
|
||||
+ "See HBASE-27623 for details.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
Abortable abortable = new Abortable() {
|
||||
|
@ -217,6 +256,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
|||
return abort;
|
||||
}
|
||||
};
|
||||
boolean isForce = parseOpts(args);
|
||||
Configuration conf = getConf();
|
||||
try (ZKWatcher zkw = new ZKWatcher(conf,
|
||||
"syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) {
|
||||
|
@ -226,7 +266,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
|||
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
|
||||
|
||||
System.out.println("Start Replication Server");
|
||||
writeInfoFile(fs);
|
||||
writeInfoFile(fs, isForce);
|
||||
Replication replication = new Replication();
|
||||
// use offline table replication queue storage
|
||||
getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
|
||||
|
|
|
@ -27,6 +27,8 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -300,4 +302,38 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
|
|||
assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
|
||||
rowCountHt2TargetAtPeer1);
|
||||
}
|
||||
|
||||
/**
|
||||
* test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details.
|
||||
*/
|
||||
@Test
|
||||
public void testStartANewSyncUpToolAfterFailed() throws Exception {
|
||||
// Start syncUpTool for the first time with non-force mode,
|
||||
// let's assume that this will fail in sync data,
|
||||
// this does not affect our test results
|
||||
syncUp(UTIL1);
|
||||
Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
|
||||
Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
|
||||
Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE);
|
||||
FileSystem fs = UTIL1.getTestFileSystem();
|
||||
assertTrue(fs.exists(replicationInfoPath));
|
||||
FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath);
|
||||
|
||||
// Start syncUpTool for the second time with non-force mode,
|
||||
// startup will fail because replication info file already exists
|
||||
try {
|
||||
syncUp(UTIL1);
|
||||
} catch (Exception e) {
|
||||
assertTrue("e should be a FileAlreadyExistsException",
|
||||
(e instanceof FileAlreadyExistsException));
|
||||
}
|
||||
FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath);
|
||||
assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime());
|
||||
|
||||
// Start syncUpTool for the third time with force mode,
|
||||
// startup will success and create a new replication info file
|
||||
syncUp(UTIL1, new String[] { "-f" });
|
||||
FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath);
|
||||
assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -136,8 +136,11 @@ public abstract class TestReplicationSyncUpToolBase {
|
|||
}
|
||||
|
||||
final void syncUp(HBaseTestingUtil util) throws Exception {
|
||||
ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(),
|
||||
new String[0]);
|
||||
syncUp(util, new String[0]);
|
||||
}
|
||||
|
||||
final void syncUp(HBaseTestingUtil util, String[] args) throws Exception {
|
||||
ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), args);
|
||||
}
|
||||
|
||||
// Utilities that manager shutdown / restart of source / sink clusters. They take care of
|
||||
|
|
Loading…
Reference in New Issue