HBASE-20050 Reimplement updateReplicationPositions logic in serial replication based on the newly introduced replication storage layer
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
c415ddb38c
commit
99d3edfc82
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
|
||||
|
@ -63,9 +64,19 @@ public interface ReplicationQueueStorage {
|
|||
* @param queueId a String that identifies the queue
|
||||
* @param fileName name of the WAL
|
||||
* @param position the current position in the file
|
||||
* @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
|
||||
*/
|
||||
void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
|
||||
throws ReplicationException;
|
||||
void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
|
||||
Map<String, Long> lastSeqIds) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Read the max sequence id of the specific region for a given peer. For serial replication, we
|
||||
* need the max sequenced id to decide whether we can push the next entries.
|
||||
* @param encodedRegionName the encoded region name
|
||||
* @param peerId peer id
|
||||
* @return the max sequence id of the specific region for a given peer.
|
||||
*/
|
||||
long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Get the current position for a specific WAL in a given queue for a given regionserver.
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
@ -86,6 +88,10 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
|||
"zookeeper.znode.replication.hfile.refs";
|
||||
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
|
||||
|
||||
public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
|
||||
"zookeeper.znode.replication.regions";
|
||||
public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
|
||||
|
||||
/**
|
||||
* The name of the znode that contains all replication queues
|
||||
*/
|
||||
|
@ -96,6 +102,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
|||
*/
|
||||
private final String hfileRefsZNode;
|
||||
|
||||
private final String regionsZNode;
|
||||
|
||||
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
|
||||
super(zookeeper, conf);
|
||||
|
||||
|
@ -104,6 +112,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
|||
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
|
||||
this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
|
||||
this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
|
||||
this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
|
||||
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
|
||||
}
|
||||
|
||||
private String getRsNode(ServerName serverName) {
|
||||
|
@ -122,6 +132,28 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
|||
return getFileNode(getQueueNode(serverName, queueId), fileName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Put all regions under /hbase/replication/regions znode will lead to too many children because
|
||||
* of the huge number of regions in real production environment. So here we use hash of encoded
|
||||
* region name to distribute the znode into multiple znodes. <br>
|
||||
* So the final znode path will be format like this:
|
||||
*
|
||||
* <pre>
|
||||
* /hbase/replication/regions/254/dd04e76a6966d4ffa908ed0586764767-100
|
||||
* </pre>
|
||||
*
|
||||
* The 254 indicate the hash of encoded region name, the 100 indicate the peer id.
|
||||
* @param encodedRegionName the encoded region name.
|
||||
* @param peerId peer id for replication.
|
||||
* @return ZNode path to persist the max sequence id that we've pushed for the given region and
|
||||
* peer.
|
||||
*/
|
||||
private String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) {
|
||||
int hash = encodedRegionName.hashCode() & 0x0000FFFF;
|
||||
String hashPath = ZNodePaths.joinZNode(regionsZNode, String.valueOf(hash));
|
||||
return ZNodePaths.joinZNode(hashPath, String.format("%s-%s", encodedRegionName, peerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
|
||||
try {
|
||||
|
@ -138,8 +170,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
|||
try {
|
||||
ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName +
|
||||
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
|
||||
throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName
|
||||
+ ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,17 +190,57 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
|
||||
throws ReplicationException {
|
||||
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
|
||||
Map<String, Long> lastSeqIds) throws ReplicationException {
|
||||
try {
|
||||
ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName),
|
||||
ZKUtil.positionToByteArray(position));
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
||||
listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
|
||||
ZKUtil.positionToByteArray(position)));
|
||||
// Persist the max sequence id(s) of regions for serial replication atomically.
|
||||
if (lastSeqIds != null && lastSeqIds.size() > 0) {
|
||||
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
|
||||
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
|
||||
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
|
||||
/*
|
||||
* Make sure the existence of path
|
||||
* /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
|
||||
* multiOrSequential() method said, if received a NodeExistsException, all operations will
|
||||
* fail. So create the path here, and in fact, no need to add this operation to listOfOps,
|
||||
* because only need to make sure that update file position and sequence id atomically.
|
||||
*/
|
||||
ZKUtil.createWithParents(zookeeper, path);
|
||||
// Persist the max sequence id of region to zookeeper.
|
||||
listOfOps
|
||||
.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
|
||||
}
|
||||
}
|
||||
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to set log position (serverName=" + serverName +
|
||||
", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
|
||||
throw new ReplicationException("Failed to set log position (serverName=" + serverName
|
||||
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastSequenceId(String encodedRegionName, String peerId)
|
||||
throws ReplicationException {
|
||||
byte[] data;
|
||||
try {
|
||||
data =
|
||||
ZKUtil.getData(zookeeper, getSerialReplicationRegionPeerNode(encodedRegionName, peerId));
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throw new ReplicationException("Failed to get the last sequence id(region="
|
||||
+ encodedRegionName + ", peerId=" + peerId + ")");
|
||||
}
|
||||
try {
|
||||
return ZKUtil.parseWALPositionFrom(data);
|
||||
} catch (DeserializationException de) {
|
||||
LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
|
||||
+ "), data=" + Bytes.toStringBinary(data));
|
||||
}
|
||||
return HConstants.NO_SEQNUM;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWALPosition(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException {
|
||||
|
|
|
@ -17,8 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.hasItems;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -26,6 +28,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -35,6 +38,8 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
|
||||
/**
|
||||
* White box testing for replication state interfaces. Implementations should extend this class, and
|
||||
* initialize the interfaces properly.
|
||||
|
@ -122,7 +127,7 @@ public abstract class TestReplicationStateBasic {
|
|||
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
|
||||
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
|
||||
assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
|
||||
rqs.setWALPosition(server3, "qId5", "filename4", 354L);
|
||||
rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
|
||||
assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
|
||||
|
||||
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
|
||||
|
@ -270,6 +275,47 @@ public abstract class TestReplicationStateBasic {
|
|||
assertNumberOfPeers(2);
|
||||
}
|
||||
|
||||
private String getFileName(String base, int i) {
|
||||
return String.format(base + "-%04d", i);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
|
||||
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
|
||||
assertTrue(rqs.getAllQueues(serverName1).isEmpty());
|
||||
String queue1 = "1";
|
||||
String region0 = "region0", region1 = "region1";
|
||||
for (int i = 0; i < 10; i++) {
|
||||
rqs.addWAL(serverName1, queue1, getFileName("file1", i));
|
||||
}
|
||||
List<String> queueIds = rqs.getAllQueues(serverName1);
|
||||
assertEquals(1, queueIds.size());
|
||||
assertThat(queueIds, hasItems("1"));
|
||||
|
||||
List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
|
||||
assertEquals(10, wals1.size());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertThat(wals1, hasItems(getFileName("file1", i)));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
|
||||
}
|
||||
assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
|
||||
assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
|
||||
ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
|
||||
}
|
||||
assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
|
||||
assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
|
||||
}
|
||||
|
||||
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
|
||||
// we can first check if the value was changed in the store, if it wasn't then fail right away
|
||||
if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
|
||||
|
|
|
@ -127,7 +127,7 @@ public class TestZKReplicationQueueStorage {
|
|||
List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
|
||||
List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
|
||||
assertEquals(10, wals1.size());
|
||||
assertEquals(10, wals1.size());
|
||||
assertEquals(10, wals2.size());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertThat(wals1, hasItems(getFileName("file1", i)));
|
||||
assertThat(wals2, hasItems(getFileName("file2", i)));
|
||||
|
@ -136,8 +136,9 @@ public class TestZKReplicationQueueStorage {
|
|||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
|
||||
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
|
||||
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100);
|
||||
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10);
|
||||
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null);
|
||||
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
|
||||
null);
|
||||
}
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
|
|
@ -483,8 +483,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
|
||||
boolean queueRecovered) {
|
||||
String fileName = log.getName();
|
||||
abortWhenFail(
|
||||
() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, position));
|
||||
abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
|
||||
position, null));
|
||||
cleanOldLogs(fileName, queueId, queueRecovered);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue