HBASE-25976 Implement a master based ReplicationTracker (#3390)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Duo Zhang 2021-06-17 18:24:49 +08:00 committed by GitHub
parent 5a19bcfa98
commit eb242be674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 695 additions and 302 deletions

View File

@ -127,6 +127,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link ReplicationTracker} implementation which polls the region servers list periodically from
* master.
*/
@InterfaceAudience.Private
class MasterReplicationTracker extends ReplicationTrackerBase {
private static final Logger LOG = LoggerFactory.getLogger(MasterReplicationTracker.class);
static final String REFRESH_INTERVAL_SECONDS =
"hbase.replication.tracker.master.refresh.interval.secs";
// default to refresh every 5 seconds
static final int REFRESH_INTERVAL_SECONDS_DEFAULT = 5;
private final ChoreService choreService;
private final ScheduledChore chore;
private final Admin admin;
private volatile Set<ServerName> regionServers;
MasterReplicationTracker(ReplicationTrackerParams params) {
try {
this.admin = params.connection().getAdmin();
} catch (IOException e) {
// should not happen
throw new AssertionError(e);
}
this.choreService = params.choreService();
int refreshIntervalSecs =
params.conf().getInt(REFRESH_INTERVAL_SECONDS, REFRESH_INTERVAL_SECONDS_DEFAULT);
this.chore = new ScheduledChore(getClass().getSimpleName(), params.stopptable(),
refreshIntervalSecs, 0, TimeUnit.SECONDS) {
@Override
protected void chore() {
try {
refresh();
} catch (IOException e) {
LOG.warn("failed to refresh region server list for replication", e);
}
}
};
}
private Set<ServerName> reload() throws IOException {
return Collections.unmodifiableSet(new HashSet<>(admin.getRegionServers()));
}
private void refresh() throws IOException {
Set<ServerName> newRegionServers = reload();
for (ServerName oldRs : regionServers) {
if (!newRegionServers.contains(oldRs)) {
notifyListeners(oldRs);
}
}
this.regionServers = newRegionServers;
}
@Override
protected Set<ServerName> internalLoadLiveRegionServersAndInitializeListeners()
throws IOException {
Set<ServerName> newRegionServers = reload();
this.regionServers = newRegionServers;
choreService.scheduleChore(chore);
return newRegionServers;
}
}

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -28,6 +27,9 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public final class ReplicationFactory {
public static final String REPLICATION_TRACKER_IMPL = "hbase.replication.tracker.impl";
private ReplicationFactory() {
}
@ -35,8 +37,9 @@ public final class ReplicationFactory {
return new ReplicationPeers(zk, conf);
}
public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, Abortable abortable,
Stoppable stopper) {
return new ReplicationTrackerZKImpl(zookeeper, abortable, stopper);
public static ReplicationTracker getReplicationTracker(ReplicationTrackerParams params) {
Class<? extends ReplicationTracker> clazz = params.conf().getClass(REPLICATION_TRACKER_IMPL,
ZKReplicationTracker.class, ReplicationTracker.class);
return ReflectionUtils.newInstance(clazz, params);
}
}

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -33,5 +34,5 @@ public interface ReplicationListener {
* A region server has been removed from the local cluster
* @param regionServer the removed region server
*/
public void regionServerRemoved(String regionServer);
public void regionServerRemoved(ServerName regionServer);
}

View File

@ -18,18 +18,20 @@
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This is the interface for a Replication Tracker. A replication tracker provides the facility to
* subscribe and track events that reflect a change in replication state. These events are used by
* the ReplicationSourceManager to coordinate replication tasks such as addition/deletion of queues
* and queue failover. These events are defined in the ReplicationListener interface. If a class
* would like to listen to replication events it must implement the ReplicationListener interface
* and register itself with a Replication Tracker.
* This is the interface for a Replication Tracker.
* <p/>
* A replication tracker provides the facility to subscribe and track events that reflect a change
* in replication state. These events are used by the ReplicationSourceManager to coordinate
* replication tasks such as addition/deletion of queues and queue failover. These events are
* defined in the ReplicationListener interface. If a class would like to listen to replication
* events it must implement the ReplicationListener interface and register itself with a Replication
* Tracker.
*/
@InterfaceAudience.Private
public interface ReplicationTracker {
@ -40,11 +42,20 @@ public interface ReplicationTracker {
*/
void registerListener(ReplicationListener listener);
/**
* Remove a replication listener
* @param listener the listener to remove
*/
void removeListener(ReplicationListener listener);
/**
* Returns a list of other live region servers in the cluster.
* @return List of region servers.
* In this method, you need to load the newest list of region server list and return it, and all
* later changes to the region server list must be passed to the listeners.
* <p/>
* This is very important for us to not miss a region server crash.
* <p/>
* Notice that this method can only be called once.
* @return Set of region servers.
*/
List<ServerName> getListOfRegionServers();
Set<ServerName> loadLiveRegionServersAndInitializeListeners() throws IOException;
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base implementation class for replication tracker.
*/
@InterfaceAudience.Private
abstract class ReplicationTrackerBase implements ReplicationTracker {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerBase.class);
// listeners to be notified
private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
private final AtomicBoolean initialized = new AtomicBoolean(false);
@Override
public void registerListener(ReplicationListener listener) {
listeners.add(listener);
}
@Override
public void removeListener(ReplicationListener listener) {
listeners.remove(listener);
}
protected final void notifyListeners(ServerName regionServer) {
LOG.info("{} is dead, triggering replicatorRemoved event", regionServer);
for (ReplicationListener listener : listeners) {
listener.regionServerRemoved(regionServer);
}
}
@Override
public final Set<ServerName> loadLiveRegionServersAndInitializeListeners() throws IOException {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException(
"loadLiveRegionServersAndInitializeListeners can only be called once");
}
return internalLoadLiveRegionServersAndInitializeListeners();
}
protected abstract Set<ServerName> internalLoadLiveRegionServersAndInitializeListeners()
throws IOException;
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Parameters for constructing a {@link ReplicationTracker}.
*/
@InterfaceAudience.Private
public final class ReplicationTrackerParams {
private final Configuration conf;
private final Stoppable stopptable;
private ZKWatcher zookeeper;
private Abortable abortable;
private Connection conn;
private ChoreService choreService;
private ReplicationTrackerParams(Configuration conf, Stoppable stopptable) {
this.conf = conf;
this.stopptable = stopptable;
}
public ReplicationTrackerParams zookeeper(ZKWatcher zookeeper) {
this.zookeeper = zookeeper;
return this;
}
public ReplicationTrackerParams abortable(Abortable abortable) {
this.abortable = abortable;
return this;
}
public ReplicationTrackerParams connection(Connection conn) {
this.conn = conn;
return this;
}
public ReplicationTrackerParams choreService(ChoreService choreService) {
this.choreService = choreService;
return this;
}
public Configuration conf() {
return conf;
}
public Stoppable stopptable() {
return stopptable;
}
public ZKWatcher zookeeper() {
return zookeeper;
}
public Abortable abortable() {
return abortable;
}
public Connection connection() {
return conn;
}
public ChoreService choreService() {
return choreService;
}
public static ReplicationTrackerParams create(Configuration conf, Stoppable stopptable) {
return new ReplicationTrackerParams(conf, stopptable);
}
}

View File

@ -17,11 +17,11 @@
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
@ -30,8 +30,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
/**
* This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is
@ -39,9 +39,7 @@ import org.slf4j.LoggerFactory;
* interface.
*/
@InterfaceAudience.Private
public class ReplicationTrackerZKImpl implements ReplicationTracker {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class);
class ZKReplicationTracker extends ReplicationTrackerBase {
// Zookeeper
private final ZKWatcher zookeeper;
@ -49,42 +47,14 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
private final Abortable abortable;
// All about stopping
private final Stoppable stopper;
// listeners to be notified
private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
// List of all the other region servers in this cluster
private final List<ServerName> otherRegionServers = new ArrayList<>();
private final Set<ServerName> regionServers = new HashSet<>();
public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) {
this.zookeeper = zookeeper;
this.abortable = abortable;
this.stopper = stopper;
ZKReplicationTracker(ReplicationTrackerParams params) {
this.zookeeper = params.zookeeper();
this.abortable = params.abortable();
this.stopper = params.stopptable();
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
// watch the changes
refreshOtherRegionServersList(true);
}
@Override
public void registerListener(ReplicationListener listener) {
listeners.add(listener);
}
@Override
public void removeListener(ReplicationListener listener) {
listeners.remove(listener);
}
/**
* Return a snapshot of the current region servers.
*/
@Override
public List<ServerName> getListOfRegionServers() {
refreshOtherRegionServersList(false);
List<ServerName> list = null;
synchronized (otherRegionServers) {
list = new ArrayList<>(otherRegionServers);
}
return list;
}
/**
@ -106,6 +76,9 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
*/
@Override
public void nodeCreated(String path) {
if (stopper.isStopped()) {
return;
}
refreshListIfRightPath(path);
}
@ -118,14 +91,10 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
if (stopper.isStopped()) {
return;
}
boolean cont = refreshListIfRightPath(path);
if (!cont) {
if (!refreshListIfRightPath(path)) {
return;
}
LOG.info(path + " znode expired, triggering replicatorRemoved event");
for (ReplicationListener rl : listeners) {
rl.regionServerRemoved(getZNodeName(path));
}
notifyListeners(ServerName.valueOf(getZNodeName(path)));
}
/**
@ -144,7 +113,7 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) {
return false;
}
return refreshOtherRegionServersList(true);
return refreshRegionServerList();
}
}
@ -154,8 +123,8 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
* @return the id or an empty string if path is invalid
*/
private String getZNodeName(String fullPath) {
String[] parts = fullPath.split("/");
return parts.length > 0 ? parts[parts.length - 1] : "";
List<String> parts = Splitter.on('/').splitToList(fullPath);
return parts.size() > 0 ? parts.get(parts.size() - 1) : "";
}
/**
@ -164,14 +133,14 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
* @return true if the local list of the other region servers was updated with the ZK data (even
* if it was empty), false if the data was missing in ZK
*/
private boolean refreshOtherRegionServersList(boolean watch) {
List<ServerName> newRsList = getRegisteredRegionServers(watch);
private boolean refreshRegionServerList() {
Set<ServerName> newRsList = getRegisteredRegionServers();
if (newRsList == null) {
return false;
} else {
synchronized (otherRegionServers) {
otherRegionServers.clear();
otherRegionServers.addAll(newRsList);
synchronized (regionServers) {
regionServers.clear();
regionServers.addAll(newRsList);
}
}
return true;
@ -181,19 +150,26 @@ public class ReplicationTrackerZKImpl implements ReplicationTracker {
* Get a list of all the other region servers in this cluster and set a watch
* @return a list of server nanes
*/
private List<ServerName> getRegisteredRegionServers(boolean watch) {
private Set<ServerName> getRegisteredRegionServers() {
List<String> result = null;
try {
if (watch) {
result = ZKUtil.listChildrenAndWatchThem(this.zookeeper,
this.zookeeper.getZNodePaths().rsZNode);
} else {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.getZNodePaths().rsZNode);
}
result =
ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.getZNodePaths().rsZNode);
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
return result == null ? null :
result.stream().map(ServerName::parseServerName).collect(Collectors.toList());
result.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
}
@Override
protected Set<ServerName> internalLoadLiveRegionServersAndInitializeListeners()
throws IOException {
if (!refreshRegionServerList()) {
throw new IOException("failed to refresh region server list");
}
synchronized (regionServers) {
return new HashSet<>(regionServers);
}
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for testing {@link ReplicationTracker} and {@link ReplicationListener}.
*/
public abstract class ReplicationTrackerTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerTestBase.class);
private ReplicationTracker rt;
private AtomicInteger rsRemovedCount;
private volatile ServerName rsRemovedData;
@Before
public void setUp() {
ReplicationTrackerParams params = createParams();
rt = ReplicationFactory.getReplicationTracker(params);
rsRemovedCount = new AtomicInteger(0);
rsRemovedData = null;
}
protected abstract ReplicationTrackerParams createParams();
protected abstract void addServer(ServerName sn) throws Exception;
protected abstract void removeServer(ServerName sn) throws Exception;
@Test
public void testWatchRegionServers() throws Exception {
ServerName sn =
ServerName.valueOf("hostname2.example.org,1234," + EnvironmentEdgeManager.currentTime());
addServer(sn);
rt.registerListener(new DummyReplicationListener());
assertEquals(1, rt.loadLiveRegionServersAndInitializeListeners().size());
// delete one
removeServer(sn);
// wait for event
Waiter.waitFor(HBaseConfiguration.create(), 15000, () -> rsRemovedCount.get() >= 1);
assertEquals(sn, rsRemovedData);
}
private class DummyReplicationListener implements ReplicationListener {
@Override
public void regionServerRemoved(ServerName regionServer) {
rsRemovedData = regionServer;
rsRemovedCount.getAndIncrement();
LOG.debug("Received regionServerRemoved event: " + regionServer);
}
}
protected static class WarnOnlyStoppable implements Stoppable {
@Override
public void stop(String why) {
LOG.warn("TestReplicationTracker received stop, ignoring. Reason: " + why);
}
@Override
public boolean isStopped() {
return false;
}
}
protected static class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("TestReplicationTracker received abort, ignoring. Reason: " + why);
}
@Override
public boolean isAborted() {
return false;
}
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestMasterReplicationTracker extends ReplicationTrackerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterReplicationTracker.class);
private static Configuration CONF;
private static Connection CONN;
private static ChoreService CHORE_SERVICE;
private static List<ServerName> SERVERS = new CopyOnWriteArrayList<>();
@BeforeClass
public static void setUpBeforeClass() throws IOException {
CONF = HBaseConfiguration.create();
CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, MasterReplicationTracker.class,
ReplicationTracker.class);
Admin admin = mock(Admin.class);
when(admin.getRegionServers()).thenReturn(SERVERS);
CONN = mock(Connection.class);
when(CONN.getAdmin()).thenReturn(admin);
CHORE_SERVICE = new ChoreService("TestMasterReplicationTracker");
}
@AfterClass
public static void tearDownAfterClass() {
CHORE_SERVICE.shutdown();
}
@Override
protected ReplicationTrackerParams createParams() {
return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
.abortable(new WarnOnlyAbortable()).connection(CONN).choreService(CHORE_SERVICE);
}
@Override
protected void addServer(ServerName sn) throws Exception {
SERVERS.add(sn);
}
@Override
protected void removeServer(ServerName sn) throws Exception {
SERVERS.remove(sn);
}
}

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@ -26,6 +28,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
@ -224,7 +227,8 @@ public class TestZKReplicationPeerStorage {
STORAGE.getNewSyncReplicationStateNode(peerId)));
}
@Test public void testBaseReplicationPeerConfig() throws ReplicationException{
@Test
public void testBaseReplicationPeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
String customPeerConfigUpdatedValue = "testUpdated";
@ -267,7 +271,8 @@ public class TestZKReplicationPeerStorage {
getConfiguration().get(customPeerConfigSecondKey));
}
@Test public void testBaseReplicationRemovePeerConfig() throws ReplicationException {
@Test
public void testBaseReplicationRemovePeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
@ -296,7 +301,8 @@ public class TestZKReplicationPeerStorage {
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
}
@Test public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
@Test
public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
@ -311,4 +317,22 @@ public class TestZKReplicationPeerStorage {
updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
}
@Test
public void testPeerNameControl() throws Exception {
String clusterKey = "key";
STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
SyncReplicationState.NONE);
try {
STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
true, SyncReplicationState.NONE);
fail();
} catch (ReplicationException e) {
assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class));
} finally {
// clean up
STORAGE.removePeer("6");
}
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestZKReplicationTracker extends ReplicationTrackerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestZKReplicationTracker.class);
private static Configuration CONF;
private static HBaseZKTestingUtility UTIL;
private static ZKWatcher ZKW;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL = new HBaseZKTestingUtility();
UTIL.startMiniZKCluster();
CONF = UTIL.getConfiguration();
CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL, ZKReplicationTracker.class,
ReplicationTracker.class);
ZKWatcher zk = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
ZKW = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
}
@Override
protected ReplicationTrackerParams createParams() {
return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
.abortable(new WarnOnlyAbortable()).zookeeper(ZKW);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(ZKW, true);
UTIL.shutdownMiniZKCluster();
}
@Override
protected void addServer(ServerName sn) throws Exception {
ZKUtil.createAndWatch(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()),
HConstants.EMPTY_BYTE_ARRAY);
}
@Override
protected void removeServer(ServerName sn) throws Exception {
ZKUtil.deleteNode(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()));
}
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -308,9 +309,11 @@ public class DumpReplicationQueues extends Configured implements Tool {
StringBuilder sb = new StringBuilder();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(),
new WarnOnlyStoppable());
Set<ServerName> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
replicationTracker = ReplicationFactory
.getReplicationTracker(ReplicationTrackerParams.create(getConf(), new WarnOnlyStoppable())
.abortable(new WarnOnlyAbortable()).zookeeper(zkw));
Set<ServerName> liveRegionServers =
new HashSet<>(replicationTracker.loadLiveRegionServersAndInitializeListeners());
// Loops each peer on each RS and dumps the queues
List<ServerName> regionservers = queueStorage.getListOfReplicators();

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
@ -101,8 +102,10 @@ public class Replication implements ReplicationSourceService {
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
this.replicationPeers.init();
this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server);
this.replicationTracker = ReplicationFactory
.getReplicationTracker(ReplicationTrackerParams.create(server.getConfiguration(), server)
.abortable(server).zookeeper(server.getZooKeeper()).choreService(server.getChoreService())
.connection(server.getConnection()));
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);
}

View File

@ -76,6 +76,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -278,16 +279,24 @@ public class ReplicationSourceManager implements ReplicationListener {
server.abort("Failed to get all replicators", e);
return;
}
Set<ServerName> liveRegionServers;
try {
// must call this method to load the first snapshot of live region servers and initialize
// listeners
liveRegionServers = replicationTracker.loadLiveRegionServersAndInitializeListeners();
} catch (IOException e) {
server.abort("Failed load live region server list for replication", e);
return;
}
LOG.info("Current list of replicators: {}, live RSes: {}", currentReplicators,
liveRegionServers);
if (currentReplicators == null || currentReplicators.isEmpty()) {
return;
}
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers();
LOG.info(
"Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
// Look if there's anything to process after a restart
for (ServerName rs : currentReplicators) {
if (!otherRegionServers.contains(rs)) {
if (!liveRegionServers.contains(rs)) {
transferQueues(rs);
}
}
@ -830,8 +839,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
@Override
public void regionServerRemoved(String regionserver) {
transferQueues(ServerName.valueOf(regionserver));
public void regionServerRemoved(ServerName regionserver) {
transferQueues(regionserver);
}
/**

View File

@ -1,208 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.MockServer;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
* MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
* of the rsZNode. All other znode creation/initialization is handled by the replication state
* interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
* MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestReplicationTrackerZKImpl {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class);
private static Configuration conf;
private static HBaseTestingUtility utility;
// Each one of the below variables are reinitialized before every test case
private ZKWatcher zkw;
private ReplicationPeers rp;
private ReplicationTracker rt;
private AtomicInteger rsRemovedCount;
private String rsRemovedData;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
}
@Before
public void setUp() throws Exception {
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String fakeRs1 = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
"hostname1.example.org:1234");
try {
ZKClusterId.setClusterId(zkw, new ClusterId());
rp = ReplicationFactory.getReplicationPeers(zkw, conf);
rp.init();
rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1),
new DummyServer(fakeRs1));
} catch (Exception e) {
fail("Exception during test setup: " + e);
}
rsRemovedCount = new AtomicInteger(0);
rsRemovedData = "";
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
utility.shutdownMiniZKCluster();
}
@Test
public void testGetListOfRegionServers() throws Exception {
// 0 region servers
assertEquals(0, rt.getListOfRegionServers().size());
// 1 region server
ZKUtil.createWithParents(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
"hostname1.example.org,1234,1611218678009"));
List<ServerName> rss = rt.getListOfRegionServers();
assertEquals(rss.toString(), 1, rss.size());
// 2 region servers
ZKUtil.createWithParents(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
"hostname2.example.org,1234,1611218678009"));
rss = rt.getListOfRegionServers();
assertEquals(rss.toString(), 2, rss.size());
// 1 region server
ZKUtil.deleteNode(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
"hostname2.example.org,1234,1611218678009"));
rss = rt.getListOfRegionServers();
assertEquals(1, rss.size());
// 0 region server
ZKUtil.deleteNode(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
"hostname1.example.org,1234,1611218678009"));
rss = rt.getListOfRegionServers();
assertEquals(rss.toString(), 0, rss.size());
}
@Test
public void testRegionServerRemovedEvent() throws Exception {
ZKUtil.createAndWatch(zkw,
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"),
HConstants.EMPTY_BYTE_ARRAY);
rt.registerListener(new DummyReplicationListener());
// delete one
ZKUtil.deleteNode(zkw,
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
// wait for event
while (rsRemovedCount.get() < 1) {
Thread.sleep(5);
}
assertEquals("hostname2.example.org:1234", rsRemovedData);
}
@Test
public void testPeerNameControl() throws Exception {
int exists = 0;
rp.getPeerStorage().addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true,
SyncReplicationState.NONE);
try {
rp.getPeerStorage().addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true,
SyncReplicationState.NONE);
} catch (ReplicationException e) {
if (e.getCause() instanceof KeeperException.NodeExistsException) {
exists++;
}
}
assertEquals(1, exists);
// clean up
rp.getPeerStorage().removePeer("6");
}
private class DummyReplicationListener implements ReplicationListener {
@Override
public void regionServerRemoved(String regionServer) {
rsRemovedData = regionServer;
rsRemovedCount.getAndIncrement();
LOG.debug("Received regionServerRemoved event: " + regionServer);
}
}
private class DummyServer extends MockServer {
private String serverName;
public DummyServer(String serverName) {
this.serverName = serverName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZKWatcher getZooKeeper() {
return zkw;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(this.serverName);
}
}
}

View File

@ -46,6 +46,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
@ -845,6 +847,16 @@ public abstract class TestReplicationSourceManager {
return zkw;
}
@Override
public Connection getConnection() {
return null;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(hostname, 1234, 1L);