HBASE-19665 Add table based replication peers/queues storage back

This commit is contained in:
huzheng 2018-03-05 19:45:45 +08:00
parent 641c87ddf9
commit 31978c31bb
14 changed files with 1019 additions and 27 deletions

View File

@ -42,7 +42,8 @@ public interface ReplicationPeerStorage {
/** /**
* Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}. * Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}.
* @throws ReplicationException if there are errors accessing the storage service. * @throws ReplicationException if there are errors accessing the storage service or peer does not
* exist.
*/ */
void setPeerState(String peerId, boolean enabled) throws ReplicationException; void setPeerState(String peerId, boolean enabled) throws ReplicationException;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -29,6 +30,15 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public final class ReplicationStorageFactory { public final class ReplicationStorageFactory {
public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl";
public static final String DEFAULT_REPLICATION_PEER_STORAGE_IMPL =
ZKReplicationPeerStorage.class.getName();
public static final String REPLICATION_QUEUE_STORAGE_IMPL =
"hbase.replication.queue.storage.impl";
public static final String DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL =
ZKReplicationQueueStorage.class.getName();
private ReplicationStorageFactory() { private ReplicationStorageFactory() {
} }
@ -36,7 +46,10 @@ public final class ReplicationStorageFactory {
* Create a new {@link ReplicationPeerStorage}. * Create a new {@link ReplicationPeerStorage}.
*/ */
public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) { public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) {
return new ZKReplicationPeerStorage(zk, conf); String peerStorageClass =
conf.get(REPLICATION_PEER_STORAGE_IMPL, DEFAULT_REPLICATION_PEER_STORAGE_IMPL);
return ReflectionUtils.instantiateWithCustomCtor(peerStorageClass,
new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf });
} }
/** /**
@ -44,6 +57,9 @@ public final class ReplicationStorageFactory {
*/ */
public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk, public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk,
Configuration conf) { Configuration conf) {
return new ZKReplicationQueueStorage(zk, conf); String queueStorageClass =
conf.get(REPLICATION_QUEUE_STORAGE_IMPL, DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL);
return ReflectionUtils.instantiateWithCustomCtor(queueStorageClass,
new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf });
} }
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.toByteArray;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -30,12 +32,19 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/** /**
* Helper class for replication. * Helper class for replication.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public final class ReplicationUtils { public final class ReplicationUtils {
public static final byte[] PEER_STATE_ENABLED_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
public static final byte[] PEER_STATE_DISABLED_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
private ReplicationUtils() { private ReplicationUtils() {
} }
@ -173,4 +182,8 @@ public final class ReplicationUtils {
return tableCFs != null && tableCFs.containsKey(tableName); return tableCFs != null && tableCFs.containsKey(tableName);
} }
} }
public static String parsePeerIdFromQueueId(String queueId) {
return new ReplicationQueueInfo(queueId).getPeerId();
}
} }

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Table based replication peer storage.
*/
@InterfaceAudience.Private
public class TableReplicationPeerStorage extends TableReplicationStorageBase
implements ReplicationPeerStorage {
public TableReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) throws IOException {
super(zookeeper, conf);
}
@Override
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Put put = new Put(Bytes.toBytes(peerId));
put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG,
ReplicationPeerConfigUtil.toByteArray(peerConfig));
put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE,
enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES);
table.put(put);
} catch (IOException e) {
throw new ReplicationException("Failed to add peer " + peerId, e);
}
}
@Override
public void removePeer(String peerId) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Delete delete = new Delete(Bytes.toBytes(peerId));
table.delete(delete);
} catch (IOException e) {
throw new ReplicationException("Failed to remove peer " + peerId, e);
}
}
// TODO make it to be a checkExistAndMutate operation.
private boolean peerExist(String peerId, Table table) throws IOException {
Get get = new Get(Bytes.toBytes(peerId));
get.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
return table.exists(get);
}
@Override
public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
if (!peerExist(peerId, table)) {
throw new ReplicationException("Peer " + peerId + " does not exist.");
}
Put put = new Put(Bytes.toBytes(peerId));
put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE,
enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES);
table.put(put);
} catch (IOException e) {
throw new ReplicationException(
"Failed to set peer state, peerId=" + peerId + ", state=" + enabled, e);
}
}
@Override
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
if (!peerExist(peerId, table)) {
throw new ReplicationException("Peer " + peerId + " does not exist.");
}
Put put = new Put(Bytes.toBytes(peerId));
put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG,
ReplicationPeerConfigUtil.toByteArray(peerConfig));
table.put(put);
} catch (IOException e) {
throw new ReplicationException("Failed to update peer configuration, peerId=" + peerId, e);
}
}
@Override
public List<String> listPeerIds() throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Scan scan = new Scan().addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
try (ResultScanner scanner = table.getScanner(scan)) {
List<String> peerIds = new ArrayList<>();
for (Result r : scanner) {
peerIds.add(Bytes.toString(r.getRow()));
}
return peerIds;
}
} catch (IOException e) {
throw new ReplicationException("Failed to list peers", e);
}
}
@Override
public boolean isPeerEnabled(String peerId) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
Result r = table.get(get);
if (r == null) {
throw new ReplicationException("Peer " + peerId + " does not found");
}
return Arrays.equals(PEER_STATE_ENABLED_BYTES, r.getValue(FAMILY_PEER, QUALIFIER_PEER_STATE));
} catch (IOException e) {
throw new ReplicationException("Failed to read the peer state, peerId=" + peerId, e);
}
}
@Override
public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG);
Result r = table.get(get);
if (r == null) {
throw new ReplicationException("Peer " + peerId + " does not found");
}
byte[] data = r.getValue(FAMILY_PEER, QUALIFIER_PEER_CONFIG);
if (data == null || data.length == 0) {
throw new ReplicationException(
"Replication peer config data shouldn't be empty, peerId=" + peerId);
}
try {
return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
throw new ReplicationException(
"Failed to parse replication peer config for peer with id=" + peerId, e);
}
} catch (IOException e) {
throw new ReplicationException(
"Failed to read the peer configuration in hbase:replication, peerId=" + peerId, e);
}
}
}

View File

@ -0,0 +1,522 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Table based replication queue storage.
*/
@InterfaceAudience.Private
public class TableReplicationQueueStorage extends TableReplicationStorageBase
implements ReplicationQueueStorage {
private static final Logger LOG = LoggerFactory.getLogger(TableReplicationQueueStorage.class);
public TableReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) throws IOException {
super(zookeeper, conf);
}
/**
* Serialize the {fileName, position} pair into a byte array.
*/
private static byte[] makeByteArray(String fileName, long position) {
byte[] data = new byte[Bytes.SIZEOF_INT + fileName.length() + Bytes.SIZEOF_LONG];
int pos = 0;
pos = Bytes.putInt(data, pos, fileName.length());
pos = Bytes.putBytes(data, pos, Bytes.toBytes(fileName), 0, fileName.length());
pos = Bytes.putLong(data, pos, position);
assert pos == data.length;
return data;
}
/**
* Deserialize the byte array into a {filename, position} pair.
*/
private static Pair<String, Long> parseFileNameAndPosition(byte[] data, int offset)
throws IOException {
if (data == null) {
throw new IOException("The byte array shouldn't be null");
}
int pos = offset;
int len = Bytes.toInt(data, pos, Bytes.SIZEOF_INT);
pos += Bytes.SIZEOF_INT;
if (pos + len > data.length) {
throw new IllegalArgumentException("offset (" + pos + ") + length (" + len + ") exceed the"
+ " capacity of the array: " + data.length);
}
String fileName = Bytes.toString(Bytes.copy(data, pos, len));
pos += len;
long position = Bytes.toLong(data, pos, Bytes.SIZEOF_LONG);
return new Pair<>(fileName, position);
}
@Override
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Delete delete = new Delete(getServerNameRowKey(serverName));
delete.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId));
// Delete all <fileName, position> pairs.
delete.addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP);
table.delete(delete);
} catch (IOException e) {
throw new ReplicationException(
"Failed to remove wal from queue, serverName=" + serverName + ", queueId=" + queueId, e);
}
}
@Override
public void addWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Put put = new Put(getServerNameRowKey(serverName));
put.addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED, HConstants.EMPTY_BYTE_ARRAY);
put.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId), HConstants.EMPTY_BYTE_ARRAY);
put.addColumn(FAMILY_WAL, Bytes.toBytes(queueId), makeByteArray(fileName, 0L));
table.put(put);
} catch (IOException e) {
throw new ReplicationException("Failed to add wal to queue, serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName, e);
}
}
@Override
public void removeWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
.filter(w -> w.fileNameMatch(fileName)).findFirst();
if (walCell.isPresent()) {
Delete delete = new Delete(getServerNameRowKey(walCell.get().serverName))
.addColumn(FAMILY_WAL, Bytes.toBytes(queueId), walCell.get().cellTimestamp);
table.delete(delete);
} else {
LOG.warn(fileName + " has already been deleted when removing log");
}
} catch (IOException e) {
throw new ReplicationException("Failed to remove wal from queue, serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName, e);
}
}
@Override
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
.filter(w -> w.fileNameMatch(fileName)).findFirst();
if (walCell.isPresent()) {
List<Put> puts = new ArrayList<>();
Put put = new Put(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL,
Bytes.toBytes(walCell.get().queueId), walCell.get().cellTimestamp,
makeByteArray(fileName, position));
puts.add(put);
// Update the last pushed sequence id for each region in a batch.
String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId);
if (lastSeqIds != null && lastSeqIds.size() > 0) {
for (Map.Entry<String, Long> e : lastSeqIds.entrySet()) {
Put regionPut = new Put(Bytes.toBytes(peerId)).addColumn(FAMILY_REGIONS,
getRegionQualifier(e.getKey()), Bytes.toBytes(e.getValue()));
puts.add(regionPut);
}
}
table.put(puts);
} else {
throw new ReplicationException("WAL file " + fileName + " does not found under queue "
+ queueId + " for server " + serverName);
}
} catch (IOException e) {
throw new ReplicationException(
"Failed to set wal position and last sequence ids, serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position,
e);
}
}
@Override
public long getLastSequenceId(String encodedRegionName, String peerId)
throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Get get = new Get(Bytes.toBytes(peerId));
get.addColumn(FAMILY_REGIONS, getRegionQualifier(encodedRegionName));
Result r = table.get(get);
if (r == null || r.listCells() == null) {
return HConstants.NO_SEQNUM;
}
return Bytes.toLong(r.getValue(FAMILY_REGIONS, getRegionQualifier(encodedRegionName)));
} catch (IOException e) {
throw new ReplicationException(
"Failed to get last sequence id, region=" + encodedRegionName + ", peerId=" + peerId, e);
}
}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
.filter(w -> w.fileNameMatch(fileName)).findFirst();
if (walCell.isPresent()) {
return walCell.get().position;
} else {
LOG.warn("WAL " + fileName + " does not found under queue " + queueId + " for server "
+ serverName);
return 0;
}
} catch (IOException e) {
throw new ReplicationException("Failed to get wal position. serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName, e);
}
}
/**
* Each cell in column wal:{queueId} will be parsed to a WALCell. The WALCell will be more
* friendly to upper layer.
*/
private static final class WALCell {
ServerName serverName;
String queueId;
String wal;
long position;
long cellTimestamp;// Timestamp of the cell
private WALCell(ServerName serverName, String queueId, String wal, long position,
long cellTimestamp) {
this.serverName = serverName;
this.queueId = queueId;
this.wal = wal;
this.position = position;
this.cellTimestamp = cellTimestamp;
}
public static WALCell create(Cell cell) throws IOException {
ServerName serverName = ServerName.parseServerName(
Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
String queueId = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
Pair<String, Long> fileAndPos =
parseFileNameAndPosition(cell.getValueArray(), cell.getValueOffset());
return new WALCell(serverName, queueId, fileAndPos.getFirst(), fileAndPos.getSecond(),
cell.getTimestamp());
}
public boolean fileNameMatch(String fileName) {
return StringUtils.equals(wal, fileName);
}
}
/**
* Parse the WALCell list from a HBase result.
*/
private List<WALCell> result2WALCells(Result r) throws IOException {
List<WALCell> wals = new ArrayList<>();
if (r != null && r.listCells() != null && r.listCells().size() > 0) {
for (Cell cell : r.listCells()) {
wals.add(WALCell.create(cell));
}
}
return wals;
}
/**
* List all WALs for the specific region server and queueId.
*/
private List<WALCell> getWALsInQueue0(Table table, ServerName serverName, String queueId)
throws IOException {
Get get = new Get(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL, Bytes.toBytes(queueId))
.readAllVersions();
return result2WALCells(table.get(get));
}
@Override
public List<String> getWALsInQueue(ServerName serverName, String queueId)
throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
return getWALsInQueue0(table, serverName, queueId).stream().map(p -> p.wal)
.collect(Collectors.toList());
} catch (IOException e) {
throw new ReplicationException(
"Failed to get wals in queue. serverName=" + serverName + ", queueId=" + queueId, e);
}
}
@Override
public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
List<String> queues = new ArrayList<>();
Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_QUEUE);
Result r = table.get(get);
if (r != null && r.listCells() != null && r.listCells().size() > 0) {
for (Cell c : r.listCells()) {
String queue =
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
queues.add(queue);
}
}
return queues;
} catch (IOException e) {
throw new ReplicationException("Failed to get all queues. serverName=" + serverName, e);
}
}
@Override
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
ServerName destServerName) throws ReplicationException {
LOG.info(
"Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
try (Table table = getReplicationMetaTable()) {
// Create an enabled region server for destination.
byte[] destServerNameRowKey = getServerNameRowKey(destServerName);
byte[] srcServerNameRowKey = getServerNameRowKey(sourceServerName);
Put put = new Put(destServerNameRowKey).addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED,
HConstants.EMPTY_BYTE_ARRAY);
table.put(put);
List<WALCell> wals = getWALsInQueue0(table, sourceServerName, queueId);
String newQueueId = queueId + "-" + sourceServerName;
// Remove the queue in source region server if wal set of the queue is empty.
if (CollectionUtils.isEmpty(wals)) {
Delete delete =
new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId))
.addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP);
table.delete(delete);
LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty");
return new Pair<>(newQueueId, Collections.emptySortedSet());
}
// Transfer all wals from source region server to destination region server in a batch.
List<Mutation> mutations = new ArrayList<>();
// a. Create queue for destination server.
mutations.add(new Put(destServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(newQueueId),
HConstants.EMPTY_BYTE_ARRAY));
SortedSet<String> logQueue = new TreeSet<>();
for (WALCell wal : wals) {
byte[] data = makeByteArray(wal.wal, wal.cellTimestamp);
// b. Add wal to destination server.
mutations.add(
new Put(destServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(newQueueId), data));
// c. Remove wal from source server.
mutations.add(new Delete(srcServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(queueId),
wal.cellTimestamp));
logQueue.add(wal.wal);
}
// d. Remove the queue of source server.
mutations
.add(new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId)));
Object[] results = new Object[mutations.size()];
table.batch(mutations, results);
boolean allSuccess = Stream.of(results).allMatch(r -> r != null);
if (!allSuccess) {
throw new ReplicationException("Claim queue queueId=" + queueId + " from "
+ sourceServerName + " to " + destServerName + " failed, not all mutations success.");
}
LOG.info(
"Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
return new Pair<>(newQueueId, logQueue);
} catch (IOException | InterruptedException e) {
throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName
+ " to " + destServerName + " failed", e);
}
}
@Override
public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
// TODO Make this to be a checkAndDelete, and provide a UT for it.
try (Table table = getReplicationMetaTable()) {
Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_WAL).readAllVersions();
Result r = table.get(get);
if (r == null || r.listCells() == null || r.listCells().size() == 0) {
Delete delete = new Delete(getServerNameRowKey(serverName));
table.delete(delete);
}
} catch (IOException e) {
throw new ReplicationException(
"Failed to remove replicator when queue is empty, serverName=" + serverName, e);
}
}
@Override
public List<ServerName> getListOfReplicators() throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Scan scan = new Scan().addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED).readVersions(1);
Set<ServerName> serverNames = new HashSet<>();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result r : scanner) {
if (r.listCells().size() > 0) {
Cell firstCell = r.listCells().get(0);
String serverName = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength());
serverNames.add(ServerName.parseServerName(serverName));
}
}
}
return new ArrayList<>(serverNames);
} catch (IOException e) {
throw new ReplicationException("Failed to get list of replicators", e);
}
}
@Override
public Set<String> getAllWALs() throws ReplicationException {
Set<String> walSet = new HashSet<>();
try (Table table = getReplicationMetaTable()) {
Scan scan = new Scan().addFamily(FAMILY_WAL).readAllVersions();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result r : scanner) {
result2WALCells(r).forEach(w -> walSet.add(w.wal));
}
}
return walSet;
} catch (IOException e) {
throw new ReplicationException("Failed to get all wals", e);
}
}
@Override
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
// Need to do nothing.
}
@Override
public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
// Need to do nothing.
}
@Override
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
List<Put> puts = new ArrayList<>();
for (Pair<Path, Path> p : pairs) {
Put put = new Put(Bytes.toBytes(peerId));
put.addColumn(FAMILY_HFILE_REFS, Bytes.toBytes(p.getSecond().getName()),
HConstants.EMPTY_BYTE_ARRAY);
puts.add(put);
}
table.put(puts);
} catch (IOException e) {
throw new ReplicationException("Failed to add hfile refs, peerId=" + peerId, e);
}
}
@Override
public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
List<Delete> deletes = new ArrayList<>();
for (String file : files) {
Delete delete = new Delete(Bytes.toBytes(peerId));
delete.addColumns(FAMILY_HFILE_REFS, Bytes.toBytes(file));
deletes.add(delete);
}
table.delete(deletes);
} catch (IOException e) {
throw new ReplicationException("Failed to remove hfile refs, peerId=" + peerId, e);
}
}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Set<String> peers = new HashSet<>();
Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS);
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result r : scanner) {
if (r.listCells().size() > 0) {
Cell firstCell = r.listCells().get(0);
String peerId = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength());
peers.add(peerId);
}
}
}
return new ArrayList<>(peers);
} catch (IOException e) {
throw new ReplicationException("Faield to get all peers by reading hbase:replication meta",
e);
}
}
@Override
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Get get = new Get(Bytes.toBytes(peerId)).addFamily(FAMILY_HFILE_REFS);
Result r = table.get(get);
List<String> hfiles = new ArrayList<>();
if (r != null && r.listCells() != null) {
for (Cell c : r.listCells()) {
hfiles.add(
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()));
}
}
return hfiles;
} catch (IOException e) {
throw new ReplicationException("Failed to get replicable hfiles, peerId=" + peerId, e);
}
}
@Override
public Set<String> getAllHFileRefs() throws ReplicationException {
try (Table table = getReplicationMetaTable()) {
Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS);
try (ResultScanner scanner = table.getScanner(scan)) {
Set<String> hfileSet = new HashSet<>();
for (Result r : scanner) {
for (Cell c : r.listCells()) {
String hfile = Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(),
c.getQualifierLength());
hfileSet.add(hfile);
}
}
return hfileSet;
}
} catch (IOException e) {
throw new ReplicationException("Failed to get all hfile refs", e);
}
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class TableReplicationStorageBase {
protected final ZKWatcher zookeeper;
protected final Configuration conf;
public static final TableName REPLICATION_TABLE =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
// Peer family, the row key would be peer id.
public static final byte[] FAMILY_PEER = Bytes.toBytes("peer");
public static final byte[] QUALIFIER_PEER_CONFIG = Bytes.toBytes("config");
public static final byte[] QUALIFIER_PEER_STATE = Bytes.toBytes("state");
// Region server state family, the row key would be name of region server.
public static final byte[] FAMILY_RS_STATE = Bytes.toBytes("rs_state");
public static final byte[] QUALIFIER_STATE_ENABLED = Bytes.toBytes("enabled");
// Queue and wal family, the row key would be name of region server.
public static final byte[] FAMILY_QUEUE = Bytes.toBytes("queue");
public static final byte[] FAMILY_WAL = Bytes.toBytes("wal");
// HFile-Refs family, the row key would be peer id.
public static final byte[] FAMILY_HFILE_REFS = Bytes.toBytes("hfile-refs");
// Region family, the row key would be peer id.
public static final byte[] FAMILY_REGIONS = Bytes.toBytes("regions");
private Connection connection;
protected static byte[] getServerNameRowKey(ServerName serverName) {
return Bytes.toBytes(serverName.toString());
}
protected static byte[] getRegionQualifier(String encodedRegionName) {
return Bytes.toBytes(encodedRegionName);
}
@VisibleForTesting
public static TableDescriptorBuilder createReplicationTableDescBuilder(final Configuration conf)
throws IOException {
int metaMaxVersion =
conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS);
int metaBlockSize =
conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE);
return TableDescriptorBuilder
.newBuilder(REPLICATION_TABLE)
.addColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_PEER).setMaxVersions(metaMaxVersion)
.setInMemory(true).setBlocksize(metaBlockSize)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
.build())
.addColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_RS_STATE).setMaxVersions(metaMaxVersion)
.setInMemory(true).setBlocksize(metaBlockSize)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
.build())
.addColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_QUEUE).setMaxVersions(metaMaxVersion)
.setInMemory(true).setBlocksize(metaBlockSize)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
.build())
.addColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_WAL)
.setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true)
.setBlocksize(metaBlockSize).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
.setBloomFilterType(BloomType.NONE).build())
.addColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_REGIONS).setMaxVersions(metaMaxVersion)
.setInMemory(true).setBlocksize(metaBlockSize)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
.build())
.addColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_HFILE_REFS)
.setMaxVersions(metaMaxVersion).setInMemory(true).setBlocksize(metaBlockSize)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
.build());
}
protected TableReplicationStorageBase(ZKWatcher zookeeper, Configuration conf)
throws IOException {
this.zookeeper = zookeeper;
this.conf = conf;
this.connection = ConnectionFactory.createConnection(conf);
}
protected Table getReplicationMetaTable() throws IOException {
return this.connection.getTable(REPLICATION_TABLE);
}
}

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -31,8 +34,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/** /**
* ZK based replication peer storage. * ZK based replication peer storage.
*/ */
@ -46,11 +47,6 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state"; public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state";
public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state"; public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
/** /**
* The name of the znode that contains the replication status of a remote slave (i.e. peer) * The name of the znode that contains the replication status of a remote slave (i.e. peer)
* cluster. * cluster.
@ -89,7 +85,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
ZKUtilOp.createAndFailSilent(getPeerNode(peerId), ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
ReplicationPeerConfigUtil.toByteArray(peerConfig)), ReplicationPeerConfigUtil.toByteArray(peerConfig)),
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES)),
false); false);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>"
@ -108,7 +104,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
@Override @Override
public void setPeerState(String peerId, boolean enabled) throws ReplicationException { public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; byte[] stateBytes = enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES;
try { try {
ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes); ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
} catch (KeeperException e) { } catch (KeeperException e) {
@ -140,7 +136,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
@Override @Override
public boolean isPeerEnabled(String peerId) throws ReplicationException { public boolean isPeerEnabled(String peerId) throws ReplicationException {
try { try {
return Arrays.equals(ENABLED_ZNODE_BYTES, return Arrays.equals(PEER_STATE_ENABLED_BYTES,
ZKUtil.getData(zookeeper, getPeerStateNode(peerId))); ZKUtil.getData(zookeeper, getPeerStateNode(peerId)));
} catch (KeeperException | InterruptedException e) { } catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);

View File

@ -79,7 +79,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* </pre> * </pre>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class ZKReplicationQueueStorage extends ZKReplicationStorageBase public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
implements ReplicationQueueStorage { implements ReplicationQueueStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
@ -199,7 +199,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
// Persist the max sequence id(s) of regions for serial replication atomically. // Persist the max sequence id(s) of regions for serial replication atomically.
if (lastSeqIds != null && lastSeqIds.size() > 0) { if (lastSeqIds != null && lastSeqIds.size() > 0) {
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String peerId = new ReplicationQueueInfo(queueId).getPeerId(); String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId);
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
/* /*
* Make sure the existence of path * Make sure the existence of path
@ -382,7 +382,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
// will be overridden in UTs // will be overridden in UTs
@VisibleForTesting @VisibleForTesting
protected int getQueuesZNodeCversion() throws KeeperException { public int getQueuesZNodeCversion() throws KeeperException {
Stat stat = new Stat(); Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
return stat.getCversion(); return stat.getCversion();

View File

@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@ -170,9 +170,9 @@ public abstract class TestReplicationSourceManager {
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ReplicationUtils.PEER_STATE_ENABLED_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationUtils.PEER_STATE_ENABLED_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId()); ZKClusterId.setClusterId(zkw, new ClusterId());
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication.storage;
import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -30,7 +30,13 @@ import java.util.List;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -222,18 +228,19 @@ public abstract class TestReplicationStateBasic {
try { try {
rp.getPeerStorage().setPeerState("bogus", true); rp.getPeerStorage().setPeerState("bogus", true);
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId");
} catch (ReplicationException e) { } catch (ReplicationException e) {
} }
try { try {
rp.getPeerStorage().setPeerState("bogus", false); rp.getPeerStorage().setPeerState("bogus", false);
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId");
} catch (ReplicationException e) { } catch (ReplicationException e) {
} }
try { try {
assertFalse(rp.addPeer("bogus")); assertFalse(rp.addPeer("bogus"));
fail("Should have thrown an ReplicationException when passed a bogus peerId"); fail("Should have thrown an ReplicationException when creating a bogus peerId "
+ "with null peer config");
} catch (ReplicationException e) { } catch (ReplicationException e) {
} }

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.storage;
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TableReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.TableReplicationStorageBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationStateTableImpl extends TestReplicationStateBasic {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationStateTableImpl.class);
private static Configuration conf;
private static HBaseTestingUtility utility = new HBaseTestingUtility();
private static ZKWatcher zkw;
private static Connection connection;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = utility.getConfiguration();
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
utility.startMiniCluster();
// After the HBase Mini cluster startup, we set the storage implementation to table based
// implementation. Otherwise, we cannot setup the HBase Mini Cluster because the master will
// list peers before finish its initialization, and if master cannot finish initialization, the
// meta cannot be online, in other hand, if meta cannot be online, the list peers never success
// when using table based replication. a dead loop happen.
// Our UTs are written for testing storage layer, so no problem here.
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
TableReplicationPeerStorage.class.getName());
conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_STORAGE_IMPL,
TableReplicationQueueStorage.class.getName());
zkw = utility.getZooKeeperWatcher();
connection = ConnectionFactory.createConnection(conf);
KEY_ONE = initPeerClusterState("/hbase1");
KEY_TWO = initPeerClusterState("/hbase2");
}
private static String initPeerClusterState(String baseZKNode)
throws IOException, KeeperException {
// Add a dummy region server and set up the cluster id
Configuration testConf = new Configuration(conf);
testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234");
ZKUtil.createWithParents(zkw1, fakeRs);
ZKClusterId.setClusterId(zkw1, new ClusterId());
return ZKConfig.getZooKeeperClusterKey(testConf);
}
@Before
public void setUp() throws IOException {
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
rp = ReplicationFactory.getReplicationPeers(zkw, conf);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
// Create hbase:replication meta table.
try (Admin admin = connection.getAdmin()) {
TableDescriptor table =
TableReplicationStorageBase.createReplicationTableDescBuilder(conf).build();
admin.createTable(table);
}
}
@After
public void tearDown() throws KeeperException, IOException {
// Drop the hbase:replication meta table.
utility.deleteTable(TableReplicationStorageBase.REPLICATION_TABLE);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (connection != null) {
IOUtils.closeQuietly(connection);
}
utility.shutdownMiniZKCluster();
}
}

View File

@ -15,14 +15,17 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication.storage;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication.storage;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
@ -33,9 +33,13 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.AfterClass; import org.junit.AfterClass;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication.storage;
import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -27,10 +27,13 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -218,7 +221,7 @@ public class TestZKReplicationQueueStorage {
private int called = 0; private int called = 0;
@Override @Override
protected int getQueuesZNodeCversion() throws KeeperException { public int getQueuesZNodeCversion() throws KeeperException {
if (called < 4) { if (called < 4) {
called++; called++;
} }