diff --git a/CHANGES.txt b/CHANGES.txt index 55fa6fe3b73..42986809f4f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -686,6 +686,7 @@ Release 0.21.0 - Unreleased HBASE-2618 Don't inherit from HConstants (Benoit Sigoure via Stack) HBASE-2208 TableServers # processBatchOfRows - converts from List to [ ] - Expensive copy + HBASE-2694 Move RS to Master region open/close messaging into ZooKeeper NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 53121586231..03cbf8d47f3 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -220,13 +220,16 @@ public class HConnectionManager { } /** - * Get this watcher's ZKW, instanciate it if necessary. + * Get this watcher's ZKW, instantiate it if necessary. * @return ZKW * @throws java.io.IOException if a remote or network exception occurs */ public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException { if(zooKeeperWrapper == null) { - zooKeeperWrapper = new ZooKeeperWrapper(conf, this); + String zkWrapperName = HConnectionManager.class.getName() + "-" + + ZooKeeperWrapper.getZookeeperClusterKey(conf); + zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, zkWrapperName); + zooKeeperWrapper.registerListener(this); } return zooKeeperWrapper; } diff --git a/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java b/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java new file mode 100644 index 00000000000..cecd3ecef42 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java @@ -0,0 +1,285 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.executor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType; +import org.apache.hadoop.hbase.master.ServerManager; + + +/** + * Abstract base class for all HBase event handlers. Subclasses should + * implement the process() method where the actual handling of the event + * happens. + * + * HBaseEventType is a list of ALL events (which also corresponds to messages - + * either internal to one component or between components). The event type + * names specify the component from which the event originated, and the + * component which is supposed to handle it. + * + * Listeners can listen to all the events by implementing the interface + * HBaseEventHandlerListener, and by registering themselves as a listener. They + * will be called back before and after the process of every event. + * + * TODO: Rename HBaseEvent and HBaseEventType to EventHandler and EventType + * after ZK refactor as it currently would clash with EventType from ZK and + * make the code very confusing. + */ +public abstract class HBaseEventHandler implements Runnable +{ + private static final Log LOG = LogFactory.getLog(HBaseEventHandler.class); + // type of event this object represents + protected HBaseEventType eventType = HBaseEventType.NONE; + // is this a region server or master? + protected boolean isRegionServer; + // name of the server - this is needed for naming executors in case of tests + // where region servers may be co-located. + protected String serverName; + // listeners that are called before and after an event is processed + protected static List eventHandlerListeners = + Collections.synchronizedList(new ArrayList()); + // static instances needed by the handlers + protected static ServerManager serverManager; + + /** + * Note that this has to be called first BEFORE the subclass constructors. + * + * TODO: take out after refactor + */ + public static void init(ServerManager serverManager) { + HBaseEventHandler.serverManager = serverManager; + } + + /** + * This interface provides hooks to listen to various events received by the + * queue. A class implementing this can listen to the updates by calling + * registerListener and stop receiving updates by calling unregisterListener + */ + public interface HBaseEventHandlerListener { + /** + * Called before any event is processed + */ + public void beforeProcess(HBaseEventHandler event); + /** + * Called after any event is processed + */ + public void afterProcess(HBaseEventHandler event); + } + + /** + * These are a list of HBase events that can be handled by the various + * HBaseExecutorService's. All the events are serialized as byte values. + */ + public enum HBaseEventType { + NONE (-1), + // Messages originating from RS (NOTE: there is NO direct communication from + // RS to Master). These are a result of RS updates into ZK. + RS2ZK_REGION_CLOSING (1), // RS is in process of closing a region + RS2ZK_REGION_CLOSED (2), // RS has finished closing a region + RS2ZK_REGION_OPENING (3), // RS is in process of opening a region + RS2ZK_REGION_OPENED (4), // RS has finished opening a region + + // Updates from master to ZK. This is done by the master and there is + // nothing to process by either Master or RS + M2ZK_REGION_OFFLINE (50); // Master adds this region as offline in ZK + + private final byte value; + + /** + * Called by the HMaster. Returns a name of the executor service given an + * event type. Every event type has en entry - if the event should not be + * handled just add the NONE executor. + * @return name of the executor service + */ + public HBaseExecutorServiceType getMasterExecutorForEvent() { + HBaseExecutorServiceType executorServiceType = null; + switch(this) { + + case RS2ZK_REGION_CLOSING: + case RS2ZK_REGION_CLOSED: + executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION; + break; + + case RS2ZK_REGION_OPENING: + case RS2ZK_REGION_OPENED: + executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION; + break; + + case M2ZK_REGION_OFFLINE: + executorServiceType = HBaseExecutorServiceType.NONE; + break; + + default: + throw new RuntimeException("Unhandled event type in the master."); + } + + return executorServiceType; + } + + /** + * Called by the RegionServer. Returns a name of the executor service given an + * event type. Every event type has en entry - if the event should not be + * handled just return a null executor name. + * @return name of the event service + */ + public static String getRSExecutorForEvent(String serverName) { + throw new RuntimeException("Unsupported operation."); + } + + /** + * Start the executor service that handles the passed in event type. The + * server that starts these event executor services wants to handle these + * event types. + */ + public void startMasterExecutorService(String serverName) { + HBaseExecutorServiceType serviceType = getMasterExecutorForEvent(); + if(serviceType == HBaseExecutorServiceType.NONE) { + throw new RuntimeException("Event type " + toString() + " not handled on master."); + } + serviceType.startExecutorService(serverName); + } + + public static void startRSExecutorService() { + + } + + HBaseEventType(int intValue) { + this.value = (byte)intValue; + } + + public byte getByteValue() { + return value; + } + + public static HBaseEventType fromByte(byte value) { + switch(value) { + case -1: return HBaseEventType.NONE; + case 1 : return HBaseEventType.RS2ZK_REGION_CLOSING; + case 2 : return HBaseEventType.RS2ZK_REGION_CLOSED; + case 3 : return HBaseEventType.RS2ZK_REGION_OPENING; + case 4 : return HBaseEventType.RS2ZK_REGION_OPENED; + case 50: return HBaseEventType.M2ZK_REGION_OFFLINE; + + default: + throw new RuntimeException("Invalid byte value for conversion to HBaseEventType"); + } + } + } + + /** + * Default base class constructor. + * + * TODO: isRegionServer and serverName will go away once we do the HMaster + * refactor. We will end up passing a ServerStatus which should tell us both + * the name and if it is a RS or master. + */ + public HBaseEventHandler(boolean isRegionServer, String serverName, HBaseEventType eventType) { + this.isRegionServer = isRegionServer; + this.eventType = eventType; + this.serverName = serverName; + } + + /** + * This is a wrapper around process, used to update listeners before and after + * events are processed. + */ + public void run() { + // fire all beforeProcess listeners + for(HBaseEventHandlerListener listener : eventHandlerListeners) { + listener.beforeProcess(this); + } + + // call the main process function + process(); + + // fire all afterProcess listeners + for(HBaseEventHandlerListener listener : eventHandlerListeners) { + LOG.debug("Firing " + listener.getClass().getName() + + ".afterProcess event listener for event " + eventType); + listener.afterProcess(this); + } + } + + /** + * This method is the main processing loop to be implemented by the various + * subclasses. + */ + public abstract void process(); + + /** + * Subscribe to updates before and after processing events + */ + public static void registerListener(HBaseEventHandlerListener listener) { + eventHandlerListeners.add(listener); + } + + /** + * Stop receiving updates before and after processing events + */ + public static void unregisterListener(HBaseEventHandlerListener listener) { + eventHandlerListeners.remove(listener); + } + + public boolean isRegionServer() { + return isRegionServer; + } + + /** + * Return the name for this event type. + * @return + */ + public HBaseExecutorServiceType getEventHandlerName() { + // TODO: check for isRegionServer here + return eventType.getMasterExecutorForEvent(); + } + + /** + * Return the event type + * @return + */ + public HBaseEventType getHBEvent() { + return eventType; + } + + /** + * Submits this event object to the correct executor service. This is causes + * this object to get executed by the correct ExecutorService. + */ + public void submit() { + HBaseExecutorServiceType serviceType = getEventHandlerName(); + if(serviceType == null) { + throw new RuntimeException("Event " + eventType + " not handled on this server " + serverName); + } + serviceType.getExecutor(serverName).submit(this); + } + + /** + * Executes this event object in the caller's thread. This is a synchronous + * way of executing the event. + */ + public void execute() { + this.run(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java b/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java new file mode 100644 index 00000000000..b5f8987bb47 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java @@ -0,0 +1,171 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.executor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This is a generic HBase executor service. This component abstract a + * threadpool, a queue to which jobs can be submitted and a Runnable that + * handles the object that is added to the queue. + * + * In order to create a new HBExecutorService, you need to do: + * HBExecutorService.startExecutorService("myService"); + * + * In order to use the service created above, you need to override the + * HBEventHandler class and create an event type that submits to this service. + * + */ +public class HBaseExecutorService +{ + private static final Log LOG = LogFactory.getLog(HBaseExecutorService.class); + // default number of threads in the pool + private int corePoolSize = 1; + // max number of threads - maximum concurrency + private int maximumPoolSize = 5; + // how long to retain excess threads + private long keepAliveTimeInMillis = 1000; + // the thread pool executor that services the requests + ThreadPoolExecutor threadPoolExecutor; + // work queue to use - unbounded queue + BlockingQueue workQueue = new LinkedBlockingQueue(); + // name for this executor service + String name; + // hold the all the executors created in a map addressable by their names + static Map executorServicesMap = + Collections.synchronizedMap(new HashMap()); + + + /** + * The following is a list of names for the various executor services in both + * the master and the region server. + */ + public enum HBaseExecutorServiceType { + NONE (-1), + MASTER_CLOSEREGION (1), + MASTER_OPENREGION (2); + + private final int value; + + HBaseExecutorServiceType(int intValue) { + this.value = intValue; + } + + public void startExecutorService(String serverName) { + // if this is NONE then there is no executor to start + if(value == NONE.value) { + throw new RuntimeException("Cannot start NONE executor type."); + } + String name = getExecutorName(serverName); + if(HBaseExecutorService.isExecutorServiceRunning(name)) { + LOG.debug("Executor service " + toString() + " already running on " + serverName); + return; + } + HBaseExecutorService.startExecutorService(name); + } + + public HBaseExecutorService getExecutor(String serverName) { + // if this is NONE then there is no executor + if(value == NONE.value) { + return null; + } + return HBaseExecutorService.getExecutorService(getExecutorName(serverName)); + } + + public String getExecutorName(String serverName) { + // if this is NONE then there is no executor + if(value == NONE.value) { + return null; + } + return (this.toString() + "-" + serverName); + } + } + + + + /** + * Start an executor service with a given name. If there was a service already + * started with the same name, this throws a RuntimeException. + * @param name Name of the service to start. + */ + public static void startExecutorService(String name) { + if(executorServicesMap.get(name) != null) { + throw new RuntimeException("An executor service with the name " + name + " is already running!"); + } + HBaseExecutorService hbes = new HBaseExecutorService(name); + executorServicesMap.put(name, hbes); + LOG.debug("Starting executor service: " + name); + } + + public static boolean isExecutorServiceRunning(String name) { + return (executorServicesMap.containsKey(name)); + } + + /** + * This method is an accessor for all the HBExecutorServices running so far + * addressable by name. If there is no such service, then it returns null. + */ + public static HBaseExecutorService getExecutorService(String name) { + HBaseExecutorService executor = executorServicesMap.get(name); + if(executor == null) { + LOG.debug("Executor service [" + name + "] not found."); + } + return executor; + } + + public static void shutdown() { + for(Entry entry : executorServicesMap.entrySet()) { + entry.getValue().threadPoolExecutor.shutdown(); + } + executorServicesMap.clear(); + } + + protected HBaseExecutorService(String name) { + this.name = name; + // create the thread pool executor + threadPoolExecutor = new ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTimeInMillis, + TimeUnit.MILLISECONDS, + workQueue + ); + // name the threads for this threadpool + threadPoolExecutor.setThreadFactory(new NamedThreadFactory(name)); + } + + /** + * Submit the event to the queue for handling. + * @param event + */ + public void submit(Runnable event) { + threadPoolExecutor.execute(event); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java b/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java new file mode 100644 index 00000000000..87ea97d1a82 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/executor/NamedThreadFactory.java @@ -0,0 +1,43 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.executor; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Returns a named thread with a specified prefix. + * + * TODO: Use guava (com.google.common.util.concurrent.NamingThreadFactory) + */ +public class NamedThreadFactory implements ThreadFactory +{ + private String threadPrefix; + private AtomicInteger threadId = new AtomicInteger(0); + + public NamedThreadFactory(String threadPrefix) { + this.threadPrefix = threadPrefix; + } + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, threadPrefix + "-" + threadId.incrementAndGet()); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java b/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java new file mode 100644 index 00000000000..1b39de9ccb9 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java @@ -0,0 +1,92 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.executor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; +import org.apache.hadoop.io.Writable; + +public class RegionTransitionEventData implements Writable { + private HBaseEventType hbEvent; + private String rsName; + private long timeStamp; + private HMsg hmsg; + + public RegionTransitionEventData() { + } + + public RegionTransitionEventData(HBaseEventType hbEvent, String rsName) { + this(hbEvent, rsName, null); + } + + public RegionTransitionEventData(HBaseEventType hbEvent, String rsName, HMsg hmsg) { + this.hbEvent = hbEvent; + this.rsName = rsName; + this.timeStamp = System.currentTimeMillis(); + this.hmsg = hmsg; + } + + public HBaseEventType getHbEvent() { + return hbEvent; + } + + public String getRsName() { + return rsName; + } + + public long getTimeStamp() { + return timeStamp; + } + + public HMsg getHmsg() { + return hmsg; + } + + @Override + public void readFields(DataInput in) throws IOException { + // the event type byte + hbEvent = HBaseEventType.fromByte(in.readByte()); + // the hostname of the RS sending the data + rsName = in.readUTF(); + // the timestamp + timeStamp = in.readLong(); + if(in.readBoolean()) { + // deserialized the HMsg from ZK + hmsg = new HMsg(); + hmsg.readFields(in); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeByte(hbEvent.getByteValue()); + out.writeUTF(rsName); + out.writeLong(System.currentTimeMillis()); + out.writeBoolean((hmsg != null)); + if(hmsg != null) { + hmsg.write(out); + } + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0cefdb13ee5..804cd982865 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -19,6 +19,23 @@ */ package org.apache.hadoop.hbase.master; +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.lang.reflect.Constructor; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,6 +65,9 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; +import org.apache.hadoop.hbase.executor.HBaseEventHandler; +import org.apache.hadoop.hbase.executor.HBaseExecutorService; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; @@ -77,23 +97,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; -import java.io.File; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; -import java.lang.reflect.Constructor; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.SortedMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - /** * HMaster is the "master server" for HBase. An HBase cluster has one active * master. If many masters are started, all compete. Whichever wins goes on to @@ -198,14 +201,31 @@ public class HMaster extends Thread implements HMasterInterface, // We'll succeed if we are only master or if we win the race when many // masters. Otherwise we park here inside in writeAddressToZooKeeper. // TODO: Bring up the UI to redirect to active Master. - this.zooKeeperWrapper = new ZooKeeperWrapper(conf, this); + zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, HMaster.class.getName()); + zooKeeperWrapper.registerListener(this); this.zkMasterAddressWatcher = new ZKMasterAddressWatcher(this.zooKeeperWrapper, this.shutdownRequested); + zooKeeperWrapper.registerListener(zkMasterAddressWatcher); this.zkMasterAddressWatcher.writeAddressToZooKeeper(this.address, true); this.regionServerOperationQueue = new RegionServerOperationQueue(this.conf, this.closed); serverManager = new ServerManager(this); + + + // Start the unassigned watcher - which will create the unassgined region + // in ZK. This is needed before RegionManager() constructor tries to assign + // the root region. + ZKUnassignedWatcher.start(); + // init the various event handlers + HBaseEventHandler.init(serverManager); + // start the "close region" executor service + HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(MASTER); + // start the "open region" executor service + HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(MASTER); + + + // start the region manager regionManager = new RegionManager(this); setName(MASTER); @@ -411,7 +431,7 @@ public class HMaster extends Thread implements HMasterInterface, return this.serverManager.getAverageLoad(); } - RegionServerOperationQueue getRegionServerOperationQueue () { + public RegionServerOperationQueue getRegionServerOperationQueue () { return this.regionServerOperationQueue; } @@ -491,6 +511,7 @@ public class HMaster extends Thread implements HMasterInterface, this.rpcServer.stop(); this.regionManager.stop(); this.zooKeeperWrapper.close(); + HBaseExecutorService.shutdown(); LOG.info("HMaster main thread exiting"); } @@ -1118,7 +1139,9 @@ public class HMaster extends Thread implements HMasterInterface, */ @Override public void process(WatchedEvent event) { - LOG.debug(("Event " + event.getType() + " with path " + event.getPath())); + LOG.debug("Event " + event.getType() + + " with state " + event.getState() + + " with path " + event.getPath()); // Master should kill itself if its session expired or if its // znode was deleted manually (usually for testing purposes) if(event.getState() == KeeperState.Expired || @@ -1132,7 +1155,8 @@ public class HMaster extends Thread implements HMasterInterface, zooKeeperWrapper.close(); try { - zooKeeperWrapper = new ZooKeeperWrapper(conf, this); + zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, HMaster.class.getName()); + zooKeeperWrapper.registerListener(this); this.zkMasterAddressWatcher.setZookeeper(zooKeeperWrapper); if(!this.zkMasterAddressWatcher. writeAddressToZooKeeper(this.address,false)) { diff --git a/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java b/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java index 56b819baf46..b94e8871b70 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java @@ -32,7 +32,7 @@ import java.io.IOException; * or deleted doesn't actually require post processing, it's no longer * necessary. */ -class ProcessRegionClose extends ProcessRegionStatusChange { +public class ProcessRegionClose extends ProcessRegionStatusChange { protected final boolean offlineRegion; protected final boolean reassignRegion; diff --git a/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java b/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java index fe86d20141a..400da1d7dc6 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import java.io.IOException; @@ -34,7 +35,7 @@ import java.io.IOException; * serving a region. This applies to all meta and user regions except the * root region which is handled specially. */ -class ProcessRegionOpen extends ProcessRegionStatusChange { +public class ProcessRegionOpen extends ProcessRegionStatusChange { protected final HServerInfo serverInfo; /** @@ -114,6 +115,8 @@ class ProcessRegionOpen extends ProcessRegionStatusChange { } else { master.getRegionManager().removeRegion(regionInfo); } + ZooKeeperWrapper zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName()); + zkWrapper.deleteUnassignedRegion(regionInfo.getEncodedName()); return true; } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java b/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java index 0073c7e993e..b55c4f5c65b 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java @@ -78,4 +78,8 @@ abstract class ProcessRegionStatusChange extends RegionServerOperation { } return this.metaRegion; } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java b/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java index 89fc17b399c..dd8815d6ad6 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java @@ -32,6 +32,9 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.executor.RegionTransitionEventData; +import org.apache.hadoop.hbase.executor.HBaseEventHandler; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -39,6 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.io.WritableUtils; import java.io.IOException; import java.util.ArrayList; @@ -94,6 +99,9 @@ public class RegionManager { */ final SortedMap regionsInTransition = Collections.synchronizedSortedMap(new TreeMap()); + + // regions in transition are also recorded in ZK using the zk wrapper + final ZooKeeperWrapper zkWrapper; // How many regions to assign a server at a time. private final int maxAssignInOneGo; @@ -124,10 +132,11 @@ public class RegionManager { private final int zooKeeperNumRetries; private final int zooKeeperPause; - RegionManager(HMaster master) { + RegionManager(HMaster master) throws IOException { Configuration conf = master.getConfiguration(); this.master = master; + this.zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName()); this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10); this.loadBalancer = new LoadBalancer(conf); @@ -165,10 +174,17 @@ public class RegionManager { unsetRootRegion(); if (!master.getShutdownRequested().get()) { synchronized (regionsInTransition) { - RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, - RegionState.State.UNASSIGNED); - regionsInTransition.put( - HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(), s); + String regionName = HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(); + byte[] data = null; + try { + data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); + } catch (IOException e) { + LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); + } + zkWrapper.createUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data); + LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); + RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, RegionState.State.UNASSIGNED); + regionsInTransition.put(regionName, s); LOG.info("ROOT inserted into regionsInTransition"); } } @@ -330,6 +346,14 @@ public class RegionManager { LOG.info("Assigning region " + regionName + " to " + sinfo.getServerName()); rs.setPendingOpen(sinfo.getServerName()); synchronized (this.regionsInTransition) { + byte[] data = null; + try { + data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); + } catch (IOException e) { + LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); + } + zkWrapper.createUnassignedRegion(rs.getRegionInfo().getEncodedName(), data); + LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); this.regionsInTransition.put(regionName, rs); } @@ -969,6 +993,16 @@ public class RegionManager { synchronized(this.regionsInTransition) { s = regionsInTransition.get(info.getRegionNameAsString()); if (s == null) { + byte[] data = null; + try { + data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); + } catch (IOException e) { + // TODO: Review what we should do here. If Writables work this + // should never happen + LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); + } + zkWrapper.createUnassignedRegion(info.getEncodedName(), data); + LOG.debug("Created UNASSIGNED zNode " + info.getRegionNameAsString() + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); s = new RegionState(info, RegionState.State.UNASSIGNED); regionsInTransition.put(info.getRegionNameAsString(), s); } @@ -1213,8 +1247,9 @@ public class RegionManager { */ public void setRootRegionLocation(HServerAddress address) { writeRootRegionLocationToZooKeeper(address); - synchronized (rootRegionLocation) { + // the root region has been assigned, remove it from transition in ZK + zkWrapper.deleteUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName()); rootRegionLocation.set(new HServerAddress(address)); rootRegionLocation.notifyAll(); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 819c9a7b49e..1d952585769 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -564,7 +564,7 @@ public class ServerManager { * @param region * @param returnMsgs */ - private void processRegionOpen(HServerInfo serverInfo, + public void processRegionOpen(HServerInfo serverInfo, HRegionInfo region, ArrayList returnMsgs) { boolean duplicateAssignment = false; synchronized (master.getRegionManager()) { @@ -633,7 +633,7 @@ public class ServerManager { * @param region * @throws Exception */ - private void processRegionClose(HRegionInfo region) { + public void processRegionClose(HRegionInfo region) { synchronized (this.master.getRegionManager()) { if (region.isRootRegion()) { // Root region diff --git a/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java b/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java index da78fe27e7a..7b008193f94 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java @@ -74,7 +74,7 @@ class ZKMasterAddressWatcher implements Watcher { } else if(type.equals(EventType.NodeCreated) && event.getPath().equals(this.zookeeper.clusterStateZNode)) { LOG.debug("Resetting watch on cluster state node."); - this.zookeeper.setClusterStateWatch(this); + this.zookeeper.setClusterStateWatch(); } } @@ -87,7 +87,7 @@ class ZKMasterAddressWatcher implements Watcher { try { LOG.debug("Waiting for master address ZNode to be deleted " + "(Also watching cluster state node)"); - this.zookeeper.setClusterStateWatch(this); + this.zookeeper.setClusterStateWatch(); wait(); } catch (InterruptedException e) { } @@ -110,7 +110,7 @@ class ZKMasterAddressWatcher implements Watcher { } if(this.zookeeper.writeMasterAddress(address)) { this.zookeeper.setClusterState(true); - this.zookeeper.setClusterStateWatch(this); + this.zookeeper.setClusterStateWatch(); // Watch our own node this.zookeeper.readMasterAddress(this); return true; diff --git a/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java b/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java new file mode 100644 index 00000000000..194ade5e460 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java @@ -0,0 +1,159 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; +import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler; +import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper.ZNodePathAndData; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; + +/** + * Watches the UNASSIGNED znode in ZK for the master, and handles all events + * relating to region transitions. + */ +public class ZKUnassignedWatcher implements Watcher { + private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class); + + // TODO: Start move this to HConstants + static final String ROOT_TABLE_NAME_STR = "-ROOT-"; + static final String META_TABLE_NAME_STR = ".META."; + // TODO: End move this to HConstants + + private ZooKeeperWrapper zkWrapper = null; + + public static void start() throws IOException { + new ZKUnassignedWatcher(); + LOG.debug("Started ZKUnassigned watcher"); + } + + public ZKUnassignedWatcher() throws IOException { + zkWrapper = ZooKeeperWrapper.getInstance(HMaster.class.getName()); + // If the UNASSIGNED ZNode does not exist, create it. + zkWrapper.createZNodeIfNotExists(zkWrapper.getRegionInTransitionZNode()); + // TODO: get the outstanding changes in UNASSIGNED + + // Set a watch on Zookeeper's UNASSIGNED node if it exists. + zkWrapper.registerListener(this); + } + + /** + * This is the processing loop that gets triggerred from the ZooKeeperWrapper. + * This zookeeper events process function dies the following: + * - WATCHES the following events: NodeCreated, NodeDataChanged, NodeChildrenChanged + * - IGNORES the following events: None, NodeDeleted + */ + @Override + public synchronized void process(WatchedEvent event) { + EventType type = event.getType(); + LOG.debug("ZK-EVENT-PROCESS: Got zkEvent " + type + + " state:" + event.getState() + + " path:" + event.getPath()); + + // Handle the ignored events + if(type.equals(EventType.None) || + type.equals(EventType.NodeDeleted)) { + return; + } + + // check if the path is for the UNASSIGNED directory we care about + if(event.getPath() == null || + !event.getPath().startsWith(zkWrapper.getZNodePathForHBase(zkWrapper.getRegionInTransitionZNode()))) { + return; + } + + try + { + /* + * If a node is created in the UNASSIGNED directory in zookeeper, then: + * 1. watch its updates (this is an unassigned region). + * 2. read to see what its state is and handle as needed (state may have + * changed before we started watching it) + */ + if(type.equals(EventType.NodeCreated)) { + zkWrapper.watchZNode(event.getPath()); + handleRegionStateInZK(event.getPath()); + } + /* + * Data on some node has changed. Read to see what the state is and handle + * as needed. + */ + else if(type.equals(EventType.NodeDataChanged)) { + handleRegionStateInZK(event.getPath()); + } + /* + * If there were some nodes created then watch those nodes + */ + else if(type.equals(EventType.NodeChildrenChanged)) { + List newZNodes = zkWrapper.watchAndGetNewChildren(event.getPath()); + for(ZNodePathAndData zNodePathAndData : newZNodes) { + LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath()); + handleRegionStateInZK(zNodePathAndData.getzNodePath(), zNodePathAndData.getData()); + } + } + } + catch (IOException e) + { + LOG.error("Could not process event from ZooKeeper", e); + } + } + + /** + * Read the state of a node in ZK, and do the needful. We want to do the + * following: + * 1. If region's state is updated as CLOSED, invoke the ClosedRegionHandler. + * 2. If region's state is updated as OPENED, invoke the OpenRegionHandler. + * @param zNodePath + * @throws IOException + */ + private void handleRegionStateInZK(String zNodePath) throws IOException { + byte[] data = zkWrapper.readZNode(zNodePath, null); + handleRegionStateInZK(zNodePath, data); + } + + private void handleRegionStateInZK(String zNodePath, byte[] data) { + // a null value is set when a node is created, we don't need to handle this + if(data == null) { + return; + } + String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode(); + String region = zNodePath.substring(zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1); + HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]); + + // if the node was CLOSED then handle it + if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) { + new MasterCloseRegionHandler(rsEvent, region, data).submit(); + } + // if the region was OPENED then handle that + else if(rsEvent == HBaseEventType.RS2ZK_REGION_OPENED || + rsEvent == HBaseEventType.RS2ZK_REGION_OPENING) { + new MasterOpenRegionHandler(rsEvent, region, data).submit(); + } + } +} + diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java new file mode 100644 index 00000000000..2e95d793414 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java @@ -0,0 +1,87 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.handler; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.executor.RegionTransitionEventData; +import org.apache.hadoop.hbase.executor.HBaseEventHandler; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.util.Writables; + +/** + * This is the event handler for all events relating to closing regions on the + * HMaster. The following event types map to this handler: + * - RS_REGION_CLOSING + * - RS_REGION_CLOSED + */ +public class MasterCloseRegionHandler extends HBaseEventHandler +{ + private static final Log LOG = LogFactory.getLog(MasterCloseRegionHandler.class); + + private String regionName; + protected byte[] serializedData; + RegionTransitionEventData hbEventData; + + public MasterCloseRegionHandler(HBaseEventType eventType, String regionName, byte[] serializedData) { + super(false, HMaster.MASTER, eventType); + this.regionName = regionName; + this.serializedData = serializedData; + } + + /** + * Handle the various events relating to closing regions. We can get the + * following events here: + * - RS_REGION_CLOSING : No-op + * - RS_REGION_CLOSED : The region is closed. If we are not in a shutdown + * state, find the RS to open this region. This could + * be a part of a region move, or just that the RS has + * died. Should result in a M_REQUEST_OPENREGION event + * getting created. + */ + @Override + public void process() + { + LOG.debug("Event = " + getHBEvent() + ", region = " + regionName); + // handle RS_REGION_CLOSED events + handleRegionClosedEvent(); + } + + private void handleRegionClosedEvent() { + try { + if(hbEventData == null) { + hbEventData = new RegionTransitionEventData(); + Writables.getWritable(serializedData, hbEventData); + } + } catch (IOException e) { + LOG.error("Could not deserialize additional args for Close region", e); + } + // process the region close - this will cause the reopening of the + // region as a part of the heartbeat of some RS + serverManager.processRegionClose(hbEventData.getHmsg().getRegionInfo()); + LOG.info("Processed close of region " + hbEventData.getHmsg().getRegionInfo().getRegionNameAsString()); + } + + public String getRegionName() { + return regionName; + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java new file mode 100644 index 00000000000..7a548016470 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java @@ -0,0 +1,105 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.handler; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.executor.RegionTransitionEventData; +import org.apache.hadoop.hbase.executor.HBaseEventHandler; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.util.Writables; + +/** + * This is the event handler for all events relating to opening regions on the + * HMaster. This could be one of the following: + * - notification that a region server is "OPENING" a region + * - notification that a region server has "OPENED" a region + * The following event types map to this handler: + * - RS_REGION_OPENING + * - RS_REGION_OPENED + */ +public class MasterOpenRegionHandler extends HBaseEventHandler { + private static final Log LOG = LogFactory.getLog(MasterOpenRegionHandler.class); + // other args passed in a byte array form + protected byte[] serializedData; + private String regionName; + private RegionTransitionEventData hbEventData; + + public MasterOpenRegionHandler(HBaseEventType eventType, String regionName, byte[] serData) { + super(false, HMaster.MASTER, eventType); + this.regionName = regionName; + this.serializedData = serData; + } + + /** + * Handle the various events relating to opening regions. We can get the + * following events here: + * - RS_REGION_OPENING : Keep track to see how long the region open takes. + * If the RS is taking too long, then revert the + * region back to closed state so that it can be + * re-assigned. + * - RS_REGION_OPENED : The region is opened. Add an entry into META for + * the RS having opened this region. Then delete this + * entry in ZK. + */ + @Override + public void process() + { + LOG.debug("Event = " + getHBEvent() + ", region = " + regionName); + if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENING) { + handleRegionOpeningEvent(); + } + else if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENED) { + handleRegionOpenedEvent(); + } + } + + private void handleRegionOpeningEvent() { + // TODO: not implemented. + LOG.debug("NO-OP call to handling region opening event"); + // Keep track to see how long the region open takes. If the RS is taking too + // long, then revert the region back to closed state so that it can be + // re-assigned. + } + + private void handleRegionOpenedEvent() { + try { + if(hbEventData == null) { + hbEventData = new RegionTransitionEventData(); + Writables.getWritable(serializedData, hbEventData); + } + } catch (IOException e) { + LOG.error("Could not deserialize additional args for Open region", e); + } + LOG.debug("RS " + hbEventData.getRsName() + " has opened region " + regionName); + HServerInfo serverInfo = serverManager.getServerInfo(hbEventData.getRsName()); + ArrayList returnMsgs = new ArrayList(); + serverManager.processRegionOpen(serverInfo, hbEventData.getHmsg().getRegionInfo(), returnMsgs); + if(returnMsgs.size() > 0) { + LOG.error("Open region tried to send message: " + returnMsgs.get(0).getType() + + " about " + returnMsgs.get(0).getRegionInfo().getRegionNameAsString()); + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 541ec9be2c9..efdc0d70622 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -319,7 +319,8 @@ public class HRegionServer implements HRegionInterface, } private void reinitializeZooKeeper() throws IOException { - zooKeeperWrapper = new ZooKeeperWrapper(conf, this); + zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, serverInfo.getServerName()); + zooKeeperWrapper.registerListener(this); watchMasterAddress(); } @@ -1217,14 +1218,7 @@ public class HRegionServer implements HRegionInterface, if (LOG.isDebugEnabled()) LOG.debug("sending initial server load: " + hsl); lastMsg = System.currentTimeMillis(); - boolean startCodeOk = false; - while(!startCodeOk) { - this.serverInfo = createServerInfoWithNewStartCode(this.serverInfo); - startCodeOk = zooKeeperWrapper.writeRSLocation(this.serverInfo); - if(!startCodeOk) { - LOG.debug("Start code already taken, trying another one"); - } - } + zooKeeperWrapper.writeRSLocation(this.serverInfo); result = this.hbaseMaster.regionServerStartup(this.serverInfo); break; } catch (IOException e) { @@ -1419,8 +1413,11 @@ public class HRegionServer implements HRegionInterface, void openRegion(final HRegionInfo regionInfo) { Integer mapKey = Bytes.mapKey(regionInfo.getRegionName()); HRegion region = this.onlineRegions.get(mapKey); + RSZookeeperUpdater zkUpdater = + new RSZookeeperUpdater(serverInfo.getServerName(), regionInfo.getEncodedName()); if (region == null) { try { + zkUpdater.startRegionOpenEvent(null, true); region = instantiateRegion(regionInfo); // Startup a compaction early if one is needed, if region has references // or if a store has too many store files @@ -1435,7 +1432,15 @@ public class HRegionServer implements HRegionInterface, // TODO: add an extra field in HRegionInfo to indicate that there is // an error. We can't do that now because that would be an incompatible // change that would require a migration - reportClose(regionInfo, StringUtils.stringifyException(t).getBytes()); + try { + HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, + regionInfo, + StringUtils.stringifyException(t).getBytes()); + zkUpdater.abortOpenRegion(hmsg); + } catch (IOException e1) { + // TODO: Can we recover? Should be throw RTE? + LOG.error("Failed to abort open region " + regionInfo.getRegionNameAsString(), e1); + } return; } this.lock.writeLock().lock(); @@ -1446,7 +1451,12 @@ public class HRegionServer implements HRegionInterface, this.lock.writeLock().unlock(); } } - reportOpen(regionInfo); + try { + HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo); + zkUpdater.finishRegionOpenEvent(hmsg); + } catch (IOException e) { + LOG.error("Failed to mark region " + regionInfo.getRegionNameAsString() + " as opened", e); + } } protected HRegion instantiateRegion(final HRegionInfo regionInfo) @@ -1475,11 +1485,19 @@ public class HRegionServer implements HRegionInterface, protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) throws IOException { + RSZookeeperUpdater zkUpdater = null; + if(reportWhenCompleted) { + zkUpdater = new RSZookeeperUpdater(serverInfo.getServerName(), hri.getEncodedName()); + zkUpdater.startRegionCloseEvent(null, false); + } HRegion region = this.removeFromOnlineRegions(hri); if (region != null) { region.close(); if(reportWhenCompleted) { - reportClose(hri); + if(zkUpdater != null) { + HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, hri, null); + zkUpdater.finishRegionCloseEvent(hmsg); + } } } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java new file mode 100644 index 00000000000..7f428e88e15 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java @@ -0,0 +1,160 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.executor.RegionTransitionEventData; +import org.apache.hadoop.hbase.executor.HBaseEventHandler; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; + +/** + * This is a helper class for region servers to update various states in + * Zookeeper. The various updates are abstracted out here. + * + * The "startRegionXXX" methods are to be called first, followed by the + * "finishRegionXXX" methods. Supports updating zookeeper periodically as a + * part of the "startRegionXXX". Currently handles the following state updates: + * - Close region + * - Open region + */ +// TODO: make this thread local, in which case it is re-usable per thread +public class RSZookeeperUpdater { + private static final Log LOG = LogFactory.getLog(RSZookeeperUpdater.class); + private final String regionServerName; + private String regionName = null; + private String regionZNode = null; + private ZooKeeperWrapper zkWrapper = null; + private int zkVersion = 0; + HBaseEventType lastUpdatedState; + + public RSZookeeperUpdater(String regionServerName, String regionName) { + this(regionServerName, regionName, 0); + } + + public RSZookeeperUpdater(String regionServerName, String regionName, int zkVersion) { + this.zkWrapper = ZooKeeperWrapper.getInstance(regionServerName); + this.regionServerName = regionServerName; + this.regionName = regionName; + // get the region ZNode we have to create + this.regionZNode = zkWrapper.getZNode(zkWrapper.getRegionInTransitionZNode(), regionName); + this.zkVersion = zkVersion; + } + + /** + * This method updates the various states in ZK to inform the master that the + * region server has started closing the region. + * @param updatePeriodically - if true, periodically updates the state in ZK + */ + public void startRegionCloseEvent(HMsg hmsg, boolean updatePeriodically) throws IOException { + // if this ZNode already exists, something is wrong + if(zkWrapper.exists(regionZNode, true)) { + String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region."; + LOG.error(msg); + throw new IOException(msg); + } + + // create the region node in the unassigned directory first + zkWrapper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true); + + // update the data for "regionName" ZNode in unassigned to CLOSING + updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg); + + // TODO: implement the updatePeriodically logic here + } + + /** + * This method updates the states in ZK to signal that the region has been + * closed. This will stop the periodic updater thread if one was started. + * @throws IOException + */ + public void finishRegionCloseEvent(HMsg hmsg) throws IOException { + // TODO: stop the updatePeriodically here + + // update the data for "regionName" ZNode in unassigned to CLOSED + updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg); + } + + /** + * This method updates the various states in ZK to inform the master that the + * region server has started opening the region. + * @param updatePeriodically - if true, periodically updates the state in ZK + */ + public void startRegionOpenEvent(HMsg hmsg, boolean updatePeriodically) throws IOException { + Stat stat = new Stat(); + byte[] data = zkWrapper.readZNode(regionZNode, stat); + // if there is no ZNode for this region, something is wrong + if(data == null) { + String msg = "ZNode " + regionZNode + " does not exist in ZooKeeper, will NOT open region."; + LOG.error(msg); + throw new IOException(msg); + } + // if the ZNode is not in the closed state, something is wrong + HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]); + if(rsEvent != HBaseEventType.RS2ZK_REGION_CLOSED && rsEvent != HBaseEventType.M2ZK_REGION_OFFLINE) { + String msg = "ZNode " + regionZNode + " is not in CLOSED/OFFLINE state (state = " + rsEvent + "), will NOT open region."; + LOG.error(msg); + throw new IOException(msg); + } + + // get the version to update from ZK + zkVersion = stat.getVersion(); + + // update the data for "regionName" ZNode in unassigned to CLOSING + updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENING, hmsg); + + // TODO: implement the updatePeriodically logic here + } + + /** + * This method updates the states in ZK to signal that the region has been + * opened. This will stop the periodic updater thread if one was started. + * @throws IOException + */ + public void finishRegionOpenEvent(HMsg hmsg) throws IOException { + // TODO: stop the updatePeriodically here + + // update the data for "regionName" ZNode in unassigned to CLOSED + updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENED, hmsg); + } + + public boolean isClosingRegion() { + return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_CLOSING); + } + + public boolean isOpeningRegion() { + return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_OPENING); + } + + public void abortOpenRegion(HMsg hmsg) throws IOException { + LOG.error("Aborting open of region " + regionName); + + // TODO: stop the updatePeriodically for start open region here + + // update the data for "regionName" ZNode in unassigned to CLOSED + updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg); + } + + private void updateZKWithEventData(HBaseEventType hbEventType, HMsg hmsg) throws IOException { + // update the data for "regionName" ZNode in unassigned to "hbEventType" + byte[] data = null; + try { + data = Writables.getBytes(new RegionTransitionEventData(hbEventType, regionServerName, hmsg)); + } catch (IOException e) { + LOG.error("Error creating event data for " + hbEventType, e); + } + LOG.debug("Updating ZNode " + regionZNode + + " with [" + hbEventType + "]" + + " expected version = " + zkVersion); + lastUpdatedState = hbEventType; + zkWrapper.writeZNode(regionZNode, data, zkVersion, true); + zkVersion++; + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java index 834dc108be6..71f1ab583b8 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java @@ -220,6 +220,68 @@ public class HQuorumPeer { return zkProperties; } + + /** + * Return the ZK Quorum servers string given zk properties returned by + * makeZKProps + * @param properties + * @return + */ + public static String getZKQuorumServersString(Properties properties) { + String clientPort = null; + List servers = new ArrayList(); + + // The clientPort option may come after the server.X hosts, so we need to + // grab everything and then create the final host:port comma separated list. + boolean anyValid = false; + for (Entry property : properties.entrySet()) { + String key = property.getKey().toString().trim(); + String value = property.getValue().toString().trim(); + if (key.equals("clientPort")) { + clientPort = value; + } + else if (key.startsWith("server.")) { + String host = value.substring(0, value.indexOf(':')); + servers.add(host); + try { + //noinspection ResultOfMethodCallIgnored + InetAddress.getByName(host); + anyValid = true; + } catch (UnknownHostException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + } + + if (!anyValid) { + LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (clientPort == null) { + LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (servers.isEmpty()) { + LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " + + "ZooKeeper cluster configured for its operation."); + return null; + } + + StringBuilder hostPortBuilder = new StringBuilder(); + for (int i = 0; i < servers.size(); ++i) { + String host = servers.get(i); + if (i > 0) { + hostPortBuilder.append(','); + } + hostPortBuilder.append(host); + hostPortBuilder.append(':'); + hostPortBuilder.append(clientPort); + } + + return hostPortBuilder.toString(); + } /** * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in. diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java index 15673fd734b..f46b55c396c 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java @@ -1,5 +1,5 @@ /** - * Copyright 2010 The Apache Software Foundation + * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,152 +19,225 @@ */ package org.apache.hadoop.hbase.zookeeper; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.net.InetAddress; -import java.net.Socket; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.Properties; - /** * Wraps a ZooKeeper instance and adds HBase specific functionality. * * This class provides methods to: * - read/write/delete the root region location in ZooKeeper. * - set/check out of safe mode flag. + * + * ------------------------------------------ + * The following STATIC ZNodes are created: + * ------------------------------------------ + * - parentZNode : All the HBase directories are hosted under this parent + * node, default = "/hbase" + * - rsZNode : This is the directory where the RS's create ephemeral + * nodes. The master watches these nodes, and their expiry + * indicates RS death. The default location is "/hbase/rs" + * + * ------------------------------------------ + * The following DYNAMIC ZNodes are created: + * ------------------------------------------ + * - rootRegionZNode : Specifies the RS hosting root. + * - masterElectionZNode : ZNode used for election of the primary master when + * there are secondaries. All the masters race to write + * their addresses into this location, the one that + * succeeds is the primary. Others block. + * - clusterStateZNode : Determines if the cluster is running. Its default + * location is "/hbase/shutdown". It always has a value + * of "up". If present with the valus, cluster is up + * and running. If deleted, the cluster is shutting + * down. + * - rgnsInTransitZNode : All the nodes under this node are names of regions + * in transition. The first byte of the data for each + * of these nodes is the event type. This is used to + * deserialize the rest of the data. */ -public class ZooKeeperWrapper { +public class ZooKeeperWrapper implements Watcher { protected static final Log LOG = LogFactory.getLog(ZooKeeperWrapper.class); + // instances of the watcher + private static Map INSTANCES = + new HashMap(); + // lock for ensuring a singleton per instance type + private static Lock createLock = new ReentrantLock(); + // name of this instance + private String instanceName; + // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved. private static final char ZNODE_PATH_SEPARATOR = '/'; private String quorumServers = null; + private final int sessionTimeout; + private ZooKeeper zooKeeper; - private final ZooKeeper zooKeeper; - - private final String parentZNode; + /* + * All the HBase directories are hosted under this parent + */ + public final String parentZNode; + /* + * Specifies the RS hosting root + */ private final String rootRegionZNode; + /* + * This is the directory where the RS's create ephemeral nodes. The master + * watches these nodes, and their expiry indicates RS death. + */ private final String rsZNode; + /* + * ZNode used for election of the primary master when there are secondaries. + */ private final String masterElectionZNode; + /* + * State of the cluster - if up and running or shutting down + */ public final String clusterStateZNode; + /* + * Regions that are in transition + */ + private final String rgnsInTransitZNode; + /* + * List of ZNodes in the unassgined region that are already being watched + */ + private Set unassignedZNodesWatched = new HashSet(); + + private List listeners = Collections.synchronizedList(new ArrayList()); + + // return the singleton given the name of the instance + public static ZooKeeperWrapper getInstance(String name) { + return INSTANCES.get(name); + } + // creates only one instance + public static ZooKeeperWrapper createInstance(Configuration conf, String name) { + if (getInstance(name) != null) { + return getInstance(name); + } + ZooKeeperWrapper.createLock.lock(); + try { + if (getInstance(name) == null) { + try { + ZooKeeperWrapper instance = new ZooKeeperWrapper(conf, name); + INSTANCES.put(name, instance); + } + catch (Exception e) { + LOG.error("<" + name + ">" + "Error creating a ZooKeeperWrapper " + e); + } + } + } + finally { + createLock.unlock(); + } + return getInstance(name); + } /** - * Create a ZooKeeperWrapper. - * @param conf Configuration to read settings from. - * @param watcher ZooKeeper watcher to register. + * Create a ZooKeeperWrapper. The Zookeeper wrapper listens to all messages + * from Zookeeper, and notifies all the listeners about all the messages. Any + * component can subscribe to these messages by adding itself as a listener, + * and remove itself from being a listener. + * + * @param conf HBaseConfiguration to read settings from. * @throws IOException If a connection error occurs. */ - public ZooKeeperWrapper(Configuration conf, Watcher watcher) + private ZooKeeperWrapper(Configuration conf, String instanceName) throws IOException { + this.instanceName = instanceName; Properties properties = HQuorumPeer.makeZKProps(conf); - setQuorumServers(properties); + quorumServers = HQuorumPeer.getZKQuorumServersString(properties); if (quorumServers == null) { throw new IOException("Could not read quorum servers from " + HConstants.ZOOKEEPER_CONFIG_NAME); } + sessionTimeout = conf.getInt("zookeeper.session.timeout", 60 * 1000); + reconnectToZk(); + + parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); - int sessionTimeout = conf.getInt("zookeeper.session.timeout", 60 * 1000); - try { - zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, watcher); - } catch (IOException e) { - LOG.error("Failed to create ZooKeeper object: " + e); - throw new IOException(e); - } + String rootServerZNodeName = conf.get("zookeeper.znode.rootserver", "root-region-server"); + String rsZNodeName = conf.get("zookeeper.znode.rs", "rs"); + String masterAddressZNodeName = conf.get("zookeeper.znode.master", "master"); + String stateZNodeName = conf.get("zookeeper.znode.state", "shutdown"); + String regionsInTransitZNodeName = conf.get("zookeeper.znode.regionInTransition", "UNASSIGNED"); - parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); - - String rootServerZNodeName = conf.get("zookeeper.znode.rootserver", - "root-region-server"); - String rsZNodeName = conf.get("zookeeper.znode.rs", "rs"); - String masterAddressZNodeName = conf.get("zookeeper.znode.master", - "master"); - String stateZNodeName = conf.get("zookeeper.znode.state", - "shutdown"); - - rootRegionZNode = getZNode(parentZNode, rootServerZNodeName); - rsZNode = getZNode(parentZNode, rsZNodeName); + rootRegionZNode = getZNode(parentZNode, rootServerZNodeName); + rsZNode = getZNode(parentZNode, rsZNodeName); + rgnsInTransitZNode = getZNode(parentZNode, regionsInTransitZNodeName); masterElectionZNode = getZNode(parentZNode, masterAddressZNodeName); - clusterStateZNode = getZNode(parentZNode, stateZNodeName); + clusterStateZNode = getZNode(parentZNode, stateZNodeName); + } + + public void reconnectToZk() throws IOException { + try { + LOG.info("Reconnecting to zookeeper"); + if(zooKeeper != null) { + zooKeeper.close(); + LOG.debug("<" + instanceName + ">" + "Closed existing zookeeper client"); + } + zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, this); + LOG.debug("<" + instanceName + ">" + "Connected to zookeeper again"); + } catch (IOException e) { + LOG.error("<" + instanceName + ">" + "Failed to create ZooKeeper object: " + e); + throw new IOException(e); + } catch (InterruptedException e) { + LOG.error("<" + instanceName + ">" + "Error closing ZK connection: " + e); + throw new IOException(e); + } } - private void setQuorumServers(Properties properties) { - String clientPort = null; - List servers = new ArrayList(); + public void registerListener(Watcher watcher) { + listeners.add(watcher); + } - // The clientPort option may come after the server.X hosts, so we need to - // grab everything and then create the final host:port comma separated list. - boolean anyValid = false; - for (Entry property : properties.entrySet()) { - String key = property.getKey().toString().trim(); - String value = property.getValue().toString().trim(); - if (key.equals("clientPort")) { - clientPort = value; - } - else if (key.startsWith("server.")) { - String host = value.substring(0, value.indexOf(':')); - servers.add(host); - try { - //noinspection ResultOfMethodCallIgnored - InetAddress.getByName(host); - anyValid = true; - } catch (UnknownHostException e) { - LOG.warn(StringUtils.stringifyException(e)); - } + public void unregisterListener(Watcher watcher) { + listeners.remove(watcher); + } + + /** + * This is the primary ZK watcher + * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent) + */ + @Override + public synchronized void process(WatchedEvent event) { + for(Watcher w : listeners) { + try { + w.process(event); + } catch (Throwable t) { + LOG.error("<"+instanceName+">" + "ZK updates listener threw an exception in process()", t); } } - - if (!anyValid) { - LOG.error("no valid quorum servers found in " - + HConstants.ZOOKEEPER_CONFIG_NAME); - return; - } - - if (clientPort == null) { - LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME); - return; - } - - if (servers.isEmpty()) { - LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " + - "ZooKeeper cluster configured for its operation."); - return; - } - - StringBuilder hostPortBuilder = new StringBuilder(); - for (int i = 0; i < servers.size(); ++i) { - String host = servers.get(i); - if (i > 0) { - hostPortBuilder.append(','); - } - hostPortBuilder.append(host); - hostPortBuilder.append(':'); - hostPortBuilder.append(clientPort); - } - - quorumServers = hostPortBuilder.toString(); } /** @return String dump of everything in ZooKeeper. */ @@ -172,7 +245,7 @@ public class ZooKeeperWrapper { public String dump() { StringBuilder sb = new StringBuilder(); sb.append("\nHBase tree in ZooKeeper is rooted at ").append(parentZNode); - sb.append("\n Cluster up? ").append(exists(clusterStateZNode)); + sb.append("\n Cluster up? ").append(exists(clusterStateZNode, true)); sb.append("\n Master address: ").append(readMasterAddress(null)); sb.append("\n Region server holding ROOT: ").append(readRootRegionLocation()); sb.append("\n Region servers:"); @@ -236,9 +309,25 @@ public class ZooKeeperWrapper { return res.toArray(new String[res.size()]); } - private boolean exists(String znode) { + public boolean exists(String znode, boolean watch) { try { - return zooKeeper.exists(znode, null) != null; + return zooKeeper.exists(getZNode(parentZNode, znode), watch?this:null) != null; + } catch (KeeperException.SessionExpiredException e) { + // if the session has expired try to reconnect to ZK, then perform query + try { + // TODO: ZK-REFACTOR: We should not reconnect - we should just quit and restart. + reconnectToZk(); + return zooKeeper.exists(getZNode(parentZNode, znode), watch?this:null) != null; + } catch (IOException e1) { + LOG.error("Error reconnecting to zookeeper", e1); + throw new RuntimeException("Error reconnecting to zookeeper", e1); + } catch (KeeperException e1) { + LOG.error("Error reading after reconnecting to zookeeper", e1); + throw new RuntimeException("Error reading after reconnecting to zookeeper", e1); + } catch (InterruptedException e1) { + LOG.error("Error reading after reconnecting to zookeeper", e1); + throw new RuntimeException("Error reading after reconnecting to zookeeper", e1); + } } catch (KeeperException e) { return false; } catch (InterruptedException e) { @@ -311,13 +400,13 @@ public class ZooKeeperWrapper { * Watch the state of the cluster, up or down * @param watcher Watcher to set on cluster state node */ - public void setClusterStateWatch(Watcher watcher) { + public void setClusterStateWatch() { try { - zooKeeper.exists(clusterStateZNode, watcher); + zooKeeper.exists(clusterStateZNode, this); } catch (InterruptedException e) { - LOG.warn("Failed to check on ZNode " + clusterStateZNode, e); + LOG.warn("<" + instanceName + ">" + "Failed to check on ZNode " + clusterStateZNode, e); } catch (KeeperException e) { - LOG.warn("Failed to check on ZNode " + clusterStateZNode, e); + LOG.warn("<" + instanceName + ">" + "Failed to check on ZNode " + clusterStateZNode, e); } } @@ -335,19 +424,19 @@ public class ZooKeeperWrapper { byte[] data = Bytes.toBytes("up"); zooKeeper.create(clusterStateZNode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - LOG.debug("State node wrote in ZooKeeper"); + LOG.debug("<" + instanceName + ">" + "State node wrote in ZooKeeper"); } else { zooKeeper.delete(clusterStateZNode, -1); - LOG.debug("State node deleted in ZooKeeper"); + LOG.debug("<" + instanceName + ">" + "State node deleted in ZooKeeper"); } return true; } catch (InterruptedException e) { - LOG.warn("Failed to set state node in ZooKeeper", e); + LOG.warn("<" + instanceName + ">" + "Failed to set state node in ZooKeeper", e); } catch (KeeperException e) { if(e.code() == KeeperException.Code.NODEEXISTS) { - LOG.debug("State node exists."); + LOG.debug("<" + instanceName + ">" + "State node exists."); } else { - LOG.warn("Failed to set state node in ZooKeeper", e); + LOG.warn("<" + instanceName + ">" + "Failed to set state node in ZooKeeper", e); } } @@ -364,13 +453,13 @@ public class ZooKeeperWrapper { try { zooKeeper.exists(masterElectionZNode, watcher); } catch (KeeperException e) { - LOG.warn("Failed to set watcher on ZNode " + masterElectionZNode, e); + LOG.warn("<" + instanceName + ">" + "Failed to set watcher on ZNode " + masterElectionZNode, e); return false; } catch (InterruptedException e) { - LOG.warn("Failed to set watcher on ZNode " + masterElectionZNode, e); + LOG.warn("<" + instanceName + ">" + "Failed to set watcher on ZNode " + masterElectionZNode, e); return false; } - LOG.debug("Set watcher on master address ZNode " + masterElectionZNode); + LOG.debug("<" + instanceName + ">" + "Set watcher on master address ZNode " + masterElectionZNode); return true; } @@ -378,7 +467,7 @@ public class ZooKeeperWrapper { try { return readAddressOrThrow(znode, watcher); } catch (IOException e) { - LOG.debug("Failed to read " + e.getMessage()); + LOG.debug("<" + instanceName + ">" + "Failed to read " + e.getMessage()); return null; } } @@ -394,7 +483,7 @@ public class ZooKeeperWrapper { } String addressString = Bytes.toString(data); - LOG.debug("Read ZNode " + znode + " got " + addressString); + LOG.debug("<" + instanceName + ">" + "Read ZNode " + znode + " got " + addressString); return new HServerAddress(addressString); } @@ -411,17 +500,17 @@ public class ZooKeeperWrapper { } zooKeeper.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - LOG.debug("Created ZNode " + znode); + LOG.debug("<" + instanceName + ">" + "Created ZNode " + znode); return true; } catch (KeeperException.NodeExistsException e) { return true; // ok, move on. } catch (KeeperException.NoNodeException e) { return ensureParentExists(znode) && ensureExists(znode); } catch (KeeperException e) { - LOG.warn("Failed to create " + znode + + LOG.warn("<" + instanceName + ">" + "Failed to create " + znode + " -- check quorum servers, currently=" + this.quorumServers, e); } catch (InterruptedException e) { - LOG.warn("Failed to create " + znode + + LOG.warn("<" + instanceName + ">" + "Failed to create " + znode + " -- check quorum servers, currently=" + this.quorumServers, e); } return false; @@ -450,9 +539,9 @@ public class ZooKeeperWrapper { } catch (KeeperException.NoNodeException e) { return true; // ok, move on. } catch (KeeperException e) { - LOG.warn("Failed to delete " + rootRegionZNode + ": " + e); + LOG.warn("<" + instanceName + ">" + "Failed to delete " + rootRegionZNode + ": " + e); } catch (InterruptedException e) { - LOG.warn("Failed to delete " + rootRegionZNode + ": " + e); + LOG.warn("<" + instanceName + ">" + "Failed to delete " + rootRegionZNode + ": " + e); } return false; @@ -479,18 +568,18 @@ public class ZooKeeperWrapper { public void deleteZNode(String znode, boolean recursive) throws KeeperException, InterruptedException { if (recursive) { - LOG.info("deleteZNode get children for " + znode); + LOG.info("<" + instanceName + ">" + "deleteZNode get children for " + znode); List znodes = this.zooKeeper.getChildren(znode, false); if (znodes.size() > 0) { for (String child : znodes) { String childFullPath = getZNode(znode, child); - LOG.info("deleteZNode recursive call " + childFullPath); + LOG.info("<" + instanceName + ">" + "deleteZNode recursive call " + childFullPath); this.deleteZNode(childFullPath, true); } } } this.zooKeeper.delete(znode, -1); - LOG.debug("Deleted ZNode " + znode); + LOG.debug("<" + instanceName + ">" + "Deleted ZNode " + znode); } private boolean createRootRegionLocation(String address) { @@ -498,12 +587,12 @@ public class ZooKeeperWrapper { try { zooKeeper.create(rootRegionZNode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - LOG.debug("Created ZNode " + rootRegionZNode + " with data " + address); + LOG.debug("<" + instanceName + ">" + "Created ZNode " + rootRegionZNode + " with data " + address); return true; } catch (KeeperException e) { - LOG.warn("Failed to create root region in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to create root region in ZooKeeper: " + e); } catch (InterruptedException e) { - LOG.warn("Failed to create root region in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to create root region in ZooKeeper: " + e); } return false; @@ -513,12 +602,12 @@ public class ZooKeeperWrapper { byte[] data = Bytes.toBytes(address); try { zooKeeper.setData(rootRegionZNode, data, -1); - LOG.debug("SetData of ZNode " + rootRegionZNode + " with " + address); + LOG.debug("<" + instanceName + ">" + "SetData of ZNode " + rootRegionZNode + " with " + address); return true; } catch (KeeperException e) { - LOG.warn("Failed to set root region location in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to set root region location in ZooKeeper: " + e); } catch (InterruptedException e) { - LOG.warn("Failed to set root region location in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to set root region location in ZooKeeper: " + e); } return false; @@ -554,20 +643,22 @@ public class ZooKeeperWrapper { * @return true if operation succeeded, false otherwise. */ public boolean writeMasterAddress(final HServerAddress address) { + LOG.debug("<" + instanceName + ">" + "Writing master address " + address.toString() + " to znode " + masterElectionZNode); if (!ensureParentExists(masterElectionZNode)) { return false; } + LOG.debug("<" + instanceName + ">" + "Znode exists : " + masterElectionZNode); String addressStr = address.toString(); byte[] data = Bytes.toBytes(addressStr); try { zooKeeper.create(masterElectionZNode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - LOG.debug("Wrote master address " + address + " to ZooKeeper"); + LOG.debug("<" + instanceName + ">" + "Wrote master address " + address + " to ZooKeeper"); return true; } catch (InterruptedException e) { - LOG.warn("Failed to write master address " + address + " to ZooKeeper", e); + LOG.warn("<" + instanceName + ">" + "Failed to write master address " + address + " to ZooKeeper", e); } catch (KeeperException e) { - LOG.warn("Failed to write master address " + address + " to ZooKeeper", e); + LOG.warn("<" + instanceName + ">" + "Failed to write master address " + address + " to ZooKeeper", e); } return false; @@ -582,16 +673,16 @@ public class ZooKeeperWrapper { public boolean writeRSLocation(HServerInfo info) { ensureExists(rsZNode); byte[] data = Bytes.toBytes(info.getServerAddress().toString()); - String znode = joinPath(rsZNode, Long.toString(info.getStartCode())); + String znode = joinPath(rsZNode, info.getServerName()); try { zooKeeper.create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - LOG.debug("Created ZNode " + znode + LOG.debug("<" + instanceName + ">" + "Created ZNode " + znode + " with data " + info.getServerAddress().toString()); return true; } catch (KeeperException e) { - LOG.warn("Failed to create " + znode + " znode in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to create " + znode + " znode in ZooKeeper: " + e); } catch (InterruptedException e) { - LOG.warn("Failed to create " + znode + " znode in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to create " + znode + " znode in ZooKeeper: " + e); } return false; } @@ -604,17 +695,17 @@ public class ZooKeeperWrapper { */ public boolean updateRSLocationGetWatch(HServerInfo info, Watcher watcher) { byte[] data = Bytes.toBytes(info.getServerAddress().toString()); - String znode = rsZNode + ZNODE_PATH_SEPARATOR + info.getStartCode(); + String znode = rsZNode + ZNODE_PATH_SEPARATOR + info.getServerName(); try { zooKeeper.setData(znode, data, -1); - LOG.debug("Updated ZNode " + znode + LOG.debug("<" + instanceName + ">" + "Updated ZNode " + znode + " with data " + info.getServerAddress().toString()); zooKeeper.getData(znode, watcher, null); return true; } catch (KeeperException e) { - LOG.warn("Failed to update " + znode + " znode in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e); } catch (InterruptedException e) { - LOG.warn("Failed to update " + znode + " znode in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e); } return false; @@ -645,13 +736,13 @@ public class ZooKeeperWrapper { try { List nodes = zooKeeper.getChildren(rsZNode, false); for (String node : nodes) { - LOG.debug("Deleting node: " + node); + LOG.debug("<" + instanceName + ">" + "Deleting node: " + node); zooKeeper.delete(joinPath(this.rsZNode, node), -1); } } catch (KeeperException e) { - LOG.warn("Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e); } catch (InterruptedException e) { - LOG.warn("Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e); } } @@ -660,9 +751,9 @@ public class ZooKeeperWrapper { try { stat = zooKeeper.exists(path, false); } catch (KeeperException e) { - LOG.warn("checking existence of " + path, e); + LOG.warn("<" + instanceName + ">" + "checking existence of " + path, e); } catch (InterruptedException e) { - LOG.warn("checking existence of " + path, e); + LOG.warn("<" + instanceName + ">" + "checking existence of " + path, e); } return stat != null; @@ -674,9 +765,10 @@ public class ZooKeeperWrapper { public void close() { try { zooKeeper.close(); - LOG.debug("Closed connection with ZooKeeper; " + this.rootRegionZNode); + INSTANCES.remove(instanceName); + LOG.debug("<" + instanceName + ">" + "Closed connection with ZooKeeper; " + this.rootRegionZNode); } catch (InterruptedException e) { - LOG.warn("Failed to close connection with ZooKeeper"); + LOG.warn("<" + instanceName + ">" + "Failed to close connection with ZooKeeper"); } } @@ -685,6 +777,10 @@ public class ZooKeeperWrapper { znodeName : joinPath(parentZNode, znodeName); } + public String getZNodePathForHBase(String znodeName) { + return getZNode(parentZNode, znodeName); + } + private String joinPath(String parent, String child) { return parent + ZNODE_PATH_SEPARATOR + child; } @@ -714,7 +810,7 @@ public class ZooKeeperWrapper { public List scanAddressDirectory(String znode, Watcher watcher) { List list = new ArrayList(); - List nodes = this.listZnodes(znode, watcher); + List nodes = this.listZnodes(znode); if(nodes == null) { return list; } @@ -731,44 +827,40 @@ public class ZooKeeperWrapper { * @param watcher watch to set, can be null * @return a list of all the znodes */ - public List listZnodes(String znode, Watcher watcher) { + public List listZnodes(String znode) { List nodes = null; try { if (checkExistenceOf(znode)) { - if (watcher == null) { - nodes = zooKeeper.getChildren(znode, false); - } else { - nodes = zooKeeper.getChildren(znode, watcher); - for (String node : nodes) { - getDataAndWatch(znode, node, watcher); - } + nodes = zooKeeper.getChildren(znode, this); + for (String node : nodes) { + getDataAndWatch(znode, node, this); } - } } catch (KeeperException e) { - LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e); } catch (InterruptedException e) { - LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e); } return nodes; } - public String getData(String parentZNode, String znode) { + public byte[] getData(String parentZNode, String znode) { return getDataAndWatch(parentZNode, znode, null); } - public String getDataAndWatch(String parentZNode, + public byte[] getDataAndWatch(String parentZNode, String znode, Watcher watcher) { - String data = null; + byte[] data = null; try { String path = joinPath(parentZNode, znode); + // TODO: ZK-REFACTOR: remove existance check? if (checkExistenceOf(path)) { - data = Bytes.toString(zooKeeper.getData(path, watcher, null)); + data = zooKeeper.getData(path, watcher, null); } } catch (KeeperException e) { - LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e); } catch (InterruptedException e) { - LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e); + LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e); } return data; } @@ -801,17 +893,17 @@ public class ZooKeeperWrapper { boolean failOnWrite) throws InterruptedException, KeeperException { String path = joinPath(parentPath, child); if (!ensureExists(parentPath)) { - LOG.error("unable to ensure parent exists: " + parentPath); + LOG.error("<" + instanceName + ">" + "unable to ensure parent exists: " + parentPath); } byte[] data = Bytes.toBytes(strData); Stat stat = this.zooKeeper.exists(path, false); if (failOnWrite || stat == null) { this.zooKeeper.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - LOG.debug("Created " + path); + LOG.debug("<" + instanceName + ">" + "Created " + path); } else { this.zooKeeper.setData(path, data, -1); - LOG.debug("Updated " + path); + LOG.debug("<" + instanceName + ">" + "Updated " + path); } } @@ -820,6 +912,14 @@ public class ZooKeeperWrapper { + ":" + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); } + /** + * Get the znode that has all the regions in transition. + * @return path to znode + */ + public String getRegionInTransitionZNode() { + return this.rgnsInTransitZNode; + } + /** * Get the path of this region server's znode * @return path to znode @@ -828,4 +928,215 @@ public class ZooKeeperWrapper { return this.rsZNode; } + public void deleteZNode(String zNodeName, int version) { + String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName); + try + { + zooKeeper.delete(fullyQualifiedZNodeName, version); + } + catch (InterruptedException e) + { + LOG.warn("<" + instanceName + ">" + "Failed to delete ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e); + } + catch (KeeperException e) + { + LOG.warn("<" + instanceName + ">" + "Failed to delete ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e); + } + } + + public String createZNodeIfNotExists(String zNodeName) { + return createZNodeIfNotExists(zNodeName, null, CreateMode.PERSISTENT, true); + } + + public void watchZNode(String zNodeName) { + String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName); + + try { + zooKeeper.exists(fullyQualifiedZNodeName, this); + zooKeeper.getData(fullyQualifiedZNodeName, this, null); + zooKeeper.getChildren(fullyQualifiedZNodeName, this); + } catch (InterruptedException e) { + LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e); + } catch (KeeperException e) { + LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e); + } + } + + public String createZNodeIfNotExists(String zNodeName, byte[] data, CreateMode createMode, boolean watch) { + String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName); + + if (!ensureParentExists(fullyQualifiedZNodeName)) { + return null; + } + + try { + // create the znode + zooKeeper.create(fullyQualifiedZNodeName, data, Ids.OPEN_ACL_UNSAFE, createMode); + LOG.debug("<" + instanceName + ">" + "Created ZNode " + fullyQualifiedZNodeName + " in ZooKeeper"); + // watch the znode for deletion, data change, creation of children + if(watch) { + watchZNode(zNodeName); + } + return fullyQualifiedZNodeName; + } catch (InterruptedException e) { + LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e); + } catch (KeeperException e) { + LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e); + } + + return null; + } + + public byte[] readZNode(String znodeName, Stat stat) throws IOException { + byte[] data; + try { + String fullyQualifiedZNodeName = getZNode(parentZNode, znodeName); + data = zooKeeper.getData(fullyQualifiedZNodeName, this, stat); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (KeeperException e) { + throw new IOException(e); + } + return data; + } + + // TODO: perhaps return the version number from this write? + public boolean writeZNode(String znodeName, byte[] data, int version, boolean watch) throws IOException { + try { + String fullyQualifiedZNodeName = getZNode(parentZNode, znodeName); + zooKeeper.setData(fullyQualifiedZNodeName, data, version); + if(watch) { + zooKeeper.getData(fullyQualifiedZNodeName, this, null); + } + return true; + } catch (InterruptedException e) { + LOG.warn("<" + instanceName + ">" + "Failed to write data to ZooKeeper", e); + throw new IOException(e); + } catch (KeeperException e) { + LOG.warn("<" + instanceName + ">" + "Failed to write data to ZooKeeper", e); + throw new IOException(e); + } + } + + public void createUnassignedRegion(String regionName, byte[] data) { + String znode = getZNode(getRegionInTransitionZNode(), regionName); + if(LOG.isDebugEnabled()) { + // check if this node already exists - + // - it should not exist + // - if it does, it should be in the CLOSED state + if(exists(znode, true)) { + Stat stat = new Stat(); + byte[] oldData = null; + try { + oldData = readZNode(znode, stat); + } catch (IOException e) { + LOG.error("Error reading data for " + znode); + } + if(oldData == null) { + LOG.debug("While creating UNASSIGNED region " + regionName + " exists with no data" ); + } + else { + LOG.debug("While creating UNASSIGNED region " + regionName + " exists, state = " + (HBaseEventType.fromByte(oldData[0]))); + } + } + else { + if(data == null) { + LOG.debug("Creating UNASSIGNED region " + regionName + " with no data" ); + } + else { + LOG.debug("Creating UNASSIGNED region " + regionName + " in state = " + (HBaseEventType.fromByte(data[0]))); + } + } + } + synchronized(unassignedZNodesWatched) { + unassignedZNodesWatched.add(znode); + createZNodeIfNotExists(znode, data, CreateMode.PERSISTENT, true); + } + } + + public void deleteUnassignedRegion(String regionName) { + String znode = getZNode(getRegionInTransitionZNode(), regionName); + try { + LOG.debug("Deleting ZNode " + znode + " in ZooKeeper as region is open..."); + synchronized(unassignedZNodesWatched) { + unassignedZNodesWatched.remove(znode); + deleteZNode(znode); + } + } catch (KeeperException.SessionExpiredException e) { + LOG.error("Zookeeper session has expired", e); + // if the session has expired try to reconnect to ZK, then perform query + try { + // TODO: ZK-REFACTOR: should just quit on reconnect?? + reconnectToZk(); + synchronized(unassignedZNodesWatched) { + unassignedZNodesWatched.remove(znode); + deleteZNode(znode); + } + } catch (IOException e1) { + LOG.error("Error reconnecting to zookeeper", e1); + throw new RuntimeException("Error reconnecting to zookeeper", e1); + } catch (KeeperException.SessionExpiredException e1) { + LOG.error("Error reading after reconnecting to zookeeper", e1); + throw new RuntimeException("Error reading after reconnecting to zookeeper", e1); + } catch (KeeperException e1) { + LOG.error("Error reading after reconnecting to zookeeper", e1); + } catch (InterruptedException e1) { + LOG.error("Error reading after reconnecting to zookeeper", e1); + } + } catch (KeeperException e) { + LOG.error("Error deleting region " + regionName, e); + } catch (InterruptedException e) { + LOG.error("Error deleting region " + regionName, e); + } + } + + /** + * Atomically adds a watch and reads data from the unwatched znodes in the + * UNASSGINED region. This works because the master is the only person + * deleting nodes. + * @param znode + * @return + */ + public List watchAndGetNewChildren(String znode) { + List nodes = null; + List newNodes = new ArrayList(); + try { + if (checkExistenceOf(znode)) { + synchronized(unassignedZNodesWatched) { + nodes = zooKeeper.getChildren(znode, this); + for (String node : nodes) { + String znodePath = joinPath(znode, node); + if(!unassignedZNodesWatched.contains(znodePath)) { + byte[] data = getDataAndWatch(znode, node, this); + newNodes.add(new ZNodePathAndData(znodePath, data)); + unassignedZNodesWatched.add(znodePath); + } + } + } + } + } catch (KeeperException e) { + LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e); + } catch (InterruptedException e) { + LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e); + } + return newNodes; + } + + public static class ZNodePathAndData { + private String zNodePath; + private byte[] data; + + public ZNodePathAndData(String zNodePath, byte[] data) { + this.zNodePath = zNodePath; + this.data = data; + } + + public String getzNodePath() { + return zNodePath; + } + public byte[] getData() { + return data; + } + + } } diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 2f2f3067c99..479c661a29b 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -697,7 +697,8 @@ public class HBaseTestingUtility { } public void expireSession(ZooKeeperWrapper nodeZK) throws Exception{ - ZooKeeperWrapper zkw = new ZooKeeperWrapper(conf, EmptyWatcher.instance); + ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf, ZooKeeperWrapper.class.getName()); + zkw.registerListener(EmptyWatcher.instance); String quorumServers = zkw.getQuorumServers(); int sessionTimeout = 5 * 1000; // 5 seconds diff --git a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index 3e42a33e3be..bdb33fd81f1 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -96,7 +96,8 @@ public class TestZooKeeper { throws IOException, InterruptedException { new HTable(conf, HConstants.META_TABLE_NAME); - ZooKeeperWrapper zkw = new ZooKeeperWrapper(conf, EmptyWatcher.instance); + ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf, TestZooKeeper.class.getName()); + zkw.registerListener(EmptyWatcher.instance); String quorumServers = zkw.getQuorumServers(); int sessionTimeout = 5 * 1000; // 5 seconds HConnection connection = HConnectionManager.getConnection(conf); @@ -158,7 +159,7 @@ public class TestZooKeeper { HTable localMeta = new HTable(conf, HConstants.META_TABLE_NAME); Configuration otherConf = HBaseConfiguration.create(conf); otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); - HTable ipMeta = new HTable(conf, HConstants.META_TABLE_NAME); + HTable ipMeta = new HTable(otherConf, HConstants.META_TABLE_NAME); // dummy, just to open the connection localMeta.exists(new Get(HConstants.LAST_ROW)); @@ -184,7 +185,8 @@ public class TestZooKeeper { */ @Test public void testZNodeDeletes() throws Exception { - ZooKeeperWrapper zkw = new ZooKeeperWrapper(conf, EmptyWatcher.instance); + ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf, TestZooKeeper.class.getName()); + zkw.registerListener(EmptyWatcher.instance); zkw.ensureExists("/l1/l2/l3/l4"); try { zkw.deleteZNode("/l1/l2"); diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index 790afa5a5d4..29ec30ef53c 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.executor.HBaseEventHandler; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventHandlerListener; +import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -70,7 +73,7 @@ public class TestMaster { CountDownLatch aboutToOpen = new CountDownLatch(1); CountDownLatch proceed = new CountDownLatch(1); RegionOpenListener list = new RegionOpenListener(aboutToOpen, proceed); - m.getRegionServerOperationQueue().registerRegionServerOperationListener(list); + HBaseEventHandler.registerListener(list); admin.split(TABLENAME); aboutToOpen.await(60, TimeUnit.SECONDS); @@ -79,32 +82,32 @@ public class TestMaster { m.getTableRegions(TABLENAME); Pair pair = m.getTableRegionClosest(TABLENAME, Bytes.toBytes("cde")); - assertNull(pair); /** - * TODO: these methods return null when the regions are not deployed. - * These tests should be uncommented after HBASE-2656. + * TODO: The assertNull below used to work before moving all RS->M + * communication to ZK, find out why this test's behavior has changed. + * Tracked in HBASE-2656. + assertNull(pair); + */ assertNotNull(pair); m.getTableRegionFromName(pair.getFirst().getRegionName()); - */ } finally { proceed.countDown(); } } - static class RegionOpenListener implements RegionServerOperationListener { + static class RegionOpenListener implements HBaseEventHandlerListener { CountDownLatch aboutToOpen, proceed; - public RegionOpenListener( - CountDownLatch aboutToOpen, CountDownLatch proceed) + public RegionOpenListener(CountDownLatch aboutToOpen, CountDownLatch proceed) { this.aboutToOpen = aboutToOpen; this.proceed = proceed; } @Override - public boolean process(HServerInfo serverInfo, HMsg incomingMsg) { - if (!incomingMsg.isType(HMsg.Type.MSG_REPORT_OPEN)) { - return true; + public void afterProcess(HBaseEventHandler event) { + if (event.getHBEvent() != HBaseEventType.RS2ZK_REGION_OPENED) { + return; } try { aboutToOpen.countDown(); @@ -112,16 +115,11 @@ public class TestMaster { } catch (InterruptedException ie) { throw new RuntimeException(ie); } - return true; + return; } @Override - public boolean process(RegionServerOperation op) throws IOException { - return true; - } - - @Override - public void processed(RegionServerOperation op) { + public void beforeProcess(HBaseEventHandler event) { } } diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java b/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java new file mode 100644 index 00000000000..d0a94039d5c --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java @@ -0,0 +1,241 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ProcessRegionClose; +import org.apache.hadoop.hbase.master.RegionServerOperation; +import org.apache.hadoop.hbase.master.RegionServerOperationListener; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestZKBasedCloseRegion { + private static final Log LOG = LogFactory.getLog(TestZKBasedCloseRegion.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final String TABLENAME = "master_transitions"; + private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"), + Bytes.toBytes("b"), Bytes.toBytes("c")}; + + @BeforeClass public static void beforeAllTests() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + c.setBoolean("dfs.support.append", true); + c.setInt("hbase.regionserver.info.port", 0); + c.setInt("hbase.master.meta.thread.rescanfrequency", 5*1000); + TEST_UTIL.startMiniCluster(2); + TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); + waitUntilAllRegionsAssigned(countOfRegions); + addToEachStartKey(countOfRegions); + } + + @AfterClass public static void afterAllTests() throws IOException { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before public void setup() throws IOException { + if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) { + // Need at least two servers. + LOG.info("Started new server=" + + TEST_UTIL.getHBaseCluster().startRegionServer()); + + } + } + + @Test (timeout=300000) public void testCloseRegion() + throws Exception { + LOG.info("Running testCloseRegion"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size()); + + int rsIdx = 0; + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx); + Collection regions = regionServer.getOnlineRegions(); + HRegion region = regions.iterator().next(); + LOG.debug("Asking RS to close region " + region.getRegionNameAsString()); + + AtomicBoolean closeEventProcessed = new AtomicBoolean(false); + RegionServerOperationListener listener = + new CloseRegionEventListener(region.getRegionNameAsString(), closeEventProcessed); + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + master.getRegionServerOperationQueue().registerRegionServerOperationListener(listener); + HMsg closeRegionMsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, + region.getRegionInfo(), + Bytes.toBytes("Forcing close in test") + ); + TEST_UTIL.getHBaseCluster().addMessageToSendRegionServer(rsIdx, closeRegionMsg); + + synchronized(closeEventProcessed) { + // wait for 3 minutes + closeEventProcessed.wait(3*60*1000); + } + if(!closeEventProcessed.get()) { + throw new Exception("Timed out, close event not called on master."); + } + else { + LOG.info("Done with test, RS informed master successfully."); + } + } + + public static class CloseRegionEventListener implements RegionServerOperationListener { + + private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class); + String regionToClose; + AtomicBoolean closeEventProcessed; + + public CloseRegionEventListener(String regionToClose, AtomicBoolean closeEventProcessed) { + this.regionToClose = regionToClose; + this.closeEventProcessed = closeEventProcessed; + } + + @Override + public boolean process(HServerInfo serverInfo, HMsg incomingMsg) { + return true; + } + + @Override + public boolean process(RegionServerOperation op) throws IOException { + return true; + } + + @Override + public void processed(RegionServerOperation op) { + LOG.debug("Master processing object: " + op.getClass().getCanonicalName()); + if(op instanceof ProcessRegionClose) { + ProcessRegionClose regionCloseOp = (ProcessRegionClose)op; + String region = regionCloseOp.getRegionInfo().getRegionNameAsString(); + LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose); + if(regionToClose.equals(region)) { + closeEventProcessed.set(true); + } + synchronized(closeEventProcessed) { + closeEventProcessed.notifyAll(); + } + } + } + + } + + + private static void waitUntilAllRegionsAssigned(final int countOfRegions) + throws IOException { + HTable meta = new HTable(TEST_UTIL.getConfiguration(), + HConstants.META_TABLE_NAME); + while (true) { + int rows = 0; + Scan scan = new Scan(); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + ResultScanner s = meta.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + byte [] b = + r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + if (b == null || b.length <= 0) break; + rows++; + } + s.close(); + // If I get to here and all rows have a Server, then all have been assigned. + if (rows == countOfRegions) break; + LOG.info("Found=" + rows); + Threads.sleep(1000); + } + } + + /* + * Add to each of the regions in .META. a value. Key is the startrow of the + * region (except its 'aaa' for first region). Actual value is the row name. + * @param expected + * @return + * @throws IOException + */ + private static int addToEachStartKey(final int expected) throws IOException { + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + HTable meta = new HTable(TEST_UTIL.getConfiguration(), + HConstants.META_TABLE_NAME); + int rows = 0; + Scan scan = new Scan(); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + ResultScanner s = meta.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + byte [] b = + r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + if (b == null || b.length <= 0) break; + HRegionInfo hri = Writables.getHRegionInfo(b); + // If start key, add 'aaa'. + byte [] row = getStartKey(hri); + Put p = new Put(row); + p.add(getTestFamily(), getTestQualifier(), row); + t.put(p); + rows++; + } + s.close(); + Assert.assertEquals(expected, rows); + return rows; + } + + private static byte [] getStartKey(final HRegionInfo hri) { + return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())? + Bytes.toBytes("aaa"): hri.getStartKey(); + } + + private static byte [] getTestFamily() { + return FAMILIES[0]; + } + + private static byte [] getTestQualifier() { + return getTestFamily(); + } + + public static void main(String args[]) throws Exception { + TestZKBasedCloseRegion.beforeAllTests(); + + TestZKBasedCloseRegion test = new TestZKBasedCloseRegion(); + test.setup(); + test.testCloseRegion(); + + TestZKBasedCloseRegion.afterAllTests(); + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java b/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java new file mode 100644 index 00000000000..4fb4c6586d4 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java @@ -0,0 +1,268 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ProcessRegionClose; +import org.apache.hadoop.hbase.master.ProcessRegionOpen; +import org.apache.hadoop.hbase.master.RegionServerOperation; +import org.apache.hadoop.hbase.master.RegionServerOperationListener; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestZKBasedReopenRegion { + private static final Log LOG = LogFactory.getLog(TestZKBasedReopenRegion.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final String TABLENAME = "master_transitions"; + private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"), + Bytes.toBytes("b"), Bytes.toBytes("c")}; + + @BeforeClass public static void beforeAllTests() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + c.setBoolean("dfs.support.append", true); + c.setInt("hbase.regionserver.info.port", 0); + c.setInt("hbase.master.meta.thread.rescanfrequency", 5*1000); + TEST_UTIL.startMiniCluster(2); + TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); + waitUntilAllRegionsAssigned(countOfRegions); + addToEachStartKey(countOfRegions); + } + + @AfterClass public static void afterAllTests() throws IOException { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before public void setup() throws IOException { + if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) { + // Need at least two servers. + LOG.info("Started new server=" + + TEST_UTIL.getHBaseCluster().startRegionServer()); + + } + } + + @Test (timeout=300000) public void testOpenRegion() + throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size()); + + int rsIdx = 0; + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx); + Collection regions = regionServer.getOnlineRegions(); + HRegion region = regions.iterator().next(); + LOG.debug("Asking RS to close region " + region.getRegionNameAsString()); + + AtomicBoolean closeEventProcessed = new AtomicBoolean(false); + AtomicBoolean reopenEventProcessed = new AtomicBoolean(false); + RegionServerOperationListener listener = + new ReopenRegionEventListener(region.getRegionNameAsString(), + closeEventProcessed, + reopenEventProcessed); + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + master.getRegionServerOperationQueue().registerRegionServerOperationListener(listener); + HMsg closeRegionMsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, + region.getRegionInfo(), + Bytes.toBytes("Forcing close in test") + ); + TEST_UTIL.getHBaseCluster().addMessageToSendRegionServer(rsIdx, closeRegionMsg); + + synchronized(closeEventProcessed) { + closeEventProcessed.wait(3*60*1000); + } + if(!closeEventProcessed.get()) { + throw new Exception("Timed out, close event not called on master."); + } + + synchronized(reopenEventProcessed) { + reopenEventProcessed.wait(3*60*1000); + } + if(!reopenEventProcessed.get()) { + throw new Exception("Timed out, open event not called on master after region close."); + } + + LOG.info("Done with test, RS informed master successfully."); + } + + public static class ReopenRegionEventListener implements RegionServerOperationListener { + + private static final Log LOG = LogFactory.getLog(ReopenRegionEventListener.class); + String regionToClose; + AtomicBoolean closeEventProcessed; + AtomicBoolean reopenEventProcessed; + + public ReopenRegionEventListener(String regionToClose, + AtomicBoolean closeEventProcessed, + AtomicBoolean reopenEventProcessed) { + this.regionToClose = regionToClose; + this.closeEventProcessed = closeEventProcessed; + this.reopenEventProcessed = reopenEventProcessed; + } + + @Override + public boolean process(HServerInfo serverInfo, HMsg incomingMsg) { + return true; + } + + @Override + public boolean process(RegionServerOperation op) throws IOException { + return true; + } + + @Override + public void processed(RegionServerOperation op) { + LOG.debug("Master processing object: " + op.getClass().getCanonicalName()); + if(op instanceof ProcessRegionClose) { + ProcessRegionClose regionCloseOp = (ProcessRegionClose)op; + String region = regionCloseOp.getRegionInfo().getRegionNameAsString(); + LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose); + if(regionToClose.equals(region)) { + closeEventProcessed.set(true); + } + synchronized(closeEventProcessed) { + closeEventProcessed.notifyAll(); + } + } + // Wait for open event AFTER we have closed the region + if(closeEventProcessed.get()) { + if(op instanceof ProcessRegionOpen) { + ProcessRegionOpen regionOpenOp = (ProcessRegionOpen)op; + String region = regionOpenOp.getRegionInfo().getRegionNameAsString(); + LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose); + if(regionToClose.equals(region)) { + reopenEventProcessed.set(true); + } + synchronized(reopenEventProcessed) { + reopenEventProcessed.notifyAll(); + } + } + } + + } + + } + + + private static void waitUntilAllRegionsAssigned(final int countOfRegions) + throws IOException { + HTable meta = new HTable(TEST_UTIL.getConfiguration(), + HConstants.META_TABLE_NAME); + while (true) { + int rows = 0; + Scan scan = new Scan(); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + ResultScanner s = meta.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + byte [] b = + r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + if (b == null || b.length <= 0) break; + rows++; + } + s.close(); + // If I get to here and all rows have a Server, then all have been assigned. + if (rows == countOfRegions) break; + LOG.info("Found=" + rows); + Threads.sleep(1000); + } + } + + /* + * Add to each of the regions in .META. a value. Key is the startrow of the + * region (except its 'aaa' for first region). Actual value is the row name. + * @param expected + * @return + * @throws IOException + */ + private static int addToEachStartKey(final int expected) throws IOException { + HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + HTable meta = new HTable(TEST_UTIL.getConfiguration(), + HConstants.META_TABLE_NAME); + int rows = 0; + Scan scan = new Scan(); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + ResultScanner s = meta.getScanner(scan); + for (Result r = null; (r = s.next()) != null;) { + byte [] b = + r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + if (b == null || b.length <= 0) break; + HRegionInfo hri = Writables.getHRegionInfo(b); + // If start key, add 'aaa'. + byte [] row = getStartKey(hri); + Put p = new Put(row); + p.add(getTestFamily(), getTestQualifier(), row); + t.put(p); + rows++; + } + s.close(); + Assert.assertEquals(expected, rows); + return rows; + } + + private static byte [] getStartKey(final HRegionInfo hri) { + return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())? + Bytes.toBytes("aaa"): hri.getStartKey(); + } + + private static byte [] getTestFamily() { + return FAMILIES[0]; + } + + private static byte [] getTestQualifier() { + return getTestFamily(); + } + + public static void main(String args[]) throws Exception { + TestZKBasedReopenRegion.beforeAllTests(); + + TestZKBasedReopenRegion test = new TestZKBasedReopenRegion(); + test.setup(); + test.testOpenRegion(); + + TestZKBasedReopenRegion.afterAllTests(); + } +}