Revert "HBASE-19665 Add table based replication peers/queues storage back"
This reverts commit 31978c31bb
.
Conflicts:
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
This commit is contained in:
parent
104f58701e
commit
00095a2ef9
|
@ -42,8 +42,7 @@ public interface ReplicationPeerStorage {
|
|||
|
||||
/**
|
||||
* Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}.
|
||||
* @throws ReplicationException if there are errors accessing the storage service or peer does not
|
||||
* exist.
|
||||
* @throws ReplicationException if there are errors accessing the storage service.
|
||||
*/
|
||||
void setPeerState(String peerId, boolean enabled) throws ReplicationException;
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
@ -30,15 +29,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
public final class ReplicationStorageFactory {
|
||||
|
||||
public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl";
|
||||
public static final String DEFAULT_REPLICATION_PEER_STORAGE_IMPL =
|
||||
ZKReplicationPeerStorage.class.getName();
|
||||
|
||||
public static final String REPLICATION_QUEUE_STORAGE_IMPL =
|
||||
"hbase.replication.queue.storage.impl";
|
||||
public static final String DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL =
|
||||
ZKReplicationQueueStorage.class.getName();
|
||||
|
||||
private ReplicationStorageFactory() {
|
||||
}
|
||||
|
||||
|
@ -46,10 +36,7 @@ public final class ReplicationStorageFactory {
|
|||
* Create a new {@link ReplicationPeerStorage}.
|
||||
*/
|
||||
public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) {
|
||||
String peerStorageClass =
|
||||
conf.get(REPLICATION_PEER_STORAGE_IMPL, DEFAULT_REPLICATION_PEER_STORAGE_IMPL);
|
||||
return ReflectionUtils.instantiateWithCustomCtor(peerStorageClass,
|
||||
new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf });
|
||||
return new ZKReplicationPeerStorage(zk, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -57,9 +44,6 @@ public final class ReplicationStorageFactory {
|
|||
*/
|
||||
public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk,
|
||||
Configuration conf) {
|
||||
String queueStorageClass =
|
||||
conf.get(REPLICATION_QUEUE_STORAGE_IMPL, DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL);
|
||||
return ReflectionUtils.instantiateWithCustomCtor(queueStorageClass,
|
||||
new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf });
|
||||
return new ZKReplicationQueueStorage(zk, conf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.toByteArray;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -32,19 +30,12 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
/**
|
||||
* Helper class for replication.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class ReplicationUtils {
|
||||
|
||||
public static final byte[] PEER_STATE_ENABLED_BYTES =
|
||||
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
|
||||
public static final byte[] PEER_STATE_DISABLED_BYTES =
|
||||
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
|
||||
|
||||
private ReplicationUtils() {
|
||||
}
|
||||
|
||||
|
@ -182,8 +173,4 @@ public final class ReplicationUtils {
|
|||
return tableCFs != null && tableCFs.containsKey(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
public static String parsePeerIdFromQueueId(String queueId) {
|
||||
return new ReplicationQueueInfo(queueId).getPeerId();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,171 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES;
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Table based replication peer storage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TableReplicationPeerStorage extends TableReplicationStorageBase
|
||||
implements ReplicationPeerStorage {
|
||||
|
||||
public TableReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) throws IOException {
|
||||
super(zookeeper, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Put put = new Put(Bytes.toBytes(peerId));
|
||||
put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG,
|
||||
ReplicationPeerConfigUtil.toByteArray(peerConfig));
|
||||
put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE,
|
||||
enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES);
|
||||
table.put(put);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to add peer " + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removePeer(String peerId) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Delete delete = new Delete(Bytes.toBytes(peerId));
|
||||
table.delete(delete);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to remove peer " + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO make it to be a checkExistAndMutate operation.
|
||||
private boolean peerExist(String peerId, Table table) throws IOException {
|
||||
Get get = new Get(Bytes.toBytes(peerId));
|
||||
get.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
|
||||
return table.exists(get);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
if (!peerExist(peerId, table)) {
|
||||
throw new ReplicationException("Peer " + peerId + " does not exist.");
|
||||
}
|
||||
Put put = new Put(Bytes.toBytes(peerId));
|
||||
put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE,
|
||||
enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES);
|
||||
table.put(put);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to set peer state, peerId=" + peerId + ", state=" + enabled, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
if (!peerExist(peerId, table)) {
|
||||
throw new ReplicationException("Peer " + peerId + " does not exist.");
|
||||
}
|
||||
Put put = new Put(Bytes.toBytes(peerId));
|
||||
put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG,
|
||||
ReplicationPeerConfigUtil.toByteArray(peerConfig));
|
||||
table.put(put);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to update peer configuration, peerId=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listPeerIds() throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Scan scan = new Scan().addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
|
||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||
List<String> peerIds = new ArrayList<>();
|
||||
for (Result r : scanner) {
|
||||
peerIds.add(Bytes.toString(r.getRow()));
|
||||
}
|
||||
return peerIds;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to list peers", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPeerEnabled(String peerId) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
|
||||
Result r = table.get(get);
|
||||
if (r == null) {
|
||||
throw new ReplicationException("Peer " + peerId + " does not found");
|
||||
}
|
||||
return Arrays.equals(PEER_STATE_ENABLED_BYTES, r.getValue(FAMILY_PEER, QUALIFIER_PEER_STATE));
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to read the peer state, peerId=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG);
|
||||
Result r = table.get(get);
|
||||
if (r == null) {
|
||||
throw new ReplicationException("Peer " + peerId + " does not found");
|
||||
}
|
||||
byte[] data = r.getValue(FAMILY_PEER, QUALIFIER_PEER_CONFIG);
|
||||
if (data == null || data.length == 0) {
|
||||
throw new ReplicationException(
|
||||
"Replication peer config data shouldn't be empty, peerId=" + peerId);
|
||||
}
|
||||
try {
|
||||
return ReplicationPeerConfigUtil.parsePeerFrom(data);
|
||||
} catch (DeserializationException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to parse replication peer config for peer with id=" + peerId, e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to read the peer configuration in hbase:replication, peerId=" + peerId, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,522 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Table based replication queue storage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TableReplicationQueueStorage extends TableReplicationStorageBase
|
||||
implements ReplicationQueueStorage {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TableReplicationQueueStorage.class);
|
||||
|
||||
public TableReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) throws IOException {
|
||||
super(zookeeper, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize the {fileName, position} pair into a byte array.
|
||||
*/
|
||||
private static byte[] makeByteArray(String fileName, long position) {
|
||||
byte[] data = new byte[Bytes.SIZEOF_INT + fileName.length() + Bytes.SIZEOF_LONG];
|
||||
int pos = 0;
|
||||
pos = Bytes.putInt(data, pos, fileName.length());
|
||||
pos = Bytes.putBytes(data, pos, Bytes.toBytes(fileName), 0, fileName.length());
|
||||
pos = Bytes.putLong(data, pos, position);
|
||||
assert pos == data.length;
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize the byte array into a {filename, position} pair.
|
||||
*/
|
||||
private static Pair<String, Long> parseFileNameAndPosition(byte[] data, int offset)
|
||||
throws IOException {
|
||||
if (data == null) {
|
||||
throw new IOException("The byte array shouldn't be null");
|
||||
}
|
||||
int pos = offset;
|
||||
int len = Bytes.toInt(data, pos, Bytes.SIZEOF_INT);
|
||||
pos += Bytes.SIZEOF_INT;
|
||||
if (pos + len > data.length) {
|
||||
throw new IllegalArgumentException("offset (" + pos + ") + length (" + len + ") exceed the"
|
||||
+ " capacity of the array: " + data.length);
|
||||
}
|
||||
String fileName = Bytes.toString(Bytes.copy(data, pos, len));
|
||||
pos += len;
|
||||
long position = Bytes.toLong(data, pos, Bytes.SIZEOF_LONG);
|
||||
return new Pair<>(fileName, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Delete delete = new Delete(getServerNameRowKey(serverName));
|
||||
delete.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId));
|
||||
// Delete all <fileName, position> pairs.
|
||||
delete.addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP);
|
||||
table.delete(delete);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to remove wal from queue, serverName=" + serverName + ", queueId=" + queueId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addWAL(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Put put = new Put(getServerNameRowKey(serverName));
|
||||
put.addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED, HConstants.EMPTY_BYTE_ARRAY);
|
||||
put.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId), HConstants.EMPTY_BYTE_ARRAY);
|
||||
put.addColumn(FAMILY_WAL, Bytes.toBytes(queueId), makeByteArray(fileName, 0L));
|
||||
table.put(put);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to add wal to queue, serverName=" + serverName
|
||||
+ ", queueId=" + queueId + ", fileName=" + fileName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeWAL(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
|
||||
.filter(w -> w.fileNameMatch(fileName)).findFirst();
|
||||
if (walCell.isPresent()) {
|
||||
Delete delete = new Delete(getServerNameRowKey(walCell.get().serverName))
|
||||
.addColumn(FAMILY_WAL, Bytes.toBytes(queueId), walCell.get().cellTimestamp);
|
||||
table.delete(delete);
|
||||
} else {
|
||||
LOG.warn(fileName + " has already been deleted when removing log");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to remove wal from queue, serverName=" + serverName
|
||||
+ ", queueId=" + queueId + ", fileName=" + fileName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
|
||||
Map<String, Long> lastSeqIds) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
|
||||
.filter(w -> w.fileNameMatch(fileName)).findFirst();
|
||||
if (walCell.isPresent()) {
|
||||
List<Put> puts = new ArrayList<>();
|
||||
Put put = new Put(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL,
|
||||
Bytes.toBytes(walCell.get().queueId), walCell.get().cellTimestamp,
|
||||
makeByteArray(fileName, position));
|
||||
puts.add(put);
|
||||
// Update the last pushed sequence id for each region in a batch.
|
||||
String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId);
|
||||
if (lastSeqIds != null && lastSeqIds.size() > 0) {
|
||||
for (Map.Entry<String, Long> e : lastSeqIds.entrySet()) {
|
||||
Put regionPut = new Put(Bytes.toBytes(peerId)).addColumn(FAMILY_REGIONS,
|
||||
getRegionQualifier(e.getKey()), Bytes.toBytes(e.getValue()));
|
||||
puts.add(regionPut);
|
||||
}
|
||||
}
|
||||
table.put(puts);
|
||||
} else {
|
||||
throw new ReplicationException("WAL file " + fileName + " does not found under queue "
|
||||
+ queueId + " for server " + serverName);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to set wal position and last sequence ids, serverName=" + serverName
|
||||
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastSequenceId(String encodedRegionName, String peerId)
|
||||
throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Get get = new Get(Bytes.toBytes(peerId));
|
||||
get.addColumn(FAMILY_REGIONS, getRegionQualifier(encodedRegionName));
|
||||
Result r = table.get(get);
|
||||
if (r == null || r.listCells() == null) {
|
||||
return HConstants.NO_SEQNUM;
|
||||
}
|
||||
return Bytes.toLong(r.getValue(FAMILY_REGIONS, getRegionQualifier(encodedRegionName)));
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to get last sequence id, region=" + encodedRegionName + ", peerId=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWALPosition(ServerName serverName, String queueId, String fileName)
|
||||
throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
|
||||
.filter(w -> w.fileNameMatch(fileName)).findFirst();
|
||||
if (walCell.isPresent()) {
|
||||
return walCell.get().position;
|
||||
} else {
|
||||
LOG.warn("WAL " + fileName + " does not found under queue " + queueId + " for server "
|
||||
+ serverName);
|
||||
return 0;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to get wal position. serverName=" + serverName
|
||||
+ ", queueId=" + queueId + ", fileName=" + fileName, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Each cell in column wal:{queueId} will be parsed to a WALCell. The WALCell will be more
|
||||
* friendly to upper layer.
|
||||
*/
|
||||
private static final class WALCell {
|
||||
ServerName serverName;
|
||||
String queueId;
|
||||
String wal;
|
||||
long position;
|
||||
long cellTimestamp;// Timestamp of the cell
|
||||
|
||||
private WALCell(ServerName serverName, String queueId, String wal, long position,
|
||||
long cellTimestamp) {
|
||||
this.serverName = serverName;
|
||||
this.queueId = queueId;
|
||||
this.wal = wal;
|
||||
this.position = position;
|
||||
this.cellTimestamp = cellTimestamp;
|
||||
}
|
||||
|
||||
public static WALCell create(Cell cell) throws IOException {
|
||||
ServerName serverName = ServerName.parseServerName(
|
||||
Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
|
||||
String queueId = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
|
||||
cell.getQualifierLength());
|
||||
Pair<String, Long> fileAndPos =
|
||||
parseFileNameAndPosition(cell.getValueArray(), cell.getValueOffset());
|
||||
return new WALCell(serverName, queueId, fileAndPos.getFirst(), fileAndPos.getSecond(),
|
||||
cell.getTimestamp());
|
||||
}
|
||||
|
||||
public boolean fileNameMatch(String fileName) {
|
||||
return StringUtils.equals(wal, fileName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the WALCell list from a HBase result.
|
||||
*/
|
||||
private List<WALCell> result2WALCells(Result r) throws IOException {
|
||||
List<WALCell> wals = new ArrayList<>();
|
||||
if (r != null && r.listCells() != null && r.listCells().size() > 0) {
|
||||
for (Cell cell : r.listCells()) {
|
||||
wals.add(WALCell.create(cell));
|
||||
}
|
||||
}
|
||||
return wals;
|
||||
}
|
||||
|
||||
/**
|
||||
* List all WALs for the specific region server and queueId.
|
||||
*/
|
||||
private List<WALCell> getWALsInQueue0(Table table, ServerName serverName, String queueId)
|
||||
throws IOException {
|
||||
Get get = new Get(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL, Bytes.toBytes(queueId))
|
||||
.readAllVersions();
|
||||
return result2WALCells(table.get(get));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getWALsInQueue(ServerName serverName, String queueId)
|
||||
throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
return getWALsInQueue0(table, serverName, queueId).stream().map(p -> p.wal)
|
||||
.collect(Collectors.toList());
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to get wals in queue. serverName=" + serverName + ", queueId=" + queueId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
List<String> queues = new ArrayList<>();
|
||||
Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_QUEUE);
|
||||
Result r = table.get(get);
|
||||
if (r != null && r.listCells() != null && r.listCells().size() > 0) {
|
||||
for (Cell c : r.listCells()) {
|
||||
String queue =
|
||||
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
|
||||
queues.add(queue);
|
||||
}
|
||||
}
|
||||
return queues;
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to get all queues. serverName=" + serverName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
|
||||
ServerName destServerName) throws ReplicationException {
|
||||
LOG.info(
|
||||
"Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
// Create an enabled region server for destination.
|
||||
byte[] destServerNameRowKey = getServerNameRowKey(destServerName);
|
||||
byte[] srcServerNameRowKey = getServerNameRowKey(sourceServerName);
|
||||
Put put = new Put(destServerNameRowKey).addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED,
|
||||
HConstants.EMPTY_BYTE_ARRAY);
|
||||
table.put(put);
|
||||
List<WALCell> wals = getWALsInQueue0(table, sourceServerName, queueId);
|
||||
String newQueueId = queueId + "-" + sourceServerName;
|
||||
// Remove the queue in source region server if wal set of the queue is empty.
|
||||
if (CollectionUtils.isEmpty(wals)) {
|
||||
Delete delete =
|
||||
new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId))
|
||||
.addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP);
|
||||
table.delete(delete);
|
||||
LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty");
|
||||
return new Pair<>(newQueueId, Collections.emptySortedSet());
|
||||
}
|
||||
// Transfer all wals from source region server to destination region server in a batch.
|
||||
List<Mutation> mutations = new ArrayList<>();
|
||||
// a. Create queue for destination server.
|
||||
mutations.add(new Put(destServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(newQueueId),
|
||||
HConstants.EMPTY_BYTE_ARRAY));
|
||||
SortedSet<String> logQueue = new TreeSet<>();
|
||||
for (WALCell wal : wals) {
|
||||
byte[] data = makeByteArray(wal.wal, wal.cellTimestamp);
|
||||
// b. Add wal to destination server.
|
||||
mutations.add(
|
||||
new Put(destServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(newQueueId), data));
|
||||
// c. Remove wal from source server.
|
||||
mutations.add(new Delete(srcServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(queueId),
|
||||
wal.cellTimestamp));
|
||||
logQueue.add(wal.wal);
|
||||
}
|
||||
// d. Remove the queue of source server.
|
||||
mutations
|
||||
.add(new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId)));
|
||||
Object[] results = new Object[mutations.size()];
|
||||
table.batch(mutations, results);
|
||||
boolean allSuccess = Stream.of(results).allMatch(r -> r != null);
|
||||
if (!allSuccess) {
|
||||
throw new ReplicationException("Claim queue queueId=" + queueId + " from "
|
||||
+ sourceServerName + " to " + destServerName + " failed, not all mutations success.");
|
||||
}
|
||||
LOG.info(
|
||||
"Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
|
||||
return new Pair<>(newQueueId, logQueue);
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName
|
||||
+ " to " + destServerName + " failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
|
||||
// TODO Make this to be a checkAndDelete, and provide a UT for it.
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_WAL).readAllVersions();
|
||||
Result r = table.get(get);
|
||||
if (r == null || r.listCells() == null || r.listCells().size() == 0) {
|
||||
Delete delete = new Delete(getServerNameRowKey(serverName));
|
||||
table.delete(delete);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException(
|
||||
"Failed to remove replicator when queue is empty, serverName=" + serverName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getListOfReplicators() throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Scan scan = new Scan().addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED).readVersions(1);
|
||||
Set<ServerName> serverNames = new HashSet<>();
|
||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (Result r : scanner) {
|
||||
if (r.listCells().size() > 0) {
|
||||
Cell firstCell = r.listCells().get(0);
|
||||
String serverName = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(),
|
||||
firstCell.getRowLength());
|
||||
serverNames.add(ServerName.parseServerName(serverName));
|
||||
}
|
||||
}
|
||||
}
|
||||
return new ArrayList<>(serverNames);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to get list of replicators", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getAllWALs() throws ReplicationException {
|
||||
Set<String> walSet = new HashSet<>();
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Scan scan = new Scan().addFamily(FAMILY_WAL).readAllVersions();
|
||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (Result r : scanner) {
|
||||
result2WALCells(r).forEach(w -> walSet.add(w.wal));
|
||||
}
|
||||
}
|
||||
return walSet;
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to get all wals", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
|
||||
// Need to do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
|
||||
// Need to do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (Pair<Path, Path> p : pairs) {
|
||||
Put put = new Put(Bytes.toBytes(peerId));
|
||||
put.addColumn(FAMILY_HFILE_REFS, Bytes.toBytes(p.getSecond().getName()),
|
||||
HConstants.EMPTY_BYTE_ARRAY);
|
||||
puts.add(put);
|
||||
}
|
||||
table.put(puts);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to add hfile refs, peerId=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
List<Delete> deletes = new ArrayList<>();
|
||||
for (String file : files) {
|
||||
Delete delete = new Delete(Bytes.toBytes(peerId));
|
||||
delete.addColumns(FAMILY_HFILE_REFS, Bytes.toBytes(file));
|
||||
deletes.add(delete);
|
||||
}
|
||||
table.delete(deletes);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to remove hfile refs, peerId=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Set<String> peers = new HashSet<>();
|
||||
Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS);
|
||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (Result r : scanner) {
|
||||
if (r.listCells().size() > 0) {
|
||||
Cell firstCell = r.listCells().get(0);
|
||||
String peerId = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(),
|
||||
firstCell.getRowLength());
|
||||
peers.add(peerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return new ArrayList<>(peers);
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Faield to get all peers by reading hbase:replication meta",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Get get = new Get(Bytes.toBytes(peerId)).addFamily(FAMILY_HFILE_REFS);
|
||||
Result r = table.get(get);
|
||||
List<String> hfiles = new ArrayList<>();
|
||||
if (r != null && r.listCells() != null) {
|
||||
for (Cell c : r.listCells()) {
|
||||
hfiles.add(
|
||||
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()));
|
||||
}
|
||||
}
|
||||
return hfiles;
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to get replicable hfiles, peerId=" + peerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getAllHFileRefs() throws ReplicationException {
|
||||
try (Table table = getReplicationMetaTable()) {
|
||||
Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS);
|
||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||
Set<String> hfileSet = new HashSet<>();
|
||||
for (Result r : scanner) {
|
||||
for (Cell c : r.listCells()) {
|
||||
String hfile = Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(),
|
||||
c.getQualifierLength());
|
||||
hfileSet.add(hfile);
|
||||
}
|
||||
}
|
||||
return hfileSet;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ReplicationException("Failed to get all hfile refs", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,127 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class TableReplicationStorageBase {
|
||||
protected final ZKWatcher zookeeper;
|
||||
protected final Configuration conf;
|
||||
|
||||
public static final TableName REPLICATION_TABLE =
|
||||
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
|
||||
|
||||
// Peer family, the row key would be peer id.
|
||||
public static final byte[] FAMILY_PEER = Bytes.toBytes("peer");
|
||||
public static final byte[] QUALIFIER_PEER_CONFIG = Bytes.toBytes("config");
|
||||
public static final byte[] QUALIFIER_PEER_STATE = Bytes.toBytes("state");
|
||||
|
||||
// Region server state family, the row key would be name of region server.
|
||||
public static final byte[] FAMILY_RS_STATE = Bytes.toBytes("rs_state");
|
||||
public static final byte[] QUALIFIER_STATE_ENABLED = Bytes.toBytes("enabled");
|
||||
|
||||
// Queue and wal family, the row key would be name of region server.
|
||||
public static final byte[] FAMILY_QUEUE = Bytes.toBytes("queue");
|
||||
public static final byte[] FAMILY_WAL = Bytes.toBytes("wal");
|
||||
|
||||
// HFile-Refs family, the row key would be peer id.
|
||||
public static final byte[] FAMILY_HFILE_REFS = Bytes.toBytes("hfile-refs");
|
||||
|
||||
// Region family, the row key would be peer id.
|
||||
public static final byte[] FAMILY_REGIONS = Bytes.toBytes("regions");
|
||||
|
||||
private Connection connection;
|
||||
|
||||
protected static byte[] getServerNameRowKey(ServerName serverName) {
|
||||
return Bytes.toBytes(serverName.toString());
|
||||
}
|
||||
|
||||
protected static byte[] getRegionQualifier(String encodedRegionName) {
|
||||
return Bytes.toBytes(encodedRegionName);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static TableDescriptorBuilder createReplicationTableDescBuilder(final Configuration conf)
|
||||
throws IOException {
|
||||
int metaMaxVersion =
|
||||
conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS);
|
||||
int metaBlockSize =
|
||||
conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE);
|
||||
return TableDescriptorBuilder
|
||||
.newBuilder(REPLICATION_TABLE)
|
||||
.setColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_PEER).setMaxVersions(metaMaxVersion)
|
||||
.setInMemory(true).setBlocksize(metaBlockSize)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
|
||||
.build())
|
||||
.setColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_RS_STATE).setMaxVersions(metaMaxVersion)
|
||||
.setInMemory(true).setBlocksize(metaBlockSize)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
|
||||
.build())
|
||||
.setColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_QUEUE).setMaxVersions(metaMaxVersion)
|
||||
.setInMemory(true).setBlocksize(metaBlockSize)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
|
||||
.build())
|
||||
.setColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_WAL)
|
||||
.setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true)
|
||||
.setBlocksize(metaBlockSize).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
|
||||
.setBloomFilterType(BloomType.NONE).build())
|
||||
.setColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_REGIONS).setMaxVersions(metaMaxVersion)
|
||||
.setInMemory(true).setBlocksize(metaBlockSize)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
|
||||
.build())
|
||||
.setColumnFamily(
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_HFILE_REFS)
|
||||
.setMaxVersions(metaMaxVersion).setInMemory(true).setBlocksize(metaBlockSize)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
|
||||
.build());
|
||||
}
|
||||
|
||||
protected TableReplicationStorageBase(ZKWatcher zookeeper, Configuration conf)
|
||||
throws IOException {
|
||||
this.zookeeper = zookeeper;
|
||||
this.conf = conf;
|
||||
this.connection = ConnectionFactory.createConnection(conf);
|
||||
}
|
||||
|
||||
protected Table getReplicationMetaTable() throws IOException {
|
||||
return this.connection.getTable(REPLICATION_TABLE);
|
||||
}
|
||||
}
|
|
@ -17,9 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES;
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -34,6 +31,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
/**
|
||||
* ZK based replication peer storage.
|
||||
*/
|
||||
|
@ -47,6 +46,11 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
|
|||
public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state";
|
||||
public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
|
||||
|
||||
public static final byte[] ENABLED_ZNODE_BYTES =
|
||||
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
|
||||
public static final byte[] DISABLED_ZNODE_BYTES =
|
||||
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
|
||||
|
||||
/**
|
||||
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
|
||||
* cluster.
|
||||
|
@ -85,7 +89,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
|
|||
ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
|
||||
ReplicationPeerConfigUtil.toByteArray(peerConfig)),
|
||||
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
|
||||
enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES)),
|
||||
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
|
||||
false);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>"
|
||||
|
@ -104,7 +108,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
|
|||
|
||||
@Override
|
||||
public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
|
||||
byte[] stateBytes = enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES;
|
||||
byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
|
||||
try {
|
||||
ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
|
||||
} catch (KeeperException e) {
|
||||
|
@ -136,7 +140,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
|
|||
@Override
|
||||
public boolean isPeerEnabled(String peerId) throws ReplicationException {
|
||||
try {
|
||||
return Arrays.equals(PEER_STATE_ENABLED_BYTES,
|
||||
return Arrays.equals(ENABLED_ZNODE_BYTES,
|
||||
ZKUtil.getData(zookeeper, getPeerStateNode(peerId)));
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
|
||||
|
|
|
@ -79,7 +79,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
* </pre>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||
class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||
implements ReplicationQueueStorage {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
|
||||
|
@ -199,7 +199,7 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
|||
// Persist the max sequence id(s) of regions for serial replication atomically.
|
||||
if (lastSeqIds != null && lastSeqIds.size() > 0) {
|
||||
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
|
||||
String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId);
|
||||
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
|
||||
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
|
||||
/*
|
||||
* Make sure the existence of path
|
||||
|
@ -375,7 +375,7 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
|||
|
||||
// will be overridden in UTs
|
||||
@VisibleForTesting
|
||||
public int getQueuesZNodeCversion() throws KeeperException {
|
||||
protected int getQueuesZNodeCversion() throws KeeperException {
|
||||
Stat stat = new Stat();
|
||||
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
|
||||
return stat.getCversion();
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.storage;
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.hasItems;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -30,13 +30,7 @@ import java.util.List;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -228,19 +222,18 @@ public abstract class TestReplicationStateBasic {
|
|||
|
||||
try {
|
||||
rp.getPeerStorage().setPeerState("bogus", true);
|
||||
fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId");
|
||||
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
try {
|
||||
rp.getPeerStorage().setPeerState("bogus", false);
|
||||
fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId");
|
||||
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
assertFalse(rp.addPeer("bogus"));
|
||||
fail("Should have thrown an ReplicationException when creating a bogus peerId "
|
||||
+ "with null peer config");
|
||||
fail("Should have thrown an ReplicationException when passed a bogus peerId");
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
|
|
@ -15,17 +15,14 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.storage;
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterId;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.storage;
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
|
@ -33,13 +33,9 @@ import java.util.Map;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.junit.AfterClass;
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.storage;
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.hasItems;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -27,13 +27,10 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -221,7 +218,7 @@ public class TestZKReplicationQueueStorage {
|
|||
private int called = 0;
|
||||
|
||||
@Override
|
||||
public int getQueuesZNodeCversion() throws KeeperException {
|
||||
protected int getQueuesZNodeCversion() throws KeeperException {
|
||||
if (called < 4) {
|
||||
called++;
|
||||
}
|
|
@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
|
@ -170,9 +170,9 @@ public abstract class TestReplicationSourceManager {
|
|||
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
|
||||
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
|
||||
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
|
||||
ReplicationUtils.PEER_STATE_ENABLED_BYTES);
|
||||
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
|
||||
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
|
||||
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationUtils.PEER_STATE_ENABLED_BYTES);
|
||||
ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
|
||||
|
||||
ZKClusterId.setClusterId(zkw, new ClusterId());
|
||||
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
|
||||
|
|
|
@ -1,129 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.replication.storage;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterId;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.TableReplicationPeerStorage;
|
||||
import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.TableReplicationStorageBase;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
public class TestReplicationStateTableImpl extends TestReplicationStateBasic {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationStateTableImpl.class);
|
||||
|
||||
private static Configuration conf;
|
||||
private static HBaseTestingUtility utility = new HBaseTestingUtility();
|
||||
private static ZKWatcher zkw;
|
||||
private static Connection connection;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
conf = utility.getConfiguration();
|
||||
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
|
||||
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
|
||||
utility.startMiniCluster();
|
||||
|
||||
// After the HBase Mini cluster startup, we set the storage implementation to table based
|
||||
// implementation. Otherwise, we cannot setup the HBase Mini Cluster because the master will
|
||||
// list peers before finish its initialization, and if master cannot finish initialization, the
|
||||
// meta cannot be online, in other hand, if meta cannot be online, the list peers never success
|
||||
// when using table based replication. a dead loop happen.
|
||||
// Our UTs are written for testing storage layer, so no problem here.
|
||||
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
|
||||
TableReplicationPeerStorage.class.getName());
|
||||
conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_STORAGE_IMPL,
|
||||
TableReplicationQueueStorage.class.getName());
|
||||
|
||||
zkw = utility.getZooKeeperWatcher();
|
||||
connection = ConnectionFactory.createConnection(conf);
|
||||
|
||||
KEY_ONE = initPeerClusterState("/hbase1");
|
||||
KEY_TWO = initPeerClusterState("/hbase2");
|
||||
}
|
||||
|
||||
private static String initPeerClusterState(String baseZKNode)
|
||||
throws IOException, KeeperException {
|
||||
// Add a dummy region server and set up the cluster id
|
||||
Configuration testConf = new Configuration(conf);
|
||||
testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
|
||||
ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
|
||||
String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234");
|
||||
ZKUtil.createWithParents(zkw1, fakeRs);
|
||||
ZKClusterId.setClusterId(zkw1, new ClusterId());
|
||||
return ZKConfig.getZooKeeperClusterKey(testConf);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
||||
rp = ReplicationFactory.getReplicationPeers(zkw, conf);
|
||||
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
|
||||
|
||||
// Create hbase:replication meta table.
|
||||
try (Admin admin = connection.getAdmin()) {
|
||||
TableDescriptor table =
|
||||
TableReplicationStorageBase.createReplicationTableDescBuilder(conf).build();
|
||||
admin.createTable(table);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws KeeperException, IOException {
|
||||
// Drop the hbase:replication meta table.
|
||||
utility.deleteTable(TableReplicationStorageBase.REPLICATION_TABLE);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
if (connection != null) {
|
||||
IOUtils.closeQuietly(connection);
|
||||
}
|
||||
utility.shutdownMiniZKCluster();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue