diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java index 39bbb20433b..9b7e2126765 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java @@ -17,10 +17,16 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -74,4 +80,28 @@ public enum SyncReplicationState { return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length))); } + + public static byte[] toByteArray(SyncReplicationState state, SyncReplicationState newState) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + out.write(ProtobufMagic.PB_MAGIC); + ReplicationPeerConfigUtil.toSyncReplicationState(state).writeDelimitedTo(out); + ReplicationPeerConfigUtil.toSyncReplicationState(newState).writeDelimitedTo(out); + } catch (IOException e) { + // should not happen, all in memory operations + throw new AssertionError(e); + } + return out.toByteArray(); + } + + public static Pair + parseStateAndNewStateFrom(byte[] bytes) throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(bytes); + ByteStreams.skipFully(in, ProtobufMagic.lengthOfPBMagic()); + SyncReplicationState state = ReplicationPeerConfigUtil + .toSyncReplicationState(ReplicationProtos.SyncReplicationState.parseDelimitedFrom(in)); + SyncReplicationState newState = ReplicationPeerConfigUtil + .toSyncReplicationState(ReplicationProtos.SyncReplicationState.parseDelimitedFrom(in)); + return Pair.newPair(state, newState); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java new file mode 100644 index 00000000000..7256700e9cc --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RotateFile.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import com.google.errorprone.annotations.RestrictedApi; +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.zip.CRC32; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +/** + * A file storage which supports atomic update through two files, i.e, rotating. The implementation + * does not require atomic rename. + */ +@InterfaceAudience.Private +public class RotateFile { + + private static final Logger LOG = LoggerFactory.getLogger(RotateFile.class); + + private final FileSystem fs; + + private final long maxFileSize; + + private final Path[] files = new Path[2]; + + // this is used to make sure that we do not go backwards + private long prevTimestamp = -1; + + private int nextFile = -1; + + /** + * Constructs a new RotateFile object with the given parameters. + * @param fs the file system to use. + * @param dir the directory where the files will be created. + * @param name the base name for the files. + * @param maxFileSize the maximum size of each file. + */ + public RotateFile(FileSystem fs, Path dir, String name, long maxFileSize) { + this.fs = fs; + this.maxFileSize = maxFileSize; + this.files[0] = new Path(dir, name + "-0"); + this.files[1] = new Path(dir, name + "-1"); + } + + private HBaseProtos.RotateFileData read(Path path) throws IOException { + byte[] data; + int expectedChecksum; + try (FSDataInputStream in = fs.open(path)) { + int length = in.readInt(); + if (length <= 0 || length > maxFileSize) { + throw new IOException("Invalid file length " + length + + ", either less than 0 or greater then max allowed size " + maxFileSize); + } + data = new byte[length]; + in.readFully(data); + expectedChecksum = in.readInt(); + } + CRC32 crc32 = new CRC32(); + crc32.update(data); + int calculatedChecksum = (int) crc32.getValue(); + if (expectedChecksum != calculatedChecksum) { + throw new IOException( + "Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum); + } + return HBaseProtos.RotateFileData.parseFrom(data); + } + + private int select(HBaseProtos.RotateFileData[] datas) { + if (datas[0] == null) { + return 1; + } + if (datas[1] == null) { + return 0; + } + return datas[0].getTimestamp() >= datas[1].getTimestamp() ? 0 : 1; + } + + /** + * Reads the content of the rotate file by selecting the winner file based on the timestamp of the + * data inside the files. It reads the content of both files and selects the one with the latest + * timestamp as the winner. If a file is incomplete or does not exist, it logs the error and moves + * on to the next file. It returns the content of the winner file as a byte array. If none of the + * files have valid data, it returns null. + * @return a byte array containing the data from the winner file, or null if no valid data is + * found. + * @throws IOException if an error occurs while reading the files. + */ + public byte[] read() throws IOException { + HBaseProtos.RotateFileData[] datas = new HBaseProtos.RotateFileData[2]; + for (int i = 0; i < 2; i++) { + try { + datas[i] = read(files[i]); + } catch (FileNotFoundException e) { + LOG.debug("file {} does not exist", files[i], e); + } catch (EOFException e) { + LOG.debug("file {} is incomplete", files[i], e); + } + } + int winnerIndex = select(datas); + nextFile = 1 - winnerIndex; + if (datas[winnerIndex] != null) { + prevTimestamp = datas[winnerIndex].getTimestamp(); + return datas[winnerIndex].getData().toByteArray(); + } else { + return null; + } + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/RotateFile.java|.*/src/test/.*") + static void write(FileSystem fs, Path file, long timestamp, byte[] data) throws IOException { + HBaseProtos.RotateFileData proto = HBaseProtos.RotateFileData.newBuilder() + .setTimestamp(timestamp).setData(ByteString.copyFrom(data)).build(); + byte[] protoData = proto.toByteArray(); + CRC32 crc32 = new CRC32(); + crc32.update(protoData); + int checksum = (int) crc32.getValue(); + // 4 bytes length, 8 bytes timestamp, 4 bytes checksum at the end + try (FSDataOutputStream out = fs.create(file, true)) { + out.writeInt(protoData.length); + out.write(protoData); + out.writeInt(checksum); + } + } + + /** + * Writes the given data to the next file in the rotation, with a timestamp calculated based on + * the previous timestamp and the current time to make sure it is greater than the previous + * timestamp. The method also deletes the previous file, which is no longer needed. + *

+ * Notice that, for a newly created {@link RotateFile} instance, you need to call {@link #read()} + * first to initialize the nextFile index, before calling this method. + * @param data the data to be written to the file + * @throws IOException if an I/O error occurs while writing the data to the file + */ + public void write(byte[] data) throws IOException { + if (data.length > maxFileSize) { + throw new IOException( + "Data size " + data.length + " is greater than max allowed size " + maxFileSize); + } + long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); + write(fs, files[nextFile], timestamp, data); + prevTimestamp = timestamp; + nextFile = 1 - nextFile; + try { + fs.delete(files[nextFile], false); + } catch (IOException e) { + // we will create new file with overwrite = true, so not a big deal here, only for speed up + // loading as we do not need to read this file when loading + LOG.debug("Failed to delete old file {}, ignoring the exception", files[nextFile], e); + } + } + + /** + * Deletes the two files used for rotating data. If any of the files cannot be deleted, an + * IOException is thrown. + * @throws IOException if there is an error deleting either file + */ + public void delete() throws IOException { + Path next = files[nextFile]; + // delete next file first, and then the current file, so when failing to delete, we can still + // read the correct data + if (fs.exists(next) && !fs.delete(next, false)) { + throw new IOException("Can not delete " + next); + } + Path current = files[1 - nextFile]; + if (fs.exists(current) && !fs.delete(current, false)) { + throw new IOException("Can not delete " + current); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java index fb74df39473..be516cc317c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java @@ -17,13 +17,24 @@ */ package org.apache.hadoop.hbase.replication; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.BuilderStyleTest; @@ -43,6 +54,8 @@ public class TestReplicationPeerConfig { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationPeerConfig.class); + private static final Configuration CONF = HBaseConfiguration.create(); + private static final String NAMESPACE_REPLICATE = "replicate"; private static final String NAMESPACE_OTHER = "other"; private static final TableName TABLE_A = TableName.valueOf(NAMESPACE_REPLICATE, "testA"); @@ -246,4 +259,124 @@ public class TestReplicationPeerConfig { assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1)); assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2)); } + + private static final Random RNG = new Random(); // Seed may be set with Random#setSeed + + private Set randNamespaces(Random rand) { + return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) + .collect(toSet()); + } + + private Map> randTableCFs(Random rand) { + int size = rand.nextInt(5); + Map> map = new HashMap<>(); + for (int i = 0; i < size; i++) { + TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); + List cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) + .limit(rand.nextInt(5)).collect(toList()); + map.put(tn, cfs); + } + return map; + } + + private ReplicationPeerConfig getConfig(int seed) { + RNG.setSeed(seed); + return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) + .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) + .setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG)) + .setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG)) + .setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean()) + .setBandwidth(RNG.nextInt(1000)).build(); + } + + @Test + public void testBaseReplicationPeerConfig() throws ReplicationException { + String customPeerConfigKey = "hbase.xxx.custom_config"; + String customPeerConfigValue = "test"; + String customPeerConfigUpdatedValue = "testUpdated"; + + String customPeerConfigSecondKey = "hbase.xxx.custom_second_config"; + String customPeerConfigSecondValue = "testSecond"; + String customPeerConfigSecondUpdatedValue = "testSecondUpdated"; + + ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); + + // custom config not present + assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); + + Configuration conf = new Configuration(CONF); + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, + customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";") + .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue)); + + ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil + .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); + + // validates base configs are present in replicationPeerConfig + assertEquals(customPeerConfigValue, + updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); + assertEquals(customPeerConfigSecondValue, + updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey)); + + // validates base configs get updated values even if config already present + conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, + customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";") + .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue)); + + ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil + .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig); + + assertEquals(customPeerConfigUpdatedValue, + replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey)); + assertEquals(customPeerConfigSecondUpdatedValue, + replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey)); + } + + @Test + public void testBaseReplicationRemovePeerConfig() throws ReplicationException { + String customPeerConfigKey = "hbase.xxx.custom_config"; + String customPeerConfigValue = "test"; + ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); + + // custom config not present + assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); + + Configuration conf = new Configuration(CONF); + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, + customPeerConfigKey.concat("=").concat(customPeerConfigValue)); + + ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil + .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); + + // validates base configs are present in replicationPeerConfig + assertEquals(customPeerConfigValue, + updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); + + conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, + customPeerConfigKey.concat("=").concat("")); + + ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil + .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig); + + assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey)); + } + + @Test + public void testBaseReplicationRemovePeerConfigWithNoExistingConfig() + throws ReplicationException { + String customPeerConfigKey = "hbase.xxx.custom_config"; + ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); + + // custom config not present + assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); + Configuration conf = new Configuration(CONF); + conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, + customPeerConfigKey.concat("=").concat("")); + + ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil + .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); + assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java new file mode 100644 index 00000000000..c229290e8e5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRotateFile.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtil; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; + +@Category({ MiscTests.class, SmallTests.class }) +public class TestRotateFile { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRotateFile.class); + + private static HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); + + private static FileSystem FS; + + private Path dir; + + private RotateFile rotateFile; + + @Rule + public final TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws IOException { + FS = FileSystem.get(UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() { + UTIL.cleanupTestDir(); + } + + @Before + public void setUp() throws IOException { + dir = UTIL.getDataTestDir(name.getMethodName()); + if (!FS.mkdirs(dir)) { + throw new IOException("Can not create dir " + dir); + } + rotateFile = new RotateFile(FS, dir, name.getMethodName(), 1024); + assertNull(rotateFile.read()); + } + + @Test + public void testSimpleReadWrite() throws IOException { + for (int i = 0; i < 10; i++) { + rotateFile.write(Bytes.toBytes(i)); + assertEquals(i, Bytes.toInt(rotateFile.read())); + } + rotateFile.delete(); + assertNull(rotateFile.read()); + } + + @Test + public void testCompareTimestamp() throws IOException { + long now = EnvironmentEdgeManager.currentTime(); + rotateFile.write(Bytes.toBytes(10)); + Path file = FS.listStatus(dir)[0].getPath(); + rotateFile.write(Bytes.toBytes(100)); + + // put a fake file with a less timestamp there + RotateFile.write(FS, file, now - 1, Bytes.toBytes(10)); + assertEquals(100, Bytes.toInt(rotateFile.read())); + + // put a fake file with a greater timestamp there + RotateFile.write(FS, file, EnvironmentEdgeManager.currentTime() + 100, Bytes.toBytes(10)); + assertEquals(10, Bytes.toInt(rotateFile.read())); + } + + @Test + public void testMaxFileSize() throws IOException { + assertThrows(IOException.class, () -> rotateFile.write(new byte[1025])); + // put a file greater than max file size + rotateFile.write(Bytes.toBytes(10)); + Path file = FS.listStatus(dir)[0].getPath(); + RotateFile.write(FS, file, EnvironmentEdgeManager.currentTime(), new byte[1025]); + assertThrows(IOException.class, () -> rotateFile.read()); + } + + @Test + public void testNotEnoughData() throws IOException { + rotateFile.write(Bytes.toBytes(10)); + assertEquals(10, Bytes.toInt(rotateFile.read())); + // remove the last byte + Path file = FS.listStatus(dir)[0].getPath(); + byte[] data; + try (FSDataInputStream in = FS.open(file)) { + data = ByteStreams.toByteArray(in); + } + try (FSDataOutputStream out = FS.create(file, true)) { + out.write(data, 0, data.length - 1); + } + // should hit EOF so read nothing + assertNull(rotateFile.read()); + } + + @Test + public void testChecksumMismatch() throws IOException { + rotateFile.write(Bytes.toBytes(10)); + assertEquals(10, Bytes.toInt(rotateFile.read())); + // mess up one byte + Path file = FS.listStatus(dir)[0].getPath(); + byte[] data; + try (FSDataInputStream in = FS.open(file)) { + data = ByteStreams.toByteArray(in); + } + data[4]++; + try (FSDataOutputStream out = FS.create(file, true)) { + out.write(data, 0, data.length); + } + // should get checksum mismatch + IOException error = assertThrows(IOException.class, () -> rotateFile.read()); + assertThat(error.getMessage(), startsWith("Checksum mismatch")); + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 6a854b97239..1e268c1858b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -347,7 +347,7 @@ public class VerifyReplication extends Configured implements Tool { } }); ReplicationPeerStorage storage = - ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf); + ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf); ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); return Pair.newPair(peerConfig, ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf)); diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto b/hbase-protocol-shaded/src/main/protobuf/HBase.proto index e4dde33a96f..0fd3d667d4d 100644 --- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto +++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto @@ -284,3 +284,8 @@ message LogEntry { required string log_class_name = 1; required bytes log_message = 2; } + +message RotateFileData { + required int64 timestamp = 1; + required bytes data = 2; +} diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml index dad93578609..2527434e97b 100644 --- a/hbase-replication/pom.xml +++ b/hbase-replication/pom.xml @@ -103,6 +103,11 @@ mockito-core test + + org.hamcrest + hamcrest-library + test + org.slf4j jcl-over-slf4j diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java new file mode 100644 index 00000000000..8bbe21c4a46 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/FSReplicationPeerStorage.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import com.google.errorprone.annotations.RestrictedApi; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.RotateFile; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A filesystem based replication peer storage. The implementation does not require atomic rename so + * you can use it on cloud OSS. + *

+ * FileSystem layout: + * + *

+ * hbase
+ *   |
+ *   --peers
+ *       |
+ *       --<peer_id>
+ *           |
+ *           --peer_config
+ *           |
+ *           --disabled
+ *           |
+ *           --sync-rep-state
+ * 
+ * + * Notice that, if the peer is enabled, we will not have a disabled file. + *

+ * And for other files, to avoid depending on atomic rename, we will use two files for storing the + * content. When loading, we will try to read both the files and load the newer one. And when + * writing, we will write to the older file. + */ +@InterfaceAudience.Private +public class FSReplicationPeerStorage implements ReplicationPeerStorage { + + private static final Logger LOG = LoggerFactory.getLogger(FSReplicationPeerStorage.class); + + public static final String PEERS_DIR = "hbase.replication.peers.directory"; + + public static final String PEERS_DIR_DEFAULT = "peers"; + + static final String PEER_CONFIG_FILE = "peer_config"; + + static final String DISABLED_FILE = "disabled"; + + static final String SYNC_REPLICATION_STATE_FILE = "sync-rep-state"; + + static final byte[] NONE_STATE_BYTES = + SyncReplicationState.toByteArray(SyncReplicationState.NONE); + + private final FileSystem fs; + + private final Path dir; + + public FSReplicationPeerStorage(FileSystem fs, Configuration conf) throws IOException { + this.fs = fs; + this.dir = new Path(CommonFSUtils.getRootDir(conf), conf.get(PEERS_DIR, PEERS_DIR_DEFAULT)); + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/FSReplicationPeerStorage.java|.*/src/test/.*") + Path getPeerDir(String peerId) { + return new Path(dir, peerId); + } + + @Override + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled, + SyncReplicationState syncReplicationState) throws ReplicationException { + Path peerDir = getPeerDir(peerId); + try { + if (fs.exists(peerDir)) { + // check whether this is a valid peer, if so we should fail the add peer operation + if (read(fs, peerDir, PEER_CONFIG_FILE) != null) { + throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfig=>" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED") + + ", syncReplicationState=" + syncReplicationState + ", peer already exists"); + } + } + if (!enabled) { + fs.createNewFile(new Path(peerDir, DISABLED_FILE)); + } + write(fs, peerDir, SYNC_REPLICATION_STATE_FILE, + SyncReplicationState.toByteArray(syncReplicationState, SyncReplicationState.NONE)); + // write the peer config data at last, so when loading, if we can not load the peer_config, we + // know that this is not a valid peer + write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig)); + } catch (IOException e) { + throw new ReplicationException( + "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" + + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState, + e); + } + } + + @Override + public void removePeer(String peerId) throws ReplicationException { + // delete the peer config first, and then delete the directory + // we will consider this is not a valid peer by reading the peer config file + Path peerDir = getPeerDir(peerId); + try { + delete(fs, peerDir, PEER_CONFIG_FILE); + if (!fs.delete(peerDir, true)) { + throw new IOException("Can not delete " + peerDir); + } + } catch (IOException e) { + throw new ReplicationException("Could not remove peer with id=" + peerId, e); + } + } + + @Override + public void setPeerState(String peerId, boolean enabled) throws ReplicationException { + Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE); + try { + if (enabled) { + if (fs.exists(disabledFile) && !fs.delete(disabledFile, false)) { + throw new IOException("Can not delete " + disabledFile); + } + } else { + if (!fs.exists(disabledFile) && !fs.createNewFile(disabledFile)) { + throw new IOException("Can not touch " + disabledFile); + } + } + } catch (IOException e) { + throw new ReplicationException( + "Unable to change state of the peer with id=" + peerId + " to " + enabled, e); + } + } + + @Override + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + Path peerDir = getPeerDir(peerId); + try { + write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig)); + } catch (IOException e) { + throw new ReplicationException( + "There was a problem trying to save changes to the " + "replication peer " + peerId, e); + } + } + + @Override + public List listPeerIds() throws ReplicationException { + try { + FileStatus[] statuses = fs.listStatus(dir); + if (statuses == null || statuses.length == 0) { + return Collections.emptyList(); + } + List peerIds = new ArrayList<>(); + for (FileStatus status : statuses) { + String peerId = status.getPath().getName(); + Path peerDir = getPeerDir(peerId); + // confirm that this is a valid peer + byte[] peerConfigData = read(fs, peerDir, PEER_CONFIG_FILE); + if (peerConfigData != null) { + peerIds.add(peerId); + } + } + return Collections.unmodifiableList(peerIds); + } catch (FileNotFoundException e) { + LOG.debug("Peer directory does not exist yet", e); + return Collections.emptyList(); + } catch (IOException e) { + throw new ReplicationException("Cannot get the list of peers", e); + } + } + + @Override + public boolean isPeerEnabled(String peerId) throws ReplicationException { + Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE); + try { + return !fs.exists(disabledFile); + } catch (IOException e) { + throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); + } + } + + @Override + public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException { + Path peerDir = getPeerDir(peerId); + byte[] data; + try { + data = read(fs, peerDir, PEER_CONFIG_FILE); + } catch (IOException e) { + throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); + } + if (data == null || data.length == 0) { + throw new ReplicationException( + "Replication peer config data shouldn't be empty, peerId=" + peerId); + } + try { + return ReplicationPeerConfigUtil.parsePeerFrom(data); + } catch (DeserializationException e) { + throw new ReplicationException( + "Failed to parse replication peer config for peer with id=" + peerId, e); + } + } + + private Pair getStateAndNewState(String peerId) + throws IOException { + Path peerDir = getPeerDir(peerId); + if (!fs.exists(peerDir)) { + throw new IOException("peer does not exists"); + } + byte[] data = read(fs, peerDir, SYNC_REPLICATION_STATE_FILE); + if (data == null) { + // should be a peer from previous version, set the sync replication state for it. + write(fs, peerDir, SYNC_REPLICATION_STATE_FILE, + SyncReplicationState.toByteArray(SyncReplicationState.NONE, SyncReplicationState.NONE)); + return Pair.newPair(SyncReplicationState.NONE, SyncReplicationState.NONE); + } else { + return SyncReplicationState.parseStateAndNewStateFrom(data); + } + } + + @Override + public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState newState) + throws ReplicationException { + Path peerDir = getPeerDir(peerId); + try { + Pair stateAndNewState = + getStateAndNewState(peerId); + write(fs, peerDir, SYNC_REPLICATION_STATE_FILE, + SyncReplicationState.toByteArray(stateAndNewState.getFirst(), newState)); + } catch (IOException e) { + throw new ReplicationException( + "Unable to set the new sync replication state for peer with id=" + peerId + ", newState=" + + newState, + e); + } + } + + @Override + public void transitPeerSyncReplicationState(String peerId) throws ReplicationException { + Path peerDir = getPeerDir(peerId); + try { + Pair stateAndNewState = + getStateAndNewState(peerId); + write(fs, peerDir, SYNC_REPLICATION_STATE_FILE, + SyncReplicationState.toByteArray(stateAndNewState.getSecond(), SyncReplicationState.NONE)); + } catch (IOException e) { + throw new ReplicationException( + "Error transiting sync replication state for peer with id=" + peerId, e); + } + } + + @Override + public SyncReplicationState getPeerSyncReplicationState(String peerId) + throws ReplicationException { + try { + return getStateAndNewState(peerId).getFirst(); + } catch (IOException e) { + throw new ReplicationException( + "Error getting sync replication state for peer with id=" + peerId, e); + } + } + + @Override + public SyncReplicationState getPeerNewSyncReplicationState(String peerId) + throws ReplicationException { + try { + return getStateAndNewState(peerId).getSecond(); + } catch (IOException e) { + throw new ReplicationException( + "Error getting new sync replication state for peer with id=" + peerId, e); + } + } + + // 16 MB is big enough for our usage here + private static final long MAX_FILE_SIZE = 16 * 1024 * 1024; + + private static byte[] read(FileSystem fs, Path dir, String name) throws IOException { + RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); + return file.read(); + } + + private static void write(FileSystem fs, Path dir, String name, byte[] data) throws IOException { + RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); + // to initialize the nextFile index + file.read(); + file.write(data); + } + + private static void delete(FileSystem fs, Path dir, String name) throws IOException { + RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE); + // to initialize the nextFile index + file.read(); + file.delete(); + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 6dba30a34c0..9047bbf29ca 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -32,7 +33,8 @@ public final class ReplicationFactory { private ReplicationFactory() { } - public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) { - return new ReplicationPeers(zk, conf); + public static ReplicationPeers getReplicationPeers(FileSystem fs, ZKWatcher zk, + Configuration conf) { + return new ReplicationPeers(fs, zk, conf); } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java new file mode 100644 index 00000000000..1b110c3a6a5 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Specify the implementations for {@link ReplicationPeerStorage}. + */ +@InterfaceAudience.Private +public enum ReplicationPeerStorageType { + + FILESYSTEM(FSReplicationPeerStorage.class), + ZOOKEEPER(ZKReplicationPeerStorage.class); + + private final Class clazz; + + private ReplicationPeerStorageType(Class clazz) { + this.clazz = clazz; + } + + public Class getClazz() { + return clazz; + } +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 70344c07bdc..a8f4a5efa54 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -40,10 +41,10 @@ public class ReplicationPeers { private final ConcurrentMap peerCache; private final ReplicationPeerStorage peerStorage; - ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { + ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) { this.conf = conf; this.peerCache = new ConcurrentHashMap<>(); - this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); } public Configuration getConf() { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java index 1080b2125c7..0124dbdd113 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -17,26 +17,60 @@ */ package org.apache.hadoop.hbase.replication; +import java.lang.reflect.Constructor; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; /** * Used to create replication storage(peer, queue) classes. - *

- * For now we only have zk based implementation. */ @InterfaceAudience.Private public final class ReplicationStorageFactory { + public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl"; + + // must use zookeeper here, otherwise when user upgrading from an old version without changing the + // config file, they will loss all the replication peer data. + public static final ReplicationPeerStorageType DEFAULT_REPLICATION_PEER_STORAGE_IMPL = + ReplicationPeerStorageType.ZOOKEEPER; + private ReplicationStorageFactory() { } + private static Class + getReplicationPeerStorageClass(Configuration conf) { + try { + ReplicationPeerStorageType type = ReplicationPeerStorageType.valueOf( + conf.get(REPLICATION_PEER_STORAGE_IMPL, DEFAULT_REPLICATION_PEER_STORAGE_IMPL.name()) + .toUpperCase()); + return type.getClazz(); + } catch (IllegalArgumentException e) { + return conf.getClass(REPLICATION_PEER_STORAGE_IMPL, + DEFAULT_REPLICATION_PEER_STORAGE_IMPL.getClazz(), ReplicationPeerStorage.class); + } + } + /** * Create a new {@link ReplicationPeerStorage}. */ - public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) { - return new ZKReplicationPeerStorage(zk, conf); + public static ReplicationPeerStorage getReplicationPeerStorage(FileSystem fs, ZKWatcher zk, + Configuration conf) { + Class clazz = getReplicationPeerStorageClass(conf); + for (Constructor c : clazz.getConstructors()) { + if (c.getParameterCount() != 2) { + continue; + } + if (c.getParameterTypes()[0].isAssignableFrom(FileSystem.class)) { + return ReflectionUtils.newInstance(clazz, fs, conf); + } else if (c.getParameterTypes()[0].isAssignableFrom(ZKWatcher.class)) { + return ReflectionUtils.newInstance(clazz, zk, conf); + } + } + throw new IllegalArgumentException( + "Can not create replication peer storage with type " + clazz); } /** diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java new file mode 100644 index 00000000000..018883290c1 --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorageTestBase.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.hadoop.hbase.TableName; +import org.junit.Test; + +public abstract class ReplicationPeerStorageTestBase { + + // Seed may be set with Random#setSeed + private static final Random RNG = new Random(); + + protected static ReplicationPeerStorage STORAGE; + + private Set randNamespaces(Random rand) { + return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) + .collect(toSet()); + } + + private Map> randTableCFs(Random rand) { + int size = rand.nextInt(5); + Map> map = new HashMap<>(); + for (int i = 0; i < size; i++) { + TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); + List cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) + .limit(rand.nextInt(5)).collect(toList()); + map.put(tn, cfs); + } + return map; + } + + private ReplicationPeerConfig getConfig(int seed) { + RNG.setSeed(seed); + return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) + .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) + .setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG)) + .setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG)) + .setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean()) + .setBandwidth(RNG.nextInt(1000)).build(); + } + + private void assertSetEquals(Set expected, Set actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach(s -> assertTrue(actual.contains(s))); + } + + private void assertMapEquals(Map> expected, + Map> actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach((expectedTn, expectedCFs) -> { + List actualCFs = actual.get(expectedTn); + if (expectedCFs == null || expectedCFs.size() == 0) { + assertTrue(actual.containsKey(expectedTn)); + assertTrue(actualCFs == null || actualCFs.size() == 0); + } else { + assertNotNull(actualCFs); + assertEquals(expectedCFs.size(), actualCFs.size()); + for (Iterator expectedIt = expectedCFs.iterator(), + actualIt = actualCFs.iterator(); expectedIt.hasNext();) { + assertEquals(expectedIt.next(), actualIt.next()); + } + } + }); + } + + private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { + assertEquals(expected.getClusterKey(), actual.getClusterKey()); + assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); + assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); + assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); + assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); + assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); + assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); + assertEquals(expected.getBandwidth(), actual.getBandwidth()); + } + + @Test + public void test() throws ReplicationException { + int peerCount = 10; + for (int i = 0; i < peerCount; i++) { + STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0, + SyncReplicationState.valueOf(i % 4)); + } + List peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount, peerIds.size()); + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); + } + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(SyncReplicationState.valueOf(i % 4), + STORAGE.getPeerSyncReplicationState(Integer.toString(i))); + } + String toRemove = Integer.toString(peerCount / 2); + STORAGE.removePeer(toRemove); + peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount - 1, peerIds.size()); + assertFalse(peerIds.contains(toRemove)); + + try { + STORAGE.getPeerConfig(toRemove); + fail("Should throw a ReplicationException when getting peer config of a removed peer"); + } catch (ReplicationException e) { + } + } + + protected abstract void removePeerSyncRelicationState(String peerId) throws Exception; + + protected abstract void assertPeerSyncReplicationStateCreate(String peerId) throws Exception; + + @Test + public void testNoSyncReplicationState() throws Exception { + // This could happen for a peer created before we introduce sync replication. + String peerId = "testNoSyncReplicationState"; + assertThrows("Should throw a ReplicationException when getting state of inexist peer", + ReplicationException.class, () -> STORAGE.getPeerSyncReplicationState(peerId)); + assertThrows("Should throw a ReplicationException when getting state of inexist peer", + ReplicationException.class, () -> STORAGE.getPeerNewSyncReplicationState(peerId)); + + STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE); + // delete the sync replication state node to simulate + removePeerSyncRelicationState(peerId); + // should not throw exception as the peer exists + assertEquals(SyncReplicationState.NONE, STORAGE.getPeerSyncReplicationState(peerId)); + assertEquals(SyncReplicationState.NONE, STORAGE.getPeerNewSyncReplicationState(peerId)); + // make sure we create the node for the old format peer + assertPeerSyncReplicationStateCreate(peerId); + } + + protected abstract void assertPeerNameControlException(ReplicationException e); + + @Test + public void testPeerNameControl() throws Exception { + String clusterKey = "key"; + STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true, + SyncReplicationState.NONE); + + try { + ReplicationException e = assertThrows(ReplicationException.class, + () -> STORAGE.addPeer("6", + ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true, + SyncReplicationState.NONE)); + assertPeerNameControlException(e); + } finally { + // clean up + STORAGE.removePeer("6"); + } + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java new file mode 100644 index 00000000000..f99f529c9f0 --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestFSReplicationPeerStorage.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.endsWith; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtil; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.RotateFile; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestFSReplicationPeerStorage extends ReplicationPeerStorageTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFSReplicationPeerStorage.class); + + private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); + + private static FileSystem FS; + + private static Path DIR; + + @BeforeClass + public static void setUp() throws Exception { + DIR = UTIL.getDataTestDir("test_fs_peer_storage"); + CommonFSUtils.setRootDir(UTIL.getConfiguration(), DIR); + FS = FileSystem.get(UTIL.getConfiguration()); + STORAGE = new FSReplicationPeerStorage(FS, UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.cleanupTestDir(); + } + + @Override + protected void removePeerSyncRelicationState(String peerId) throws Exception { + FSReplicationPeerStorage storage = (FSReplicationPeerStorage) STORAGE; + Path peerDir = storage.getPeerDir(peerId); + RotateFile file = + new RotateFile(FS, peerDir, FSReplicationPeerStorage.SYNC_REPLICATION_STATE_FILE, 1024); + file.read(); + file.delete(); + } + + @Override + protected void assertPeerSyncReplicationStateCreate(String peerId) throws Exception { + FSReplicationPeerStorage storage = (FSReplicationPeerStorage) STORAGE; + Path peerDir = storage.getPeerDir(peerId); + RotateFile file = + new RotateFile(FS, peerDir, FSReplicationPeerStorage.SYNC_REPLICATION_STATE_FILE, 1024); + assertNotNull(file.read()); + } + + @Override + protected void assertPeerNameControlException(ReplicationException e) { + assertThat(e.getMessage(), endsWith("peer already exists")); + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 311e7e337f9..d2540987906 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtil; @@ -77,10 +78,11 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { } @Before - public void setUp() { + public void setUp() throws IOException { zkTimeoutCount = 0; rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - rp = ReplicationFactory.getReplicationPeers(zkw, conf); + rp = + ReplicationFactory.getReplicationPeers(FileSystem.get(utility.getConfiguration()), zkw, conf); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); } diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java index 7bc47918992..12262461f74 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -17,52 +17,30 @@ */ package org.apache.hadoop.hbase.replication; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.stream.Stream; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.zookeeper.KeeperException; -import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ ReplicationTests.class, MediumTests.class }) -public class TestZKReplicationPeerStorage { +public class TestZKReplicationPeerStorage extends ReplicationPeerStorageTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class); private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil(); - private static final Random RNG = new Random(); // Seed may be set with Random#setSeed - private static ZKReplicationPeerStorage STORAGE; @BeforeClass public static void setUp() throws Exception { @@ -75,264 +53,24 @@ public class TestZKReplicationPeerStorage { UTIL.shutdownMiniZKCluster(); } - @After - public void cleanCustomConfigurations() { - UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); + @Override + protected void removePeerSyncRelicationState(String peerId) throws Exception { + ZKReplicationPeerStorage storage = (ZKReplicationPeerStorage) STORAGE; + ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), storage.getSyncReplicationStateNode(peerId)); + ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), storage.getNewSyncReplicationStateNode(peerId)); } - private Set randNamespaces(Random rand) { - return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) - .collect(toSet()); - } - - private Map> randTableCFs(Random rand) { - int size = rand.nextInt(5); - Map> map = new HashMap<>(); - for (int i = 0; i < size; i++) { - TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); - List cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) - .limit(rand.nextInt(5)).collect(toList()); - map.put(tn, cfs); - } - return map; - } - - private ReplicationPeerConfig getConfig(int seed) { - RNG.setSeed(seed); - return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) - .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) - .setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG)) - .setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG)) - .setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean()) - .setBandwidth(RNG.nextInt(1000)).build(); - } - - private void assertSetEquals(Set expected, Set actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach(s -> assertTrue(actual.contains(s))); - } - - private void assertMapEquals(Map> expected, - Map> actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach((expectedTn, expectedCFs) -> { - List actualCFs = actual.get(expectedTn); - if (expectedCFs == null || expectedCFs.size() == 0) { - assertTrue(actual.containsKey(expectedTn)); - assertTrue(actualCFs == null || actualCFs.size() == 0); - } else { - assertNotNull(actualCFs); - assertEquals(expectedCFs.size(), actualCFs.size()); - for (Iterator expectedIt = expectedCFs.iterator(), - actualIt = actualCFs.iterator(); expectedIt.hasNext();) { - assertEquals(expectedIt.next(), actualIt.next()); - } - } - }); - } - - private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { - assertEquals(expected.getClusterKey(), actual.getClusterKey()); - assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); - assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); - assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); - assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); - assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); - assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); - assertEquals(expected.getBandwidth(), actual.getBandwidth()); - } - - @Test - public void test() throws ReplicationException { - int peerCount = 10; - for (int i = 0; i < peerCount; i++) { - STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0, - SyncReplicationState.valueOf(i % 4)); - } - List peerIds = STORAGE.listPeerIds(); - assertEquals(peerCount, peerIds.size()); - for (String peerId : peerIds) { - int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); - } - for (int i = 0; i < peerCount; i++) { - STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); - } - for (String peerId : peerIds) { - int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); - } - for (int i = 0; i < peerCount; i++) { - STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(SyncReplicationState.valueOf(i % 4), - STORAGE.getPeerSyncReplicationState(Integer.toString(i))); - } - String toRemove = Integer.toString(peerCount / 2); - STORAGE.removePeer(toRemove); - peerIds = STORAGE.listPeerIds(); - assertEquals(peerCount - 1, peerIds.size()); - assertFalse(peerIds.contains(toRemove)); - - try { - STORAGE.getPeerConfig(toRemove); - fail("Should throw a ReplicationException when getting peer config of a removed peer"); - } catch (ReplicationException e) { - } - } - - @Test - public void testNoSyncReplicationState() - throws ReplicationException, KeeperException, IOException { - // This could happen for a peer created before we introduce sync replication. - String peerId = "testNoSyncReplicationState"; - try { - STORAGE.getPeerSyncReplicationState(peerId); - fail("Should throw a ReplicationException when getting state of inexist peer"); - } catch (ReplicationException e) { - // expected - } - try { - STORAGE.getPeerNewSyncReplicationState(peerId); - fail("Should throw a ReplicationException when getting state of inexist peer"); - } catch (ReplicationException e) { - // expected - } - STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE); - // delete the sync replication state node to simulate - ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId)); - ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getNewSyncReplicationStateNode(peerId)); - // should not throw exception as the peer exists - assertEquals(SyncReplicationState.NONE, STORAGE.getPeerSyncReplicationState(peerId)); - assertEquals(SyncReplicationState.NONE, STORAGE.getPeerNewSyncReplicationState(peerId)); - // make sure we create the node for the old format peer + @Override + protected void assertPeerSyncReplicationStateCreate(String peerId) throws Exception { + ZKReplicationPeerStorage storage = (ZKReplicationPeerStorage) STORAGE; assertNotEquals(-1, - ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId))); + ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), storage.getSyncReplicationStateNode(peerId))); assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), - STORAGE.getNewSyncReplicationStateNode(peerId))); + storage.getNewSyncReplicationStateNode(peerId))); } - @Test - public void testBaseReplicationPeerConfig() throws ReplicationException { - String customPeerConfigKey = "hbase.xxx.custom_config"; - String customPeerConfigValue = "test"; - String customPeerConfigUpdatedValue = "testUpdated"; - - String customPeerConfigSecondKey = "hbase.xxx.custom_second_config"; - String customPeerConfigSecondValue = "testSecond"; - String customPeerConfigSecondUpdatedValue = "testSecondUpdated"; - - ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); - - // custom config not present - assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); - - Configuration conf = UTIL.getConfiguration(); - conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, - customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";") - .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue)); - - ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil - .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); - - // validates base configs are present in replicationPeerConfig - assertEquals(customPeerConfigValue, - updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); - assertEquals(customPeerConfigSecondValue, - updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey)); - - // validates base configs get updated values even if config already present - conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); - conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, - customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";") - .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue)); - - ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil - .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig); - - assertEquals(customPeerConfigUpdatedValue, - replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey)); - assertEquals(customPeerConfigSecondUpdatedValue, - replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey)); - } - - @Test - public void testBaseReplicationRemovePeerConfig() throws ReplicationException { - String customPeerConfigKey = "hbase.xxx.custom_config"; - String customPeerConfigValue = "test"; - ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); - - // custom config not present - assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); - - Configuration conf = UTIL.getConfiguration(); - conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, - customPeerConfigKey.concat("=").concat(customPeerConfigValue)); - - ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil - .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); - - // validates base configs are present in replicationPeerConfig - assertEquals(customPeerConfigValue, - updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); - - conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); - conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, - customPeerConfigKey.concat("=").concat("")); - - ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil - .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig); - - assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey)); - } - - @Test - public void testBaseReplicationRemovePeerConfigWithNoExistingConfig() - throws ReplicationException { - String customPeerConfigKey = "hbase.xxx.custom_config"; - ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); - - // custom config not present - assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); - Configuration conf = UTIL.getConfiguration(); - conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, - customPeerConfigKey.concat("=").concat("")); - - ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil - .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); - assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); - } - - @Test - public void testPeerNameControl() throws Exception { - String clusterKey = "key"; - STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true, - SyncReplicationState.NONE); - - try { - STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), - true, SyncReplicationState.NONE); - fail(); - } catch (ReplicationException e) { - assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class)); - } finally { - // clean up - STORAGE.removePeer("6"); - } + @Override + protected void assertPeerNameControlException(ReplicationException e) { + assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 58a5d7cae47..d0fd34848a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -783,7 +783,8 @@ public class HMaster extends HBaseServerBase implements Maste } this.rsGroupInfoManager = RSGroupInfoManager.create(this); - this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId); + this.replicationPeerManager = + ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 06cf559d492..30164f29671 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -33,6 +33,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -559,10 +560,10 @@ public class ReplicationPeerManager { return queueStorage; } - public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId) - throws ReplicationException { + public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk, Configuration conf, + String clusterId) throws ReplicationException { ReplicationPeerStorage peerStorage = - ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); + ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); ConcurrentMap peers = new ConcurrentHashMap<>(); for (String peerId : peerStorage.listPeerIds()) { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index ea28a20c56b..84b98ed3c93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -96,8 +96,8 @@ public class Replication implements ReplicationSourceService { try { this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); - this.replicationPeers = - ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf); + this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getFileSystem(), + server.getZooKeeper(), this.conf); this.replicationPeers.init(); } catch (Exception e) { throw new IOException("Failed replication handler create", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index ab8d9fcde6a..7e10fd786a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -2557,7 +2557,7 @@ public class HBaseFsck extends Configured implements Closeable { return hbi; } - private void checkAndFixReplication() throws ReplicationException { + private void checkAndFixReplication() throws ReplicationException, IOException { ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, errors); checker.checkUnDeletedQueues(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 234daef85b5..7e7a46573b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.util.hbck; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -24,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; @@ -53,8 +55,10 @@ public class ReplicationChecker { private final ReplicationPeerStorage peerStorage; private final ReplicationQueueStorage queueStorage; - public ReplicationChecker(Configuration conf, ZKWatcher zkw, HbckErrorReporter errorReporter) { - this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf); + public ReplicationChecker(Configuration conf, ZKWatcher zkw, HbckErrorReporter errorReporter) + throws IOException { + this.peerStorage = + ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), zkw, conf); this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); this.errorReporter = errorReporter; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 899d3eb4722..87d21e583dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -91,7 +91,8 @@ public class TestReplicationHFileCleaner { server = new DummyServer(); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); HMaster.decorateMasterConfiguration(conf); - rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf); + rp = + ReplicationFactory.getReplicationPeers(server.getFileSystem(), server.getZooKeeper(), conf); rp.init(); rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); fs = FileSystem.get(conf); @@ -236,7 +237,17 @@ public class TestReplicationHFileCleaner { try { return new ZKWatcher(getConfiguration(), "dummy server", this); } catch (IOException e) { - e.printStackTrace(); + LOG.error("Can not get ZKWatcher", e); + } + return null; + } + + @Override + public FileSystem getFileSystem() { + try { + return TEST_UTIL.getTestFileSystem(); + } catch (IOException e) { + LOG.error("Can not get FileSystem", e); } return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index e82d69826d8..7f5df02ecfc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -232,8 +232,8 @@ public class SyncReplicationTestBase { protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtil utility) throws Exception { - ReplicationPeerStorage rps = ReplicationStorageFactory - .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration()); + ReplicationPeerStorage rps = ReplicationStorageFactory.getReplicationPeerStorage( + utility.getTestFileSystem(), utility.getZooKeeperWatcher(), utility.getConfiguration()); try { rps.getPeerSyncReplicationState(peerId); fail("Should throw exception when get the sync replication state of a removed peer."); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java new file mode 100644 index 00000000000..6f5c6c20d8d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithFSPeerStorage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestReplicationWithFSPeerStorage extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationWithFSPeerStorage.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // enable file system based peer storage + UTIL1.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + ReplicationPeerStorageType.FILESYSTEM.name().toLowerCase()); + UTIL2.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + ReplicationPeerStorageType.FILESYSTEM.name().toLowerCase()); + TestReplicationBase.setUpBeforeClass(); + } + + @Before + public void setUp() throws Exception { + cleanUp(); + } + + /** + * Add a row, check it's replicated, delete it, check's gone + */ + @Test + public void testSimplePutDelete() throws Exception { + runSimplePutDeleteTest(); + } + + /** + * Try a small batch upload using the write buffer, check it's replicated + */ + @Test + public void testSmallBatch() throws Exception { + runSmallBatchTest(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index dd989293ff5..c48dbc39a03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -388,8 +388,8 @@ public abstract class TestReplicationSourceManager { rq.addWAL(server.getServerName(), "1", file); } Server s1 = new DummyServer("dummyserver1.example.org"); - ReplicationPeers rp1 = - ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); + ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getFileSystem(), + s1.getZooKeeper(), s1.getConfiguration()); rp1.init(); manager.claimQueue(server.getServerName(), "1"); assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); @@ -857,6 +857,11 @@ public abstract class TestReplicationSourceManager { return zkw; } + @Override + public FileSystem getFileSystem() { + return fs; + } + @Override public Connection getConnection() { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java index dc8fb849633..e44e00d2d37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java @@ -61,8 +61,8 @@ public class TestHBaseFsckReplication { @Test public void test() throws Exception { - ReplicationPeerStorage peerStorage = ReplicationStorageFactory - .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + ReplicationPeerStorage peerStorage = ReplicationStorageFactory.getReplicationPeerStorage( + UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); ReplicationQueueStorage queueStorage = ReplicationStorageFactory .getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());