HBASE-20050 Reimplement updateReplicationPositions logic in serial replication based on the newly introduced replication storage layer

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
huzheng 2018-02-28 16:25:24 +08:00 committed by zhangduo
parent 39c1ddc6e3
commit 1d11cdb26c
5 changed files with 146 additions and 16 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@ -63,9 +64,19 @@ public interface ReplicationQueueStorage {
* @param queueId a String that identifies the queue
* @param fileName name of the WAL
* @param position the current position in the file
* @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
*/
void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
throws ReplicationException;
void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException;
/**
* Read the max sequence id of the specific region for a given peer. For serial replication, we
* need the max sequenced id to decide whether we can push the next entries.
* @param encodedRegionName the encoded region name
* @param peerId peer id
* @return the max sequence id of the specific region for a given peer.
*/
long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException;
/**
* Get the current position for a specific WAL in a given queue for a given regionserver.

View File

@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@ -85,6 +87,10 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
"zookeeper.znode.replication.hfile.refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
"zookeeper.znode.replication.regions";
public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
/**
* The name of the znode that contains all replication queues
*/
@ -95,6 +101,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
*/
private final String hfileRefsZNode;
private final String regionsZNode;
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
@ -103,6 +111,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
}
private String getRsNode(ServerName serverName) {
@ -121,6 +131,28 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
return getFileNode(getQueueNode(serverName, queueId), fileName);
}
/**
* Put all regions under /hbase/replication/regions znode will lead to too many children because
* of the huge number of regions in real production environment. So here we use hash of encoded
* region name to distribute the znode into multiple znodes. <br>
* So the final znode path will be format like this:
*
* <pre>
* /hbase/replication/regions/254/dd04e76a6966d4ffa908ed0586764767-100
* </pre>
*
* The 254 indicate the hash of encoded region name, the 100 indicate the peer id.
* @param encodedRegionName the encoded region name.
* @param peerId peer id for replication.
* @return ZNode path to persist the max sequence id that we've pushed for the given region and
* peer.
*/
private String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) {
int hash = encodedRegionName.hashCode() & 0x0000FFFF;
String hashPath = ZNodePaths.joinZNode(regionsZNode, String.valueOf(hash));
return ZNodePaths.joinZNode(hashPath, String.format("%s-%s", encodedRegionName, peerId));
}
@Override
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
try {
@ -137,8 +169,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
try {
ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
} catch (KeeperException e) {
throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
}
@ -157,16 +189,56 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
@Override
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
throws ReplicationException {
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException {
try {
ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName),
ZKUtil.positionToByteArray(position));
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
List<ZKUtilOp> listOfOps = new ArrayList<>();
listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
ZKUtil.positionToByteArray(position)));
// Persist the max sequence id(s) of regions for serial replication atomically.
if (lastSeqIds != null && lastSeqIds.size() > 0) {
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
/*
* Make sure the existence of path
* /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
* multiOrSequential() method said, if received a NodeExistsException, all operations will
* fail. So create the path here, and in fact, no need to add this operation to listOfOps,
* because only need to make sure that update file position and sequence id atomically.
*/
ZKUtil.createWithParents(zookeeper, path);
// Persist the max sequence id of region to zookeeper.
listOfOps
.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
}
}
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
}
}
@Override
public long getLastSequenceId(String encodedRegionName, String peerId)
throws ReplicationException {
byte[] data;
try {
data =
ZKUtil.getData(zookeeper, getSerialReplicationRegionPeerNode(encodedRegionName, peerId));
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Failed to get the last sequence id(region="
+ encodedRegionName + ", peerId=" + peerId + ")");
}
try {
return ZKUtil.parseWALPositionFrom(data);
} catch (DeserializationException de) {
LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
+ "), data=" + Bytes.toStringBinary(data));
}
return HConstants.NO_SEQNUM;
}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)

View File

@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -26,6 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Pair;
@ -35,6 +38,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
/**
* White box testing for replication state interfaces. Implementations should extend this class, and
* initialize the interfaces properly.
@ -122,7 +127,7 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
rqs.setWALPosition(server3, "qId5", "filename4", 354L);
rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
@ -270,6 +275,47 @@ public abstract class TestReplicationStateBasic {
assertNumberOfPeers(2);
}
private String getFileName(String base, int i) {
return String.format(base + "-%04d", i);
}
@Test
public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
assertTrue(rqs.getAllQueues(serverName1).isEmpty());
String queue1 = "1";
String region0 = "region0", region1 = "region1";
for (int i = 0; i < 10; i++) {
rqs.addWAL(serverName1, queue1, getFileName("file1", i));
}
List<String> queueIds = rqs.getAllQueues(serverName1);
assertEquals(1, queueIds.size());
assertThat(queueIds, hasItems("1"));
List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
assertEquals(10, wals1.size());
for (int i = 0; i < 10; i++) {
assertThat(wals1, hasItems(getFileName("file1", i)));
}
for (int i = 0; i < 10; i++) {
assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
}
assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
for (int i = 0; i < 10; i++) {
rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
}
for (int i = 0; i < 10; i++) {
assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
}
assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
}
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
// we can first check if the value was changed in the store, if it wasn't then fail right away
if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {

View File

@ -127,7 +127,7 @@ public class TestZKReplicationQueueStorage {
List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
assertEquals(10, wals1.size());
assertEquals(10, wals1.size());
assertEquals(10, wals2.size());
for (int i = 0; i < 10; i++) {
assertThat(wals1, hasItems(getFileName("file1", i)));
assertThat(wals2, hasItems(getFileName("file2", i)));
@ -136,8 +136,9 @@ public class TestZKReplicationQueueStorage {
for (int i = 0; i < 10; i++) {
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100);
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10);
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null);
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
null);
}
for (int i = 0; i < 10; i++) {

View File

@ -482,8 +482,8 @@ public class ReplicationSourceManager implements ReplicationListener {
public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
boolean queueRecovered) {
String fileName = log.getName();
abortWhenFail(
() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, position));
abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
position, null));
cleanOldLogs(fileName, queueId, queueRecovered);
}