HBASE-2741 HBaseExecutorService needs to be multi-cluster friendly
(Karthik Ranganathan via JD) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@956968 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
981573c9e1
commit
ada9be3cf2
|
@ -404,6 +404,8 @@ Release 0.21.0 - Unreleased
|
||||||
the first row in a table.
|
the first row in a table.
|
||||||
HBASE-1025 Reconstruction log playback has no bounds on memory used
|
HBASE-1025 Reconstruction log playback has no bounds on memory used
|
||||||
HBASE-2757 Fix flaky TestFromClientSide test by forcing region assignment
|
HBASE-2757 Fix flaky TestFromClientSide test by forcing region assignment
|
||||||
|
HBASE-2741 HBaseExecutorService needs to be multi-cluster friendly
|
||||||
|
(Karthik Ranganathan via JD)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1760 Cleanup TODOs in HTable
|
HBASE-1760 Cleanup TODOs in HTable
|
||||||
|
|
|
@ -60,17 +60,6 @@ public abstract class HBaseEventHandler implements Runnable
|
||||||
// listeners that are called before and after an event is processed
|
// listeners that are called before and after an event is processed
|
||||||
protected static List<HBaseEventHandlerListener> eventHandlerListeners =
|
protected static List<HBaseEventHandlerListener> eventHandlerListeners =
|
||||||
Collections.synchronizedList(new ArrayList<HBaseEventHandlerListener>());
|
Collections.synchronizedList(new ArrayList<HBaseEventHandlerListener>());
|
||||||
// static instances needed by the handlers
|
|
||||||
protected static ServerManager serverManager;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Note that this has to be called first BEFORE the subclass constructors.
|
|
||||||
*
|
|
||||||
* TODO: take out after refactor
|
|
||||||
*/
|
|
||||||
public static void init(ServerManager serverManager) {
|
|
||||||
HBaseEventHandler.serverManager = serverManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This interface provides hooks to listen to various events received by the
|
* This interface provides hooks to listen to various events received by the
|
||||||
|
@ -124,7 +113,7 @@ public abstract class HBaseEventHandler implements Runnable
|
||||||
|
|
||||||
case RS2ZK_REGION_OPENING:
|
case RS2ZK_REGION_OPENING:
|
||||||
case RS2ZK_REGION_OPENED:
|
case RS2ZK_REGION_OPENED:
|
||||||
executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION;
|
executorServiceType = HBaseExecutorServiceType.MASTER_OPENREGION;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case M2ZK_REGION_OFFLINE:
|
case M2ZK_REGION_OFFLINE:
|
||||||
|
@ -212,7 +201,11 @@ public abstract class HBaseEventHandler implements Runnable
|
||||||
}
|
}
|
||||||
|
|
||||||
// call the main process function
|
// call the main process function
|
||||||
|
try {
|
||||||
process();
|
process();
|
||||||
|
} catch(Throwable t) {
|
||||||
|
LOG.error("Caught throwable while processing event " + eventType, t);
|
||||||
|
}
|
||||||
|
|
||||||
// fire all afterProcess listeners
|
// fire all afterProcess listeners
|
||||||
for(HBaseEventHandlerListener listener : eventHandlerListeners) {
|
for(HBaseEventHandlerListener listener : eventHandlerListeners) {
|
||||||
|
|
|
@ -222,13 +222,11 @@ public class HMaster extends Thread implements HMasterInterface,
|
||||||
// Start the unassigned watcher - which will create the unassgined region
|
// Start the unassigned watcher - which will create the unassgined region
|
||||||
// in ZK. This is needed before RegionManager() constructor tries to assign
|
// in ZK. This is needed before RegionManager() constructor tries to assign
|
||||||
// the root region.
|
// the root region.
|
||||||
ZKUnassignedWatcher.start(this.conf);
|
ZKUnassignedWatcher.start(this.conf, serverManager, address.toString());
|
||||||
// init the various event handlers
|
|
||||||
HBaseEventHandler.init(serverManager);
|
|
||||||
// start the "close region" executor service
|
// start the "close region" executor service
|
||||||
HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(MASTER);
|
HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString());
|
||||||
// start the "open region" executor service
|
// start the "open region" executor service
|
||||||
HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(MASTER);
|
HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString());
|
||||||
|
|
||||||
|
|
||||||
// start the region manager
|
// start the region manager
|
||||||
|
|
|
@ -41,14 +41,20 @@ import org.apache.zookeeper.Watcher.Event.EventType;
|
||||||
public class ZKUnassignedWatcher implements Watcher {
|
public class ZKUnassignedWatcher implements Watcher {
|
||||||
private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class);
|
private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class);
|
||||||
|
|
||||||
private ZooKeeperWrapper zkWrapper = null;
|
private ZooKeeperWrapper zkWrapper;
|
||||||
|
String serverName;
|
||||||
|
ServerManager serverManager;
|
||||||
|
|
||||||
public static void start(Configuration conf) throws IOException {
|
public static void start(Configuration conf, ServerManager serverManager,
|
||||||
new ZKUnassignedWatcher(conf);
|
String serverName) throws IOException {
|
||||||
|
new ZKUnassignedWatcher(conf, serverManager, serverName);
|
||||||
LOG.debug("Started ZKUnassigned watcher");
|
LOG.debug("Started ZKUnassigned watcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
public ZKUnassignedWatcher(Configuration conf) throws IOException {
|
public ZKUnassignedWatcher(Configuration conf, ServerManager serverManager,
|
||||||
|
String serverName) throws IOException {
|
||||||
|
this.serverName = serverName;
|
||||||
|
this.serverManager = serverManager;
|
||||||
zkWrapper =
|
zkWrapper =
|
||||||
ZooKeeperWrapper.getInstance(conf, HMaster.class.getName());
|
ZooKeeperWrapper.getInstance(conf, HMaster.class.getName());
|
||||||
// If the UNASSIGNED ZNode does not exist, create it.
|
// If the UNASSIGNED ZNode does not exist, create it.
|
||||||
|
@ -80,7 +86,8 @@ public class ZKUnassignedWatcher implements Watcher {
|
||||||
|
|
||||||
// check if the path is for the UNASSIGNED directory we care about
|
// check if the path is for the UNASSIGNED directory we care about
|
||||||
if(event.getPath() == null ||
|
if(event.getPath() == null ||
|
||||||
!event.getPath().startsWith(zkWrapper.getZNodePathForHBase(zkWrapper.getRegionInTransitionZNode()))) {
|
!event.getPath().startsWith(zkWrapper.getZNodePathForHBase(
|
||||||
|
zkWrapper.getRegionInTransitionZNode()))) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,10 +114,12 @@ public class ZKUnassignedWatcher implements Watcher {
|
||||||
* If there were some nodes created then watch those nodes
|
* If there were some nodes created then watch those nodes
|
||||||
*/
|
*/
|
||||||
else if(type.equals(EventType.NodeChildrenChanged)) {
|
else if(type.equals(EventType.NodeChildrenChanged)) {
|
||||||
List<ZNodePathAndData> newZNodes = zkWrapper.watchAndGetNewChildren(event.getPath());
|
List<ZNodePathAndData> newZNodes =
|
||||||
|
zkWrapper.watchAndGetNewChildren(event.getPath());
|
||||||
for(ZNodePathAndData zNodePathAndData : newZNodes) {
|
for(ZNodePathAndData zNodePathAndData : newZNodes) {
|
||||||
LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath());
|
LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath());
|
||||||
handleRegionStateInZK(zNodePathAndData.getzNodePath(), zNodePathAndData.getData());
|
handleRegionStateInZK(zNodePathAndData.getzNodePath(),
|
||||||
|
zNodePathAndData.getData());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,17 +148,18 @@ public class ZKUnassignedWatcher implements Watcher {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode();
|
String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode();
|
||||||
String region = zNodePath.substring(zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1);
|
String region = zNodePath.substring(
|
||||||
|
zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1);
|
||||||
HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
|
HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
|
||||||
|
|
||||||
// if the node was CLOSED then handle it
|
// if the node was CLOSED then handle it
|
||||||
if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) {
|
if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) {
|
||||||
new MasterCloseRegionHandler(rsEvent, region, data).submit();
|
new MasterCloseRegionHandler(rsEvent, serverManager, serverName, region, data).submit();
|
||||||
}
|
}
|
||||||
// if the region was OPENED then handle that
|
// if the region was OPENED then handle that
|
||||||
else if(rsEvent == HBaseEventType.RS2ZK_REGION_OPENED ||
|
else if(rsEvent == HBaseEventType.RS2ZK_REGION_OPENED ||
|
||||||
rsEvent == HBaseEventType.RS2ZK_REGION_OPENING) {
|
rsEvent == HBaseEventType.RS2ZK_REGION_OPENING) {
|
||||||
new MasterOpenRegionHandler(rsEvent, region, data).submit();
|
new MasterOpenRegionHandler(rsEvent, serverManager, serverName, region, data).submit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
|
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
|
||||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -41,11 +42,17 @@ public class MasterCloseRegionHandler extends HBaseEventHandler
|
||||||
private String regionName;
|
private String regionName;
|
||||||
protected byte[] serializedData;
|
protected byte[] serializedData;
|
||||||
RegionTransitionEventData hbEventData;
|
RegionTransitionEventData hbEventData;
|
||||||
|
ServerManager serverManager;
|
||||||
|
|
||||||
public MasterCloseRegionHandler(HBaseEventType eventType, String regionName, byte[] serializedData) {
|
public MasterCloseRegionHandler(HBaseEventType eventType,
|
||||||
super(false, HMaster.MASTER, eventType);
|
ServerManager serverManager,
|
||||||
|
String serverName,
|
||||||
|
String regionName,
|
||||||
|
byte[] serializedData) {
|
||||||
|
super(false, serverName, eventType);
|
||||||
this.regionName = regionName;
|
this.regionName = regionName;
|
||||||
this.serializedData = serializedData;
|
this.serializedData = serializedData;
|
||||||
|
this.serverManager = serverManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HServerInfo;
|
||||||
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
|
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
|
||||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -46,11 +47,17 @@ public class MasterOpenRegionHandler extends HBaseEventHandler {
|
||||||
protected byte[] serializedData;
|
protected byte[] serializedData;
|
||||||
private String regionName;
|
private String regionName;
|
||||||
private RegionTransitionEventData hbEventData;
|
private RegionTransitionEventData hbEventData;
|
||||||
|
ServerManager serverManager;
|
||||||
|
|
||||||
public MasterOpenRegionHandler(HBaseEventType eventType, String regionName, byte[] serData) {
|
public MasterOpenRegionHandler(HBaseEventType eventType,
|
||||||
super(false, HMaster.MASTER, eventType);
|
ServerManager serverManager,
|
||||||
|
String serverName,
|
||||||
|
String regionName,
|
||||||
|
byte[] serData) {
|
||||||
|
super(false, serverName, eventType);
|
||||||
this.regionName = regionName;
|
this.regionName = regionName;
|
||||||
this.serializedData = serData;
|
this.serializedData = serData;
|
||||||
|
this.serverManager = serverManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that tests the multi-cluster in 1 JVM case, useful
|
||||||
|
* only for "unit'ish tests".
|
||||||
|
*/
|
||||||
|
public class TestMultiClusters {
|
||||||
|
|
||||||
|
private static final byte[] TABLE_NAME = Bytes.toBytes("test");
|
||||||
|
private static final byte[] FAM_NAME = Bytes.toBytes("fam");
|
||||||
|
private static final byte[] ROW = Bytes.toBytes("row");
|
||||||
|
private static final byte[] QUAL_NAME = Bytes.toBytes("qual");
|
||||||
|
private static final byte[] VALUE = Bytes.toBytes("value");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Basic sanity test that spins up 2 HDFS and HBase clusters that share the
|
||||||
|
* same ZK ensemble. We then create the same table in both and make sure that
|
||||||
|
* what we insert in one place doesn't end up in the other.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test (timeout=100000)
|
||||||
|
public void twoClusters() throws Exception{
|
||||||
|
Configuration conf1 = HBaseConfiguration.create();
|
||||||
|
// Different path for different clusters
|
||||||
|
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||||
|
HBaseTestingUtility utility1 = new HBaseTestingUtility(conf1);
|
||||||
|
utility1.startMiniZKCluster();
|
||||||
|
|
||||||
|
Configuration conf2 = HBaseConfiguration.create();
|
||||||
|
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||||
|
HBaseTestingUtility utility2 = new HBaseTestingUtility(conf2);
|
||||||
|
// They share the same ensemble, but homed differently
|
||||||
|
utility2.setZkCluster(utility1.getZkCluster());
|
||||||
|
|
||||||
|
utility1.startMiniCluster();
|
||||||
|
utility2.startMiniCluster();
|
||||||
|
|
||||||
|
HTable table1 = utility1.createTable(TABLE_NAME, FAM_NAME);
|
||||||
|
HTable table2 = utility2.createTable(TABLE_NAME, FAM_NAME);
|
||||||
|
|
||||||
|
Put put = new Put(ROW);
|
||||||
|
put.add(FAM_NAME, QUAL_NAME, VALUE);
|
||||||
|
table1.put(put);
|
||||||
|
|
||||||
|
Get get = new Get(ROW);
|
||||||
|
get.addColumn(FAM_NAME, QUAL_NAME);
|
||||||
|
Result res = table1.get(get);
|
||||||
|
assertEquals(1, res.size());
|
||||||
|
|
||||||
|
res = table2.get(get);
|
||||||
|
assertEquals(0, res.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue