HBASE-2694 Move RS to Master region open/close messaging into ZooKeeper
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@953920 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df71c57269
commit
b1ef73ff1f
|
@ -686,6 +686,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2618 Don't inherit from HConstants (Benoit Sigoure via Stack)
|
||||
HBASE-2208 TableServers # processBatchOfRows - converts from List to [ ]
|
||||
- Expensive copy
|
||||
HBASE-2694 Move RS to Master region open/close messaging into ZooKeeper
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
|
|
@ -220,13 +220,16 @@ public class HConnectionManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get this watcher's ZKW, instanciate it if necessary.
|
||||
* Get this watcher's ZKW, instantiate it if necessary.
|
||||
* @return ZKW
|
||||
* @throws java.io.IOException if a remote or network exception occurs
|
||||
*/
|
||||
public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
|
||||
if(zooKeeperWrapper == null) {
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
|
||||
String zkWrapperName = HConnectionManager.class.getName() + "-" +
|
||||
ZooKeeperWrapper.getZookeeperClusterKey(conf);
|
||||
zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, zkWrapperName);
|
||||
zooKeeperWrapper.registerListener(this);
|
||||
}
|
||||
return zooKeeperWrapper;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,285 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.executor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
|
||||
|
||||
/**
|
||||
* Abstract base class for all HBase event handlers. Subclasses should
|
||||
* implement the process() method where the actual handling of the event
|
||||
* happens.
|
||||
*
|
||||
* HBaseEventType is a list of ALL events (which also corresponds to messages -
|
||||
* either internal to one component or between components). The event type
|
||||
* names specify the component from which the event originated, and the
|
||||
* component which is supposed to handle it.
|
||||
*
|
||||
* Listeners can listen to all the events by implementing the interface
|
||||
* HBaseEventHandlerListener, and by registering themselves as a listener. They
|
||||
* will be called back before and after the process of every event.
|
||||
*
|
||||
* TODO: Rename HBaseEvent and HBaseEventType to EventHandler and EventType
|
||||
* after ZK refactor as it currently would clash with EventType from ZK and
|
||||
* make the code very confusing.
|
||||
*/
|
||||
public abstract class HBaseEventHandler implements Runnable
|
||||
{
|
||||
private static final Log LOG = LogFactory.getLog(HBaseEventHandler.class);
|
||||
// type of event this object represents
|
||||
protected HBaseEventType eventType = HBaseEventType.NONE;
|
||||
// is this a region server or master?
|
||||
protected boolean isRegionServer;
|
||||
// name of the server - this is needed for naming executors in case of tests
|
||||
// where region servers may be co-located.
|
||||
protected String serverName;
|
||||
// listeners that are called before and after an event is processed
|
||||
protected static List<HBaseEventHandlerListener> eventHandlerListeners =
|
||||
Collections.synchronizedList(new ArrayList<HBaseEventHandlerListener>());
|
||||
// static instances needed by the handlers
|
||||
protected static ServerManager serverManager;
|
||||
|
||||
/**
|
||||
* Note that this has to be called first BEFORE the subclass constructors.
|
||||
*
|
||||
* TODO: take out after refactor
|
||||
*/
|
||||
public static void init(ServerManager serverManager) {
|
||||
HBaseEventHandler.serverManager = serverManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* This interface provides hooks to listen to various events received by the
|
||||
* queue. A class implementing this can listen to the updates by calling
|
||||
* registerListener and stop receiving updates by calling unregisterListener
|
||||
*/
|
||||
public interface HBaseEventHandlerListener {
|
||||
/**
|
||||
* Called before any event is processed
|
||||
*/
|
||||
public void beforeProcess(HBaseEventHandler event);
|
||||
/**
|
||||
* Called after any event is processed
|
||||
*/
|
||||
public void afterProcess(HBaseEventHandler event);
|
||||
}
|
||||
|
||||
/**
|
||||
* These are a list of HBase events that can be handled by the various
|
||||
* HBaseExecutorService's. All the events are serialized as byte values.
|
||||
*/
|
||||
public enum HBaseEventType {
|
||||
NONE (-1),
|
||||
// Messages originating from RS (NOTE: there is NO direct communication from
|
||||
// RS to Master). These are a result of RS updates into ZK.
|
||||
RS2ZK_REGION_CLOSING (1), // RS is in process of closing a region
|
||||
RS2ZK_REGION_CLOSED (2), // RS has finished closing a region
|
||||
RS2ZK_REGION_OPENING (3), // RS is in process of opening a region
|
||||
RS2ZK_REGION_OPENED (4), // RS has finished opening a region
|
||||
|
||||
// Updates from master to ZK. This is done by the master and there is
|
||||
// nothing to process by either Master or RS
|
||||
M2ZK_REGION_OFFLINE (50); // Master adds this region as offline in ZK
|
||||
|
||||
private final byte value;
|
||||
|
||||
/**
|
||||
* Called by the HMaster. Returns a name of the executor service given an
|
||||
* event type. Every event type has en entry - if the event should not be
|
||||
* handled just add the NONE executor.
|
||||
* @return name of the executor service
|
||||
*/
|
||||
public HBaseExecutorServiceType getMasterExecutorForEvent() {
|
||||
HBaseExecutorServiceType executorServiceType = null;
|
||||
switch(this) {
|
||||
|
||||
case RS2ZK_REGION_CLOSING:
|
||||
case RS2ZK_REGION_CLOSED:
|
||||
executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION;
|
||||
break;
|
||||
|
||||
case RS2ZK_REGION_OPENING:
|
||||
case RS2ZK_REGION_OPENED:
|
||||
executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION;
|
||||
break;
|
||||
|
||||
case M2ZK_REGION_OFFLINE:
|
||||
executorServiceType = HBaseExecutorServiceType.NONE;
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new RuntimeException("Unhandled event type in the master.");
|
||||
}
|
||||
|
||||
return executorServiceType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the RegionServer. Returns a name of the executor service given an
|
||||
* event type. Every event type has en entry - if the event should not be
|
||||
* handled just return a null executor name.
|
||||
* @return name of the event service
|
||||
*/
|
||||
public static String getRSExecutorForEvent(String serverName) {
|
||||
throw new RuntimeException("Unsupported operation.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the executor service that handles the passed in event type. The
|
||||
* server that starts these event executor services wants to handle these
|
||||
* event types.
|
||||
*/
|
||||
public void startMasterExecutorService(String serverName) {
|
||||
HBaseExecutorServiceType serviceType = getMasterExecutorForEvent();
|
||||
if(serviceType == HBaseExecutorServiceType.NONE) {
|
||||
throw new RuntimeException("Event type " + toString() + " not handled on master.");
|
||||
}
|
||||
serviceType.startExecutorService(serverName);
|
||||
}
|
||||
|
||||
public static void startRSExecutorService() {
|
||||
|
||||
}
|
||||
|
||||
HBaseEventType(int intValue) {
|
||||
this.value = (byte)intValue;
|
||||
}
|
||||
|
||||
public byte getByteValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public static HBaseEventType fromByte(byte value) {
|
||||
switch(value) {
|
||||
case -1: return HBaseEventType.NONE;
|
||||
case 1 : return HBaseEventType.RS2ZK_REGION_CLOSING;
|
||||
case 2 : return HBaseEventType.RS2ZK_REGION_CLOSED;
|
||||
case 3 : return HBaseEventType.RS2ZK_REGION_OPENING;
|
||||
case 4 : return HBaseEventType.RS2ZK_REGION_OPENED;
|
||||
case 50: return HBaseEventType.M2ZK_REGION_OFFLINE;
|
||||
|
||||
default:
|
||||
throw new RuntimeException("Invalid byte value for conversion to HBaseEventType");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Default base class constructor.
|
||||
*
|
||||
* TODO: isRegionServer and serverName will go away once we do the HMaster
|
||||
* refactor. We will end up passing a ServerStatus which should tell us both
|
||||
* the name and if it is a RS or master.
|
||||
*/
|
||||
public HBaseEventHandler(boolean isRegionServer, String serverName, HBaseEventType eventType) {
|
||||
this.isRegionServer = isRegionServer;
|
||||
this.eventType = eventType;
|
||||
this.serverName = serverName;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a wrapper around process, used to update listeners before and after
|
||||
* events are processed.
|
||||
*/
|
||||
public void run() {
|
||||
// fire all beforeProcess listeners
|
||||
for(HBaseEventHandlerListener listener : eventHandlerListeners) {
|
||||
listener.beforeProcess(this);
|
||||
}
|
||||
|
||||
// call the main process function
|
||||
process();
|
||||
|
||||
// fire all afterProcess listeners
|
||||
for(HBaseEventHandlerListener listener : eventHandlerListeners) {
|
||||
LOG.debug("Firing " + listener.getClass().getName() +
|
||||
".afterProcess event listener for event " + eventType);
|
||||
listener.afterProcess(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is the main processing loop to be implemented by the various
|
||||
* subclasses.
|
||||
*/
|
||||
public abstract void process();
|
||||
|
||||
/**
|
||||
* Subscribe to updates before and after processing events
|
||||
*/
|
||||
public static void registerListener(HBaseEventHandlerListener listener) {
|
||||
eventHandlerListeners.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop receiving updates before and after processing events
|
||||
*/
|
||||
public static void unregisterListener(HBaseEventHandlerListener listener) {
|
||||
eventHandlerListeners.remove(listener);
|
||||
}
|
||||
|
||||
public boolean isRegionServer() {
|
||||
return isRegionServer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the name for this event type.
|
||||
* @return
|
||||
*/
|
||||
public HBaseExecutorServiceType getEventHandlerName() {
|
||||
// TODO: check for isRegionServer here
|
||||
return eventType.getMasterExecutorForEvent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the event type
|
||||
* @return
|
||||
*/
|
||||
public HBaseEventType getHBEvent() {
|
||||
return eventType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits this event object to the correct executor service. This is causes
|
||||
* this object to get executed by the correct ExecutorService.
|
||||
*/
|
||||
public void submit() {
|
||||
HBaseExecutorServiceType serviceType = getEventHandlerName();
|
||||
if(serviceType == null) {
|
||||
throw new RuntimeException("Event " + eventType + " not handled on this server " + serverName);
|
||||
}
|
||||
serviceType.getExecutor(serverName).submit(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes this event object in the caller's thread. This is a synchronous
|
||||
* way of executing the event.
|
||||
*/
|
||||
public void execute() {
|
||||
this.run();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.executor;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* This is a generic HBase executor service. This component abstract a
|
||||
* threadpool, a queue to which jobs can be submitted and a Runnable that
|
||||
* handles the object that is added to the queue.
|
||||
*
|
||||
* In order to create a new HBExecutorService, you need to do:
|
||||
* HBExecutorService.startExecutorService("myService");
|
||||
*
|
||||
* In order to use the service created above, you need to override the
|
||||
* HBEventHandler class and create an event type that submits to this service.
|
||||
*
|
||||
*/
|
||||
public class HBaseExecutorService
|
||||
{
|
||||
private static final Log LOG = LogFactory.getLog(HBaseExecutorService.class);
|
||||
// default number of threads in the pool
|
||||
private int corePoolSize = 1;
|
||||
// max number of threads - maximum concurrency
|
||||
private int maximumPoolSize = 5;
|
||||
// how long to retain excess threads
|
||||
private long keepAliveTimeInMillis = 1000;
|
||||
// the thread pool executor that services the requests
|
||||
ThreadPoolExecutor threadPoolExecutor;
|
||||
// work queue to use - unbounded queue
|
||||
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
|
||||
// name for this executor service
|
||||
String name;
|
||||
// hold the all the executors created in a map addressable by their names
|
||||
static Map<String, HBaseExecutorService> executorServicesMap =
|
||||
Collections.synchronizedMap(new HashMap<String, HBaseExecutorService>());
|
||||
|
||||
|
||||
/**
|
||||
* The following is a list of names for the various executor services in both
|
||||
* the master and the region server.
|
||||
*/
|
||||
public enum HBaseExecutorServiceType {
|
||||
NONE (-1),
|
||||
MASTER_CLOSEREGION (1),
|
||||
MASTER_OPENREGION (2);
|
||||
|
||||
private final int value;
|
||||
|
||||
HBaseExecutorServiceType(int intValue) {
|
||||
this.value = intValue;
|
||||
}
|
||||
|
||||
public void startExecutorService(String serverName) {
|
||||
// if this is NONE then there is no executor to start
|
||||
if(value == NONE.value) {
|
||||
throw new RuntimeException("Cannot start NONE executor type.");
|
||||
}
|
||||
String name = getExecutorName(serverName);
|
||||
if(HBaseExecutorService.isExecutorServiceRunning(name)) {
|
||||
LOG.debug("Executor service " + toString() + " already running on " + serverName);
|
||||
return;
|
||||
}
|
||||
HBaseExecutorService.startExecutorService(name);
|
||||
}
|
||||
|
||||
public HBaseExecutorService getExecutor(String serverName) {
|
||||
// if this is NONE then there is no executor
|
||||
if(value == NONE.value) {
|
||||
return null;
|
||||
}
|
||||
return HBaseExecutorService.getExecutorService(getExecutorName(serverName));
|
||||
}
|
||||
|
||||
public String getExecutorName(String serverName) {
|
||||
// if this is NONE then there is no executor
|
||||
if(value == NONE.value) {
|
||||
return null;
|
||||
}
|
||||
return (this.toString() + "-" + serverName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Start an executor service with a given name. If there was a service already
|
||||
* started with the same name, this throws a RuntimeException.
|
||||
* @param name Name of the service to start.
|
||||
*/
|
||||
public static void startExecutorService(String name) {
|
||||
if(executorServicesMap.get(name) != null) {
|
||||
throw new RuntimeException("An executor service with the name " + name + " is already running!");
|
||||
}
|
||||
HBaseExecutorService hbes = new HBaseExecutorService(name);
|
||||
executorServicesMap.put(name, hbes);
|
||||
LOG.debug("Starting executor service: " + name);
|
||||
}
|
||||
|
||||
public static boolean isExecutorServiceRunning(String name) {
|
||||
return (executorServicesMap.containsKey(name));
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is an accessor for all the HBExecutorServices running so far
|
||||
* addressable by name. If there is no such service, then it returns null.
|
||||
*/
|
||||
public static HBaseExecutorService getExecutorService(String name) {
|
||||
HBaseExecutorService executor = executorServicesMap.get(name);
|
||||
if(executor == null) {
|
||||
LOG.debug("Executor service [" + name + "] not found.");
|
||||
}
|
||||
return executor;
|
||||
}
|
||||
|
||||
public static void shutdown() {
|
||||
for(Entry<String, HBaseExecutorService> entry : executorServicesMap.entrySet()) {
|
||||
entry.getValue().threadPoolExecutor.shutdown();
|
||||
}
|
||||
executorServicesMap.clear();
|
||||
}
|
||||
|
||||
protected HBaseExecutorService(String name) {
|
||||
this.name = name;
|
||||
// create the thread pool executor
|
||||
threadPoolExecutor = new ThreadPoolExecutor(
|
||||
corePoolSize,
|
||||
maximumPoolSize,
|
||||
keepAliveTimeInMillis,
|
||||
TimeUnit.MILLISECONDS,
|
||||
workQueue
|
||||
);
|
||||
// name the threads for this threadpool
|
||||
threadPoolExecutor.setThreadFactory(new NamedThreadFactory(name));
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit the event to the queue for handling.
|
||||
* @param event
|
||||
*/
|
||||
public void submit(Runnable event) {
|
||||
threadPoolExecutor.execute(event);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.executor;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Returns a named thread with a specified prefix.
|
||||
*
|
||||
* TODO: Use guava (com.google.common.util.concurrent.NamingThreadFactory)
|
||||
*/
|
||||
public class NamedThreadFactory implements ThreadFactory
|
||||
{
|
||||
private String threadPrefix;
|
||||
private AtomicInteger threadId = new AtomicInteger(0);
|
||||
|
||||
public NamedThreadFactory(String threadPrefix) {
|
||||
this.threadPrefix = threadPrefix;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
return new Thread(r, threadPrefix + "-" + threadId.incrementAndGet());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.executor;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
public class RegionTransitionEventData implements Writable {
|
||||
private HBaseEventType hbEvent;
|
||||
private String rsName;
|
||||
private long timeStamp;
|
||||
private HMsg hmsg;
|
||||
|
||||
public RegionTransitionEventData() {
|
||||
}
|
||||
|
||||
public RegionTransitionEventData(HBaseEventType hbEvent, String rsName) {
|
||||
this(hbEvent, rsName, null);
|
||||
}
|
||||
|
||||
public RegionTransitionEventData(HBaseEventType hbEvent, String rsName, HMsg hmsg) {
|
||||
this.hbEvent = hbEvent;
|
||||
this.rsName = rsName;
|
||||
this.timeStamp = System.currentTimeMillis();
|
||||
this.hmsg = hmsg;
|
||||
}
|
||||
|
||||
public HBaseEventType getHbEvent() {
|
||||
return hbEvent;
|
||||
}
|
||||
|
||||
public String getRsName() {
|
||||
return rsName;
|
||||
}
|
||||
|
||||
public long getTimeStamp() {
|
||||
return timeStamp;
|
||||
}
|
||||
|
||||
public HMsg getHmsg() {
|
||||
return hmsg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
// the event type byte
|
||||
hbEvent = HBaseEventType.fromByte(in.readByte());
|
||||
// the hostname of the RS sending the data
|
||||
rsName = in.readUTF();
|
||||
// the timestamp
|
||||
timeStamp = in.readLong();
|
||||
if(in.readBoolean()) {
|
||||
// deserialized the HMsg from ZK
|
||||
hmsg = new HMsg();
|
||||
hmsg.readFields(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeByte(hbEvent.getByteValue());
|
||||
out.writeUTF(rsName);
|
||||
out.writeLong(System.currentTimeMillis());
|
||||
out.writeBoolean((hmsg != null));
|
||||
if(hmsg != null) {
|
||||
hmsg.write(out);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -19,6 +19,23 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.RuntimeMXBean;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -48,6 +65,9 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.ServerConnection;
|
||||
import org.apache.hadoop.hbase.client.ServerConnectionManager;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
||||
import org.apache.hadoop.hbase.executor.HBaseExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
|
||||
|
@ -77,23 +97,6 @@ import org.apache.zookeeper.Watcher;
|
|||
import org.apache.zookeeper.Watcher.Event.EventType;
|
||||
import org.apache.zookeeper.Watcher.Event.KeeperState;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.RuntimeMXBean;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* HMaster is the "master server" for HBase. An HBase cluster has one active
|
||||
* master. If many masters are started, all compete. Whichever wins goes on to
|
||||
|
@ -198,14 +201,31 @@ public class HMaster extends Thread implements HMasterInterface,
|
|||
// We'll succeed if we are only master or if we win the race when many
|
||||
// masters. Otherwise we park here inside in writeAddressToZooKeeper.
|
||||
// TODO: Bring up the UI to redirect to active Master.
|
||||
this.zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
|
||||
zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, HMaster.class.getName());
|
||||
zooKeeperWrapper.registerListener(this);
|
||||
this.zkMasterAddressWatcher =
|
||||
new ZKMasterAddressWatcher(this.zooKeeperWrapper, this.shutdownRequested);
|
||||
zooKeeperWrapper.registerListener(zkMasterAddressWatcher);
|
||||
this.zkMasterAddressWatcher.writeAddressToZooKeeper(this.address, true);
|
||||
this.regionServerOperationQueue =
|
||||
new RegionServerOperationQueue(this.conf, this.closed);
|
||||
|
||||
serverManager = new ServerManager(this);
|
||||
|
||||
|
||||
// Start the unassigned watcher - which will create the unassgined region
|
||||
// in ZK. This is needed before RegionManager() constructor tries to assign
|
||||
// the root region.
|
||||
ZKUnassignedWatcher.start();
|
||||
// init the various event handlers
|
||||
HBaseEventHandler.init(serverManager);
|
||||
// start the "close region" executor service
|
||||
HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(MASTER);
|
||||
// start the "open region" executor service
|
||||
HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(MASTER);
|
||||
|
||||
|
||||
// start the region manager
|
||||
regionManager = new RegionManager(this);
|
||||
|
||||
setName(MASTER);
|
||||
|
@ -411,7 +431,7 @@ public class HMaster extends Thread implements HMasterInterface,
|
|||
return this.serverManager.getAverageLoad();
|
||||
}
|
||||
|
||||
RegionServerOperationQueue getRegionServerOperationQueue () {
|
||||
public RegionServerOperationQueue getRegionServerOperationQueue () {
|
||||
return this.regionServerOperationQueue;
|
||||
}
|
||||
|
||||
|
@ -491,6 +511,7 @@ public class HMaster extends Thread implements HMasterInterface,
|
|||
this.rpcServer.stop();
|
||||
this.regionManager.stop();
|
||||
this.zooKeeperWrapper.close();
|
||||
HBaseExecutorService.shutdown();
|
||||
LOG.info("HMaster main thread exiting");
|
||||
}
|
||||
|
||||
|
@ -1118,7 +1139,9 @@ public class HMaster extends Thread implements HMasterInterface,
|
|||
*/
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
LOG.debug(("Event " + event.getType() + " with path " + event.getPath()));
|
||||
LOG.debug("Event " + event.getType() +
|
||||
" with state " + event.getState() +
|
||||
" with path " + event.getPath());
|
||||
// Master should kill itself if its session expired or if its
|
||||
// znode was deleted manually (usually for testing purposes)
|
||||
if(event.getState() == KeeperState.Expired ||
|
||||
|
@ -1132,7 +1155,8 @@ public class HMaster extends Thread implements HMasterInterface,
|
|||
|
||||
zooKeeperWrapper.close();
|
||||
try {
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
|
||||
zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, HMaster.class.getName());
|
||||
zooKeeperWrapper.registerListener(this);
|
||||
this.zkMasterAddressWatcher.setZookeeper(zooKeeperWrapper);
|
||||
if(!this.zkMasterAddressWatcher.
|
||||
writeAddressToZooKeeper(this.address,false)) {
|
||||
|
|
|
@ -32,7 +32,7 @@ import java.io.IOException;
|
|||
* or deleted doesn't actually require post processing, it's no longer
|
||||
* necessary.
|
||||
*/
|
||||
class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||
public class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||
protected final boolean offlineRegion;
|
||||
protected final boolean reassignRegion;
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HServerInfo;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -34,7 +35,7 @@ import java.io.IOException;
|
|||
* serving a region. This applies to all meta and user regions except the
|
||||
* root region which is handled specially.
|
||||
*/
|
||||
class ProcessRegionOpen extends ProcessRegionStatusChange {
|
||||
public class ProcessRegionOpen extends ProcessRegionStatusChange {
|
||||
protected final HServerInfo serverInfo;
|
||||
|
||||
/**
|
||||
|
@ -114,6 +115,8 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
|||
} else {
|
||||
master.getRegionManager().removeRegion(regionInfo);
|
||||
}
|
||||
ZooKeeperWrapper zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName());
|
||||
zkWrapper.deleteUnassignedRegion(regionInfo.getEncodedName());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,4 +78,8 @@ abstract class ProcessRegionStatusChange extends RegionServerOperation {
|
|||
}
|
||||
return this.metaRegion;
|
||||
}
|
||||
|
||||
public HRegionInfo getRegionInfo() {
|
||||
return regionInfo;
|
||||
}
|
||||
}
|
|
@ -32,6 +32,9 @@ import org.apache.hadoop.hbase.HServerAddress;
|
|||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HServerLoad;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
|
@ -39,6 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -94,6 +99,9 @@ public class RegionManager {
|
|||
*/
|
||||
final SortedMap<String, RegionState> regionsInTransition =
|
||||
Collections.synchronizedSortedMap(new TreeMap<String, RegionState>());
|
||||
|
||||
// regions in transition are also recorded in ZK using the zk wrapper
|
||||
final ZooKeeperWrapper zkWrapper;
|
||||
|
||||
// How many regions to assign a server at a time.
|
||||
private final int maxAssignInOneGo;
|
||||
|
@ -124,10 +132,11 @@ public class RegionManager {
|
|||
private final int zooKeeperNumRetries;
|
||||
private final int zooKeeperPause;
|
||||
|
||||
RegionManager(HMaster master) {
|
||||
RegionManager(HMaster master) throws IOException {
|
||||
Configuration conf = master.getConfiguration();
|
||||
|
||||
this.master = master;
|
||||
this.zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName());
|
||||
this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10);
|
||||
this.loadBalancer = new LoadBalancer(conf);
|
||||
|
||||
|
@ -165,10 +174,17 @@ public class RegionManager {
|
|||
unsetRootRegion();
|
||||
if (!master.getShutdownRequested().get()) {
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO,
|
||||
RegionState.State.UNASSIGNED);
|
||||
regionsInTransition.put(
|
||||
HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(), s);
|
||||
String regionName = HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString();
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
|
||||
}
|
||||
zkWrapper.createUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data);
|
||||
LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
|
||||
RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, RegionState.State.UNASSIGNED);
|
||||
regionsInTransition.put(regionName, s);
|
||||
LOG.info("ROOT inserted into regionsInTransition");
|
||||
}
|
||||
}
|
||||
|
@ -330,6 +346,14 @@ public class RegionManager {
|
|||
LOG.info("Assigning region " + regionName + " to " + sinfo.getServerName());
|
||||
rs.setPendingOpen(sinfo.getServerName());
|
||||
synchronized (this.regionsInTransition) {
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
|
||||
}
|
||||
zkWrapper.createUnassignedRegion(rs.getRegionInfo().getEncodedName(), data);
|
||||
LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
|
||||
this.regionsInTransition.put(regionName, rs);
|
||||
}
|
||||
|
||||
|
@ -969,6 +993,16 @@ public class RegionManager {
|
|||
synchronized(this.regionsInTransition) {
|
||||
s = regionsInTransition.get(info.getRegionNameAsString());
|
||||
if (s == null) {
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
|
||||
} catch (IOException e) {
|
||||
// TODO: Review what we should do here. If Writables work this
|
||||
// should never happen
|
||||
LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
|
||||
}
|
||||
zkWrapper.createUnassignedRegion(info.getEncodedName(), data);
|
||||
LOG.debug("Created UNASSIGNED zNode " + info.getRegionNameAsString() + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
|
||||
s = new RegionState(info, RegionState.State.UNASSIGNED);
|
||||
regionsInTransition.put(info.getRegionNameAsString(), s);
|
||||
}
|
||||
|
@ -1213,8 +1247,9 @@ public class RegionManager {
|
|||
*/
|
||||
public void setRootRegionLocation(HServerAddress address) {
|
||||
writeRootRegionLocationToZooKeeper(address);
|
||||
|
||||
synchronized (rootRegionLocation) {
|
||||
// the root region has been assigned, remove it from transition in ZK
|
||||
zkWrapper.deleteUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName());
|
||||
rootRegionLocation.set(new HServerAddress(address));
|
||||
rootRegionLocation.notifyAll();
|
||||
}
|
||||
|
|
|
@ -564,7 +564,7 @@ public class ServerManager {
|
|||
* @param region
|
||||
* @param returnMsgs
|
||||
*/
|
||||
private void processRegionOpen(HServerInfo serverInfo,
|
||||
public void processRegionOpen(HServerInfo serverInfo,
|
||||
HRegionInfo region, ArrayList<HMsg> returnMsgs) {
|
||||
boolean duplicateAssignment = false;
|
||||
synchronized (master.getRegionManager()) {
|
||||
|
@ -633,7 +633,7 @@ public class ServerManager {
|
|||
* @param region
|
||||
* @throws Exception
|
||||
*/
|
||||
private void processRegionClose(HRegionInfo region) {
|
||||
public void processRegionClose(HRegionInfo region) {
|
||||
synchronized (this.master.getRegionManager()) {
|
||||
if (region.isRootRegion()) {
|
||||
// Root region
|
||||
|
|
|
@ -74,7 +74,7 @@ class ZKMasterAddressWatcher implements Watcher {
|
|||
} else if(type.equals(EventType.NodeCreated) &&
|
||||
event.getPath().equals(this.zookeeper.clusterStateZNode)) {
|
||||
LOG.debug("Resetting watch on cluster state node.");
|
||||
this.zookeeper.setClusterStateWatch(this);
|
||||
this.zookeeper.setClusterStateWatch();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,7 +87,7 @@ class ZKMasterAddressWatcher implements Watcher {
|
|||
try {
|
||||
LOG.debug("Waiting for master address ZNode to be deleted " +
|
||||
"(Also watching cluster state node)");
|
||||
this.zookeeper.setClusterStateWatch(this);
|
||||
this.zookeeper.setClusterStateWatch();
|
||||
wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ class ZKMasterAddressWatcher implements Watcher {
|
|||
}
|
||||
if(this.zookeeper.writeMasterAddress(address)) {
|
||||
this.zookeeper.setClusterState(true);
|
||||
this.zookeeper.setClusterStateWatch(this);
|
||||
this.zookeeper.setClusterStateWatch();
|
||||
// Watch our own node
|
||||
this.zookeeper.readMasterAddress(this);
|
||||
return true;
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
|
||||
import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler;
|
||||
import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper.ZNodePathAndData;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.Watcher.Event.EventType;
|
||||
|
||||
/**
|
||||
* Watches the UNASSIGNED znode in ZK for the master, and handles all events
|
||||
* relating to region transitions.
|
||||
*/
|
||||
public class ZKUnassignedWatcher implements Watcher {
|
||||
private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class);
|
||||
|
||||
// TODO: Start move this to HConstants
|
||||
static final String ROOT_TABLE_NAME_STR = "-ROOT-";
|
||||
static final String META_TABLE_NAME_STR = ".META.";
|
||||
// TODO: End move this to HConstants
|
||||
|
||||
private ZooKeeperWrapper zkWrapper = null;
|
||||
|
||||
public static void start() throws IOException {
|
||||
new ZKUnassignedWatcher();
|
||||
LOG.debug("Started ZKUnassigned watcher");
|
||||
}
|
||||
|
||||
public ZKUnassignedWatcher() throws IOException {
|
||||
zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName());
|
||||
// If the UNASSIGNED ZNode does not exist, create it.
|
||||
zkWrapper.createZNodeIfNotExists(zkWrapper.getRegionInTransitionZNode());
|
||||
// TODO: get the outstanding changes in UNASSIGNED
|
||||
|
||||
// Set a watch on Zookeeper's UNASSIGNED node if it exists.
|
||||
zkWrapper.registerListener(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the processing loop that gets triggerred from the ZooKeeperWrapper.
|
||||
* This zookeeper events process function dies the following:
|
||||
* - WATCHES the following events: NodeCreated, NodeDataChanged, NodeChildrenChanged
|
||||
* - IGNORES the following events: None, NodeDeleted
|
||||
*/
|
||||
@Override
|
||||
public synchronized void process(WatchedEvent event) {
|
||||
EventType type = event.getType();
|
||||
LOG.debug("ZK-EVENT-PROCESS: Got zkEvent " + type +
|
||||
" state:" + event.getState() +
|
||||
" path:" + event.getPath());
|
||||
|
||||
// Handle the ignored events
|
||||
if(type.equals(EventType.None) ||
|
||||
type.equals(EventType.NodeDeleted)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// check if the path is for the UNASSIGNED directory we care about
|
||||
if(event.getPath() == null ||
|
||||
!event.getPath().startsWith(zkWrapper.getZNodePathForHBase(zkWrapper.getRegionInTransitionZNode()))) {
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
/*
|
||||
* If a node is created in the UNASSIGNED directory in zookeeper, then:
|
||||
* 1. watch its updates (this is an unassigned region).
|
||||
* 2. read to see what its state is and handle as needed (state may have
|
||||
* changed before we started watching it)
|
||||
*/
|
||||
if(type.equals(EventType.NodeCreated)) {
|
||||
zkWrapper.watchZNode(event.getPath());
|
||||
handleRegionStateInZK(event.getPath());
|
||||
}
|
||||
/*
|
||||
* Data on some node has changed. Read to see what the state is and handle
|
||||
* as needed.
|
||||
*/
|
||||
else if(type.equals(EventType.NodeDataChanged)) {
|
||||
handleRegionStateInZK(event.getPath());
|
||||
}
|
||||
/*
|
||||
* If there were some nodes created then watch those nodes
|
||||
*/
|
||||
else if(type.equals(EventType.NodeChildrenChanged)) {
|
||||
List<ZNodePathAndData> newZNodes = zkWrapper.watchAndGetNewChildren(event.getPath());
|
||||
for(ZNodePathAndData zNodePathAndData : newZNodes) {
|
||||
LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath());
|
||||
handleRegionStateInZK(zNodePathAndData.getzNodePath(), zNodePathAndData.getData());
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.error("Could not process event from ZooKeeper", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the state of a node in ZK, and do the needful. We want to do the
|
||||
* following:
|
||||
* 1. If region's state is updated as CLOSED, invoke the ClosedRegionHandler.
|
||||
* 2. If region's state is updated as OPENED, invoke the OpenRegionHandler.
|
||||
* @param zNodePath
|
||||
* @throws IOException
|
||||
*/
|
||||
private void handleRegionStateInZK(String zNodePath) throws IOException {
|
||||
byte[] data = zkWrapper.readZNode(zNodePath, null);
|
||||
handleRegionStateInZK(zNodePath, data);
|
||||
}
|
||||
|
||||
private void handleRegionStateInZK(String zNodePath, byte[] data) {
|
||||
// a null value is set when a node is created, we don't need to handle this
|
||||
if(data == null) {
|
||||
return;
|
||||
}
|
||||
String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode();
|
||||
String region = zNodePath.substring(zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1);
|
||||
HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
|
||||
|
||||
// if the node was CLOSED then handle it
|
||||
if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) {
|
||||
new MasterCloseRegionHandler(rsEvent, region, data).submit();
|
||||
}
|
||||
// if the region was OPENED then handle that
|
||||
else if(rsEvent == HBaseEventType.RS2ZK_REGION_OPENED ||
|
||||
rsEvent == HBaseEventType.RS2ZK_REGION_OPENING) {
|
||||
new MasterOpenRegionHandler(rsEvent, region, data).submit();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.handler;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* This is the event handler for all events relating to closing regions on the
|
||||
* HMaster. The following event types map to this handler:
|
||||
* - RS_REGION_CLOSING
|
||||
* - RS_REGION_CLOSED
|
||||
*/
|
||||
public class MasterCloseRegionHandler extends HBaseEventHandler
|
||||
{
|
||||
private static final Log LOG = LogFactory.getLog(MasterCloseRegionHandler.class);
|
||||
|
||||
private String regionName;
|
||||
protected byte[] serializedData;
|
||||
RegionTransitionEventData hbEventData;
|
||||
|
||||
public MasterCloseRegionHandler(HBaseEventType eventType, String regionName, byte[] serializedData) {
|
||||
super(false, HMaster.MASTER, eventType);
|
||||
this.regionName = regionName;
|
||||
this.serializedData = serializedData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the various events relating to closing regions. We can get the
|
||||
* following events here:
|
||||
* - RS_REGION_CLOSING : No-op
|
||||
* - RS_REGION_CLOSED : The region is closed. If we are not in a shutdown
|
||||
* state, find the RS to open this region. This could
|
||||
* be a part of a region move, or just that the RS has
|
||||
* died. Should result in a M_REQUEST_OPENREGION event
|
||||
* getting created.
|
||||
*/
|
||||
@Override
|
||||
public void process()
|
||||
{
|
||||
LOG.debug("Event = " + getHBEvent() + ", region = " + regionName);
|
||||
// handle RS_REGION_CLOSED events
|
||||
handleRegionClosedEvent();
|
||||
}
|
||||
|
||||
private void handleRegionClosedEvent() {
|
||||
try {
|
||||
if(hbEventData == null) {
|
||||
hbEventData = new RegionTransitionEventData();
|
||||
Writables.getWritable(serializedData, hbEventData);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Could not deserialize additional args for Close region", e);
|
||||
}
|
||||
// process the region close - this will cause the reopening of the
|
||||
// region as a part of the heartbeat of some RS
|
||||
serverManager.processRegionClose(hbEventData.getHmsg().getRegionInfo());
|
||||
LOG.info("Processed close of region " + hbEventData.getHmsg().getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
|
||||
public String getRegionName() {
|
||||
return regionName;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.handler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* This is the event handler for all events relating to opening regions on the
|
||||
* HMaster. This could be one of the following:
|
||||
* - notification that a region server is "OPENING" a region
|
||||
* - notification that a region server has "OPENED" a region
|
||||
* The following event types map to this handler:
|
||||
* - RS_REGION_OPENING
|
||||
* - RS_REGION_OPENED
|
||||
*/
|
||||
public class MasterOpenRegionHandler extends HBaseEventHandler {
|
||||
private static final Log LOG = LogFactory.getLog(MasterOpenRegionHandler.class);
|
||||
// other args passed in a byte array form
|
||||
protected byte[] serializedData;
|
||||
private String regionName;
|
||||
private RegionTransitionEventData hbEventData;
|
||||
|
||||
public MasterOpenRegionHandler(HBaseEventType eventType, String regionName, byte[] serData) {
|
||||
super(false, HMaster.MASTER, eventType);
|
||||
this.regionName = regionName;
|
||||
this.serializedData = serData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the various events relating to opening regions. We can get the
|
||||
* following events here:
|
||||
* - RS_REGION_OPENING : Keep track to see how long the region open takes.
|
||||
* If the RS is taking too long, then revert the
|
||||
* region back to closed state so that it can be
|
||||
* re-assigned.
|
||||
* - RS_REGION_OPENED : The region is opened. Add an entry into META for
|
||||
* the RS having opened this region. Then delete this
|
||||
* entry in ZK.
|
||||
*/
|
||||
@Override
|
||||
public void process()
|
||||
{
|
||||
LOG.debug("Event = " + getHBEvent() + ", region = " + regionName);
|
||||
if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENING) {
|
||||
handleRegionOpeningEvent();
|
||||
}
|
||||
else if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENED) {
|
||||
handleRegionOpenedEvent();
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRegionOpeningEvent() {
|
||||
// TODO: not implemented.
|
||||
LOG.debug("NO-OP call to handling region opening event");
|
||||
// Keep track to see how long the region open takes. If the RS is taking too
|
||||
// long, then revert the region back to closed state so that it can be
|
||||
// re-assigned.
|
||||
}
|
||||
|
||||
private void handleRegionOpenedEvent() {
|
||||
try {
|
||||
if(hbEventData == null) {
|
||||
hbEventData = new RegionTransitionEventData();
|
||||
Writables.getWritable(serializedData, hbEventData);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Could not deserialize additional args for Open region", e);
|
||||
}
|
||||
LOG.debug("RS " + hbEventData.getRsName() + " has opened region " + regionName);
|
||||
HServerInfo serverInfo = serverManager.getServerInfo(hbEventData.getRsName());
|
||||
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
|
||||
serverManager.processRegionOpen(serverInfo, hbEventData.getHmsg().getRegionInfo(), returnMsgs);
|
||||
if(returnMsgs.size() > 0) {
|
||||
LOG.error("Open region tried to send message: " + returnMsgs.get(0).getType() +
|
||||
" about " + returnMsgs.get(0).getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -319,7 +319,8 @@ public class HRegionServer implements HRegionInterface,
|
|||
}
|
||||
|
||||
private void reinitializeZooKeeper() throws IOException {
|
||||
zooKeeperWrapper = new ZooKeeperWrapper(conf, this);
|
||||
zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, serverInfo.getServerName());
|
||||
zooKeeperWrapper.registerListener(this);
|
||||
watchMasterAddress();
|
||||
}
|
||||
|
||||
|
@ -1217,14 +1218,7 @@ public class HRegionServer implements HRegionInterface,
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("sending initial server load: " + hsl);
|
||||
lastMsg = System.currentTimeMillis();
|
||||
boolean startCodeOk = false;
|
||||
while(!startCodeOk) {
|
||||
this.serverInfo = createServerInfoWithNewStartCode(this.serverInfo);
|
||||
startCodeOk = zooKeeperWrapper.writeRSLocation(this.serverInfo);
|
||||
if(!startCodeOk) {
|
||||
LOG.debug("Start code already taken, trying another one");
|
||||
}
|
||||
}
|
||||
zooKeeperWrapper.writeRSLocation(this.serverInfo);
|
||||
result = this.hbaseMaster.regionServerStartup(this.serverInfo);
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
|
@ -1419,8 +1413,11 @@ public class HRegionServer implements HRegionInterface,
|
|||
void openRegion(final HRegionInfo regionInfo) {
|
||||
Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
|
||||
HRegion region = this.onlineRegions.get(mapKey);
|
||||
RSZookeeperUpdater zkUpdater =
|
||||
new RSZookeeperUpdater(serverInfo.getServerName(), regionInfo.getEncodedName());
|
||||
if (region == null) {
|
||||
try {
|
||||
zkUpdater.startRegionOpenEvent(null, true);
|
||||
region = instantiateRegion(regionInfo);
|
||||
// Startup a compaction early if one is needed, if region has references
|
||||
// or if a store has too many store files
|
||||
|
@ -1435,7 +1432,15 @@ public class HRegionServer implements HRegionInterface,
|
|||
// TODO: add an extra field in HRegionInfo to indicate that there is
|
||||
// an error. We can't do that now because that would be an incompatible
|
||||
// change that would require a migration
|
||||
reportClose(regionInfo, StringUtils.stringifyException(t).getBytes());
|
||||
try {
|
||||
HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
|
||||
regionInfo,
|
||||
StringUtils.stringifyException(t).getBytes());
|
||||
zkUpdater.abortOpenRegion(hmsg);
|
||||
} catch (IOException e1) {
|
||||
// TODO: Can we recover? Should be throw RTE?
|
||||
LOG.error("Failed to abort open region " + regionInfo.getRegionNameAsString(), e1);
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.lock.writeLock().lock();
|
||||
|
@ -1446,7 +1451,12 @@ public class HRegionServer implements HRegionInterface,
|
|||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
reportOpen(regionInfo);
|
||||
try {
|
||||
HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo);
|
||||
zkUpdater.finishRegionOpenEvent(hmsg);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to mark region " + regionInfo.getRegionNameAsString() + " as opened", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
|
||||
|
@ -1475,11 +1485,19 @@ public class HRegionServer implements HRegionInterface,
|
|||
|
||||
protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
|
||||
throws IOException {
|
||||
RSZookeeperUpdater zkUpdater = null;
|
||||
if(reportWhenCompleted) {
|
||||
zkUpdater = new RSZookeeperUpdater(serverInfo.getServerName(), hri.getEncodedName());
|
||||
zkUpdater.startRegionCloseEvent(null, false);
|
||||
}
|
||||
HRegion region = this.removeFromOnlineRegions(hri);
|
||||
if (region != null) {
|
||||
region.close();
|
||||
if(reportWhenCompleted) {
|
||||
reportClose(hri);
|
||||
if(zkUpdater != null) {
|
||||
HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, hri, null);
|
||||
zkUpdater.finishRegionCloseEvent(hmsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
/**
|
||||
* This is a helper class for region servers to update various states in
|
||||
* Zookeeper. The various updates are abstracted out here.
|
||||
*
|
||||
* The "startRegionXXX" methods are to be called first, followed by the
|
||||
* "finishRegionXXX" methods. Supports updating zookeeper periodically as a
|
||||
* part of the "startRegionXXX". Currently handles the following state updates:
|
||||
* - Close region
|
||||
* - Open region
|
||||
*/
|
||||
// TODO: make this thread local, in which case it is re-usable per thread
|
||||
public class RSZookeeperUpdater {
|
||||
private static final Log LOG = LogFactory.getLog(RSZookeeperUpdater.class);
|
||||
private final String regionServerName;
|
||||
private String regionName = null;
|
||||
private String regionZNode = null;
|
||||
private ZooKeeperWrapper zkWrapper = null;
|
||||
private int zkVersion = 0;
|
||||
HBaseEventType lastUpdatedState;
|
||||
|
||||
public RSZookeeperUpdater(String regionServerName, String regionName) {
|
||||
this(regionServerName, regionName, 0);
|
||||
}
|
||||
|
||||
public RSZookeeperUpdater(String regionServerName, String regionName, int zkVersion) {
|
||||
this.zkWrapper = ZooKeeperWrapper.getInstance(regionServerName);
|
||||
this.regionServerName = regionServerName;
|
||||
this.regionName = regionName;
|
||||
// get the region ZNode we have to create
|
||||
this.regionZNode = zkWrapper.getZNode(zkWrapper.getRegionInTransitionZNode(), regionName);
|
||||
this.zkVersion = zkVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method updates the various states in ZK to inform the master that the
|
||||
* region server has started closing the region.
|
||||
* @param updatePeriodically - if true, periodically updates the state in ZK
|
||||
*/
|
||||
public void startRegionCloseEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
|
||||
// if this ZNode already exists, something is wrong
|
||||
if(zkWrapper.exists(regionZNode, true)) {
|
||||
String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region.";
|
||||
LOG.error(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
|
||||
// create the region node in the unassigned directory first
|
||||
zkWrapper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true);
|
||||
|
||||
// update the data for "regionName" ZNode in unassigned to CLOSING
|
||||
updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg);
|
||||
|
||||
// TODO: implement the updatePeriodically logic here
|
||||
}
|
||||
|
||||
/**
|
||||
* This method updates the states in ZK to signal that the region has been
|
||||
* closed. This will stop the periodic updater thread if one was started.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void finishRegionCloseEvent(HMsg hmsg) throws IOException {
|
||||
// TODO: stop the updatePeriodically here
|
||||
|
||||
// update the data for "regionName" ZNode in unassigned to CLOSED
|
||||
updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method updates the various states in ZK to inform the master that the
|
||||
* region server has started opening the region.
|
||||
* @param updatePeriodically - if true, periodically updates the state in ZK
|
||||
*/
|
||||
public void startRegionOpenEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
|
||||
Stat stat = new Stat();
|
||||
byte[] data = zkWrapper.readZNode(regionZNode, stat);
|
||||
// if there is no ZNode for this region, something is wrong
|
||||
if(data == null) {
|
||||
String msg = "ZNode " + regionZNode + " does not exist in ZooKeeper, will NOT open region.";
|
||||
LOG.error(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
// if the ZNode is not in the closed state, something is wrong
|
||||
HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
|
||||
if(rsEvent != HBaseEventType.RS2ZK_REGION_CLOSED && rsEvent != HBaseEventType.M2ZK_REGION_OFFLINE) {
|
||||
String msg = "ZNode " + regionZNode + " is not in CLOSED/OFFLINE state (state = " + rsEvent + "), will NOT open region.";
|
||||
LOG.error(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
|
||||
// get the version to update from ZK
|
||||
zkVersion = stat.getVersion();
|
||||
|
||||
// update the data for "regionName" ZNode in unassigned to CLOSING
|
||||
updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENING, hmsg);
|
||||
|
||||
// TODO: implement the updatePeriodically logic here
|
||||
}
|
||||
|
||||
/**
|
||||
* This method updates the states in ZK to signal that the region has been
|
||||
* opened. This will stop the periodic updater thread if one was started.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void finishRegionOpenEvent(HMsg hmsg) throws IOException {
|
||||
// TODO: stop the updatePeriodically here
|
||||
|
||||
// update the data for "regionName" ZNode in unassigned to CLOSED
|
||||
updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENED, hmsg);
|
||||
}
|
||||
|
||||
public boolean isClosingRegion() {
|
||||
return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_CLOSING);
|
||||
}
|
||||
|
||||
public boolean isOpeningRegion() {
|
||||
return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_OPENING);
|
||||
}
|
||||
|
||||
public void abortOpenRegion(HMsg hmsg) throws IOException {
|
||||
LOG.error("Aborting open of region " + regionName);
|
||||
|
||||
// TODO: stop the updatePeriodically for start open region here
|
||||
|
||||
// update the data for "regionName" ZNode in unassigned to CLOSED
|
||||
updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg);
|
||||
}
|
||||
|
||||
private void updateZKWithEventData(HBaseEventType hbEventType, HMsg hmsg) throws IOException {
|
||||
// update the data for "regionName" ZNode in unassigned to "hbEventType"
|
||||
byte[] data = null;
|
||||
try {
|
||||
data = Writables.getBytes(new RegionTransitionEventData(hbEventType, regionServerName, hmsg));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error creating event data for " + hbEventType, e);
|
||||
}
|
||||
LOG.debug("Updating ZNode " + regionZNode +
|
||||
" with [" + hbEventType + "]" +
|
||||
" expected version = " + zkVersion);
|
||||
lastUpdatedState = hbEventType;
|
||||
zkWrapper.writeZNode(regionZNode, data, zkVersion, true);
|
||||
zkVersion++;
|
||||
}
|
||||
}
|
|
@ -220,6 +220,68 @@ public class HQuorumPeer {
|
|||
|
||||
return zkProperties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the ZK Quorum servers string given zk properties returned by
|
||||
* makeZKProps
|
||||
* @param properties
|
||||
* @return
|
||||
*/
|
||||
public static String getZKQuorumServersString(Properties properties) {
|
||||
String clientPort = null;
|
||||
List<String> servers = new ArrayList<String>();
|
||||
|
||||
// The clientPort option may come after the server.X hosts, so we need to
|
||||
// grab everything and then create the final host:port comma separated list.
|
||||
boolean anyValid = false;
|
||||
for (Entry<Object,Object> property : properties.entrySet()) {
|
||||
String key = property.getKey().toString().trim();
|
||||
String value = property.getValue().toString().trim();
|
||||
if (key.equals("clientPort")) {
|
||||
clientPort = value;
|
||||
}
|
||||
else if (key.startsWith("server.")) {
|
||||
String host = value.substring(0, value.indexOf(':'));
|
||||
servers.add(host);
|
||||
try {
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
InetAddress.getByName(host);
|
||||
anyValid = true;
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.warn(StringUtils.stringifyException(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!anyValid) {
|
||||
LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (clientPort == null) {
|
||||
LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (servers.isEmpty()) {
|
||||
LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " +
|
||||
"ZooKeeper cluster configured for its operation.");
|
||||
return null;
|
||||
}
|
||||
|
||||
StringBuilder hostPortBuilder = new StringBuilder();
|
||||
for (int i = 0; i < servers.size(); ++i) {
|
||||
String host = servers.get(i);
|
||||
if (i > 0) {
|
||||
hostPortBuilder.append(',');
|
||||
}
|
||||
hostPortBuilder.append(host);
|
||||
hostPortBuilder.append(':');
|
||||
hostPortBuilder.append(clientPort);
|
||||
}
|
||||
|
||||
return hostPortBuilder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -19,152 +19,225 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.Socket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.apache.zookeeper.ZooKeeper.States;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Wraps a ZooKeeper instance and adds HBase specific functionality.
|
||||
*
|
||||
* This class provides methods to:
|
||||
* - read/write/delete the root region location in ZooKeeper.
|
||||
* - set/check out of safe mode flag.
|
||||
*
|
||||
* ------------------------------------------
|
||||
* The following STATIC ZNodes are created:
|
||||
* ------------------------------------------
|
||||
* - parentZNode : All the HBase directories are hosted under this parent
|
||||
* node, default = "/hbase"
|
||||
* - rsZNode : This is the directory where the RS's create ephemeral
|
||||
* nodes. The master watches these nodes, and their expiry
|
||||
* indicates RS death. The default location is "/hbase/rs"
|
||||
*
|
||||
* ------------------------------------------
|
||||
* The following DYNAMIC ZNodes are created:
|
||||
* ------------------------------------------
|
||||
* - rootRegionZNode : Specifies the RS hosting root.
|
||||
* - masterElectionZNode : ZNode used for election of the primary master when
|
||||
* there are secondaries. All the masters race to write
|
||||
* their addresses into this location, the one that
|
||||
* succeeds is the primary. Others block.
|
||||
* - clusterStateZNode : Determines if the cluster is running. Its default
|
||||
* location is "/hbase/shutdown". It always has a value
|
||||
* of "up". If present with the valus, cluster is up
|
||||
* and running. If deleted, the cluster is shutting
|
||||
* down.
|
||||
* - rgnsInTransitZNode : All the nodes under this node are names of regions
|
||||
* in transition. The first byte of the data for each
|
||||
* of these nodes is the event type. This is used to
|
||||
* deserialize the rest of the data.
|
||||
*/
|
||||
public class ZooKeeperWrapper {
|
||||
public class ZooKeeperWrapper implements Watcher {
|
||||
protected static final Log LOG = LogFactory.getLog(ZooKeeperWrapper.class);
|
||||
|
||||
// instances of the watcher
|
||||
private static Map<String,ZooKeeperWrapper> INSTANCES =
|
||||
new HashMap<String,ZooKeeperWrapper>();
|
||||
// lock for ensuring a singleton per instance type
|
||||
private static Lock createLock = new ReentrantLock();
|
||||
// name of this instance
|
||||
private String instanceName;
|
||||
|
||||
// TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
|
||||
private static final char ZNODE_PATH_SEPARATOR = '/';
|
||||
|
||||
private String quorumServers = null;
|
||||
private final int sessionTimeout;
|
||||
private ZooKeeper zooKeeper;
|
||||
|
||||
private final ZooKeeper zooKeeper;
|
||||
|
||||
private final String parentZNode;
|
||||
/*
|
||||
* All the HBase directories are hosted under this parent
|
||||
*/
|
||||
public final String parentZNode;
|
||||
/*
|
||||
* Specifies the RS hosting root
|
||||
*/
|
||||
private final String rootRegionZNode;
|
||||
/*
|
||||
* This is the directory where the RS's create ephemeral nodes. The master
|
||||
* watches these nodes, and their expiry indicates RS death.
|
||||
*/
|
||||
private final String rsZNode;
|
||||
/*
|
||||
* ZNode used for election of the primary master when there are secondaries.
|
||||
*/
|
||||
private final String masterElectionZNode;
|
||||
/*
|
||||
* State of the cluster - if up and running or shutting down
|
||||
*/
|
||||
public final String clusterStateZNode;
|
||||
/*
|
||||
* Regions that are in transition
|
||||
*/
|
||||
private final String rgnsInTransitZNode;
|
||||
/*
|
||||
* List of ZNodes in the unassgined region that are already being watched
|
||||
*/
|
||||
private Set<String> unassignedZNodesWatched = new HashSet<String>();
|
||||
|
||||
private List<Watcher> listeners = Collections.synchronizedList(new ArrayList<Watcher>());
|
||||
|
||||
// return the singleton given the name of the instance
|
||||
public static ZooKeeperWrapper getInstance(String name) {
|
||||
return INSTANCES.get(name);
|
||||
}
|
||||
// creates only one instance
|
||||
public static ZooKeeperWrapper createInstance(Configuration conf, String name) {
|
||||
if (getInstance(name) != null) {
|
||||
return getInstance(name);
|
||||
}
|
||||
ZooKeeperWrapper.createLock.lock();
|
||||
try {
|
||||
if (getInstance(name) == null) {
|
||||
try {
|
||||
ZooKeeperWrapper instance = new ZooKeeperWrapper(conf, name);
|
||||
INSTANCES.put(name, instance);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error("<" + name + ">" + "Error creating a ZooKeeperWrapper " + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
createLock.unlock();
|
||||
}
|
||||
return getInstance(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a ZooKeeperWrapper.
|
||||
* @param conf Configuration to read settings from.
|
||||
* @param watcher ZooKeeper watcher to register.
|
||||
* Create a ZooKeeperWrapper. The Zookeeper wrapper listens to all messages
|
||||
* from Zookeeper, and notifies all the listeners about all the messages. Any
|
||||
* component can subscribe to these messages by adding itself as a listener,
|
||||
* and remove itself from being a listener.
|
||||
*
|
||||
* @param conf HBaseConfiguration to read settings from.
|
||||
* @throws IOException If a connection error occurs.
|
||||
*/
|
||||
public ZooKeeperWrapper(Configuration conf, Watcher watcher)
|
||||
private ZooKeeperWrapper(Configuration conf, String instanceName)
|
||||
throws IOException {
|
||||
this.instanceName = instanceName;
|
||||
Properties properties = HQuorumPeer.makeZKProps(conf);
|
||||
setQuorumServers(properties);
|
||||
quorumServers = HQuorumPeer.getZKQuorumServersString(properties);
|
||||
if (quorumServers == null) {
|
||||
throw new IOException("Could not read quorum servers from " +
|
||||
HConstants.ZOOKEEPER_CONFIG_NAME);
|
||||
}
|
||||
sessionTimeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
|
||||
reconnectToZk();
|
||||
|
||||
parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||
|
||||
int sessionTimeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
|
||||
try {
|
||||
zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, watcher);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to create ZooKeeper object: " + e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
String rootServerZNodeName = conf.get("zookeeper.znode.rootserver", "root-region-server");
|
||||
String rsZNodeName = conf.get("zookeeper.znode.rs", "rs");
|
||||
String masterAddressZNodeName = conf.get("zookeeper.znode.master", "master");
|
||||
String stateZNodeName = conf.get("zookeeper.znode.state", "shutdown");
|
||||
String regionsInTransitZNodeName = conf.get("zookeeper.znode.regionInTransition", "UNASSIGNED");
|
||||
|
||||
parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
|
||||
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||
|
||||
String rootServerZNodeName = conf.get("zookeeper.znode.rootserver",
|
||||
"root-region-server");
|
||||
String rsZNodeName = conf.get("zookeeper.znode.rs", "rs");
|
||||
String masterAddressZNodeName = conf.get("zookeeper.znode.master",
|
||||
"master");
|
||||
String stateZNodeName = conf.get("zookeeper.znode.state",
|
||||
"shutdown");
|
||||
|
||||
rootRegionZNode = getZNode(parentZNode, rootServerZNodeName);
|
||||
rsZNode = getZNode(parentZNode, rsZNodeName);
|
||||
rootRegionZNode = getZNode(parentZNode, rootServerZNodeName);
|
||||
rsZNode = getZNode(parentZNode, rsZNodeName);
|
||||
rgnsInTransitZNode = getZNode(parentZNode, regionsInTransitZNodeName);
|
||||
masterElectionZNode = getZNode(parentZNode, masterAddressZNodeName);
|
||||
clusterStateZNode = getZNode(parentZNode, stateZNodeName);
|
||||
clusterStateZNode = getZNode(parentZNode, stateZNodeName);
|
||||
}
|
||||
|
||||
public void reconnectToZk() throws IOException {
|
||||
try {
|
||||
LOG.info("Reconnecting to zookeeper");
|
||||
if(zooKeeper != null) {
|
||||
zooKeeper.close();
|
||||
LOG.debug("<" + instanceName + ">" + "Closed existing zookeeper client");
|
||||
}
|
||||
zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, this);
|
||||
LOG.debug("<" + instanceName + ">" + "Connected to zookeeper again");
|
||||
} catch (IOException e) {
|
||||
LOG.error("<" + instanceName + ">" + "Failed to create ZooKeeper object: " + e);
|
||||
throw new IOException(e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("<" + instanceName + ">" + "Error closing ZK connection: " + e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void setQuorumServers(Properties properties) {
|
||||
String clientPort = null;
|
||||
List<String> servers = new ArrayList<String>();
|
||||
public void registerListener(Watcher watcher) {
|
||||
listeners.add(watcher);
|
||||
}
|
||||
|
||||
// The clientPort option may come after the server.X hosts, so we need to
|
||||
// grab everything and then create the final host:port comma separated list.
|
||||
boolean anyValid = false;
|
||||
for (Entry<Object,Object> property : properties.entrySet()) {
|
||||
String key = property.getKey().toString().trim();
|
||||
String value = property.getValue().toString().trim();
|
||||
if (key.equals("clientPort")) {
|
||||
clientPort = value;
|
||||
}
|
||||
else if (key.startsWith("server.")) {
|
||||
String host = value.substring(0, value.indexOf(':'));
|
||||
servers.add(host);
|
||||
try {
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
InetAddress.getByName(host);
|
||||
anyValid = true;
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.warn(StringUtils.stringifyException(e));
|
||||
}
|
||||
public void unregisterListener(Watcher watcher) {
|
||||
listeners.remove(watcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the primary ZK watcher
|
||||
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)
|
||||
*/
|
||||
@Override
|
||||
public synchronized void process(WatchedEvent event) {
|
||||
for(Watcher w : listeners) {
|
||||
try {
|
||||
w.process(event);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("<"+instanceName+">" + "ZK updates listener threw an exception in process()", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (!anyValid) {
|
||||
LOG.error("no valid quorum servers found in "
|
||||
+ HConstants.ZOOKEEPER_CONFIG_NAME);
|
||||
return;
|
||||
}
|
||||
|
||||
if (clientPort == null) {
|
||||
LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
|
||||
return;
|
||||
}
|
||||
|
||||
if (servers.isEmpty()) {
|
||||
LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " +
|
||||
"ZooKeeper cluster configured for its operation.");
|
||||
return;
|
||||
}
|
||||
|
||||
StringBuilder hostPortBuilder = new StringBuilder();
|
||||
for (int i = 0; i < servers.size(); ++i) {
|
||||
String host = servers.get(i);
|
||||
if (i > 0) {
|
||||
hostPortBuilder.append(',');
|
||||
}
|
||||
hostPortBuilder.append(host);
|
||||
hostPortBuilder.append(':');
|
||||
hostPortBuilder.append(clientPort);
|
||||
}
|
||||
|
||||
quorumServers = hostPortBuilder.toString();
|
||||
}
|
||||
|
||||
/** @return String dump of everything in ZooKeeper. */
|
||||
|
@ -172,7 +245,7 @@ public class ZooKeeperWrapper {
|
|||
public String dump() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("\nHBase tree in ZooKeeper is rooted at ").append(parentZNode);
|
||||
sb.append("\n Cluster up? ").append(exists(clusterStateZNode));
|
||||
sb.append("\n Cluster up? ").append(exists(clusterStateZNode, true));
|
||||
sb.append("\n Master address: ").append(readMasterAddress(null));
|
||||
sb.append("\n Region server holding ROOT: ").append(readRootRegionLocation());
|
||||
sb.append("\n Region servers:");
|
||||
|
@ -236,9 +309,25 @@ public class ZooKeeperWrapper {
|
|||
return res.toArray(new String[res.size()]);
|
||||
}
|
||||
|
||||
private boolean exists(String znode) {
|
||||
public boolean exists(String znode, boolean watch) {
|
||||
try {
|
||||
return zooKeeper.exists(znode, null) != null;
|
||||
return zooKeeper.exists(getZNode(parentZNode, znode), watch?this:null) != null;
|
||||
} catch (KeeperException.SessionExpiredException e) {
|
||||
// if the session has expired try to reconnect to ZK, then perform query
|
||||
try {
|
||||
// TODO: ZK-REFACTOR: We should not reconnect - we should just quit and restart.
|
||||
reconnectToZk();
|
||||
return zooKeeper.exists(getZNode(parentZNode, znode), watch?this:null) != null;
|
||||
} catch (IOException e1) {
|
||||
LOG.error("Error reconnecting to zookeeper", e1);
|
||||
throw new RuntimeException("Error reconnecting to zookeeper", e1);
|
||||
} catch (KeeperException e1) {
|
||||
LOG.error("Error reading after reconnecting to zookeeper", e1);
|
||||
throw new RuntimeException("Error reading after reconnecting to zookeeper", e1);
|
||||
} catch (InterruptedException e1) {
|
||||
LOG.error("Error reading after reconnecting to zookeeper", e1);
|
||||
throw new RuntimeException("Error reading after reconnecting to zookeeper", e1);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -311,13 +400,13 @@ public class ZooKeeperWrapper {
|
|||
* Watch the state of the cluster, up or down
|
||||
* @param watcher Watcher to set on cluster state node
|
||||
*/
|
||||
public void setClusterStateWatch(Watcher watcher) {
|
||||
public void setClusterStateWatch() {
|
||||
try {
|
||||
zooKeeper.exists(clusterStateZNode, watcher);
|
||||
zooKeeper.exists(clusterStateZNode, this);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to check on ZNode " + clusterStateZNode, e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to check on ZNode " + clusterStateZNode, e);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to check on ZNode " + clusterStateZNode, e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to check on ZNode " + clusterStateZNode, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -335,19 +424,19 @@ public class ZooKeeperWrapper {
|
|||
byte[] data = Bytes.toBytes("up");
|
||||
zooKeeper.create(clusterStateZNode, data,
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
LOG.debug("State node wrote in ZooKeeper");
|
||||
LOG.debug("<" + instanceName + ">" + "State node wrote in ZooKeeper");
|
||||
} else {
|
||||
zooKeeper.delete(clusterStateZNode, -1);
|
||||
LOG.debug("State node deleted in ZooKeeper");
|
||||
LOG.debug("<" + instanceName + ">" + "State node deleted in ZooKeeper");
|
||||
}
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to set state node in ZooKeeper", e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to set state node in ZooKeeper", e);
|
||||
} catch (KeeperException e) {
|
||||
if(e.code() == KeeperException.Code.NODEEXISTS) {
|
||||
LOG.debug("State node exists.");
|
||||
LOG.debug("<" + instanceName + ">" + "State node exists.");
|
||||
} else {
|
||||
LOG.warn("Failed to set state node in ZooKeeper", e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to set state node in ZooKeeper", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -364,13 +453,13 @@ public class ZooKeeperWrapper {
|
|||
try {
|
||||
zooKeeper.exists(masterElectionZNode, watcher);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to set watcher on ZNode " + masterElectionZNode, e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to set watcher on ZNode " + masterElectionZNode, e);
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to set watcher on ZNode " + masterElectionZNode, e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to set watcher on ZNode " + masterElectionZNode, e);
|
||||
return false;
|
||||
}
|
||||
LOG.debug("Set watcher on master address ZNode " + masterElectionZNode);
|
||||
LOG.debug("<" + instanceName + ">" + "Set watcher on master address ZNode " + masterElectionZNode);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -378,7 +467,7 @@ public class ZooKeeperWrapper {
|
|||
try {
|
||||
return readAddressOrThrow(znode, watcher);
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Failed to read " + e.getMessage());
|
||||
LOG.debug("<" + instanceName + ">" + "Failed to read " + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -394,7 +483,7 @@ public class ZooKeeperWrapper {
|
|||
}
|
||||
|
||||
String addressString = Bytes.toString(data);
|
||||
LOG.debug("Read ZNode " + znode + " got " + addressString);
|
||||
LOG.debug("<" + instanceName + ">" + "Read ZNode " + znode + " got " + addressString);
|
||||
return new HServerAddress(addressString);
|
||||
}
|
||||
|
||||
|
@ -411,17 +500,17 @@ public class ZooKeeperWrapper {
|
|||
}
|
||||
zooKeeper.create(znode, new byte[0],
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
LOG.debug("Created ZNode " + znode);
|
||||
LOG.debug("<" + instanceName + ">" + "Created ZNode " + znode);
|
||||
return true;
|
||||
} catch (KeeperException.NodeExistsException e) {
|
||||
return true; // ok, move on.
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
return ensureParentExists(znode) && ensureExists(znode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to create " + znode +
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create " + znode +
|
||||
" -- check quorum servers, currently=" + this.quorumServers, e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to create " + znode +
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create " + znode +
|
||||
" -- check quorum servers, currently=" + this.quorumServers, e);
|
||||
}
|
||||
return false;
|
||||
|
@ -450,9 +539,9 @@ public class ZooKeeperWrapper {
|
|||
} catch (KeeperException.NoNodeException e) {
|
||||
return true; // ok, move on.
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to delete " + rootRegionZNode + ": " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to delete " + rootRegionZNode + ": " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to delete " + rootRegionZNode + ": " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to delete " + rootRegionZNode + ": " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -479,18 +568,18 @@ public class ZooKeeperWrapper {
|
|||
public void deleteZNode(String znode, boolean recursive)
|
||||
throws KeeperException, InterruptedException {
|
||||
if (recursive) {
|
||||
LOG.info("deleteZNode get children for " + znode);
|
||||
LOG.info("<" + instanceName + ">" + "deleteZNode get children for " + znode);
|
||||
List<String> znodes = this.zooKeeper.getChildren(znode, false);
|
||||
if (znodes.size() > 0) {
|
||||
for (String child : znodes) {
|
||||
String childFullPath = getZNode(znode, child);
|
||||
LOG.info("deleteZNode recursive call " + childFullPath);
|
||||
LOG.info("<" + instanceName + ">" + "deleteZNode recursive call " + childFullPath);
|
||||
this.deleteZNode(childFullPath, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.zooKeeper.delete(znode, -1);
|
||||
LOG.debug("Deleted ZNode " + znode);
|
||||
LOG.debug("<" + instanceName + ">" + "Deleted ZNode " + znode);
|
||||
}
|
||||
|
||||
private boolean createRootRegionLocation(String address) {
|
||||
|
@ -498,12 +587,12 @@ public class ZooKeeperWrapper {
|
|||
try {
|
||||
zooKeeper.create(rootRegionZNode, data, Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
LOG.debug("Created ZNode " + rootRegionZNode + " with data " + address);
|
||||
LOG.debug("<" + instanceName + ">" + "Created ZNode " + rootRegionZNode + " with data " + address);
|
||||
return true;
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to create root region in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create root region in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to create root region in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create root region in ZooKeeper: " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -513,12 +602,12 @@ public class ZooKeeperWrapper {
|
|||
byte[] data = Bytes.toBytes(address);
|
||||
try {
|
||||
zooKeeper.setData(rootRegionZNode, data, -1);
|
||||
LOG.debug("SetData of ZNode " + rootRegionZNode + " with " + address);
|
||||
LOG.debug("<" + instanceName + ">" + "SetData of ZNode " + rootRegionZNode + " with " + address);
|
||||
return true;
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to set root region location in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to set root region location in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to set root region location in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to set root region location in ZooKeeper: " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -554,20 +643,22 @@ public class ZooKeeperWrapper {
|
|||
* @return true if operation succeeded, false otherwise.
|
||||
*/
|
||||
public boolean writeMasterAddress(final HServerAddress address) {
|
||||
LOG.debug("<" + instanceName + ">" + "Writing master address " + address.toString() + " to znode " + masterElectionZNode);
|
||||
if (!ensureParentExists(masterElectionZNode)) {
|
||||
return false;
|
||||
}
|
||||
LOG.debug("<" + instanceName + ">" + "Znode exists : " + masterElectionZNode);
|
||||
|
||||
String addressStr = address.toString();
|
||||
byte[] data = Bytes.toBytes(addressStr);
|
||||
try {
|
||||
zooKeeper.create(masterElectionZNode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
||||
LOG.debug("Wrote master address " + address + " to ZooKeeper");
|
||||
LOG.debug("<" + instanceName + ">" + "Wrote master address " + address + " to ZooKeeper");
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to write master address " + address + " to ZooKeeper", e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to write master address " + address + " to ZooKeeper", e);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to write master address " + address + " to ZooKeeper", e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to write master address " + address + " to ZooKeeper", e);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -582,16 +673,16 @@ public class ZooKeeperWrapper {
|
|||
public boolean writeRSLocation(HServerInfo info) {
|
||||
ensureExists(rsZNode);
|
||||
byte[] data = Bytes.toBytes(info.getServerAddress().toString());
|
||||
String znode = joinPath(rsZNode, Long.toString(info.getStartCode()));
|
||||
String znode = joinPath(rsZNode, info.getServerName());
|
||||
try {
|
||||
zooKeeper.create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
||||
LOG.debug("Created ZNode " + znode
|
||||
LOG.debug("<" + instanceName + ">" + "Created ZNode " + znode
|
||||
+ " with data " + info.getServerAddress().toString());
|
||||
return true;
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to create " + znode + " znode in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create " + znode + " znode in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to create " + znode + " znode in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create " + znode + " znode in ZooKeeper: " + e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -604,17 +695,17 @@ public class ZooKeeperWrapper {
|
|||
*/
|
||||
public boolean updateRSLocationGetWatch(HServerInfo info, Watcher watcher) {
|
||||
byte[] data = Bytes.toBytes(info.getServerAddress().toString());
|
||||
String znode = rsZNode + ZNODE_PATH_SEPARATOR + info.getStartCode();
|
||||
String znode = rsZNode + ZNODE_PATH_SEPARATOR + info.getServerName();
|
||||
try {
|
||||
zooKeeper.setData(znode, data, -1);
|
||||
LOG.debug("Updated ZNode " + znode
|
||||
LOG.debug("<" + instanceName + ">" + "Updated ZNode " + znode
|
||||
+ " with data " + info.getServerAddress().toString());
|
||||
zooKeeper.getData(znode, watcher, null);
|
||||
return true;
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to update " + znode + " znode in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to update " + znode + " znode in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -645,13 +736,13 @@ public class ZooKeeperWrapper {
|
|||
try {
|
||||
List<String> nodes = zooKeeper.getChildren(rsZNode, false);
|
||||
for (String node : nodes) {
|
||||
LOG.debug("Deleting node: " + node);
|
||||
LOG.debug("<" + instanceName + ">" + "Deleting node: " + node);
|
||||
zooKeeper.delete(joinPath(this.rsZNode, node), -1);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -660,9 +751,9 @@ public class ZooKeeperWrapper {
|
|||
try {
|
||||
stat = zooKeeper.exists(path, false);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("checking existence of " + path, e);
|
||||
LOG.warn("<" + instanceName + ">" + "checking existence of " + path, e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("checking existence of " + path, e);
|
||||
LOG.warn("<" + instanceName + ">" + "checking existence of " + path, e);
|
||||
}
|
||||
|
||||
return stat != null;
|
||||
|
@ -674,9 +765,10 @@ public class ZooKeeperWrapper {
|
|||
public void close() {
|
||||
try {
|
||||
zooKeeper.close();
|
||||
LOG.debug("Closed connection with ZooKeeper; " + this.rootRegionZNode);
|
||||
INSTANCES.remove(instanceName);
|
||||
LOG.debug("<" + instanceName + ">" + "Closed connection with ZooKeeper; " + this.rootRegionZNode);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to close connection with ZooKeeper");
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to close connection with ZooKeeper");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -685,6 +777,10 @@ public class ZooKeeperWrapper {
|
|||
znodeName : joinPath(parentZNode, znodeName);
|
||||
}
|
||||
|
||||
public String getZNodePathForHBase(String znodeName) {
|
||||
return getZNode(parentZNode, znodeName);
|
||||
}
|
||||
|
||||
private String joinPath(String parent, String child) {
|
||||
return parent + ZNODE_PATH_SEPARATOR + child;
|
||||
}
|
||||
|
@ -714,7 +810,7 @@ public class ZooKeeperWrapper {
|
|||
public List<HServerAddress> scanAddressDirectory(String znode,
|
||||
Watcher watcher) {
|
||||
List<HServerAddress> list = new ArrayList<HServerAddress>();
|
||||
List<String> nodes = this.listZnodes(znode, watcher);
|
||||
List<String> nodes = this.listZnodes(znode);
|
||||
if(nodes == null) {
|
||||
return list;
|
||||
}
|
||||
|
@ -731,44 +827,40 @@ public class ZooKeeperWrapper {
|
|||
* @param watcher watch to set, can be null
|
||||
* @return a list of all the znodes
|
||||
*/
|
||||
public List<String> listZnodes(String znode, Watcher watcher) {
|
||||
public List<String> listZnodes(String znode) {
|
||||
List<String> nodes = null;
|
||||
try {
|
||||
if (checkExistenceOf(znode)) {
|
||||
if (watcher == null) {
|
||||
nodes = zooKeeper.getChildren(znode, false);
|
||||
} else {
|
||||
nodes = zooKeeper.getChildren(znode, watcher);
|
||||
for (String node : nodes) {
|
||||
getDataAndWatch(znode, node, watcher);
|
||||
}
|
||||
nodes = zooKeeper.getChildren(znode, this);
|
||||
for (String node : nodes) {
|
||||
getDataAndWatch(znode, node, this);
|
||||
}
|
||||
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
|
||||
public String getData(String parentZNode, String znode) {
|
||||
public byte[] getData(String parentZNode, String znode) {
|
||||
return getDataAndWatch(parentZNode, znode, null);
|
||||
}
|
||||
|
||||
public String getDataAndWatch(String parentZNode,
|
||||
public byte[] getDataAndWatch(String parentZNode,
|
||||
String znode, Watcher watcher) {
|
||||
String data = null;
|
||||
byte[] data = null;
|
||||
try {
|
||||
String path = joinPath(parentZNode, znode);
|
||||
// TODO: ZK-REFACTOR: remove existance check?
|
||||
if (checkExistenceOf(path)) {
|
||||
data = Bytes.toString(zooKeeper.getData(path, watcher, null));
|
||||
data = zooKeeper.getData(path, watcher, null);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
@ -801,17 +893,17 @@ public class ZooKeeperWrapper {
|
|||
boolean failOnWrite) throws InterruptedException, KeeperException {
|
||||
String path = joinPath(parentPath, child);
|
||||
if (!ensureExists(parentPath)) {
|
||||
LOG.error("unable to ensure parent exists: " + parentPath);
|
||||
LOG.error("<" + instanceName + ">" + "unable to ensure parent exists: " + parentPath);
|
||||
}
|
||||
byte[] data = Bytes.toBytes(strData);
|
||||
Stat stat = this.zooKeeper.exists(path, false);
|
||||
if (failOnWrite || stat == null) {
|
||||
this.zooKeeper.create(path, data,
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
LOG.debug("Created " + path);
|
||||
LOG.debug("<" + instanceName + ">" + "Created " + path);
|
||||
} else {
|
||||
this.zooKeeper.setData(path, data, -1);
|
||||
LOG.debug("Updated " + path);
|
||||
LOG.debug("<" + instanceName + ">" + "Updated " + path);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -820,6 +912,14 @@ public class ZooKeeperWrapper {
|
|||
+ ":" + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the znode that has all the regions in transition.
|
||||
* @return path to znode
|
||||
*/
|
||||
public String getRegionInTransitionZNode() {
|
||||
return this.rgnsInTransitZNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the path of this region server's znode
|
||||
* @return path to znode
|
||||
|
@ -828,4 +928,215 @@ public class ZooKeeperWrapper {
|
|||
return this.rsZNode;
|
||||
}
|
||||
|
||||
public void deleteZNode(String zNodeName, int version) {
|
||||
String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName);
|
||||
try
|
||||
{
|
||||
zooKeeper.delete(fullyQualifiedZNodeName, version);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to delete ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
|
||||
}
|
||||
catch (KeeperException e)
|
||||
{
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to delete ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
|
||||
}
|
||||
}
|
||||
|
||||
public String createZNodeIfNotExists(String zNodeName) {
|
||||
return createZNodeIfNotExists(zNodeName, null, CreateMode.PERSISTENT, true);
|
||||
}
|
||||
|
||||
public void watchZNode(String zNodeName) {
|
||||
String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName);
|
||||
|
||||
try {
|
||||
zooKeeper.exists(fullyQualifiedZNodeName, this);
|
||||
zooKeeper.getData(fullyQualifiedZNodeName, this, null);
|
||||
zooKeeper.getChildren(fullyQualifiedZNodeName, this);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
|
||||
}
|
||||
}
|
||||
|
||||
public String createZNodeIfNotExists(String zNodeName, byte[] data, CreateMode createMode, boolean watch) {
|
||||
String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName);
|
||||
|
||||
if (!ensureParentExists(fullyQualifiedZNodeName)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
// create the znode
|
||||
zooKeeper.create(fullyQualifiedZNodeName, data, Ids.OPEN_ACL_UNSAFE, createMode);
|
||||
LOG.debug("<" + instanceName + ">" + "Created ZNode " + fullyQualifiedZNodeName + " in ZooKeeper");
|
||||
// watch the znode for deletion, data change, creation of children
|
||||
if(watch) {
|
||||
watchZNode(zNodeName);
|
||||
}
|
||||
return fullyQualifiedZNodeName;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public byte[] readZNode(String znodeName, Stat stat) throws IOException {
|
||||
byte[] data;
|
||||
try {
|
||||
String fullyQualifiedZNodeName = getZNode(parentZNode, znodeName);
|
||||
data = zooKeeper.getData(fullyQualifiedZNodeName, this, stat);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
// TODO: perhaps return the version number from this write?
|
||||
public boolean writeZNode(String znodeName, byte[] data, int version, boolean watch) throws IOException {
|
||||
try {
|
||||
String fullyQualifiedZNodeName = getZNode(parentZNode, znodeName);
|
||||
zooKeeper.setData(fullyQualifiedZNodeName, data, version);
|
||||
if(watch) {
|
||||
zooKeeper.getData(fullyQualifiedZNodeName, this, null);
|
||||
}
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to write data to ZooKeeper", e);
|
||||
throw new IOException(e);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to write data to ZooKeeper", e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void createUnassignedRegion(String regionName, byte[] data) {
|
||||
String znode = getZNode(getRegionInTransitionZNode(), regionName);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
// check if this node already exists -
|
||||
// - it should not exist
|
||||
// - if it does, it should be in the CLOSED state
|
||||
if(exists(znode, true)) {
|
||||
Stat stat = new Stat();
|
||||
byte[] oldData = null;
|
||||
try {
|
||||
oldData = readZNode(znode, stat);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error reading data for " + znode);
|
||||
}
|
||||
if(oldData == null) {
|
||||
LOG.debug("While creating UNASSIGNED region " + regionName + " exists with no data" );
|
||||
}
|
||||
else {
|
||||
LOG.debug("While creating UNASSIGNED region " + regionName + " exists, state = " + (HBaseEventType.fromByte(oldData[0])));
|
||||
}
|
||||
}
|
||||
else {
|
||||
if(data == null) {
|
||||
LOG.debug("Creating UNASSIGNED region " + regionName + " with no data" );
|
||||
}
|
||||
else {
|
||||
LOG.debug("Creating UNASSIGNED region " + regionName + " in state = " + (HBaseEventType.fromByte(data[0])));
|
||||
}
|
||||
}
|
||||
}
|
||||
synchronized(unassignedZNodesWatched) {
|
||||
unassignedZNodesWatched.add(znode);
|
||||
createZNodeIfNotExists(znode, data, CreateMode.PERSISTENT, true);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteUnassignedRegion(String regionName) {
|
||||
String znode = getZNode(getRegionInTransitionZNode(), regionName);
|
||||
try {
|
||||
LOG.debug("Deleting ZNode " + znode + " in ZooKeeper as region is open...");
|
||||
synchronized(unassignedZNodesWatched) {
|
||||
unassignedZNodesWatched.remove(znode);
|
||||
deleteZNode(znode);
|
||||
}
|
||||
} catch (KeeperException.SessionExpiredException e) {
|
||||
LOG.error("Zookeeper session has expired", e);
|
||||
// if the session has expired try to reconnect to ZK, then perform query
|
||||
try {
|
||||
// TODO: ZK-REFACTOR: should just quit on reconnect??
|
||||
reconnectToZk();
|
||||
synchronized(unassignedZNodesWatched) {
|
||||
unassignedZNodesWatched.remove(znode);
|
||||
deleteZNode(znode);
|
||||
}
|
||||
} catch (IOException e1) {
|
||||
LOG.error("Error reconnecting to zookeeper", e1);
|
||||
throw new RuntimeException("Error reconnecting to zookeeper", e1);
|
||||
} catch (KeeperException.SessionExpiredException e1) {
|
||||
LOG.error("Error reading after reconnecting to zookeeper", e1);
|
||||
throw new RuntimeException("Error reading after reconnecting to zookeeper", e1);
|
||||
} catch (KeeperException e1) {
|
||||
LOG.error("Error reading after reconnecting to zookeeper", e1);
|
||||
} catch (InterruptedException e1) {
|
||||
LOG.error("Error reading after reconnecting to zookeeper", e1);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Error deleting region " + regionName, e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Error deleting region " + regionName, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically adds a watch and reads data from the unwatched znodes in the
|
||||
* UNASSGINED region. This works because the master is the only person
|
||||
* deleting nodes.
|
||||
* @param znode
|
||||
* @return
|
||||
*/
|
||||
public List<ZNodePathAndData> watchAndGetNewChildren(String znode) {
|
||||
List<String> nodes = null;
|
||||
List<ZNodePathAndData> newNodes = new ArrayList<ZNodePathAndData>();
|
||||
try {
|
||||
if (checkExistenceOf(znode)) {
|
||||
synchronized(unassignedZNodesWatched) {
|
||||
nodes = zooKeeper.getChildren(znode, this);
|
||||
for (String node : nodes) {
|
||||
String znodePath = joinPath(znode, node);
|
||||
if(!unassignedZNodesWatched.contains(znodePath)) {
|
||||
byte[] data = getDataAndWatch(znode, node, this);
|
||||
newNodes.add(new ZNodePathAndData(znodePath, data));
|
||||
unassignedZNodesWatched.add(znodePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
}
|
||||
return newNodes;
|
||||
}
|
||||
|
||||
public static class ZNodePathAndData {
|
||||
private String zNodePath;
|
||||
private byte[] data;
|
||||
|
||||
public ZNodePathAndData(String zNodePath, byte[] data) {
|
||||
this.zNodePath = zNodePath;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public String getzNodePath() {
|
||||
return zNodePath;
|
||||
}
|
||||
public byte[] getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -697,7 +697,8 @@ public class HBaseTestingUtility {
|
|||
}
|
||||
|
||||
public void expireSession(ZooKeeperWrapper nodeZK) throws Exception{
|
||||
ZooKeeperWrapper zkw = new ZooKeeperWrapper(conf, EmptyWatcher.instance);
|
||||
ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf, ZooKeeperWrapper.class.getName());
|
||||
zkw.registerListener(EmptyWatcher.instance);
|
||||
String quorumServers = zkw.getQuorumServers();
|
||||
int sessionTimeout = 5 * 1000; // 5 seconds
|
||||
|
||||
|
|
|
@ -96,7 +96,8 @@ public class TestZooKeeper {
|
|||
throws IOException, InterruptedException {
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
|
||||
ZooKeeperWrapper zkw = new ZooKeeperWrapper(conf, EmptyWatcher.instance);
|
||||
ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf, TestZooKeeper.class.getName());
|
||||
zkw.registerListener(EmptyWatcher.instance);
|
||||
String quorumServers = zkw.getQuorumServers();
|
||||
int sessionTimeout = 5 * 1000; // 5 seconds
|
||||
HConnection connection = HConnectionManager.getConnection(conf);
|
||||
|
@ -158,7 +159,7 @@ public class TestZooKeeper {
|
|||
HTable localMeta = new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
Configuration otherConf = HBaseConfiguration.create(conf);
|
||||
otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
|
||||
HTable ipMeta = new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
HTable ipMeta = new HTable(otherConf, HConstants.META_TABLE_NAME);
|
||||
|
||||
// dummy, just to open the connection
|
||||
localMeta.exists(new Get(HConstants.LAST_ROW));
|
||||
|
@ -184,7 +185,8 @@ public class TestZooKeeper {
|
|||
*/
|
||||
@Test
|
||||
public void testZNodeDeletes() throws Exception {
|
||||
ZooKeeperWrapper zkw = new ZooKeeperWrapper(conf, EmptyWatcher.instance);
|
||||
ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf, TestZooKeeper.class.getName());
|
||||
zkw.registerListener(EmptyWatcher.instance);
|
||||
zkw.ensureExists("/l1/l2/l3/l4");
|
||||
try {
|
||||
zkw.deleteZNode("/l1/l2");
|
||||
|
|
|
@ -29,6 +29,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventHandlerListener;
|
||||
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
|
@ -70,7 +73,7 @@ public class TestMaster {
|
|||
CountDownLatch aboutToOpen = new CountDownLatch(1);
|
||||
CountDownLatch proceed = new CountDownLatch(1);
|
||||
RegionOpenListener list = new RegionOpenListener(aboutToOpen, proceed);
|
||||
m.getRegionServerOperationQueue().registerRegionServerOperationListener(list);
|
||||
HBaseEventHandler.registerListener(list);
|
||||
|
||||
admin.split(TABLENAME);
|
||||
aboutToOpen.await(60, TimeUnit.SECONDS);
|
||||
|
@ -79,32 +82,32 @@ public class TestMaster {
|
|||
m.getTableRegions(TABLENAME);
|
||||
Pair<HRegionInfo,HServerAddress> pair =
|
||||
m.getTableRegionClosest(TABLENAME, Bytes.toBytes("cde"));
|
||||
assertNull(pair);
|
||||
/**
|
||||
* TODO: these methods return null when the regions are not deployed.
|
||||
* These tests should be uncommented after HBASE-2656.
|
||||
* TODO: The assertNull below used to work before moving all RS->M
|
||||
* communication to ZK, find out why this test's behavior has changed.
|
||||
* Tracked in HBASE-2656.
|
||||
assertNull(pair);
|
||||
*/
|
||||
assertNotNull(pair);
|
||||
m.getTableRegionFromName(pair.getFirst().getRegionName());
|
||||
*/
|
||||
} finally {
|
||||
proceed.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
static class RegionOpenListener implements RegionServerOperationListener {
|
||||
static class RegionOpenListener implements HBaseEventHandlerListener {
|
||||
CountDownLatch aboutToOpen, proceed;
|
||||
|
||||
public RegionOpenListener(
|
||||
CountDownLatch aboutToOpen, CountDownLatch proceed)
|
||||
public RegionOpenListener(CountDownLatch aboutToOpen, CountDownLatch proceed)
|
||||
{
|
||||
this.aboutToOpen = aboutToOpen;
|
||||
this.proceed = proceed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
|
||||
if (!incomingMsg.isType(HMsg.Type.MSG_REPORT_OPEN)) {
|
||||
return true;
|
||||
public void afterProcess(HBaseEventHandler event) {
|
||||
if (event.getHBEvent() != HBaseEventType.RS2ZK_REGION_OPENED) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
aboutToOpen.countDown();
|
||||
|
@ -112,16 +115,11 @@ public class TestMaster {
|
|||
} catch (InterruptedException ie) {
|
||||
throw new RuntimeException(ie);
|
||||
}
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(RegionServerOperation op) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processed(RegionServerOperation op) {
|
||||
public void beforeProcess(HBaseEventHandler event) {
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,241 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.ProcessRegionClose;
|
||||
import org.apache.hadoop.hbase.master.RegionServerOperation;
|
||||
import org.apache.hadoop.hbase.master.RegionServerOperationListener;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestZKBasedCloseRegion {
|
||||
private static final Log LOG = LogFactory.getLog(TestZKBasedCloseRegion.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final String TABLENAME = "master_transitions";
|
||||
private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
|
||||
Bytes.toBytes("b"), Bytes.toBytes("c")};
|
||||
|
||||
@BeforeClass public static void beforeAllTests() throws Exception {
|
||||
Configuration c = TEST_UTIL.getConfiguration();
|
||||
c.setBoolean("dfs.support.append", true);
|
||||
c.setInt("hbase.regionserver.info.port", 0);
|
||||
c.setInt("hbase.master.meta.thread.rescanfrequency", 5*1000);
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
|
||||
int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily());
|
||||
waitUntilAllRegionsAssigned(countOfRegions);
|
||||
addToEachStartKey(countOfRegions);
|
||||
}
|
||||
|
||||
@AfterClass public static void afterAllTests() throws IOException {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before public void setup() throws IOException {
|
||||
if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) {
|
||||
// Need at least two servers.
|
||||
LOG.info("Started new server=" +
|
||||
TEST_UTIL.getHBaseCluster().startRegionServer());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=300000) public void testCloseRegion()
|
||||
throws Exception {
|
||||
LOG.info("Running testCloseRegion");
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size());
|
||||
|
||||
int rsIdx = 0;
|
||||
HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
|
||||
Collection<HRegion> regions = regionServer.getOnlineRegions();
|
||||
HRegion region = regions.iterator().next();
|
||||
LOG.debug("Asking RS to close region " + region.getRegionNameAsString());
|
||||
|
||||
AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
|
||||
RegionServerOperationListener listener =
|
||||
new CloseRegionEventListener(region.getRegionNameAsString(), closeEventProcessed);
|
||||
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
master.getRegionServerOperationQueue().registerRegionServerOperationListener(listener);
|
||||
HMsg closeRegionMsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE,
|
||||
region.getRegionInfo(),
|
||||
Bytes.toBytes("Forcing close in test")
|
||||
);
|
||||
TEST_UTIL.getHBaseCluster().addMessageToSendRegionServer(rsIdx, closeRegionMsg);
|
||||
|
||||
synchronized(closeEventProcessed) {
|
||||
// wait for 3 minutes
|
||||
closeEventProcessed.wait(3*60*1000);
|
||||
}
|
||||
if(!closeEventProcessed.get()) {
|
||||
throw new Exception("Timed out, close event not called on master.");
|
||||
}
|
||||
else {
|
||||
LOG.info("Done with test, RS informed master successfully.");
|
||||
}
|
||||
}
|
||||
|
||||
public static class CloseRegionEventListener implements RegionServerOperationListener {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class);
|
||||
String regionToClose;
|
||||
AtomicBoolean closeEventProcessed;
|
||||
|
||||
public CloseRegionEventListener(String regionToClose, AtomicBoolean closeEventProcessed) {
|
||||
this.regionToClose = regionToClose;
|
||||
this.closeEventProcessed = closeEventProcessed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(RegionServerOperation op) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processed(RegionServerOperation op) {
|
||||
LOG.debug("Master processing object: " + op.getClass().getCanonicalName());
|
||||
if(op instanceof ProcessRegionClose) {
|
||||
ProcessRegionClose regionCloseOp = (ProcessRegionClose)op;
|
||||
String region = regionCloseOp.getRegionInfo().getRegionNameAsString();
|
||||
LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose);
|
||||
if(regionToClose.equals(region)) {
|
||||
closeEventProcessed.set(true);
|
||||
}
|
||||
synchronized(closeEventProcessed) {
|
||||
closeEventProcessed.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static void waitUntilAllRegionsAssigned(final int countOfRegions)
|
||||
throws IOException {
|
||||
HTable meta = new HTable(TEST_UTIL.getConfiguration(),
|
||||
HConstants.META_TABLE_NAME);
|
||||
while (true) {
|
||||
int rows = 0;
|
||||
Scan scan = new Scan();
|
||||
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
|
||||
ResultScanner s = meta.getScanner(scan);
|
||||
for (Result r = null; (r = s.next()) != null;) {
|
||||
byte [] b =
|
||||
r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
|
||||
if (b == null || b.length <= 0) break;
|
||||
rows++;
|
||||
}
|
||||
s.close();
|
||||
// If I get to here and all rows have a Server, then all have been assigned.
|
||||
if (rows == countOfRegions) break;
|
||||
LOG.info("Found=" + rows);
|
||||
Threads.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Add to each of the regions in .META. a value. Key is the startrow of the
|
||||
* region (except its 'aaa' for first region). Actual value is the row name.
|
||||
* @param expected
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private static int addToEachStartKey(final int expected) throws IOException {
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
|
||||
HTable meta = new HTable(TEST_UTIL.getConfiguration(),
|
||||
HConstants.META_TABLE_NAME);
|
||||
int rows = 0;
|
||||
Scan scan = new Scan();
|
||||
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
|
||||
ResultScanner s = meta.getScanner(scan);
|
||||
for (Result r = null; (r = s.next()) != null;) {
|
||||
byte [] b =
|
||||
r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
|
||||
if (b == null || b.length <= 0) break;
|
||||
HRegionInfo hri = Writables.getHRegionInfo(b);
|
||||
// If start key, add 'aaa'.
|
||||
byte [] row = getStartKey(hri);
|
||||
Put p = new Put(row);
|
||||
p.add(getTestFamily(), getTestQualifier(), row);
|
||||
t.put(p);
|
||||
rows++;
|
||||
}
|
||||
s.close();
|
||||
Assert.assertEquals(expected, rows);
|
||||
return rows;
|
||||
}
|
||||
|
||||
private static byte [] getStartKey(final HRegionInfo hri) {
|
||||
return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())?
|
||||
Bytes.toBytes("aaa"): hri.getStartKey();
|
||||
}
|
||||
|
||||
private static byte [] getTestFamily() {
|
||||
return FAMILIES[0];
|
||||
}
|
||||
|
||||
private static byte [] getTestQualifier() {
|
||||
return getTestFamily();
|
||||
}
|
||||
|
||||
public static void main(String args[]) throws Exception {
|
||||
TestZKBasedCloseRegion.beforeAllTests();
|
||||
|
||||
TestZKBasedCloseRegion test = new TestZKBasedCloseRegion();
|
||||
test.setup();
|
||||
test.testCloseRegion();
|
||||
|
||||
TestZKBasedCloseRegion.afterAllTests();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,268 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.ProcessRegionClose;
|
||||
import org.apache.hadoop.hbase.master.ProcessRegionOpen;
|
||||
import org.apache.hadoop.hbase.master.RegionServerOperation;
|
||||
import org.apache.hadoop.hbase.master.RegionServerOperationListener;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestZKBasedReopenRegion {
|
||||
private static final Log LOG = LogFactory.getLog(TestZKBasedReopenRegion.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final String TABLENAME = "master_transitions";
|
||||
private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
|
||||
Bytes.toBytes("b"), Bytes.toBytes("c")};
|
||||
|
||||
@BeforeClass public static void beforeAllTests() throws Exception {
|
||||
Configuration c = TEST_UTIL.getConfiguration();
|
||||
c.setBoolean("dfs.support.append", true);
|
||||
c.setInt("hbase.regionserver.info.port", 0);
|
||||
c.setInt("hbase.master.meta.thread.rescanfrequency", 5*1000);
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
|
||||
int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily());
|
||||
waitUntilAllRegionsAssigned(countOfRegions);
|
||||
addToEachStartKey(countOfRegions);
|
||||
}
|
||||
|
||||
@AfterClass public static void afterAllTests() throws IOException {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before public void setup() throws IOException {
|
||||
if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) {
|
||||
// Need at least two servers.
|
||||
LOG.info("Started new server=" +
|
||||
TEST_UTIL.getHBaseCluster().startRegionServer());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=300000) public void testOpenRegion()
|
||||
throws Exception {
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size());
|
||||
|
||||
int rsIdx = 0;
|
||||
HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
|
||||
Collection<HRegion> regions = regionServer.getOnlineRegions();
|
||||
HRegion region = regions.iterator().next();
|
||||
LOG.debug("Asking RS to close region " + region.getRegionNameAsString());
|
||||
|
||||
AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
|
||||
AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
|
||||
RegionServerOperationListener listener =
|
||||
new ReopenRegionEventListener(region.getRegionNameAsString(),
|
||||
closeEventProcessed,
|
||||
reopenEventProcessed);
|
||||
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
master.getRegionServerOperationQueue().registerRegionServerOperationListener(listener);
|
||||
HMsg closeRegionMsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE,
|
||||
region.getRegionInfo(),
|
||||
Bytes.toBytes("Forcing close in test")
|
||||
);
|
||||
TEST_UTIL.getHBaseCluster().addMessageToSendRegionServer(rsIdx, closeRegionMsg);
|
||||
|
||||
synchronized(closeEventProcessed) {
|
||||
closeEventProcessed.wait(3*60*1000);
|
||||
}
|
||||
if(!closeEventProcessed.get()) {
|
||||
throw new Exception("Timed out, close event not called on master.");
|
||||
}
|
||||
|
||||
synchronized(reopenEventProcessed) {
|
||||
reopenEventProcessed.wait(3*60*1000);
|
||||
}
|
||||
if(!reopenEventProcessed.get()) {
|
||||
throw new Exception("Timed out, open event not called on master after region close.");
|
||||
}
|
||||
|
||||
LOG.info("Done with test, RS informed master successfully.");
|
||||
}
|
||||
|
||||
public static class ReopenRegionEventListener implements RegionServerOperationListener {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ReopenRegionEventListener.class);
|
||||
String regionToClose;
|
||||
AtomicBoolean closeEventProcessed;
|
||||
AtomicBoolean reopenEventProcessed;
|
||||
|
||||
public ReopenRegionEventListener(String regionToClose,
|
||||
AtomicBoolean closeEventProcessed,
|
||||
AtomicBoolean reopenEventProcessed) {
|
||||
this.regionToClose = regionToClose;
|
||||
this.closeEventProcessed = closeEventProcessed;
|
||||
this.reopenEventProcessed = reopenEventProcessed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(RegionServerOperation op) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processed(RegionServerOperation op) {
|
||||
LOG.debug("Master processing object: " + op.getClass().getCanonicalName());
|
||||
if(op instanceof ProcessRegionClose) {
|
||||
ProcessRegionClose regionCloseOp = (ProcessRegionClose)op;
|
||||
String region = regionCloseOp.getRegionInfo().getRegionNameAsString();
|
||||
LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose);
|
||||
if(regionToClose.equals(region)) {
|
||||
closeEventProcessed.set(true);
|
||||
}
|
||||
synchronized(closeEventProcessed) {
|
||||
closeEventProcessed.notifyAll();
|
||||
}
|
||||
}
|
||||
// Wait for open event AFTER we have closed the region
|
||||
if(closeEventProcessed.get()) {
|
||||
if(op instanceof ProcessRegionOpen) {
|
||||
ProcessRegionOpen regionOpenOp = (ProcessRegionOpen)op;
|
||||
String region = regionOpenOp.getRegionInfo().getRegionNameAsString();
|
||||
LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose);
|
||||
if(regionToClose.equals(region)) {
|
||||
reopenEventProcessed.set(true);
|
||||
}
|
||||
synchronized(reopenEventProcessed) {
|
||||
reopenEventProcessed.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static void waitUntilAllRegionsAssigned(final int countOfRegions)
|
||||
throws IOException {
|
||||
HTable meta = new HTable(TEST_UTIL.getConfiguration(),
|
||||
HConstants.META_TABLE_NAME);
|
||||
while (true) {
|
||||
int rows = 0;
|
||||
Scan scan = new Scan();
|
||||
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
|
||||
ResultScanner s = meta.getScanner(scan);
|
||||
for (Result r = null; (r = s.next()) != null;) {
|
||||
byte [] b =
|
||||
r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
|
||||
if (b == null || b.length <= 0) break;
|
||||
rows++;
|
||||
}
|
||||
s.close();
|
||||
// If I get to here and all rows have a Server, then all have been assigned.
|
||||
if (rows == countOfRegions) break;
|
||||
LOG.info("Found=" + rows);
|
||||
Threads.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Add to each of the regions in .META. a value. Key is the startrow of the
|
||||
* region (except its 'aaa' for first region). Actual value is the row name.
|
||||
* @param expected
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private static int addToEachStartKey(final int expected) throws IOException {
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
|
||||
HTable meta = new HTable(TEST_UTIL.getConfiguration(),
|
||||
HConstants.META_TABLE_NAME);
|
||||
int rows = 0;
|
||||
Scan scan = new Scan();
|
||||
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
|
||||
ResultScanner s = meta.getScanner(scan);
|
||||
for (Result r = null; (r = s.next()) != null;) {
|
||||
byte [] b =
|
||||
r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
|
||||
if (b == null || b.length <= 0) break;
|
||||
HRegionInfo hri = Writables.getHRegionInfo(b);
|
||||
// If start key, add 'aaa'.
|
||||
byte [] row = getStartKey(hri);
|
||||
Put p = new Put(row);
|
||||
p.add(getTestFamily(), getTestQualifier(), row);
|
||||
t.put(p);
|
||||
rows++;
|
||||
}
|
||||
s.close();
|
||||
Assert.assertEquals(expected, rows);
|
||||
return rows;
|
||||
}
|
||||
|
||||
private static byte [] getStartKey(final HRegionInfo hri) {
|
||||
return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())?
|
||||
Bytes.toBytes("aaa"): hri.getStartKey();
|
||||
}
|
||||
|
||||
private static byte [] getTestFamily() {
|
||||
return FAMILIES[0];
|
||||
}
|
||||
|
||||
private static byte [] getTestQualifier() {
|
||||
return getTestFamily();
|
||||
}
|
||||
|
||||
public static void main(String args[]) throws Exception {
|
||||
TestZKBasedReopenRegion.beforeAllTests();
|
||||
|
||||
TestZKBasedReopenRegion test = new TestZKBasedReopenRegion();
|
||||
test.setup();
|
||||
test.testOpenRegion();
|
||||
|
||||
TestZKBasedReopenRegion.afterAllTests();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue