HBASE-27218 Support rolling upgrading (#4808)

Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
Duo Zhang 2022-11-06 16:57:11 +08:00 committed by Duo Zhang
parent 5f95a914b6
commit 0d57ee147e
22 changed files with 1917 additions and 16 deletions

View File

@ -220,7 +220,11 @@ public class ZNodePaths {
* @param suffix ending of znode name * @param suffix ending of znode name
* @return result of properly joining prefix with suffix * @return result of properly joining prefix with suffix
*/ */
public static String joinZNode(String prefix, String suffix) { public static String joinZNode(String prefix, String... suffix) {
return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix; StringBuilder sb = new StringBuilder(prefix);
for (String s : suffix) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s);
}
return sb.toString();
} }
} }

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.metrics.Counter; import org.apache.hadoop.hbase.metrics.Counter;
import org.apache.hadoop.hbase.metrics.Histogram; import org.apache.hadoop.hbase.metrics.Histogram;
@ -33,6 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/** /**
@ -1011,6 +1013,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
releaseLock(env); releaseLock(env);
} }
protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
throws ProcedureSuspendedException {
if (jitter) {
// 10% possible jitter
double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
timeoutMillis += add;
}
setTimeout(timeoutMillis);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}
@Override @Override
public int compareTo(final Procedure<TEnvironment> other) { public int compareTo(final Procedure<TEnvironment> other) {
return Long.compare(getProcId(), other.getProcId()); return Long.compare(getProcId(), other.getProcId());

View File

@ -722,3 +722,15 @@ enum AssignReplicationQueuesState {
message AssignReplicationQueuesStateData { message AssignReplicationQueuesStateData {
required ServerName crashed_server = 1; required ServerName crashed_server = 1;
} }
enum MigrateReplicationQueueFromZkToTableState {
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
}
message MigrateReplicationQueueFromZkToTableStateData {
repeated string disabled_peer_id = 1;
}

View File

@ -104,6 +104,16 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -184,4 +185,22 @@ public interface ReplicationQueueStorage {
* @return Whether the replication queue table exists * @return Whether the replication queue table exists
*/ */
boolean hasData() throws ReplicationException; boolean hasData() throws ReplicationException;
// the below 3 methods are used for migrating
/**
* Update the replication queue datas for a given region server.
*/
void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
throws ReplicationException;
/**
* Update last pushed sequence id for the given regions and peers.
*/
void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
throws ReplicationException;
/**
* Add the given hfile refs to the given peer.
*/
void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
} }

View File

@ -21,12 +21,14 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Set; import java.util.Set;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -74,12 +77,6 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
private final TableName tableName; private final TableName tableName;
@FunctionalInterface
private interface TableCreator {
void create() throws IOException;
}
public TableReplicationQueueStorage(Connection conn, TableName tableName) { public TableReplicationQueueStorage(Connection conn, TableName tableName) {
this.conn = conn; this.conn = conn;
this.tableName = tableName; this.tableName = tableName;
@ -541,4 +538,60 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
throw new ReplicationException("failed to get replication queue table", e); throw new ReplicationException("failed to get replication queue table", e);
} }
} }
@Override
public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
throws ReplicationException {
List<Put> puts = new ArrayList<>();
for (ReplicationQueueData data : datas) {
if (data.getOffsets().isEmpty()) {
continue;
}
Put put = new Put(Bytes.toBytes(data.getId().toString()));
data.getOffsets().forEach((walGroup, offset) -> {
put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
});
puts.add(put);
}
try (Table table = conn.getTable(tableName)) {
table.put(puts);
} catch (IOException e) {
throw new ReplicationException("failed to batch update queues", e);
}
}
@Override
public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
throws ReplicationException {
Map<String, Put> peerId2Put = new HashMap<>();
for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
peerId2Put
.computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
.addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
}
try (Table table = conn.getTable(tableName)) {
table
.put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
} catch (IOException e) {
throw new ReplicationException("failed to batch update last pushed sequence ids", e);
}
}
@Override
public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
throws ReplicationException {
if (hfileRefs.isEmpty()) {
return;
}
Put put = new Put(Bytes.toBytes(peerId));
for (String ref : hfileRefs) {
put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
}
try (Table table = conn.getTable(tableName)) {
table.put(put);
} catch (IOException e) {
throw new ReplicationException("failed to batch update hfile references", e);
}
}
} }

View File

@ -0,0 +1,351 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import com.google.errorprone.annotations.RestrictedApi;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
/**
* Just retain a small set of the methods for the old zookeeper based replication queue storage, for
* migrating.
*/
@InterfaceAudience.Private
public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase {
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
"zookeeper.znode.replication.hfile.refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
"zookeeper.znode.replication.regions";
public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
/**
* The name of the znode that contains all replication queues
*/
private final String queuesZNode;
/**
* The name of the znode that contains queues of hfile references to be replicated
*/
private final String hfileRefsZNode;
private final String regionsZNode;
public ZKReplicationQueueStorageForMigration(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
}
public interface MigrationIterator<T> {
T next() throws Exception;
}
@SuppressWarnings("rawtypes")
private static final MigrationIterator EMPTY_ITER = new MigrationIterator() {
@Override
public Object next() {
return null;
}
};
public static final class ZkReplicationQueueData {
private final ReplicationQueueId queueId;
private final Map<String, Long> walOffsets;
public ZkReplicationQueueData(ReplicationQueueId queueId, Map<String, Long> walOffsets) {
this.queueId = queueId;
this.walOffsets = walOffsets;
}
public ReplicationQueueId getQueueId() {
return queueId;
}
public Map<String, Long> getWalOffsets() {
return walOffsets;
}
}
private String getRsNode(ServerName serverName) {
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}
private String getQueueNode(ServerName serverName, String queueId) {
return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
}
private String getFileNode(String queueNode, String fileName) {
return ZNodePaths.joinZNode(queueNode, fileName);
}
private String getFileNode(ServerName serverName, String queueId, String fileName) {
return getFileNode(getQueueNode(serverName, queueId), fileName);
}
@SuppressWarnings("unchecked")
public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> listAllQueues()
throws KeeperException {
List<String> replicators = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
if (replicators == null || replicators.isEmpty()) {
ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
return EMPTY_ITER;
}
Iterator<String> iter = replicators.iterator();
return new MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>>() {
private ServerName previousServerName;
@Override
public Pair<ServerName, List<ZkReplicationQueueData>> next() throws Exception {
if (previousServerName != null) {
ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName));
}
if (!iter.hasNext()) {
ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
return null;
}
String replicator = iter.next();
ServerName serverName = ServerName.parseServerName(replicator);
previousServerName = serverName;
List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
if (queueIdList == null || queueIdList.isEmpty()) {
return Pair.newPair(serverName, Collections.emptyList());
}
List<ZkReplicationQueueData> queueDataList = new ArrayList<>(queueIdList.size());
for (String queueIdStr : queueIdList) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr);
ReplicationQueueId queueId;
if (queueInfo.getDeadRegionServers().isEmpty()) {
queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId());
} else {
queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId(),
queueInfo.getDeadRegionServers().get(0));
}
List<String> wals =
ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueIdStr));
ZkReplicationQueueData queueData;
if (wals == null || wals.isEmpty()) {
queueData = new ZkReplicationQueueData(queueId, Collections.emptyMap());
} else {
Map<String, Long> walOffsets = new HashMap<>();
for (String wal : wals) {
byte[] data = ZKUtil.getData(zookeeper, getFileNode(serverName, queueIdStr, wal));
if (data == null || data.length == 0) {
walOffsets.put(wal, 0L);
} else {
walOffsets.put(wal, ZKUtil.parseWALPositionFrom(data));
}
}
queueData = new ZkReplicationQueueData(queueId, walOffsets);
}
queueDataList.add(queueData);
}
return Pair.newPair(serverName, queueDataList);
}
};
}
public static final class ZkLastPushedSeqId {
private final String encodedRegionName;
private final String peerId;
private final long lastPushedSeqId;
ZkLastPushedSeqId(String encodedRegionName, String peerId, long lastPushedSeqId) {
this.encodedRegionName = encodedRegionName;
this.peerId = peerId;
this.lastPushedSeqId = lastPushedSeqId;
}
public String getEncodedRegionName() {
return encodedRegionName;
}
public String getPeerId() {
return peerId;
}
public long getLastPushedSeqId() {
return lastPushedSeqId;
}
}
@SuppressWarnings("unchecked")
public MigrationIterator<List<ZkLastPushedSeqId>> listAllLastPushedSeqIds()
throws KeeperException {
List<String> level1Prefixs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
if (level1Prefixs == null || level1Prefixs.isEmpty()) {
ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
return EMPTY_ITER;
}
Iterator<String> level1Iter = level1Prefixs.iterator();
return new MigrationIterator<List<ZkLastPushedSeqId>>() {
private String level1Prefix;
private Iterator<String> level2Iter;
private String level2Prefix;
@Override
public List<ZkLastPushedSeqId> next() throws Exception {
for (;;) {
if (level2Iter == null || !level2Iter.hasNext()) {
if (!level1Iter.hasNext()) {
ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
return null;
}
if (level1Prefix != null) {
// this will also delete the previous level2Prefix which is under this level1Prefix
ZKUtil.deleteNodeRecursively(zookeeper,
ZNodePaths.joinZNode(regionsZNode, level1Prefix));
}
level1Prefix = level1Iter.next();
List<String> level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper,
ZNodePaths.joinZNode(regionsZNode, level1Prefix));
if (level2Prefixes != null) {
level2Iter = level2Prefixes.iterator();
// reset level2Prefix as we have switched level1Prefix, otherwise the below delete
// level2Prefix section will delete the znode with this level2Prefix under the new
// level1Prefix
level2Prefix = null;
}
} else {
if (level2Prefix != null) {
ZKUtil.deleteNodeRecursively(zookeeper,
ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
}
level2Prefix = level2Iter.next();
List<String> encodedRegionNameAndPeerIds = ZKUtil.listChildrenNoWatch(zookeeper,
ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
if (encodedRegionNameAndPeerIds == null || encodedRegionNameAndPeerIds.isEmpty()) {
return Collections.emptyList();
}
List<ZkLastPushedSeqId> lastPushedSeqIds = new ArrayList<>();
for (String encodedRegionNameAndPeerId : encodedRegionNameAndPeerIds) {
byte[] data = ZKUtil.getData(zookeeper, ZNodePaths.joinZNode(regionsZNode,
level1Prefix, level2Prefix, encodedRegionNameAndPeerId));
long lastPushedSeqId = ZKUtil.parseWALPositionFrom(data);
Iterator<String> iter = Splitter.on('-').split(encodedRegionNameAndPeerId).iterator();
String encodedRegionName = level1Prefix + level2Prefix + iter.next();
String peerId = iter.next();
lastPushedSeqIds
.add(new ZkLastPushedSeqId(encodedRegionName, peerId, lastPushedSeqId));
}
return Collections.unmodifiableList(lastPushedSeqIds);
}
}
}
};
}
private String getHFileRefsPeerNode(String peerId) {
return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
}
/**
* Pair&lt;PeerId, List&lt;HFileRefs&gt;&gt;
*/
@SuppressWarnings("unchecked")
public MigrationIterator<Pair<String, List<String>>> listAllHFileRefs() throws KeeperException {
List<String> peerIds = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
if (peerIds == null || peerIds.isEmpty()) {
ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
return EMPTY_ITER;
}
Iterator<String> iter = peerIds.iterator();
return new MigrationIterator<Pair<String, List<String>>>() {
private String previousPeerId;
@Override
public Pair<String, List<String>> next() throws KeeperException {
if (previousPeerId != null) {
ZKUtil.deleteNodeRecursively(zookeeper, getHFileRefsPeerNode(previousPeerId));
}
if (!iter.hasNext()) {
ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
return null;
}
String peerId = iter.next();
List<String> refs = ZKUtil.listChildrenNoWatch(zookeeper, getHFileRefsPeerNode(peerId));
previousPeerId = peerId;
return Pair.newPair(peerId, refs != null ? refs : Collections.emptyList());
}
};
}
public boolean hasData() throws KeeperException {
return ZKUtil.checkExists(zookeeper, queuesZNode) != -1
|| ZKUtil.checkExists(zookeeper, regionsZNode) != -1
|| ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1;
}
public void deleteAllData() throws KeeperException {
ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
String getQueuesZNode() {
return queuesZNode;
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
String getHfileRefsZNode() {
return hfileRefsZNode;
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
String getRegionsZNode() {
return regionsZNode;
}
}

View File

@ -0,0 +1,317 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestZKReplicationQueueStorage {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
private ZKWatcher zk;
private ZKReplicationQueueStorageForMigration storage;
@Rule
public final TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniZKCluster();
}
@AfterClass
public static void tearDownAfterClass() throws IOException {
UTIL.shutdownMiniZKCluster();
}
@Before
public void setUp() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName());
zk = new ZKWatcher(conf, name.getMethodName(), null);
storage = new ZKReplicationQueueStorageForMigration(zk, conf);
}
@After
public void tearDown() throws Exception {
ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode);
Closeables.close(zk, true);
}
public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers,
String peerId, ServerName deadServer) throws KeeperException {
ZKWatcher zk = storage.zookeeper;
for (int i = 0; i < nServers; i++) {
ServerName sn =
ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime());
String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString());
String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId);
ZKUtil.createWithParents(zk, peerZNode);
for (int j = 0; j < i; j++) {
String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j);
ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
}
String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer);
ZKUtil.createWithParents(zk, deadServerPeerZNode);
for (int j = 0; j < i; j++) {
String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j);
if (j > 0) {
ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
} else {
ZKUtil.createWithParents(zk, wal);
}
}
}
ZKUtil.createWithParents(zk,
ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
}
private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName,
String peerId) {
return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2),
encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId);
}
public static Map<String, Set<String>> mockLastPushedSeqIds(
ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions,
int emptyLevel1Count, int emptyLevel2Count) throws KeeperException {
ZKWatcher zk = storage.zookeeper;
Map<String, Set<String>> name2PeerIds = new HashMap<>();
byte[] bytes = new byte[32];
for (int i = 0; i < nRegions; i++) {
ThreadLocalRandom.current().nextBytes(bytes);
String encodeName = MD5Hash.getMD5AsHex(bytes);
String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1);
ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1));
String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2);
ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2));
name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2));
}
int addedEmptyZNodes = 0;
for (int i = 0; i < 256; i++) {
String level1ZNode =
ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i));
if (ZKUtil.checkExists(zk, level1ZNode) == -1) {
ZKUtil.createWithParents(zk, level1ZNode);
addedEmptyZNodes++;
if (addedEmptyZNodes <= emptyLevel2Count) {
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab"));
}
if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) {
break;
}
}
}
return name2PeerIds;
}
public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers)
throws KeeperException {
ZKWatcher zk = storage.zookeeper;
for (int i = 0; i < nPeers; i++) {
String peerId = "peer_" + i;
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId));
for (int j = 0; j < i; j++) {
ZKUtil.createWithParents(zk,
ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j));
}
}
}
@Test
public void testDeleteAllData() throws Exception {
assertFalse(storage.hasData());
ZKUtil.createWithParents(zk, storage.getQueuesZNode());
assertTrue(storage.hasData());
storage.deleteAllData();
assertFalse(storage.hasData());
}
@Test
public void testEmptyIter() throws Exception {
ZKUtil.createWithParents(zk, storage.getQueuesZNode());
ZKUtil.createWithParents(zk, storage.getRegionsZNode());
ZKUtil.createWithParents(zk, storage.getHfileRefsZNode());
assertNull(storage.listAllQueues().next());
assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
assertNull(storage.listAllLastPushedSeqIds().next());
assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
assertNull(storage.listAllHFileRefs().next());
assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
}
@Test
public void testListAllQueues() throws Exception {
String peerId = "1";
ServerName deadServer =
ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
int nServers = 10;
mockQueuesData(storage, nServers, peerId, deadServer);
MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
storage.listAllQueues();
ServerName previousServerName = null;
for (int i = 0; i < nServers + 1; i++) {
Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
assertNotNull(pair);
if (previousServerName != null) {
assertEquals(-1, ZKUtil.checkExists(zk,
ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())));
}
ServerName sn = pair.getFirst();
previousServerName = sn;
if (sn.equals(deadServer)) {
assertThat(pair.getSecond(), empty());
} else {
assertEquals(2, pair.getSecond().size());
int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname())));
ZkReplicationQueueData data0 = pair.getSecond().get(0);
assertEquals(peerId, data0.getQueueId().getPeerId());
assertEquals(sn, data0.getQueueId().getServerName());
assertEquals(n, data0.getWalOffsets().size());
for (int j = 0; j < n; j++) {
assertEquals(j,
data0.getWalOffsets().get(
(data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
.intValue());
}
ZkReplicationQueueData data1 = pair.getSecond().get(1);
assertEquals(peerId, data1.getQueueId().getPeerId());
assertEquals(sn, data1.getQueueId().getServerName());
assertEquals(n, data1.getWalOffsets().size());
for (int j = 0; j < n; j++) {
assertEquals(j,
data1.getWalOffsets().get(
(data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
.intValue());
}
// the order of the returned result is undetermined
if (data0.getQueueId().getSourceServerName().isPresent()) {
assertEquals(deadServer, data0.getQueueId().getSourceServerName().get());
assertFalse(data1.getQueueId().getSourceServerName().isPresent());
} else {
assertEquals(deadServer, data1.getQueueId().getSourceServerName().get());
}
}
}
assertNull(iter.next());
assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
}
@Test
public void testListAllLastPushedSeqIds() throws Exception {
String peerId1 = "1";
String peerId2 = "2";
Map<String, Set<String>> name2PeerIds =
mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10);
MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds();
int emptyListCount = 0;
for (;;) {
List<ZkLastPushedSeqId> list = iter.next();
if (list == null) {
break;
}
if (list.isEmpty()) {
emptyListCount++;
continue;
}
for (ZkLastPushedSeqId seqId : list) {
name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId());
if (seqId.getPeerId().equals(peerId1)) {
assertEquals(1, seqId.getLastPushedSeqId());
} else {
assertEquals(2, seqId.getLastPushedSeqId());
}
}
}
assertEquals(10, emptyListCount);
name2PeerIds.forEach((encodedRegionName, peerIds) -> {
assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
});
assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
}
@Test
public void testListAllHFileRefs() throws Exception {
int nPeers = 10;
mockHFileRefs(storage, nPeers);
MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs();
String previousPeerId = null;
for (int i = 0; i < nPeers; i++) {
Pair<String, List<String>> pair = iter.next();
if (previousPeerId != null) {
assertEquals(-1, ZKUtil.checkExists(zk,
ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId)));
}
String peerId = pair.getFirst();
previousPeerId = peerId;
int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId)));
assertEquals(index, pair.getSecond().size());
}
assertNull(iter.next());
assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
}
}

View File

@ -102,6 +102,12 @@
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-replication</artifactId> <artifactId>hbase-replication</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-replication</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-balancer</artifactId> <artifactId>hbase-balancer</artifactId>

View File

@ -170,6 +170,7 @@ import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerModificationStateStore; import org.apache.hadoop.hbase.master.replication.ReplicationPeerModificationStateStore;
@ -221,6 +222,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator; import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
@ -1058,6 +1060,17 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
this.balancer.initialize(); this.balancer.initialize();
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
// try migrate replication data
ZKReplicationQueueStorageForMigration oldReplicationQueueStorage =
new ZKReplicationQueueStorageForMigration(zooKeeper, conf);
// check whether there are something to migrate and we haven't scheduled a migration procedure
// yet
if (
oldReplicationQueueStorage.hasData() && procedureExecutor.getProcedures().stream()
.allMatch(p -> !(p instanceof MigrateReplicationQueueFromZkToTableProcedure))
) {
procedureExecutor.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
}
// start up all service threads. // start up all service threads.
startupTaskGroup.addTask("Initializing master service threads"); startupTaskGroup.addTask("Initializing master service threads");
startServiceThreads(); startServiceThreads();

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure; import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure;
import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/** /**
* Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
@ -266,6 +268,16 @@ public class ServerCrashProcedure extends
} }
break; break;
case SERVER_CRASH_CLAIM_REPLICATION_QUEUES: case SERVER_CRASH_CLAIM_REPLICATION_QUEUES:
if (
env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
.anyMatch(p -> !p.isFinished())
) {
LOG.info("There is a pending {}, will retry claim replication queue later",
MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName());
suspend(10_000, true);
return Flow.NO_MORE_STATE;
}
addChildProcedure(new AssignReplicationQueuesProcedure(serverName)); addChildProcedure(new AssignReplicationQueuesProcedure(serverName));
setNextState(ServerCrashState.SERVER_CRASH_FINISH); setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break; break;
@ -431,6 +443,13 @@ public class ServerCrashProcedure extends
env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName()); env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName());
} }
@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(this);
return false;
}
@Override @Override
public void toStringClassDetails(StringBuilder sb) { public void toStringClassDetails(StringBuilder sb) {
sb.append(getProcName()); sb.append(getProcName());

View File

@ -98,10 +98,7 @@ public abstract class AbstractPeerNoLockProcedure<TState>
} }
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
backoffConsumer.accept(backoff); backoffConsumer.accept(backoff);
setTimeout(Math.toIntExact(backoff)); throw suspend(Math.toIntExact(backoff), false);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
} }
protected final void resetRetry() { protected final void resetRetry() {

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* A procedure for migrating replication queue data from zookeeper to hbase:replication table.
*/
@InterfaceAudience.Private
public class MigrateReplicationQueueFromZkToTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, MigrateReplicationQueueFromZkToTableState>
implements GlobalProcedureInterface {
private static final Logger LOG =
LoggerFactory.getLogger(MigrateReplicationQueueFromZkToTableProcedure.class);
private static final int MIN_MAJOR_VERSION = 3;
private List<String> disabledPeerIds;
private List<Future<?>> futures;
private ExecutorService executor;
@Override
public String getGlobalId() {
return getClass().getSimpleName();
}
private ExecutorService getExecutorService() {
if (executor == null) {
executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
.setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
}
return executor;
}
private void shutdownExecutorService() {
if (executor != null) {
executor.shutdown();
executor = null;
}
}
private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
long peerProcCount;
try {
peerProcCount = env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
} catch (IOException e) {
LOG.warn("failed to check peer procedure status", e);
throw suspend(5000, true);
}
if (peerProcCount > 0) {
LOG.info("There are still {} pending peer procedures, will sleep and check later",
peerProcCount);
throw suspend(10_000, true);
}
LOG.info("No pending peer procedures found, continue...");
}
@Override
protected Flow executeFromState(MasterProcedureEnv env,
MigrateReplicationQueueFromZkToTableState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
waitUntilNoPeerProcedure(env);
List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
if (peers.isEmpty()) {
LOG.info("No active replication peer found, delete old replication queue data and quit");
ZKReplicationQueueStorageForMigration oldStorage =
new ZKReplicationQueueStorageForMigration(env.getMasterServices().getZooKeeper(),
env.getMasterConfiguration());
try {
oldStorage.deleteAllData();
} catch (KeeperException e) {
LOG.warn("failed to delete old replication queue data, sleep and retry later", e);
suspend(10_000, true);
}
return Flow.NO_MORE_STATE;
}
// here we do not care the peers which have already been disabled, as later we do not need
// to enable them
disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
for (String peerId : disabledPeerIds) {
addChildProcedure(new DisablePeerProcedure(peerId));
}
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
if (futures != null) {
// wait until all futures done
long notDone = futures.stream().filter(f -> !f.isDone()).count();
if (notDone == 0) {
boolean succ = true;
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
succ = false;
LOG.warn("Failed to migrate", e);
}
}
if (succ) {
shutdownExecutorService();
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
return Flow.HAS_MORE_STATE;
}
// reschedule to retry migration again
futures = null;
} else {
LOG.info("There still {} pending migration tasks, will sleep and check later", notDone);
throw suspend(10_000, true);
}
}
try {
futures = env.getReplicationPeerManager()
.migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
} catch (IOException e) {
LOG.warn("failed to submit migration tasks", e);
throw suspend(10_000, true);
}
throw suspend(10_000, true);
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
long rsWithLowerVersion =
env.getMasterServices().getServerManager().getOnlineServers().values().stream()
.filter(sm -> VersionInfo.getMajorVersion(sm.getVersion()) < MIN_MAJOR_VERSION).count();
if (rsWithLowerVersion == 0) {
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
return Flow.HAS_MORE_STATE;
} else {
LOG.info("There are still {} region servers which have a major version less than {}, "
+ "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION);
throw suspend(10_000, true);
}
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
for (String peerId : disabledPeerIds) {
addChildProcedure(new EnablePeerProcedure(peerId));
}
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(this);
return false;
}
@Override
protected void rollbackState(MasterProcedureEnv env,
MigrateReplicationQueueFromZkToTableState state) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected MigrateReplicationQueueFromZkToTableState getState(int stateId) {
return MigrateReplicationQueueFromZkToTableState.forNumber(stateId);
}
@Override
protected int getStateId(MigrateReplicationQueueFromZkToTableState state) {
return state.getNumber();
}
@Override
protected MigrateReplicationQueueFromZkToTableState getInitialState() {
return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
MigrateReplicationQueueFromZkToTableStateData.Builder builder =
MigrateReplicationQueueFromZkToTableStateData.newBuilder();
if (disabledPeerIds != null) {
builder.addAllDisabledPeerId(disabledPeerIds);
}
serializer.serialize(builder.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
MigrateReplicationQueueFromZkToTableStateData data =
serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class);
disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList());
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -152,12 +154,36 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
} }
} }
private boolean shouldFailForMigrating(MasterProcedureEnv env) throws IOException {
long parentProcId = getParentProcId();
if (
parentProcId != Procedure.NO_PROC_ID && env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(parentProcId) instanceof MigrateReplicationQueueFromZkToTableProcedure
) {
// this is scheduled by MigrateReplicationQueueFromZkToTableProcedure, should not fail it
return false;
}
return env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
.anyMatch(p -> !p.isFinished());
}
@Override @Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, InterruptedException { throws ProcedureSuspendedException, InterruptedException {
switch (state) { switch (state) {
case PRE_PEER_MODIFICATION: case PRE_PEER_MODIFICATION:
try { try {
if (shouldFailForMigrating(env)) {
LOG.info("There is a pending {}, give up execution of {}",
MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
getClass().getName());
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer",
new DoNotRetryIOException("There is a pending "
+ MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
releaseLatch(env);
return Flow.NO_MORE_STATE;
}
checkPeerModificationEnabled(env); checkPeerModificationEnabled(env);
prePeerModification(env); prePeerModification(env);
} catch (IOException e) { } catch (IOException e) {

View File

@ -21,14 +21,18 @@ import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -40,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
@ -51,17 +56,24 @@ import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -116,7 +128,7 @@ public class ReplicationPeerManager implements ConfigurationObserver {
private final ZKWatcher zk; private final ZKWatcher zk;
@FunctionalInterface @FunctionalInterface
private interface ReplicationQueueStorageInitializer { interface ReplicationQueueStorageInitializer {
void initialize() throws IOException; void initialize() throws IOException;
} }
@ -151,6 +163,10 @@ public class ReplicationPeerManager implements ConfigurationObserver {
} }
} }
private void initializeQueueStorage() throws IOException {
queueStorageInitializer.initialize();
}
void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException { throws ReplicationException, IOException {
if (peerId.contains("-")) { if (peerId.contains("-")) {
@ -165,7 +181,7 @@ public class ReplicationPeerManager implements ConfigurationObserver {
} }
// lazy create table // lazy create table
queueStorageInitializer.initialize(); initializeQueueStorage();
// make sure that there is no queues with the same peer id. This may happen when we create a // make sure that there is no queues with the same peer id. This may happen when we create a
// peer with the same id with a old deleted peer. If the replication queues for the old peer // peer with the same id with a old deleted peer. If the replication queues for the old peer
// have not been cleaned up yet then we should not create the new peer, otherwise the old wal // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
@ -718,4 +734,88 @@ public class ReplicationPeerManager implements ConfigurationObserver {
this.conf = conf; this.conf = conf;
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
} }
private ReplicationQueueData convert(ZkReplicationQueueData zkData) {
Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>();
zkData.getWalOffsets().forEach((wal, offset) -> {
String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
groupOffsets.compute(walGroup, (k, oldOffset) -> {
if (oldOffset == null) {
return new ReplicationGroupOffset(wal, offset);
}
// we should record the first wal's offset
long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal());
long walTs = AbstractFSWALProvider.getTimestamp(wal);
if (walTs < oldWalTs) {
return new ReplicationGroupOffset(wal, offset);
}
return oldOffset;
});
});
return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets));
}
private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage)
throws Exception {
MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
oldQueueStorage.listAllQueues();
for (;;) {
Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
if (pair == null) {
return;
}
queueStorage.batchUpdateQueues(pair.getFirst(),
pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId()))
.map(this::convert).collect(Collectors.toList()));
}
}
private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage)
throws Exception {
MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds();
for (;;) {
List<ZkLastPushedSeqId> list = iter.next();
if (list == null) {
return;
}
queueStorage.batchUpdateLastSequenceIds(list.stream()
.filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList()));
}
}
private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage)
throws Exception {
MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs();
for (;;) {
Pair<String, List<String>> pair = iter.next();
if (pair == null) {
return;
}
if (peers.containsKey(pair.getFirst())) {
queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond());
}
}
}
/**
* Submit the migration tasks to the given {@code executor} and return the futures.
*/
List<Future<?>> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor)
throws IOException {
// the replication queue table creation is asynchronous and will be triggered by addPeer, so
// here we need to manually initialize it since we will not call addPeer.
initializeQueueStorage();
ZKReplicationQueueStorageForMigration oldStorage =
new ZKReplicationQueueStorageForMigration(zookeeper, conf);
return Arrays.asList(executor.submit(() -> {
migrateQueues(oldStorage);
return null;
}), executor.submit(() -> {
migrateLastPushedSeqIds(oldStorage);
return null;
}), executor.submit(() -> {
migrateHFileRefs(oldStorage);
return null;
}));
}
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure.Flow;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
@ -236,6 +237,19 @@ public class TransitPeerSyncReplicationStateProcedure
switch (state) { switch (state) {
case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try { try {
if (
env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
.anyMatch(p -> !p.isFinished())
) {
LOG.info("There is a pending {}, give up execution of {}",
MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
getClass().getSimpleName());
setFailure("master-transit-peer-sync-replication-state",
new DoNotRetryIOException("There is a pending "
+ MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
return Flow.NO_MORE_STATE;
}
checkPeerModificationEnabled(env); checkPeerModificationEnabled(env);
preTransit(env); preTransit(env);
} catch (IOException e) { } catch (IOException e) {

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
@Category({ MasterTests.class, LargeTests.class })
public class TestMigrateReplicationQueue extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMigrateReplicationQueue.class);
private int disableAndInsert() throws Exception {
UTIL1.getAdmin().disableReplicationPeer(PEER_ID2);
return UTIL1.loadTable(htable1, famName);
}
private String getQueuesZNode() throws IOException {
Configuration conf = UTIL1.getConfiguration();
ZKWatcher zk = UTIL1.getZooKeeperWatcher();
String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
return ZNodePaths.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
}
private void mockData() throws Exception {
// delete the replication queue table to simulate upgrading from an older version of hbase
TableName replicationQueueTableName = TableName
.valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
.getReplicationPeerManager().getQueueStorage().listAllQueues();
assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size(), queueDatas.size());
UTIL1.getAdmin().disableTable(replicationQueueTableName);
UTIL1.getAdmin().deleteTable(replicationQueueTableName);
// shutdown the hbase cluster
UTIL1.shutdownMiniHBaseCluster();
ZKWatcher zk = UTIL1.getZooKeeperWatcher();
String queuesZNode = getQueuesZNode();
for (ReplicationQueueData queueData : queueDatas) {
String replicatorZNode =
ZNodePaths.joinZNode(queuesZNode, queueData.getId().getServerName().toString());
String queueZNode = ZNodePaths.joinZNode(replicatorZNode, queueData.getId().getPeerId());
assertEquals(1, queueData.getOffsets().size());
ReplicationGroupOffset offset = Iterables.getOnlyElement(queueData.getOffsets().values());
String walZNode = ZNodePaths.joinZNode(queueZNode, offset.getWal());
ZKUtil.createSetData(zk, walZNode, ZKUtil.positionToByteArray(offset.getOffset()));
}
}
@Test
public void testMigrate() throws Exception {
int count = disableAndInsert();
mockData();
restartSourceCluster(1);
UTIL1.waitFor(60000,
() -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure).findAny()
.map(Procedure::isSuccess).orElse(false));
TableName replicationQueueTableName = TableName
.valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName));
ZKWatcher zk = UTIL1.getZooKeeperWatcher();
assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
// wait until SCP finishes, which means we can finish the claim queue operation
UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
.getReplicationPeerManager().getQueueStorage().listAllQueues();
assertEquals(1, queueDatas.size());
// should have 1 recovered queue, as we haven't replicated anything out so there is no queue
// data for the new alive region server
assertTrue(queueDatas.get(0).getId().isRecovered());
assertEquals(1, queueDatas.get(0).getOffsets().size());
// the peer is still disabled, so no data has been replicated
assertFalse(UTIL1.getAdmin().isReplicationPeerEnabled(PEER_ID2));
assertEquals(0, HBaseTestingUtil.countRows(htable2));
// enable peer, and make sure the replication can continue correctly
UTIL1.getAdmin().enableReplicationPeer(PEER_ID2);
waitForReplication(count, 100);
}
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionServerList;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
@Category({ MasterTests.class, MediumTests.class })
public class TestMigrateReplicationQueueFromZkToTableProcedure {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedure.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
public static final class HMasterForTest extends HMaster {
public HMasterForTest(Configuration conf) throws IOException {
super(conf);
}
@Override
protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
throws IOException {
setupClusterConnection();
return new ServerManagerForTest(master, storage);
}
}
private static final ConcurrentMap<ServerName, ServerMetrics> EXTRA_REGION_SERVERS =
new ConcurrentHashMap<>();
public static final class ServerManagerForTest extends ServerManager {
public ServerManagerForTest(MasterServices master, RegionServerList storage) {
super(master, storage);
}
@Override
public Map<ServerName, ServerMetrics> getOnlineServers() {
Map<ServerName, ServerMetrics> map = new HashMap<>(super.getOnlineServers());
map.putAll(EXTRA_REGION_SERVERS);
return map;
}
}
@BeforeClass
public static void setupCluster() throws Exception {
UTIL.startMiniCluster(
StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
}
@AfterClass
public static void cleanupTest() throws Exception {
UTIL.shutdownMiniCluster();
}
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
}
@After
public void tearDown() throws Exception {
Admin admin = UTIL.getAdmin();
for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
admin.removeReplicationPeer(pd.getPeerId());
}
}
private static CountDownLatch PEER_PROC_ARRIVE;
private static CountDownLatch PEER_PROC_RESUME;
public static final class FakePeerProcedure extends Procedure<MasterProcedureEnv>
implements PeerProcedureInterface {
private String peerId;
public FakePeerProcedure() {
}
public FakePeerProcedure(String peerId) {
this.peerId = peerId;
}
@Override
public String getPeerId() {
return peerId;
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.UPDATE_CONFIG;
}
@Override
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
PEER_PROC_ARRIVE.countDown();
PEER_PROC_RESUME.await();
return null;
}
@Override
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
protected boolean abort(MasterProcedureEnv env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
@Test
public void testWaitUntilNoPeerProcedure() throws Exception {
PEER_PROC_ARRIVE = new CountDownLatch(1);
PEER_PROC_RESUME = new CountDownLatch(1);
ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
procExec.submitProcedure(new FakePeerProcedure("1"));
PEER_PROC_ARRIVE.await();
MigrateReplicationQueueFromZkToTableProcedure proc =
new MigrateReplicationQueueFromZkToTableProcedure();
procExec.submitProcedure(proc);
// make sure we will wait until there is no peer related procedures before proceeding
UTIL.waitFor(30000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
// continue and make sure we can finish successfully
PEER_PROC_RESUME.countDown();
UTIL.waitFor(30000, () -> proc.isSuccess());
}
@Test
public void testDisablePeerAndWaitUpgrading() throws Exception {
String peerId = "2";
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
.setReplicateAllUserTables(true).build();
UTIL.getAdmin().addReplicationPeer(peerId, rpc);
// put a fake region server to simulate that there are still region servers with older version
ServerMetrics metrics = mock(ServerMetrics.class);
when(metrics.getVersion()).thenReturn("2.5.0");
EXTRA_REGION_SERVERS
.put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);
ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
MigrateReplicationQueueFromZkToTableProcedure proc =
new MigrateReplicationQueueFromZkToTableProcedure();
procExec.submitProcedure(proc);
// wait until we reach the wait upgrading state
UTIL.waitFor(30000,
() -> proc.getCurrentStateId()
== MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber()
&& proc.getState() == ProcedureState.WAITING_TIMEOUT);
// make sure the peer is disabled for migrating
assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
// the procedure should finish successfully
EXTRA_REGION_SERVERS.clear();
UTIL.waitFor(30000, () -> proc.isSuccess());
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestMigrateReplicationQueueFromZkToTableProcedureRecovery {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedureRecovery.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
@BeforeClass
public static void setupCluster() throws Exception {
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
UTIL.startMiniCluster(1);
}
@AfterClass
public static void cleanupTest() throws Exception {
UTIL.shutdownMiniCluster();
}
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
}
@Before
public void setup() throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
}
@After
public void tearDown() throws Exception {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
}
private String getHFileRefsZNode() throws IOException {
Configuration conf = UTIL.getConfiguration();
ZKWatcher zk = UTIL.getZooKeeperWatcher();
String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
return ZNodePaths.joinZNode(replicationZNode,
conf.get(ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT));
}
@Test
public void testRecoveryAndDoubleExecution() throws Exception {
String peerId = "2";
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
.setReplicateAllUserTables(true).build();
UTIL.getAdmin().addReplicationPeer(peerId, rpc);
// here we only test a simple migration, more complicated migration will be tested in other UTs,
// such as TestMigrateReplicationQueue and TestReplicationPeerManagerMigrateFromZk
String hfileRefsZNode = getHFileRefsZNode();
String hfile = "hfile";
String hfileZNode = ZNodePaths.joinZNode(hfileRefsZNode, peerId, hfile);
ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), hfileZNode);
ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
// Start the migration procedure && kill the executor
long procId = procExec.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
// Restart the executor and execute the step twice
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
// Validate the migration result
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
List<String> hfiles = queueStorage.getReplicableHFiles(peerId);
assertThat(hfiles, Matchers.<List<String>> both(hasItem(hfile)).and(hasSize(1)));
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager.ReplicationQueueStorageInitializer;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.TestZKReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@Category({ MasterTests.class, MediumTests.class })
public class TestReplicationPeerManagerMigrateQueuesFromZk {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationPeerManagerMigrateQueuesFromZk.class);
private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static ExecutorService EXECUTOR;
ConcurrentMap<String, ReplicationPeerDescription> peers;
private ReplicationPeerStorage peerStorage;
private ReplicationQueueStorage queueStorage;
private ReplicationQueueStorageInitializer queueStorageInitializer;
private ReplicationPeerManager manager;
private int nServers = 10;
private int nPeers = 10;
private int nRegions = 100;
private ServerName deadServerName;
@Rule
public final TableNameTestRule tableNameRule = new TableNameTestRule();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster(1);
EXECUTOR = Executors.newFixedThreadPool(3,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(TestReplicationPeerManagerMigrateQueuesFromZk.class.getSimpleName() + "-%d")
.build());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
EXECUTOR.shutdownNow();
UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws IOException {
Configuration conf = UTIL.getConfiguration();
peerStorage = mock(ReplicationPeerStorage.class);
TableName tableName = tableNameRule.getTableName();
UTIL.getAdmin()
.createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
queueStorage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName);
queueStorageInitializer = mock(ReplicationQueueStorageInitializer.class);
peers = new ConcurrentHashMap<>();
deadServerName =
ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
manager = new ReplicationPeerManager(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(),
peerStorage, queueStorage, peers, conf, "cluster", queueStorageInitializer);
}
private Map<String, Set<String>> prepareData() throws Exception {
ZKReplicationQueueStorageForMigration storage = new ZKReplicationQueueStorageForMigration(
UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
TestZKReplicationQueueStorage.mockQueuesData(storage, 10, "peer_0", deadServerName);
Map<String, Set<String>> encodedName2PeerIds = TestZKReplicationQueueStorage
.mockLastPushedSeqIds(storage, "peer_1", "peer_2", nRegions, 10, 10);
TestZKReplicationQueueStorage.mockHFileRefs(storage, 10);
return encodedName2PeerIds;
}
@Test
public void testNoPeers() throws Exception {
prepareData();
for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
future.get(1, TimeUnit.MINUTES);
}
// should have called initializer
verify(queueStorageInitializer).initialize();
// should have not migrated any data since there is no peer
try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName())) {
assertEquals(0, HBaseTestingUtil.countRows(table));
}
}
@Test
public void testMigrate() throws Exception {
Map<String, Set<String>> encodedName2PeerIds = prepareData();
// add all peers so we will migrate them all
for (int i = 0; i < nPeers; i++) {
// value is not used in this test, so just add a mock
peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
}
for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
future.get(1, TimeUnit.MINUTES);
}
// should have called initializer
verify(queueStorageInitializer).initialize();
List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();
// there should be two empty queues so minus 2
assertEquals(2 * nServers - 2, queueDatas.size());
for (ReplicationQueueData queueData : queueDatas) {
assertEquals("peer_0", queueData.getId().getPeerId());
assertEquals(1, queueData.getOffsets().size());
String walGroup = queueData.getId().getServerWALsBelongTo().toString();
ReplicationGroupOffset offset = queueData.getOffsets().get(walGroup);
assertEquals(0, offset.getOffset());
assertEquals(queueData.getId().getServerWALsBelongTo().toString() + ".0", offset.getWal());
}
// there is no method in ReplicationQueueStorage can list all the last pushed sequence ids
try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName());
ResultScanner scanner =
table.getScanner(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) {
for (int i = 0; i < 2; i++) {
Result result = scanner.next();
String peerId = Bytes.toString(result.getRow());
assertEquals(nRegions, result.size());
for (Cell cell : result.rawCells()) {
String encodedRegionName = Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength());
encodedName2PeerIds.get(encodedRegionName).remove(peerId);
long seqId =
Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertEquals(i + 1, seqId);
}
}
encodedName2PeerIds.forEach((encodedRegionName, peerIds) -> {
assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
});
assertNull(scanner.next());
}
for (int i = 0; i < nPeers; i++) {
List<String> refs = queueStorage.getReplicableHFiles("peer_" + i);
assertEquals(i, refs.size());
Set<String> refsSet = new HashSet<>(refs);
for (int j = 0; j < i; j++) {
assertTrue(refsSet.remove("hfile-" + j));
}
assertThat(refsSet, empty());
}
}
}

View File

@ -216,7 +216,7 @@ public class TestReplicationBase {
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
} }
static void restartSourceCluster(int numSlaves) throws Exception { protected static void restartSourceCluster(int numSlaves) throws Exception {
Closeables.close(hbaseAdmin, true); Closeables.close(hbaseAdmin, true);
Closeables.close(htable1, true); Closeables.close(htable1, true);
UTIL1.shutdownMiniHBaseCluster(); UTIL1.shutdownMiniHBaseCluster();

View File

@ -1046,13 +1046,18 @@
<artifactId>hbase-hadoop-compat</artifactId> <artifactId>hbase-hadoop-compat</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-replication</artifactId> <artifactId>hbase-replication</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-replication</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-balancer</artifactId> <artifactId>hbase-balancer</artifactId>