HBASE-26151 Reimplement MasterAddressTracker to also cache backup master addresses (#3548)
Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
90f23d4743
commit
b248730126
|
@ -33,6 +33,7 @@ import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
@ -51,10 +52,10 @@ import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
|
|||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
|
||||
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
|
||||
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.TestMasterAddressTracker.NodeCreationListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.zookeeper;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -33,6 +32,7 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
|
||||
|
@ -57,6 +57,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MasterAddressTracker extends ZKNodeTracker {
|
||||
|
||||
private volatile List<ServerName> backupMasters = Collections.emptyList();
|
||||
|
||||
/**
|
||||
* Construct a master address listener with the specified
|
||||
* <code>zookeeper</code> reference.
|
||||
|
@ -72,6 +75,26 @@ public class MasterAddressTracker extends ZKNodeTracker {
|
|||
super(watcher, watcher.getZNodePaths().masterAddressZNode, abortable);
|
||||
}
|
||||
|
||||
private void loadBackupMasters() {
|
||||
try {
|
||||
backupMasters = Collections.unmodifiableList(getBackupMastersAndRenewWatch(watcher));
|
||||
} catch (InterruptedIOException e) {
|
||||
abortable.abort("Unexpected exception handling nodeChildrenChanged event", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postStart() {
|
||||
loadBackupMasters();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeChildrenChanged(String path) {
|
||||
if (path.equals(watcher.getZNodePaths().backupMasterAddressesZNode)) {
|
||||
loadBackupMasters();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the address of the current master if one is available. Returns null
|
||||
* if no current master.
|
||||
|
@ -252,11 +275,12 @@ public class MasterAddressTracker extends ZKNodeTracker {
|
|||
}
|
||||
int prefixLen = ProtobufUtil.lengthOfPBMagic();
|
||||
try {
|
||||
return ZooKeeperProtos.Master.PARSER.parseFrom(data, prefixLen, data.length - prefixLen);
|
||||
return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* delete the master znode if its content is same as the parameter
|
||||
* @param zkw must not be null
|
||||
|
@ -284,7 +308,7 @@ public class MasterAddressTracker extends ZKNodeTracker {
|
|||
}
|
||||
|
||||
public List<ServerName> getBackupMasters() throws InterruptedIOException {
|
||||
return getBackupMastersAndRenewWatch(watcher);
|
||||
return backupMasters;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,7 +42,7 @@ public abstract class ZKNodeTracker extends ZKListener {
|
|||
protected final String node;
|
||||
|
||||
/** Data of the node being tracked */
|
||||
private byte [] data;
|
||||
private byte[] data;
|
||||
|
||||
/** Used to abort if a fatal error occurs */
|
||||
protected final Abortable abortable;
|
||||
|
@ -51,16 +51,14 @@ public abstract class ZKNodeTracker extends ZKListener {
|
|||
|
||||
/**
|
||||
* Constructs a new ZK node tracker.
|
||||
*
|
||||
* <p>After construction, use {@link #start} to kick off tracking.
|
||||
*
|
||||
* <p/>
|
||||
* After construction, use {@link #start} to kick off tracking.
|
||||
* @param watcher reference to the {@link ZKWatcher} which also contains configuration and
|
||||
* constants
|
||||
* constants
|
||||
* @param node path of the node being tracked
|
||||
* @param abortable used to abort if a fatal error occurs
|
||||
*/
|
||||
public ZKNodeTracker(ZKWatcher watcher, String node,
|
||||
Abortable abortable) {
|
||||
public ZKNodeTracker(ZKWatcher watcher, String node, Abortable abortable) {
|
||||
super(watcher);
|
||||
this.node = node;
|
||||
this.abortable = abortable;
|
||||
|
@ -69,9 +67,9 @@ public abstract class ZKNodeTracker extends ZKListener {
|
|||
|
||||
/**
|
||||
* Starts the tracking of the node in ZooKeeper.
|
||||
*
|
||||
* <p>Use {@link #blockUntilAvailable()} to block until the node is available
|
||||
* or {@link #getData(boolean)} to get the data of the node if it is available.
|
||||
* <p/>
|
||||
* Use {@link #blockUntilAvailable()} to block until the node is available or
|
||||
* {@link #getData(boolean)} to get the data of the node if it is available.
|
||||
*/
|
||||
public synchronized void start() {
|
||||
this.watcher.registerListener(this);
|
||||
|
@ -89,6 +87,13 @@ public abstract class ZKNodeTracker extends ZKListener {
|
|||
} catch (KeeperException e) {
|
||||
abortable.abort("Unexpected exception during initialization, aborting", e);
|
||||
}
|
||||
postStart();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after start is called. Sub classes could implement this method to load more data on zk.
|
||||
*/
|
||||
protected void postStart() {
|
||||
}
|
||||
|
||||
public synchronized void stop() {
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -25,16 +25,11 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ZKTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -46,18 +41,19 @@ import org.junit.rules.TestName;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({RegionServerTests.class, MediumTests.class})
|
||||
@Category({ ZKTests.class, MediumTests.class })
|
||||
public class TestMasterAddressTracker {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMasterAddressTracker.class);
|
||||
HBaseClassTestRule.forClass(TestMasterAddressTracker.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMasterAddressTracker.class);
|
||||
|
||||
private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
||||
private final static HBaseZKTestingUtil TEST_UTIL = new HBaseZKTestingUtil();
|
||||
|
||||
// Cleaned up after each unit test.
|
||||
private static ZKWatcher zk;
|
||||
private ZKWatcher zk;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
@ -81,15 +77,15 @@ public class TestMasterAddressTracker {
|
|||
|
||||
@Test
|
||||
public void testDeleteIfEquals() throws Exception {
|
||||
final ServerName sn = ServerName.valueOf("localhost", 1234,
|
||||
EnvironmentEdgeManager.currentTime());
|
||||
final ServerName sn =
|
||||
ServerName.valueOf("localhost", 1234, EnvironmentEdgeManager.currentTime());
|
||||
final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1772);
|
||||
try {
|
||||
assertFalse("shouldn't have deleted wrong master server.",
|
||||
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), "some other string."));
|
||||
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), "some other string."));
|
||||
} finally {
|
||||
assertTrue("Couldn't clean up master",
|
||||
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
|
||||
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,9 +95,8 @@ public class TestMasterAddressTracker {
|
|||
* @param infoPort if there is an active master, set its info port.
|
||||
*/
|
||||
private MasterAddressTracker setupMasterTracker(final ServerName sn, final int infoPort)
|
||||
throws Exception {
|
||||
zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
|
||||
name.getMethodName(), null);
|
||||
throws Exception {
|
||||
zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null);
|
||||
ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode);
|
||||
ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().backupMasterAddressesZNode);
|
||||
|
||||
|
@ -112,14 +107,14 @@ public class TestMasterAddressTracker {
|
|||
zk.registerListener(addressTracker);
|
||||
|
||||
// Use a listener to capture when the node is actually created
|
||||
NodeCreationListener listener = new NodeCreationListener(zk,
|
||||
zk.getZNodePaths().masterAddressZNode);
|
||||
NodeCreationListener listener =
|
||||
new NodeCreationListener(zk, zk.getZNodePaths().masterAddressZNode);
|
||||
zk.registerListener(listener);
|
||||
|
||||
if (sn != null) {
|
||||
LOG.info("Creating master node");
|
||||
MasterAddressTracker.setMasterAddress(zk, zk.getZNodePaths().masterAddressZNode,
|
||||
sn, infoPort);
|
||||
MasterAddressTracker.setMasterAddress(zk, zk.getZNodePaths().masterAddressZNode, sn,
|
||||
infoPort);
|
||||
|
||||
// Wait for the node to be created
|
||||
LOG.info("Waiting for master address manager to be notified");
|
||||
|
@ -130,16 +125,15 @@ public class TestMasterAddressTracker {
|
|||
}
|
||||
|
||||
/**
|
||||
* Unit tests that uses ZooKeeper but does not use the master-side methods
|
||||
* but rather acts directly on ZK.
|
||||
* @throws Exception
|
||||
* Unit tests that uses ZooKeeper but does not use the master-side methods but rather acts
|
||||
* directly on ZK.
|
||||
*/
|
||||
@Test
|
||||
public void testMasterAddressTrackerFromZK() throws Exception {
|
||||
// Create the master node with a dummy address
|
||||
final int infoPort = 1235;
|
||||
final ServerName sn = ServerName.valueOf("localhost", 1234,
|
||||
EnvironmentEdgeManager.currentTime());
|
||||
final ServerName sn =
|
||||
ServerName.valueOf("localhost", 1234, EnvironmentEdgeManager.currentTime());
|
||||
final MasterAddressTracker addressTracker = setupMasterTracker(sn, infoPort);
|
||||
try {
|
||||
assertTrue(addressTracker.hasMaster());
|
||||
|
@ -148,11 +142,10 @@ public class TestMasterAddressTracker {
|
|||
assertEquals(infoPort, addressTracker.getMasterInfoPort());
|
||||
} finally {
|
||||
assertTrue("Couldn't clean up master",
|
||||
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
|
||||
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testParsingNull() throws Exception {
|
||||
assertNull("parse on null data should return null.", MasterAddressTracker.parse(null));
|
||||
|
@ -160,17 +153,16 @@ public class TestMasterAddressTracker {
|
|||
|
||||
@Test
|
||||
public void testNoBackups() throws Exception {
|
||||
final ServerName sn = ServerName.valueOf("localhost", 1234,
|
||||
EnvironmentEdgeManager.currentTime());
|
||||
final ServerName sn =
|
||||
ServerName.valueOf("localhost", 1234, EnvironmentEdgeManager.currentTime());
|
||||
final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1772);
|
||||
try {
|
||||
assertEquals("Should receive 0 for backup not found.", 0,
|
||||
addressTracker.getBackupMasterInfoPort(
|
||||
ServerName.valueOf("doesnotexist.example.com", 1234,
|
||||
EnvironmentEdgeManager.currentTime())));
|
||||
addressTracker.getBackupMasterInfoPort(ServerName.valueOf("doesnotexist.example.com", 1234,
|
||||
EnvironmentEdgeManager.currentTime())));
|
||||
} finally {
|
||||
assertTrue("Couldn't clean up master",
|
||||
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
|
||||
MasterAddressTracker.deleteIfEquals(addressTracker.getWatcher(), sn.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -184,8 +176,8 @@ public class TestMasterAddressTracker {
|
|||
|
||||
@Test
|
||||
public void testBackupMasters() throws Exception {
|
||||
final ServerName sn = ServerName.valueOf("localhost", 5678,
|
||||
EnvironmentEdgeManager.currentTime());
|
||||
final ServerName sn =
|
||||
ServerName.valueOf("localhost", 5678, EnvironmentEdgeManager.currentTime());
|
||||
final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1111);
|
||||
assertTrue(addressTracker.hasMaster());
|
||||
ServerName activeMaster = addressTracker.getMasterAddress();
|
||||
|
@ -195,13 +187,14 @@ public class TestMasterAddressTracker {
|
|||
assertEquals(0, backupMasters.size());
|
||||
ServerName backupMaster1 = ServerName.valueOf("localhost", 2222, -1);
|
||||
ServerName backupMaster2 = ServerName.valueOf("localhost", 3333, -1);
|
||||
String backupZNode1 = ZNodePaths.joinZNode(
|
||||
zk.getZNodePaths().backupMasterAddressesZNode, backupMaster1.toString());
|
||||
String backupZNode2 = ZNodePaths.joinZNode(
|
||||
zk.getZNodePaths().backupMasterAddressesZNode, backupMaster2.toString());
|
||||
// Add a backup master
|
||||
String backupZNode1 =
|
||||
ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupMaster1.toString());
|
||||
String backupZNode2 =
|
||||
ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupMaster2.toString());
|
||||
// Add backup masters
|
||||
MasterAddressTracker.setMasterAddress(zk, backupZNode1, backupMaster1, 2222);
|
||||
MasterAddressTracker.setMasterAddress(zk, backupZNode2, backupMaster2, 3333);
|
||||
TEST_UTIL.waitFor(30000, () -> addressTracker.getBackupMasters().size() == 2);
|
||||
backupMasters = addressTracker.getBackupMasters();
|
||||
assertEquals(2, backupMasters.size());
|
||||
assertTrue(backupMasters.contains(backupMaster1));
|
||||
|
@ -222,7 +215,7 @@ public class TestMasterAddressTracker {
|
|||
|
||||
@Override
|
||||
public void nodeCreated(String path) {
|
||||
if(path.equals(node)) {
|
||||
if (path.equals(node)) {
|
||||
LOG.debug("nodeCreated(" + path + ")");
|
||||
lock.release();
|
||||
}
|
||||
|
@ -232,6 +225,4 @@ public class TestMasterAddressTracker {
|
|||
lock.acquire();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue