HBASE-27727 Implement filesystem based Replication peer storage (#5165)

Signed-off-by: Liangjun He <heliangjun@apache.org>
This commit is contained in:
Duo Zhang 2023-04-13 18:58:02 +08:00 committed by GitHub
parent e4b4cef80e
commit a71105997f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1346 additions and 304 deletions

View File

@ -17,10 +17,16 @@
*/
package org.apache.hadoop.hbase.replication;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@ -74,4 +80,28 @@ public enum SyncReplicationState {
return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
.parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length)));
}
public static byte[] toByteArray(SyncReplicationState state, SyncReplicationState newState) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
out.write(ProtobufMagic.PB_MAGIC);
ReplicationPeerConfigUtil.toSyncReplicationState(state).writeDelimitedTo(out);
ReplicationPeerConfigUtil.toSyncReplicationState(newState).writeDelimitedTo(out);
} catch (IOException e) {
// should not happen, all in memory operations
throw new AssertionError(e);
}
return out.toByteArray();
}
public static Pair<SyncReplicationState, SyncReplicationState>
parseStateAndNewStateFrom(byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ByteStreams.skipFully(in, ProtobufMagic.lengthOfPBMagic());
SyncReplicationState state = ReplicationPeerConfigUtil
.toSyncReplicationState(ReplicationProtos.SyncReplicationState.parseDelimitedFrom(in));
SyncReplicationState newState = ReplicationPeerConfigUtil
.toSyncReplicationState(ReplicationProtos.SyncReplicationState.parseDelimitedFrom(in));
return Pair.newPair(state, newState);
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.zip.CRC32;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/**
* A file storage which supports atomic update through two files, i.e, rotating. The implementation
* does not require atomic rename.
*/
@InterfaceAudience.Private
public class RotateFile {
private static final Logger LOG = LoggerFactory.getLogger(RotateFile.class);
private final FileSystem fs;
private final long maxFileSize;
private final Path[] files = new Path[2];
// this is used to make sure that we do not go backwards
private long prevTimestamp = -1;
private int nextFile = -1;
/**
* Constructs a new RotateFile object with the given parameters.
* @param fs the file system to use.
* @param dir the directory where the files will be created.
* @param name the base name for the files.
* @param maxFileSize the maximum size of each file.
*/
public RotateFile(FileSystem fs, Path dir, String name, long maxFileSize) {
this.fs = fs;
this.maxFileSize = maxFileSize;
this.files[0] = new Path(dir, name + "-0");
this.files[1] = new Path(dir, name + "-1");
}
private HBaseProtos.RotateFileData read(Path path) throws IOException {
byte[] data;
int expectedChecksum;
try (FSDataInputStream in = fs.open(path)) {
int length = in.readInt();
if (length <= 0 || length > maxFileSize) {
throw new IOException("Invalid file length " + length
+ ", either less than 0 or greater then max allowed size " + maxFileSize);
}
data = new byte[length];
in.readFully(data);
expectedChecksum = in.readInt();
}
CRC32 crc32 = new CRC32();
crc32.update(data);
int calculatedChecksum = (int) crc32.getValue();
if (expectedChecksum != calculatedChecksum) {
throw new IOException(
"Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum);
}
return HBaseProtos.RotateFileData.parseFrom(data);
}
private int select(HBaseProtos.RotateFileData[] datas) {
if (datas[0] == null) {
return 1;
}
if (datas[1] == null) {
return 0;
}
return datas[0].getTimestamp() >= datas[1].getTimestamp() ? 0 : 1;
}
/**
* Reads the content of the rotate file by selecting the winner file based on the timestamp of the
* data inside the files. It reads the content of both files and selects the one with the latest
* timestamp as the winner. If a file is incomplete or does not exist, it logs the error and moves
* on to the next file. It returns the content of the winner file as a byte array. If none of the
* files have valid data, it returns null.
* @return a byte array containing the data from the winner file, or null if no valid data is
* found.
* @throws IOException if an error occurs while reading the files.
*/
public byte[] read() throws IOException {
HBaseProtos.RotateFileData[] datas = new HBaseProtos.RotateFileData[2];
for (int i = 0; i < 2; i++) {
try {
datas[i] = read(files[i]);
} catch (FileNotFoundException e) {
LOG.debug("file {} does not exist", files[i], e);
} catch (EOFException e) {
LOG.debug("file {} is incomplete", files[i], e);
}
}
int winnerIndex = select(datas);
nextFile = 1 - winnerIndex;
if (datas[winnerIndex] != null) {
prevTimestamp = datas[winnerIndex].getTimestamp();
return datas[winnerIndex].getData().toByteArray();
} else {
return null;
}
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/RotateFile.java|.*/src/test/.*")
static void write(FileSystem fs, Path file, long timestamp, byte[] data) throws IOException {
HBaseProtos.RotateFileData proto = HBaseProtos.RotateFileData.newBuilder()
.setTimestamp(timestamp).setData(ByteString.copyFrom(data)).build();
byte[] protoData = proto.toByteArray();
CRC32 crc32 = new CRC32();
crc32.update(protoData);
int checksum = (int) crc32.getValue();
// 4 bytes length, 8 bytes timestamp, 4 bytes checksum at the end
try (FSDataOutputStream out = fs.create(file, true)) {
out.writeInt(protoData.length);
out.write(protoData);
out.writeInt(checksum);
}
}
/**
* Writes the given data to the next file in the rotation, with a timestamp calculated based on
* the previous timestamp and the current time to make sure it is greater than the previous
* timestamp. The method also deletes the previous file, which is no longer needed.
* <p/>
* Notice that, for a newly created {@link RotateFile} instance, you need to call {@link #read()}
* first to initialize the nextFile index, before calling this method.
* @param data the data to be written to the file
* @throws IOException if an I/O error occurs while writing the data to the file
*/
public void write(byte[] data) throws IOException {
if (data.length > maxFileSize) {
throw new IOException(
"Data size " + data.length + " is greater than max allowed size " + maxFileSize);
}
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
write(fs, files[nextFile], timestamp, data);
prevTimestamp = timestamp;
nextFile = 1 - nextFile;
try {
fs.delete(files[nextFile], false);
} catch (IOException e) {
// we will create new file with overwrite = true, so not a big deal here, only for speed up
// loading as we do not need to read this file when loading
LOG.debug("Failed to delete old file {}, ignoring the exception", files[nextFile], e);
}
}
/**
* Deletes the two files used for rotating data. If any of the files cannot be deleted, an
* IOException is thrown.
* @throws IOException if there is an error deleting either file
*/
public void delete() throws IOException {
Path next = files[nextFile];
// delete next file first, and then the current file, so when failing to delete, we can still
// read the correct data
if (fs.exists(next) && !fs.delete(next, false)) {
throw new IOException("Can not delete " + next);
}
Path current = files[1 - nextFile];
if (fs.exists(current) && !fs.delete(current, false)) {
throw new IOException("Can not delete " + current);
}
}
}

View File

@ -17,13 +17,24 @@
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BuilderStyleTest;
@ -43,6 +54,8 @@ public class TestReplicationPeerConfig {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
private static final Configuration CONF = HBaseConfiguration.create();
private static final String NAMESPACE_REPLICATE = "replicate";
private static final String NAMESPACE_OTHER = "other";
private static final TableName TABLE_A = TableName.valueOf(NAMESPACE_REPLICATE, "testA");
@ -246,4 +259,124 @@ public class TestReplicationPeerConfig {
assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
}
private static final Random RNG = new Random(); // Seed may be set with Random#setSeed
private Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}
private Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}
private ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
.setBandwidth(RNG.nextInt(1000)).build();
}
@Test
public void testBaseReplicationPeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
String customPeerConfigUpdatedValue = "testUpdated";
String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
String customPeerConfigSecondValue = "testSecond";
String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
// custom config not present
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
Configuration conf = new Configuration(CONF);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";")
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
// validates base configs are present in replicationPeerConfig
assertEquals(customPeerConfigValue,
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
assertEquals(customPeerConfigSecondValue,
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey));
// validates base configs get updated values even if config already present
conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";")
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
assertEquals(customPeerConfigUpdatedValue,
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey));
assertEquals(customPeerConfigSecondUpdatedValue,
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey));
}
@Test
public void testBaseReplicationRemovePeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
// custom config not present
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
Configuration conf = new Configuration(CONF);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigValue));
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
// validates base configs are present in replicationPeerConfig
assertEquals(customPeerConfigValue,
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(""));
ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
}
@Test
public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
// custom config not present
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
Configuration conf = new Configuration(CONF);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(""));
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
@Category({ MiscTests.class, SmallTests.class })
public class TestRotateFile {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRotateFile.class);
private static HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
private static FileSystem FS;
private Path dir;
private RotateFile rotateFile;
@Rule
public final TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws IOException {
FS = FileSystem.get(UTIL.getConfiguration());
}
@AfterClass
public static void tearDownAfterClass() {
UTIL.cleanupTestDir();
}
@Before
public void setUp() throws IOException {
dir = UTIL.getDataTestDir(name.getMethodName());
if (!FS.mkdirs(dir)) {
throw new IOException("Can not create dir " + dir);
}
rotateFile = new RotateFile(FS, dir, name.getMethodName(), 1024);
assertNull(rotateFile.read());
}
@Test
public void testSimpleReadWrite() throws IOException {
for (int i = 0; i < 10; i++) {
rotateFile.write(Bytes.toBytes(i));
assertEquals(i, Bytes.toInt(rotateFile.read()));
}
rotateFile.delete();
assertNull(rotateFile.read());
}
@Test
public void testCompareTimestamp() throws IOException {
long now = EnvironmentEdgeManager.currentTime();
rotateFile.write(Bytes.toBytes(10));
Path file = FS.listStatus(dir)[0].getPath();
rotateFile.write(Bytes.toBytes(100));
// put a fake file with a less timestamp there
RotateFile.write(FS, file, now - 1, Bytes.toBytes(10));
assertEquals(100, Bytes.toInt(rotateFile.read()));
// put a fake file with a greater timestamp there
RotateFile.write(FS, file, EnvironmentEdgeManager.currentTime() + 100, Bytes.toBytes(10));
assertEquals(10, Bytes.toInt(rotateFile.read()));
}
@Test
public void testMaxFileSize() throws IOException {
assertThrows(IOException.class, () -> rotateFile.write(new byte[1025]));
// put a file greater than max file size
rotateFile.write(Bytes.toBytes(10));
Path file = FS.listStatus(dir)[0].getPath();
RotateFile.write(FS, file, EnvironmentEdgeManager.currentTime(), new byte[1025]);
assertThrows(IOException.class, () -> rotateFile.read());
}
@Test
public void testNotEnoughData() throws IOException {
rotateFile.write(Bytes.toBytes(10));
assertEquals(10, Bytes.toInt(rotateFile.read()));
// remove the last byte
Path file = FS.listStatus(dir)[0].getPath();
byte[] data;
try (FSDataInputStream in = FS.open(file)) {
data = ByteStreams.toByteArray(in);
}
try (FSDataOutputStream out = FS.create(file, true)) {
out.write(data, 0, data.length - 1);
}
// should hit EOF so read nothing
assertNull(rotateFile.read());
}
@Test
public void testChecksumMismatch() throws IOException {
rotateFile.write(Bytes.toBytes(10));
assertEquals(10, Bytes.toInt(rotateFile.read()));
// mess up one byte
Path file = FS.listStatus(dir)[0].getPath();
byte[] data;
try (FSDataInputStream in = FS.open(file)) {
data = ByteStreams.toByteArray(in);
}
data[4]++;
try (FSDataOutputStream out = FS.create(file, true)) {
out.write(data, 0, data.length);
}
// should get checksum mismatch
IOException error = assertThrows(IOException.class, () -> rotateFile.read());
assertThat(error.getMessage(), startsWith("Checksum mismatch"));
}
}

View File

@ -347,7 +347,7 @@ public class VerifyReplication extends Configured implements Tool {
}
});
ReplicationPeerStorage storage =
ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf);
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
return Pair.newPair(peerConfig,
ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));

View File

@ -284,3 +284,8 @@ message LogEntry {
required string log_class_name = 1;
required bytes log_message = 2;
}
message RotateFileData {
required int64 timestamp = 1;
required bytes data = 2;
}

View File

@ -103,6 +103,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>

View File

@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RotateFile;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A filesystem based replication peer storage. The implementation does not require atomic rename so
* you can use it on cloud OSS.
* <p/>
* FileSystem layout:
*
* <pre>
* hbase
* |
* --peers
* |
* --&lt;peer_id&gt;
* |
* --peer_config
* |
* --disabled
* |
* --sync-rep-state
* </pre>
*
* Notice that, if the peer is enabled, we will not have a disabled file.
* <p/>
* And for other files, to avoid depending on atomic rename, we will use two files for storing the
* content. When loading, we will try to read both the files and load the newer one. And when
* writing, we will write to the older file.
*/
@InterfaceAudience.Private
public class FSReplicationPeerStorage implements ReplicationPeerStorage {
private static final Logger LOG = LoggerFactory.getLogger(FSReplicationPeerStorage.class);
public static final String PEERS_DIR = "hbase.replication.peers.directory";
public static final String PEERS_DIR_DEFAULT = "peers";
static final String PEER_CONFIG_FILE = "peer_config";
static final String DISABLED_FILE = "disabled";
static final String SYNC_REPLICATION_STATE_FILE = "sync-rep-state";
static final byte[] NONE_STATE_BYTES =
SyncReplicationState.toByteArray(SyncReplicationState.NONE);
private final FileSystem fs;
private final Path dir;
public FSReplicationPeerStorage(FileSystem fs, Configuration conf) throws IOException {
this.fs = fs;
this.dir = new Path(CommonFSUtils.getRootDir(conf), conf.get(PEERS_DIR, PEERS_DIR_DEFAULT));
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/FSReplicationPeerStorage.java|.*/src/test/.*")
Path getPeerDir(String peerId) {
return new Path(dir, peerId);
}
@Override
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
SyncReplicationState syncReplicationState) throws ReplicationException {
Path peerDir = getPeerDir(peerId);
try {
if (fs.exists(peerDir)) {
// check whether this is a valid peer, if so we should fail the add peer operation
if (read(fs, peerDir, PEER_CONFIG_FILE) != null) {
throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfig=>"
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")
+ ", syncReplicationState=" + syncReplicationState + ", peer already exists");
}
}
if (!enabled) {
fs.createNewFile(new Path(peerDir, DISABLED_FILE));
}
write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
SyncReplicationState.toByteArray(syncReplicationState, SyncReplicationState.NONE));
// write the peer config data at last, so when loading, if we can not load the peer_config, we
// know that this is not a valid peer
write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig));
} catch (IOException e) {
throw new ReplicationException(
"Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state="
+ (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
e);
}
}
@Override
public void removePeer(String peerId) throws ReplicationException {
// delete the peer config first, and then delete the directory
// we will consider this is not a valid peer by reading the peer config file
Path peerDir = getPeerDir(peerId);
try {
delete(fs, peerDir, PEER_CONFIG_FILE);
if (!fs.delete(peerDir, true)) {
throw new IOException("Can not delete " + peerDir);
}
} catch (IOException e) {
throw new ReplicationException("Could not remove peer with id=" + peerId, e);
}
}
@Override
public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
try {
if (enabled) {
if (fs.exists(disabledFile) && !fs.delete(disabledFile, false)) {
throw new IOException("Can not delete " + disabledFile);
}
} else {
if (!fs.exists(disabledFile) && !fs.createNewFile(disabledFile)) {
throw new IOException("Can not touch " + disabledFile);
}
}
} catch (IOException e) {
throw new ReplicationException(
"Unable to change state of the peer with id=" + peerId + " to " + enabled, e);
}
}
@Override
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException {
Path peerDir = getPeerDir(peerId);
try {
write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig));
} catch (IOException e) {
throw new ReplicationException(
"There was a problem trying to save changes to the " + "replication peer " + peerId, e);
}
}
@Override
public List<String> listPeerIds() throws ReplicationException {
try {
FileStatus[] statuses = fs.listStatus(dir);
if (statuses == null || statuses.length == 0) {
return Collections.emptyList();
}
List<String> peerIds = new ArrayList<>();
for (FileStatus status : statuses) {
String peerId = status.getPath().getName();
Path peerDir = getPeerDir(peerId);
// confirm that this is a valid peer
byte[] peerConfigData = read(fs, peerDir, PEER_CONFIG_FILE);
if (peerConfigData != null) {
peerIds.add(peerId);
}
}
return Collections.unmodifiableList(peerIds);
} catch (FileNotFoundException e) {
LOG.debug("Peer directory does not exist yet", e);
return Collections.emptyList();
} catch (IOException e) {
throw new ReplicationException("Cannot get the list of peers", e);
}
}
@Override
public boolean isPeerEnabled(String peerId) throws ReplicationException {
Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
try {
return !fs.exists(disabledFile);
} catch (IOException e) {
throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
}
}
@Override
public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
Path peerDir = getPeerDir(peerId);
byte[] data;
try {
data = read(fs, peerDir, PEER_CONFIG_FILE);
} catch (IOException e) {
throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
}
if (data == null || data.length == 0) {
throw new ReplicationException(
"Replication peer config data shouldn't be empty, peerId=" + peerId);
}
try {
return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
throw new ReplicationException(
"Failed to parse replication peer config for peer with id=" + peerId, e);
}
}
private Pair<SyncReplicationState, SyncReplicationState> getStateAndNewState(String peerId)
throws IOException {
Path peerDir = getPeerDir(peerId);
if (!fs.exists(peerDir)) {
throw new IOException("peer does not exists");
}
byte[] data = read(fs, peerDir, SYNC_REPLICATION_STATE_FILE);
if (data == null) {
// should be a peer from previous version, set the sync replication state for it.
write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
SyncReplicationState.toByteArray(SyncReplicationState.NONE, SyncReplicationState.NONE));
return Pair.newPair(SyncReplicationState.NONE, SyncReplicationState.NONE);
} else {
return SyncReplicationState.parseStateAndNewStateFrom(data);
}
}
@Override
public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState newState)
throws ReplicationException {
Path peerDir = getPeerDir(peerId);
try {
Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
getStateAndNewState(peerId);
write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
SyncReplicationState.toByteArray(stateAndNewState.getFirst(), newState));
} catch (IOException e) {
throw new ReplicationException(
"Unable to set the new sync replication state for peer with id=" + peerId + ", newState="
+ newState,
e);
}
}
@Override
public void transitPeerSyncReplicationState(String peerId) throws ReplicationException {
Path peerDir = getPeerDir(peerId);
try {
Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
getStateAndNewState(peerId);
write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
SyncReplicationState.toByteArray(stateAndNewState.getSecond(), SyncReplicationState.NONE));
} catch (IOException e) {
throw new ReplicationException(
"Error transiting sync replication state for peer with id=" + peerId, e);
}
}
@Override
public SyncReplicationState getPeerSyncReplicationState(String peerId)
throws ReplicationException {
try {
return getStateAndNewState(peerId).getFirst();
} catch (IOException e) {
throw new ReplicationException(
"Error getting sync replication state for peer with id=" + peerId, e);
}
}
@Override
public SyncReplicationState getPeerNewSyncReplicationState(String peerId)
throws ReplicationException {
try {
return getStateAndNewState(peerId).getSecond();
} catch (IOException e) {
throw new ReplicationException(
"Error getting new sync replication state for peer with id=" + peerId, e);
}
}
// 16 MB is big enough for our usage here
private static final long MAX_FILE_SIZE = 16 * 1024 * 1024;
private static byte[] read(FileSystem fs, Path dir, String name) throws IOException {
RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
return file.read();
}
private static void write(FileSystem fs, Path dir, String name, byte[] data) throws IOException {
RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
// to initialize the nextFile index
file.read();
file.write(data);
}
private static void delete(FileSystem fs, Path dir, String name) throws IOException {
RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
// to initialize the nextFile index
file.read();
file.delete();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -32,7 +33,8 @@ public final class ReplicationFactory {
private ReplicationFactory() {
}
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) {
return new ReplicationPeers(zk, conf);
public static ReplicationPeers getReplicationPeers(FileSystem fs, ZKWatcher zk,
Configuration conf) {
return new ReplicationPeers(fs, zk, conf);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Specify the implementations for {@link ReplicationPeerStorage}.
*/
@InterfaceAudience.Private
public enum ReplicationPeerStorageType {
FILESYSTEM(FSReplicationPeerStorage.class),
ZOOKEEPER(ZKReplicationPeerStorage.class);
private final Class<? extends ReplicationPeerStorage> clazz;
private ReplicationPeerStorageType(Class<? extends ReplicationPeerStorage> clazz) {
this.clazz = clazz;
}
public Class<? extends ReplicationPeerStorage> getClazz() {
return clazz;
}
}

View File

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -40,10 +41,10 @@ public class ReplicationPeers {
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
private final ReplicationPeerStorage peerStorage;
ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
this.conf = conf;
this.peerCache = new ConcurrentHashMap<>();
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
}
public Configuration getConf() {

View File

@ -17,26 +17,60 @@
*/
package org.apache.hadoop.hbase.replication;
import java.lang.reflect.Constructor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used to create replication storage(peer, queue) classes.
* <p>
* For now we only have zk based implementation.
*/
@InterfaceAudience.Private
public final class ReplicationStorageFactory {
public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl";
// must use zookeeper here, otherwise when user upgrading from an old version without changing the
// config file, they will loss all the replication peer data.
public static final ReplicationPeerStorageType DEFAULT_REPLICATION_PEER_STORAGE_IMPL =
ReplicationPeerStorageType.ZOOKEEPER;
private ReplicationStorageFactory() {
}
private static Class<? extends ReplicationPeerStorage>
getReplicationPeerStorageClass(Configuration conf) {
try {
ReplicationPeerStorageType type = ReplicationPeerStorageType.valueOf(
conf.get(REPLICATION_PEER_STORAGE_IMPL, DEFAULT_REPLICATION_PEER_STORAGE_IMPL.name())
.toUpperCase());
return type.getClazz();
} catch (IllegalArgumentException e) {
return conf.getClass(REPLICATION_PEER_STORAGE_IMPL,
DEFAULT_REPLICATION_PEER_STORAGE_IMPL.getClazz(), ReplicationPeerStorage.class);
}
}
/**
* Create a new {@link ReplicationPeerStorage}.
*/
public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) {
return new ZKReplicationPeerStorage(zk, conf);
public static ReplicationPeerStorage getReplicationPeerStorage(FileSystem fs, ZKWatcher zk,
Configuration conf) {
Class<? extends ReplicationPeerStorage> clazz = getReplicationPeerStorageClass(conf);
for (Constructor<?> c : clazz.getConstructors()) {
if (c.getParameterCount() != 2) {
continue;
}
if (c.getParameterTypes()[0].isAssignableFrom(FileSystem.class)) {
return ReflectionUtils.newInstance(clazz, fs, conf);
} else if (c.getParameterTypes()[0].isAssignableFrom(ZKWatcher.class)) {
return ReflectionUtils.newInstance(clazz, zk, conf);
}
}
throw new IllegalArgumentException(
"Can not create replication peer storage with type " + clazz);
}
/**

View File

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.TableName;
import org.junit.Test;
public abstract class ReplicationPeerStorageTestBase {
// Seed may be set with Random#setSeed
private static final Random RNG = new Random();
protected static ReplicationPeerStorage STORAGE;
private Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}
private Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}
private ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
.setBandwidth(RNG.nextInt(1000)).build();
}
private void assertSetEquals(Set<String> expected, Set<String> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach(s -> assertTrue(actual.contains(s)));
}
private void assertMapEquals(Map<TableName, List<String>> expected,
Map<TableName, List<String>> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach((expectedTn, expectedCFs) -> {
List<String> actualCFs = actual.get(expectedTn);
if (expectedCFs == null || expectedCFs.size() == 0) {
assertTrue(actual.containsKey(expectedTn));
assertTrue(actualCFs == null || actualCFs.size() == 0);
} else {
assertNotNull(actualCFs);
assertEquals(expectedCFs.size(), actualCFs.size());
for (Iterator<String> expectedIt = expectedCFs.iterator(),
actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
assertEquals(expectedIt.next(), actualIt.next());
}
}
});
}
private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
assertEquals(expected.getClusterKey(), actual.getClusterKey());
assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
}
@Test
public void test() throws ReplicationException {
int peerCount = 10;
for (int i = 0; i < peerCount; i++) {
STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
SyncReplicationState.valueOf(i % 4));
}
List<String> peerIds = STORAGE.listPeerIds();
assertEquals(peerCount, peerIds.size());
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
}
for (int i = 0; i < peerCount; i++) {
STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
}
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
}
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
}
for (int i = 0; i < peerCount; i++) {
STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
}
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
}
for (int i = 0; i < peerCount; i++) {
assertEquals(SyncReplicationState.valueOf(i % 4),
STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
}
String toRemove = Integer.toString(peerCount / 2);
STORAGE.removePeer(toRemove);
peerIds = STORAGE.listPeerIds();
assertEquals(peerCount - 1, peerIds.size());
assertFalse(peerIds.contains(toRemove));
try {
STORAGE.getPeerConfig(toRemove);
fail("Should throw a ReplicationException when getting peer config of a removed peer");
} catch (ReplicationException e) {
}
}
protected abstract void removePeerSyncRelicationState(String peerId) throws Exception;
protected abstract void assertPeerSyncReplicationStateCreate(String peerId) throws Exception;
@Test
public void testNoSyncReplicationState() throws Exception {
// This could happen for a peer created before we introduce sync replication.
String peerId = "testNoSyncReplicationState";
assertThrows("Should throw a ReplicationException when getting state of inexist peer",
ReplicationException.class, () -> STORAGE.getPeerSyncReplicationState(peerId));
assertThrows("Should throw a ReplicationException when getting state of inexist peer",
ReplicationException.class, () -> STORAGE.getPeerNewSyncReplicationState(peerId));
STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE);
// delete the sync replication state node to simulate
removePeerSyncRelicationState(peerId);
// should not throw exception as the peer exists
assertEquals(SyncReplicationState.NONE, STORAGE.getPeerSyncReplicationState(peerId));
assertEquals(SyncReplicationState.NONE, STORAGE.getPeerNewSyncReplicationState(peerId));
// make sure we create the node for the old format peer
assertPeerSyncReplicationStateCreate(peerId);
}
protected abstract void assertPeerNameControlException(ReplicationException e);
@Test
public void testPeerNameControl() throws Exception {
String clusterKey = "key";
STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
SyncReplicationState.NONE);
try {
ReplicationException e = assertThrows(ReplicationException.class,
() -> STORAGE.addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
SyncReplicationState.NONE));
assertPeerNameControlException(e);
} finally {
// clean up
STORAGE.removePeer("6");
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.endsWith;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.RotateFile;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestFSReplicationPeerStorage extends ReplicationPeerStorageTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSReplicationPeerStorage.class);
private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
private static FileSystem FS;
private static Path DIR;
@BeforeClass
public static void setUp() throws Exception {
DIR = UTIL.getDataTestDir("test_fs_peer_storage");
CommonFSUtils.setRootDir(UTIL.getConfiguration(), DIR);
FS = FileSystem.get(UTIL.getConfiguration());
STORAGE = new FSReplicationPeerStorage(FS, UTIL.getConfiguration());
}
@AfterClass
public static void tearDown() throws IOException {
UTIL.cleanupTestDir();
}
@Override
protected void removePeerSyncRelicationState(String peerId) throws Exception {
FSReplicationPeerStorage storage = (FSReplicationPeerStorage) STORAGE;
Path peerDir = storage.getPeerDir(peerId);
RotateFile file =
new RotateFile(FS, peerDir, FSReplicationPeerStorage.SYNC_REPLICATION_STATE_FILE, 1024);
file.read();
file.delete();
}
@Override
protected void assertPeerSyncReplicationStateCreate(String peerId) throws Exception {
FSReplicationPeerStorage storage = (FSReplicationPeerStorage) STORAGE;
Path peerDir = storage.getPeerDir(peerId);
RotateFile file =
new RotateFile(FS, peerDir, FSReplicationPeerStorage.SYNC_REPLICATION_STATE_FILE, 1024);
assertNotNull(file.read());
}
@Override
protected void assertPeerNameControlException(ReplicationException e) {
assertThat(e.getMessage(), endsWith("peer already exists"));
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
@ -77,10 +78,11 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
}
@Before
public void setUp() {
public void setUp() throws IOException {
zkTimeoutCount = 0;
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
rp = ReplicationFactory.getReplicationPeers(zkw, conf);
rp =
ReplicationFactory.getReplicationPeers(FileSystem.get(utility.getConfiguration()), zkw, conf);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
}

View File

@ -17,52 +17,30 @@
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestZKReplicationPeerStorage {
public class TestZKReplicationPeerStorage extends ReplicationPeerStorageTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
private static final Random RNG = new Random(); // Seed may be set with Random#setSeed
private static ZKReplicationPeerStorage STORAGE;
@BeforeClass
public static void setUp() throws Exception {
@ -75,264 +53,24 @@ public class TestZKReplicationPeerStorage {
UTIL.shutdownMiniZKCluster();
}
@After
public void cleanCustomConfigurations() {
UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
@Override
protected void removePeerSyncRelicationState(String peerId) throws Exception {
ZKReplicationPeerStorage storage = (ZKReplicationPeerStorage) STORAGE;
ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), storage.getSyncReplicationStateNode(peerId));
ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), storage.getNewSyncReplicationStateNode(peerId));
}
private Set<String> randNamespaces(Random rand) {
return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
.collect(toSet());
}
private Map<TableName, List<String>> randTableCFs(Random rand) {
int size = rand.nextInt(5);
Map<TableName, List<String>> map = new HashMap<>();
for (int i = 0; i < size; i++) {
TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
.limit(rand.nextInt(5)).collect(toList());
map.put(tn, cfs);
}
return map;
}
private ReplicationPeerConfig getConfig(int seed) {
RNG.setSeed(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
.setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG))
.setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG))
.setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
.setBandwidth(RNG.nextInt(1000)).build();
}
private void assertSetEquals(Set<String> expected, Set<String> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach(s -> assertTrue(actual.contains(s)));
}
private void assertMapEquals(Map<TableName, List<String>> expected,
Map<TableName, List<String>> actual) {
if (expected == null || expected.size() == 0) {
assertTrue(actual == null || actual.size() == 0);
return;
}
assertEquals(expected.size(), actual.size());
expected.forEach((expectedTn, expectedCFs) -> {
List<String> actualCFs = actual.get(expectedTn);
if (expectedCFs == null || expectedCFs.size() == 0) {
assertTrue(actual.containsKey(expectedTn));
assertTrue(actualCFs == null || actualCFs.size() == 0);
} else {
assertNotNull(actualCFs);
assertEquals(expectedCFs.size(), actualCFs.size());
for (Iterator<String> expectedIt = expectedCFs.iterator(),
actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
assertEquals(expectedIt.next(), actualIt.next());
}
}
});
}
private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
assertEquals(expected.getClusterKey(), actual.getClusterKey());
assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
assertEquals(expected.getBandwidth(), actual.getBandwidth());
}
@Test
public void test() throws ReplicationException {
int peerCount = 10;
for (int i = 0; i < peerCount; i++) {
STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
SyncReplicationState.valueOf(i % 4));
}
List<String> peerIds = STORAGE.listPeerIds();
assertEquals(peerCount, peerIds.size());
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
}
for (int i = 0; i < peerCount; i++) {
STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
}
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
}
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
}
for (int i = 0; i < peerCount; i++) {
STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
}
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
}
for (int i = 0; i < peerCount; i++) {
assertEquals(SyncReplicationState.valueOf(i % 4),
STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
}
String toRemove = Integer.toString(peerCount / 2);
STORAGE.removePeer(toRemove);
peerIds = STORAGE.listPeerIds();
assertEquals(peerCount - 1, peerIds.size());
assertFalse(peerIds.contains(toRemove));
try {
STORAGE.getPeerConfig(toRemove);
fail("Should throw a ReplicationException when getting peer config of a removed peer");
} catch (ReplicationException e) {
}
}
@Test
public void testNoSyncReplicationState()
throws ReplicationException, KeeperException, IOException {
// This could happen for a peer created before we introduce sync replication.
String peerId = "testNoSyncReplicationState";
try {
STORAGE.getPeerSyncReplicationState(peerId);
fail("Should throw a ReplicationException when getting state of inexist peer");
} catch (ReplicationException e) {
// expected
}
try {
STORAGE.getPeerNewSyncReplicationState(peerId);
fail("Should throw a ReplicationException when getting state of inexist peer");
} catch (ReplicationException e) {
// expected
}
STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE);
// delete the sync replication state node to simulate
ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId));
ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getNewSyncReplicationStateNode(peerId));
// should not throw exception as the peer exists
assertEquals(SyncReplicationState.NONE, STORAGE.getPeerSyncReplicationState(peerId));
assertEquals(SyncReplicationState.NONE, STORAGE.getPeerNewSyncReplicationState(peerId));
// make sure we create the node for the old format peer
@Override
protected void assertPeerSyncReplicationStateCreate(String peerId) throws Exception {
ZKReplicationPeerStorage storage = (ZKReplicationPeerStorage) STORAGE;
assertNotEquals(-1,
ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId)));
ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), storage.getSyncReplicationStateNode(peerId)));
assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
STORAGE.getNewSyncReplicationStateNode(peerId)));
storage.getNewSyncReplicationStateNode(peerId)));
}
@Test
public void testBaseReplicationPeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
String customPeerConfigUpdatedValue = "testUpdated";
String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
String customPeerConfigSecondValue = "testSecond";
String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
// custom config not present
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
Configuration conf = UTIL.getConfiguration();
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";")
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
// validates base configs are present in replicationPeerConfig
assertEquals(customPeerConfigValue,
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
assertEquals(customPeerConfigSecondValue,
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey));
// validates base configs get updated values even if config already present
conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";")
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
assertEquals(customPeerConfigUpdatedValue,
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey));
assertEquals(customPeerConfigSecondUpdatedValue,
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey));
}
@Test
public void testBaseReplicationRemovePeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
// custom config not present
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
Configuration conf = UTIL.getConfiguration();
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigValue));
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
// validates base configs are present in replicationPeerConfig
assertEquals(customPeerConfigValue,
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(""));
ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
}
@Test
public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
// custom config not present
assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
Configuration conf = UTIL.getConfiguration();
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(""));
ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
}
@Test
public void testPeerNameControl() throws Exception {
String clusterKey = "key";
STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
SyncReplicationState.NONE);
try {
STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
true, SyncReplicationState.NONE);
fail();
} catch (ReplicationException e) {
assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class));
} finally {
// clean up
STORAGE.removePeer("6");
}
@Override
protected void assertPeerNameControlException(ReplicationException e) {
assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class));
}
}

View File

@ -783,7 +783,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
}
this.rsGroupInfoManager = RSGroupInfoManager.create(this);
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);
this.replicationPeerManager =
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();

View File

@ -33,6 +33,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -559,10 +560,10 @@ public class ReplicationPeerManager {
return queueStorage;
}
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
throws ReplicationException {
public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk, Configuration conf,
String clusterId) throws ReplicationException {
ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);

View File

@ -96,8 +96,8 @@ public class Replication implements ReplicationSourceService {
try {
this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getFileSystem(),
server.getZooKeeper(), this.conf);
this.replicationPeers.init();
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);

View File

@ -2557,7 +2557,7 @@ public class HBaseFsck extends Configured implements Closeable {
return hbi;
}
private void checkAndFixReplication() throws ReplicationException {
private void checkAndFixReplication() throws ReplicationException, IOException {
ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, errors);
checker.checkUnDeletedQueues();

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.util.hbck;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -24,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
@ -53,8 +55,10 @@ public class ReplicationChecker {
private final ReplicationPeerStorage peerStorage;
private final ReplicationQueueStorage queueStorage;
public ReplicationChecker(Configuration conf, ZKWatcher zkw, HbckErrorReporter errorReporter) {
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
public ReplicationChecker(Configuration conf, ZKWatcher zkw, HbckErrorReporter errorReporter)
throws IOException {
this.peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), zkw, conf);
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
this.errorReporter = errorReporter;
}

View File

@ -91,7 +91,8 @@ public class TestReplicationHFileCleaner {
server = new DummyServer();
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
HMaster.decorateMasterConfiguration(conf);
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
rp =
ReplicationFactory.getReplicationPeers(server.getFileSystem(), server.getZooKeeper(), conf);
rp.init();
rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
fs = FileSystem.get(conf);
@ -236,7 +237,17 @@ public class TestReplicationHFileCleaner {
try {
return new ZKWatcher(getConfiguration(), "dummy server", this);
} catch (IOException e) {
e.printStackTrace();
LOG.error("Can not get ZKWatcher", e);
}
return null;
}
@Override
public FileSystem getFileSystem() {
try {
return TEST_UTIL.getTestFileSystem();
} catch (IOException e) {
LOG.error("Can not get FileSystem", e);
}
return null;
}

View File

@ -232,8 +232,8 @@ public class SyncReplicationTestBase {
protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtil utility)
throws Exception {
ReplicationPeerStorage rps = ReplicationStorageFactory
.getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
ReplicationPeerStorage rps = ReplicationStorageFactory.getReplicationPeerStorage(
utility.getTestFileSystem(), utility.getZooKeeperWatcher(), utility.getConfiguration());
try {
rps.getPeerSyncReplicationState(peerId);
fail("Should throw exception when get the sync replication state of a removed peer.");

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationWithFSPeerStorage extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationWithFSPeerStorage.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// enable file system based peer storage
UTIL1.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.FILESYSTEM.name().toLowerCase());
UTIL2.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
ReplicationPeerStorageType.FILESYSTEM.name().toLowerCase());
TestReplicationBase.setUpBeforeClass();
}
@Before
public void setUp() throws Exception {
cleanUp();
}
/**
* Add a row, check it's replicated, delete it, check's gone
*/
@Test
public void testSimplePutDelete() throws Exception {
runSimplePutDeleteTest();
}
/**
* Try a small batch upload using the write buffer, check it's replicated
*/
@Test
public void testSmallBatch() throws Exception {
runSmallBatchTest();
}
}

View File

@ -388,8 +388,8 @@ public abstract class TestReplicationSourceManager {
rq.addWAL(server.getServerName(), "1", file);
}
Server s1 = new DummyServer("dummyserver1.example.org");
ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getFileSystem(),
s1.getZooKeeper(), s1.getConfiguration());
rp1.init();
manager.claimQueue(server.getServerName(), "1");
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
@ -857,6 +857,11 @@ public abstract class TestReplicationSourceManager {
return zkw;
}
@Override
public FileSystem getFileSystem() {
return fs;
}
@Override
public Connection getConnection() {
return null;

View File

@ -61,8 +61,8 @@ public class TestHBaseFsckReplication {
@Test
public void test() throws Exception {
ReplicationPeerStorage peerStorage = ReplicationStorageFactory
.getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
ReplicationPeerStorage peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(
UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());