From fa86731a5bc01ee63a650a072c7e19a8733679f3 Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Fri, 8 Aug 2008 18:55:46 +0000 Subject: [PATCH] HBASE-805 Remove unnecessary getRow overloads in HRS (Jonathan Gray via Jim Kellerman) git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@684054 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../apache/hadoop/hbase/client/HTable.java | 9 +- .../hadoop/hbase/ipc/HRegionInterface.java | 442 ++- .../hbase/regionserver/HRegionServer.java | 3294 ++++++++--------- 4 files changed, 1853 insertions(+), 1895 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 29e8aa810d3..ce8bbb65f0f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,9 @@ Release 0.3.0 - Unreleased INCOMPATIBLE CHANGES BUG FIXES + HBASE-805 Remove unnecessary getRow overloads in HRS (Jonathan Gray via + Jim Kellerman) + IMPROVEMENTS HBASE-801 When a table haven't disable, shell could response in a "user friendly" way. diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index 2d3b616076a..5e96834a3b6 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -554,14 +554,7 @@ public class HTable { */ public RowResult getRow(final byte [] row, final long ts) throws IOException { - return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public RowResult call() throws IOException { - return server.getRow(location.getRegionInfo().getRegionName(), row, - ts); - } - } - ); + return getRow(row,null,ts); } /** diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 9de2c613846..e3a00e3ead2 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -1,234 +1,210 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; - -import org.apache.hadoop.hbase.filter.RowFilterInterface; -import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.Cell; -import org.apache.hadoop.hbase.io.RowResult; - -import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.NotServingRegionException; - -/** - * Clients interact with HRegionServers using a handle to the HRegionInterface. - */ -public interface HRegionInterface extends VersionedProtocol { - /** - * Protocol version. - * Upped to 3 when we went from Text to byte arrays for row and column names. - */ - public static final long versionID = 3L; - - /** - * Get metainfo about an HRegion - * - * @param regionName name of the region - * @return HRegionInfo object for region - * @throws NotServingRegionException - */ - public HRegionInfo getRegionInfo(final byte [] regionName) - throws NotServingRegionException; - - /** - * Retrieve a single value from the specified region for the specified row - * and column keys - * - * @param regionName name of region - * @param row row key - * @param column column key - * @return alue for that region/row/column - * @throws IOException - */ - public Cell get(final byte [] regionName, final byte [] row, final byte [] column) - throws IOException; - - /** - * Get the specified number of versions of the specified row and column - * - * @param regionName region name - * @param row row key - * @param column column key - * @param numVersions number of versions to return - * @return array of values - * @throws IOException - */ - public Cell[] get(final byte [] regionName, final byte [] row, - final byte [] column, final int numVersions) - throws IOException; - - /** - * Get the specified number of versions of the specified row and column with - * the specified timestamp. - * - * @param regionName region name - * @param row row key - * @param column column key - * @param timestamp timestamp - * @param numVersions number of versions to return - * @return array of values - * @throws IOException - */ - public Cell[] get(final byte [] regionName, final byte [] row, - final byte [] column, final long timestamp, final int numVersions) - throws IOException; - - /** - * Get all the data for the specified row at a given timestamp - * - * @param regionName region name - * @param row row key - * @return map of values - * @throws IOException - */ - public RowResult getRow(final byte [] regionName, final byte [] row, - final long ts) - throws IOException; - - /** - * Return all the data for the row that matches row exactly, - * or the one that immediately preceeds it. - * - * @param regionName region name - * @param row row key - * @return map of values - * @throws IOException - */ - public RowResult getClosestRowBefore(final byte [] regionName, - final byte [] row) - throws IOException; - - /** - * Get selected columns for the specified row at a given timestamp. - * - * @param regionName region name - * @param row row key - * @return map of values - * @throws IOException - */ - public RowResult getRow(final byte [] regionName, final byte [] row, - final byte[][] columns, final long ts) - throws IOException; - - /** - * Get selected columns for the specified row at the latest timestamp. - * - * @param regionName region name - * @param row row key - * @return map of values - * @throws IOException - */ - public RowResult getRow(final byte [] regionName, final byte [] row, - final byte[][] columns) - throws IOException; - - /** - * Applies a batch of updates via one RPC - * - * @param regionName name of the region to update - * @param b BatchUpdate - * @throws IOException - */ - public void batchUpdate(final byte [] regionName, final BatchUpdate b) - throws IOException; - - /** - * Delete all cells that match the passed row and column and whose - * timestamp is equal-to or older than the passed timestamp. - * - * @param regionName region name - * @param row row key - * @param column column key - * @param timestamp Delete all entries that have this timestamp or older - * @throws IOException - */ - public void deleteAll(byte [] regionName, byte [] row, byte [] column, - long timestamp) - throws IOException; - - /** - * Delete all cells that match the passed row and whose - * timestamp is equal-to or older than the passed timestamp. - * - * @param regionName region name - * @param row row key - * @param timestamp Delete all entries that have this timestamp or older - * @throws IOException - */ - public void deleteAll(byte [] regionName, byte [] row, long timestamp) - throws IOException; - - /** - * Delete all cells for a row with matching column family with timestamps - * less than or equal to timestamp. - * - * @param regionName The name of the region to operate on - * @param row The row to operate on - * @param family The column family to match - * @param timestamp Timestamp to match - */ - public void deleteFamily(byte [] regionName, byte [] row, byte [] family, - long timestamp) - throws IOException; - - - // - // remote scanner interface - // - - /** - * Opens a remote scanner with a RowFilter. - * - * @param regionName name of region to scan - * @param columns columns to scan. If column name is a column family, all - * columns of the specified column family are returned. Its also possible - * to pass a regex for column family name. A column name is judged to be - * regex if it contains at least one of the following characters: - * \+|^&*$[]]}{)(. - * @param startRow starting row to scan - * @param timestamp only return values whose timestamp is <= this value - * @param filter RowFilter for filtering results at the row-level. - * - * @return scannerId scanner identifier used in other calls - * @throws IOException - */ - public long openScanner(final byte [] regionName, final byte [][] columns, - final byte [] startRow, long timestamp, RowFilterInterface filter) - throws IOException; - - /** - * Get the next set of values - * @param scannerId clientId passed to openScanner - * @return map of values - * @throws IOException - */ - public RowResult next(long scannerId) throws IOException; - - /** - * Close a scanner - * - * @param scannerId the scanner id returned by openScanner - * @throws IOException - */ - public void close(long scannerId) throws IOException; +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; + +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.NotServingRegionException; + +/** + * Clients interact with HRegionServers using a handle to the HRegionInterface. + */ +public interface HRegionInterface extends VersionedProtocol { + /** + * Protocol version. + * Upped to 3 when we went from Text to byte arrays for row and column names. + */ + public static final long versionID = 3L; + + /** + * Get metainfo about an HRegion + * + * @param regionName name of the region + * @return HRegionInfo object for region + * @throws NotServingRegionException + */ + public HRegionInfo getRegionInfo(final byte [] regionName) + throws NotServingRegionException; + + /** + * Retrieve a single value from the specified region for the specified row + * and column keys + * + * @param regionName name of region + * @param row row key + * @param column column key + * @return alue for that region/row/column + * @throws IOException + */ + public Cell get(final byte [] regionName, final byte [] row, final byte [] column) + throws IOException; + + /** + * Get the specified number of versions of the specified row and column + * + * @param regionName region name + * @param row row key + * @param column column key + * @param numVersions number of versions to return + * @return array of values + * @throws IOException + */ + public Cell[] get(final byte [] regionName, final byte [] row, + final byte [] column, final int numVersions) + throws IOException; + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param regionName region name + * @param row row key + * @param column column key + * @param timestamp timestamp + * @param numVersions number of versions to return + * @return array of values + * @throws IOException + */ + public Cell[] get(final byte [] regionName, final byte [] row, + final byte [] column, final long timestamp, final int numVersions) + throws IOException; + + /** + * Return all the data for the row that matches row exactly, + * or the one that immediately preceeds it. + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getClosestRowBefore(final byte [] regionName, + final byte [] row) + throws IOException; + + /** + * Get selected columns for the specified row at a given timestamp. + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(final byte [] regionName, final byte [] row, + final byte[][] columns, final long ts) + throws IOException; + + /** + * Applies a batch of updates via one RPC + * + * @param regionName name of the region to update + * @param b BatchUpdate + * @throws IOException + */ + public void batchUpdate(final byte [] regionName, final BatchUpdate b) + throws IOException; + + /** + * Delete all cells that match the passed row and column and whose + * timestamp is equal-to or older than the passed timestamp. + * + * @param regionName region name + * @param row row key + * @param column column key + * @param timestamp Delete all entries that have this timestamp or older + * @throws IOException + */ + public void deleteAll(byte [] regionName, byte [] row, byte [] column, + long timestamp) + throws IOException; + + /** + * Delete all cells that match the passed row and whose + * timestamp is equal-to or older than the passed timestamp. + * + * @param regionName region name + * @param row row key + * @param timestamp Delete all entries that have this timestamp or older + * @throws IOException + */ + public void deleteAll(byte [] regionName, byte [] row, long timestamp) + throws IOException; + + /** + * Delete all cells for a row with matching column family with timestamps + * less than or equal to timestamp. + * + * @param regionName The name of the region to operate on + * @param row The row to operate on + * @param family The column family to match + * @param timestamp Timestamp to match + */ + public void deleteFamily(byte [] regionName, byte [] row, byte [] family, + long timestamp) + throws IOException; + + + // + // remote scanner interface + // + + /** + * Opens a remote scanner with a RowFilter. + * + * @param regionName name of region to scan + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible + * to pass a regex for column family name. A column name is judged to be + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row to scan + * @param timestamp only return values whose timestamp is <= this value + * @param filter RowFilter for filtering results at the row-level. + * + * @return scannerId scanner identifier used in other calls + * @throws IOException + */ + public long openScanner(final byte [] regionName, final byte [][] columns, + final byte [] startRow, long timestamp, RowFilterInterface filter) + throws IOException; + + /** + * Get the next set of values + * @param scannerId clientId passed to openScanner + * @return map of values + * @throws IOException + */ + public RowResult next(long scannerId) throws IOException; + + /** + * Close a scanner + * + * @param scannerId the scanner id returned by openScanner + * @throws IOException + */ + public void close(long scannerId) throws IOException; } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 94ffeae3844..2d100559eae 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1,1654 +1,1640 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; -import java.lang.reflect.Constructor; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.HServerLoad; -import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.LeaseListener; -import org.apache.hadoop.hbase.Leases; -import org.apache.hadoop.hbase.LocalHBaseCluster; -import org.apache.hadoop.hbase.NotServingRegionException; -import org.apache.hadoop.hbase.RegionHistorian; -import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; -import org.apache.hadoop.hbase.filter.RowFilterInterface; -import org.apache.hadoop.hbase.io.BatchOperation; -import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.Cell; -import org.apache.hadoop.hbase.io.HbaseMapWritable; -import org.apache.hadoop.hbase.io.RowResult; -import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; -import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.ipc.HbaseRPC; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.InfoServer; -import org.apache.hadoop.hbase.util.Sleeper; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.StringUtils; - -/** - * HRegionServer makes a set of HRegions available to clients. It checks in with - * the HMaster. There are many HRegionServers in a single HBase deployment. - */ -public class HRegionServer implements HConstants, HRegionInterface, Runnable { - static final Log LOG = LogFactory.getLog(HRegionServer.class); - - // Set when a report to the master comes back with a message asking us to - // shutdown. Also set by call to stop when debugging or running unit tests - // of HRegionServer in isolation. We use AtomicBoolean rather than - // plain boolean so we can pass a reference to Chore threads. Otherwise, - // Chore threads need to know about the hosting class. - protected final AtomicBoolean stopRequested = new AtomicBoolean(false); - - protected final AtomicBoolean quiesced = new AtomicBoolean(false); - - // Go down hard. Used if file system becomes unavailable and also in - // debugging and unit tests. - protected volatile boolean abortRequested; - - // If false, the file system has become unavailable - protected volatile boolean fsOk; - - protected final HServerInfo serverInfo; - protected final HBaseConfiguration conf; - private FileSystem fs; - private Path rootDir; - private final Random rand = new Random(); - - // Key is Bytes.hashCode of region name byte array and the value is HRegion - // in both of the maps below. Use Bytes.mapKey(byte []) generating key for - // below maps. - protected final Map onlineRegions = - new ConcurrentHashMap(); - - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final List outboundMsgs = - Collections.synchronizedList(new ArrayList()); - - final int numRetries; - protected final int threadWakeFrequency; - private final int msgInterval; - private final int serverLeaseTimeout; - - protected final int numRegionsToReport; - - // Remote HMaster - private HMasterRegionInterface hbaseMaster; - - // Server to handle client requests. Default access so can be accessed by - // unit tests. - final Server server; - - // Leases - private final Leases leases; - - // Request counter - private volatile AtomicInteger requestCount = new AtomicInteger(); - - // Info server. Default access so can be used by unit tests. REGIONSERVER - // is name of the webapp and the attribute name used stuffing this instance - // into web context. - InfoServer infoServer; - - /** region server process name */ - public static final String REGIONSERVER = "regionserver"; - - /** - * Space is reserved in HRS constructor and then released when aborting - * to recover from an OOME. See HBASE-706. - */ - private final LinkedList reservedSpace = new LinkedList(); - - /** - * Thread to shutdown the region server in an orderly manner. This thread - * is registered as a shutdown hook in the HRegionServer constructor and is - * only called when the HRegionServer receives a kill signal. - */ - class ShutdownThread extends Thread { - private final HRegionServer instance; - - /** - * @param instance - */ - public ShutdownThread(HRegionServer instance) { - this.instance = instance; - } - - /** {@inheritDoc} */ - @Override - public void run() { - LOG.info("Starting shutdown thread."); - - // tell the region server to stop and wait for it to complete - instance.stop(); - instance.join(); - LOG.info("Shutdown thread complete"); - } - } - - // Compactions - final CompactSplitThread compactSplitThread; - - // Cache flushing - final Flusher cacheFlusher; - - // HLog and HLog roller. log is protected rather than private to avoid - // eclipse warning when accessed by inner classes - protected HLog log; - final LogRoller logRoller; - - // flag set after we're done setting up server threads (used for testing) - protected volatile boolean isOnline; - - /** - * Starts a HRegionServer at the default location - * @param conf - * @throws IOException - */ - public HRegionServer(HBaseConfiguration conf) throws IOException { - this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS, - DEFAULT_REGIONSERVER_ADDRESS)), conf); - } - - /** - * Starts a HRegionServer at the specified location - * @param address - * @param conf - * @throws IOException - */ - public HRegionServer(HServerAddress address, HBaseConfiguration conf) - throws IOException { - this.abortRequested = false; - this.fsOk = true; - this.conf = conf; - - this.isOnline = false; - - // Config'ed params - this.numRetries = conf.getInt("hbase.client.retries.number", 2); - this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000); - this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); - this.serverLeaseTimeout = - conf.getInt("hbase.master.lease.period", 120 * 1000); - - // Cache flushing thread. - this.cacheFlusher = new Flusher(conf, this); - - // Compaction thread - this.compactSplitThread = new CompactSplitThread(this); - - // Log rolling thread - this.logRoller = new LogRoller(this); - - // Task thread to process requests from Master - this.worker = new Worker(); - this.workerThread = new Thread(worker); - - // Server to handle client requests - this.server = HbaseRPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); - // Address is givin a default IP for the moment. Will be changed after - // calling the master. - this.serverInfo = new HServerInfo(new HServerAddress( - new InetSocketAddress(DEFAULT_HOST, - this.server.getListenerAddress().getPort())), System.currentTimeMillis(), - this.conf.getInt("hbase.regionserver.info.port", 60030)); - this.numRegionsToReport = - conf.getInt("hbase.regionserver.numregionstoreport", 10); - - this.leases = new Leases( - conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000), - this.threadWakeFrequency); - - int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4); - for(int i = 0; i < nbBlocks; i++) { - reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]); - } - - // Register shutdown hook for HRegionServer, runs an orderly shutdown - // when a kill signal is recieved - Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); - } - - /** - * The HRegionServer sticks in this loop until closed. It repeatedly checks - * in with the HMaster, sending heartbeats & reports, and receiving HRegion - * load/unload instructions. - */ - public void run() { - boolean quiesceRequested = false; - // A sleeper that sleeps for msgInterval. - Sleeper sleeper = - new Sleeper(this.msgInterval, this.stopRequested); - try { - init(reportForDuty(sleeper)); - long lastMsg = 0; - // Now ask master what it wants us to do and tell it what we have done - for (int tries = 0; !stopRequested.get() && isHealthy();) { - long now = System.currentTimeMillis(); - if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) { - // It has been way too long since we last reported to the master. - // Commit suicide. - LOG.fatal("unable to report to master for " + (now - lastMsg) + - " milliseconds - aborting server"); - abort(); - break; - } - if ((now - lastMsg) >= msgInterval) { - HMsg outboundArray[] = null; - synchronized(this.outboundMsgs) { - outboundArray = - this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]); - this.outboundMsgs.clear(); - } - try { - this.serverInfo.setLoad(new HServerLoad(requestCount.get(), - onlineRegions.size())); - this.requestCount.set(0); - HMsg msgs[] = hbaseMaster.regionServerReport( - serverInfo, outboundArray, getMostLoadedRegions()); - lastMsg = System.currentTimeMillis(); - if (this.quiesced.get() && onlineRegions.size() == 0) { - // We've just told the master we're exiting because we aren't - // serving any regions. So set the stop bit and exit. - LOG.info("Server quiesced and not serving any regions. " + - "Starting shutdown"); - stopRequested.set(true); - this.outboundMsgs.clear(); - continue; - } - - // Queue up the HMaster's instruction stream for processing - boolean restart = false; - for(int i = 0; - !restart && !stopRequested.get() && i < msgs.length; - i++) { - LOG.info(msgs[i].toString()); - switch(msgs[i].getType()) { - case MSG_CALL_SERVER_STARTUP: - // We the MSG_CALL_SERVER_STARTUP on startup but we can also - // get it when the master is panicing because for instance - // the HDFS has been yanked out from under it. Be wary of - // this message. - if (checkFileSystem()) { - closeAllRegions(); - try { - log.closeAndDelete(); - } catch (Exception e) { - LOG.error("error closing and deleting HLog", e); - } - try { - serverInfo.setStartCode(System.currentTimeMillis()); - log = setupHLog(); - } catch (IOException e) { - this.abortRequested = true; - this.stopRequested.set(true); - e = RemoteExceptionHandler.checkIOException(e); - LOG.fatal("error restarting server", e); - break; - } - reportForDuty(sleeper); - restart = true; - } else { - LOG.fatal("file system available check failed. " + - "Shutting down server."); - } - break; - - case MSG_REGIONSERVER_STOP: - stopRequested.set(true); - break; - - case MSG_REGIONSERVER_QUIESCE: - if (!quiesceRequested) { - try { - toDo.put(new ToDoEntry(msgs[i])); - } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was " + - "interrupted.", e); - } - quiesceRequested = true; - } - break; - - default: - if (fsOk) { - try { - toDo.put(new ToDoEntry(msgs[i])); - } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was " + - "interrupted.", e); - } - } - } - } - // Reset tries count if we had a successful transaction. - tries = 0; - - if (restart || this.stopRequested.get()) { - toDo.clear(); - continue; - } - } catch (Exception e) { - if (e instanceof IOException) { - e = RemoteExceptionHandler.checkIOException((IOException) e); - } - if (tries < this.numRetries) { - LOG.warn("Processing message (Retry: " + tries + ")", e); - tries++; - } else { - LOG.fatal("Exceeded max retries: " + this.numRetries, e); - if (!checkFileSystem()) { - continue; - } - // Something seriously wrong. Shutdown. - stop(); - } - } - } - // Do some housekeeping before going to sleep - housekeeping(); - sleeper.sleep(lastMsg); - } // for - } catch (OutOfMemoryError error) { - abort(); - LOG.fatal("Ran out of memory", error); - } catch (Throwable t) { - LOG.fatal("Unhandled exception. Aborting...", t); - abort(); - } - RegionHistorian.getInstance().offline(); - this.leases.closeAfterLeasesExpire(); - this.worker.stop(); - this.server.stop(); - if (this.infoServer != null) { - LOG.info("Stopping infoServer"); - try { - this.infoServer.stop(); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - } - - // Send interrupts to wake up threads if sleeping so they notice shutdown. - // TODO: Should we check they are alive? If OOME could have exited already - cacheFlusher.interruptIfNecessary(); - compactSplitThread.interruptIfNecessary(); - this.logRoller.interruptIfNecessary(); - - if (abortRequested) { - if (this.fsOk) { - // Only try to clean up if the file system is available - try { - this.log.close(); - LOG.info("On abort, closed hlog"); - } catch (IOException e) { - LOG.error("Unable to close log in abort", - RemoteExceptionHandler.checkIOException(e)); - } - closeAllRegions(); // Don't leave any open file handles - } - LOG.info("aborting server at: " + - serverInfo.getServerAddress().toString()); - } else { - ArrayList closedRegions = closeAllRegions(); - try { - log.closeAndDelete(); - } catch (IOException e) { - LOG.error("Close and delete failed", - RemoteExceptionHandler.checkIOException(e)); - } - try { - HMsg[] exitMsg = new HMsg[closedRegions.size() + 1]; - exitMsg[0] = HMsg.REPORT_EXITING; - // Tell the master what regions we are/were serving - int i = 1; - for (HRegion region: closedRegions) { - exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, - region.getRegionInfo()); - } - - LOG.info("telling master that region server is shutting down at: " + - serverInfo.getServerAddress().toString()); - hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null); - } catch (IOException e) { - LOG.warn("Failed to send exiting message to master: ", - RemoteExceptionHandler.checkIOException(e)); - } - LOG.info("stopping server at: " + - serverInfo.getServerAddress().toString()); - } - join(); - LOG.info(Thread.currentThread().getName() + " exiting"); - } - - /* - * Run init. Sets up hlog and starts up all server threads. - * @param c Extra configuration. - */ - protected void init(final MapWritable c) throws IOException { - try { - for (Map.Entry e: c.entrySet()) { - String key = e.getKey().toString(); - String value = e.getValue().toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("Config from master: " + key + "=" + value); - } - this.conf.set(key, value); - } - // Master may have sent us a new address with the other configs. - // Update our address in this case. See HBASE-719 - if(conf.get("hbase.regionserver.address") != null) - serverInfo.setServerAddress(new HServerAddress - (conf.get("hbase.regionserver.address"), - serverInfo.getServerAddress().getPort())); - // Master sent us hbase.rootdir to use. Should be fully qualified - // path with file system specification included. Set 'fs.default.name' - // to match the filesystem on hbase.rootdir else underlying hadoop hdfs - // accessors will be going against wrong filesystem (unless all is set - // to defaults). - this.conf.set("fs.default.name", this.conf.get("hbase.rootdir")); - this.fs = FileSystem.get(this.conf); - this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR)); - this.log = setupHLog(); - startServiceThreads(); - isOnline = true; - } catch (IOException e) { - this.stopRequested.set(true); - isOnline = false; - e = RemoteExceptionHandler.checkIOException(e); - LOG.fatal("Failed init", e); - IOException ex = new IOException("region server startup failed"); - ex.initCause(e); - throw ex; - } - } - - /** - * Report the status of the server. A server is online once all the startup - * is completed (setting up filesystem, starting service threads, etc.). This - * method is designed mostly to be useful in tests. - * @return true if online, false if not. - */ - public boolean isOnline() { - return isOnline; - } - - private HLog setupHLog() throws RegionServerRunningException, - IOException { - - Path logdir = new Path(rootDir, "log" + "_" + - serverInfo.getServerAddress().getBindAddress() + "_" + - this.serverInfo.getStartCode() + "_" + - this.serverInfo.getServerAddress().getPort()); - if (LOG.isDebugEnabled()) { - LOG.debug("Log dir " + logdir); - } - if (fs.exists(logdir)) { - throw new RegionServerRunningException("region server already " + - "running at " + this.serverInfo.getServerAddress().toString() + - " because logdir " + logdir.toString() + " exists"); - } - return new HLog(fs, logdir, conf, logRoller); - } - - /* - * Start Chore Threads, Server, Worker and lease checker threads. Install an - * UncaughtExceptionHandler that calls abort of RegionServer if we get - * an unhandled exception. We cannot set the handler on all threads. - * Server's internal Listener thread is off limits. For Server, if an OOME, - * it waits a while then retries. Meantime, a flush or a compaction that - * tries to run should trigger same critical condition and the shutdown will - * run. On its way out, this server will shut down Server. Leases are sort - * of inbetween. It has an internal thread that while it inherits from - * Chore, it keeps its own internal stop mechanism so needs to be stopped - * by this hosting server. Worker logs the exception and exits. - */ - private void startServiceThreads() throws IOException { - String n = Thread.currentThread().getName(); - UncaughtExceptionHandler handler = new UncaughtExceptionHandler() { - public void uncaughtException(Thread t, Throwable e) { - abort(); - LOG.fatal("Set stop flag in " + t.getName(), e); - } - }; - Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller", - handler); - Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", - handler); - Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", - handler); - Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); - // Leases is not a Thread. Internally it runs a daemon thread. If it gets - // an unhandled exception, it will just exit. - this.leases.setName(n + ".leaseChecker"); - this.leases.start(); - // Put up info server. - int port = this.conf.getInt("hbase.regionserver.info.port", 60030); - if (port >= 0) { - String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); - this.infoServer = new InfoServer("regionserver", a, port, false); - this.infoServer.setAttribute("regionserver", this); - this.infoServer.start(); - } - // Start Server. This service is like leases in that it internally runs - // a thread. - this.server.start(); - LOG.info("HRegionServer started at: " + - serverInfo.getServerAddress().toString()); - } - - /* - * Verify that server is healthy - */ - private boolean isHealthy() { - if (!fsOk) { - // File system problem - return false; - } - // Verify that all threads are alive - if (!(leases.isAlive() && compactSplitThread.isAlive() && - cacheFlusher.isAlive() && logRoller.isAlive() && - workerThread.isAlive())) { - // One or more threads are no longer alive - shut down - stop(); - return false; - } - return true; - } - /* - * Run some housekeeping tasks before we go into 'hibernation' sleeping at - * the end of the main HRegionServer run loop. - */ - private void housekeeping() { - // If the todo list has > 0 messages, iterate looking for open region - // messages. Send the master a message that we're working on its - // processing so it doesn't assign the region elsewhere. - if (this.toDo.size() <= 0) { - return; - } - // This iterator is 'safe'. We are guaranteed a view on state of the - // queue at time iterator was taken out. Apparently goes from oldest. - for (ToDoEntry e: this.toDo) { - if (e.msg.isType(HMsg.Type.MSG_REGION_OPEN)) { - addProcessingMessage(e.msg.getRegionInfo()); - } - } - } - - /** @return the HLog */ - HLog getLog() { - return this.log; - } - - /** - * Sets a flag that will cause all the HRegionServer threads to shut down - * in an orderly fashion. Used by unit tests. - */ - public void stop() { - this.stopRequested.set(true); - synchronized(this) { - notifyAll(); // Wakes run() if it is sleeping - } - } - - /** - * Cause the server to exit without closing the regions it is serving, the - * log it is using and without notifying the master. - * Used unit testing and on catastrophic events such as HDFS is yanked out - * from under hbase or we OOME. - */ - public void abort() { - reservedSpace.clear(); - this.abortRequested = true; - stop(); - } - - /** - * Wait on all threads to finish. - * Presumption is that all closes and stops have already been called. - */ - void join() { - join(this.workerThread); - join(this.cacheFlusher); - join(this.compactSplitThread); - join(this.logRoller); - } - - private void join(final Thread t) { - while (t.isAlive()) { - try { - t.join(); - } catch (InterruptedException e) { - // continue - } - } - } - - /* - * Let the master know we're here - * Run initialization using parameters passed us by the master. - */ - private MapWritable reportForDuty(final Sleeper sleeper) - throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Telling master at " + - conf.get(MASTER_ADDRESS) + " that we are up"); - } - // Do initial RPC setup. The final argument indicates that the RPC should retry indefinitely. - this.hbaseMaster = (HMasterRegionInterface)HbaseRPC.waitForProxy( - HMasterRegionInterface.class, HMasterRegionInterface.versionID, - new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(), - this.conf, -1); - MapWritable result = null; - long lastMsg = 0; - while(!stopRequested.get()) { - try { - this.requestCount.set(0); - this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size())); - lastMsg = System.currentTimeMillis(); - result = this.hbaseMaster.regionServerStartup(serverInfo); - break; - } catch (Leases.LeaseStillHeldException e) { - LOG.info("Lease " + e.getName() + " already held on master. Check " + - "DNS configuration so that all region servers are" + - "reporting their true IPs and not 127.0.0.1. Otherwise, this" + - "problem should resolve itself after the lease period of " + - this.conf.get("hbase.master.lease.period") - + " seconds expires over on the master"); - } catch (IOException e) { - LOG.warn("error telling master we are up", e); - } - sleeper.sleep(lastMsg); - } - return result; - } - - /* Add to the outbound message buffer */ - private void reportOpen(HRegionInfo region) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); - } - - /* Add to the outbound message buffer */ - private void reportClose(HRegionInfo region) { - reportClose(region, null); - } - - /* Add to the outbound message buffer */ - private void reportClose(final HRegionInfo region, final Text message) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); - } - - - /** - * Add to the outbound message buffer - * - * When a region splits, we need to tell the master that there are two new - * regions that need to be assigned. - * - * We do not need to inform the master about the old region, because we've - * updated the meta or root regions, and the master will pick that up on its - * next rescan of the root or meta tables. - */ - void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, - HRegionInfo newRegionB) { - - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, - new Text(oldRegion.getRegionNameAsString() + " split; daughters: " + - newRegionA.getRegionNameAsString() + ", " + - newRegionB.getRegionNameAsString()))); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); - } - - ////////////////////////////////////////////////////////////////////////////// - // HMaster-given operations - ////////////////////////////////////////////////////////////////////////////// - - /* - * Data structure to hold a HMsg and retries count. - */ - private static class ToDoEntry { - private int tries; - private final HMsg msg; - ToDoEntry(HMsg msg) { - this.tries = 0; - this.msg = msg; - } - } - - final BlockingQueue toDo = new LinkedBlockingQueue(); - private Worker worker; - private Thread workerThread; - - /** Thread that performs long running requests from the master */ - class Worker implements Runnable { - void stop() { - synchronized(toDo) { - toDo.notifyAll(); - } - } - - /** {@inheritDoc} */ - public void run() { - try { - while(!stopRequested.get()) { - ToDoEntry e = null; - try { - e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if(e == null || stopRequested.get()) { - continue; - } - LOG.info(e.msg); - switch(e.msg.getType()) { - - case MSG_REGIONSERVER_QUIESCE: - closeUserRegions(); - break; - - case MSG_REGION_OPEN: - // Open a region - openRegion(e.msg.getRegionInfo()); - break; - - case MSG_REGION_CLOSE: - // Close a region - closeRegion(e.msg.getRegionInfo(), true); - break; - - case MSG_REGION_CLOSE_WITHOUT_REPORT: - // Close a region, don't reply - closeRegion(e.msg.getRegionInfo(), false); - break; - - default: - throw new AssertionError( - "Impossible state during msg processing. Instruction: " - + e.msg.toString()); - } - } catch (InterruptedException ex) { - // continue - } catch (Exception ex) { - if (ex instanceof IOException) { - ex = RemoteExceptionHandler.checkIOException((IOException) ex); - } - if(e != null && e.tries < numRetries) { - LOG.warn(ex); - e.tries++; - try { - toDo.put(e); - } catch (InterruptedException ie) { - throw new RuntimeException("Putting into msgQueue was " + - "interrupted.", ex); - } - } else { - LOG.error("unable to process message" + - (e != null ? (": " + e.msg.toString()) : ""), ex); - if (!checkFileSystem()) { - break; - } - } - } - } - } catch(Throwable t) { - LOG.fatal("Unhandled exception", t); - } finally { - LOG.info("worker thread exiting"); - } - } - } - - void openRegion(final HRegionInfo regionInfo) { - // If historian is not online and this is not a meta region, online it. - if (!regionInfo.isMetaRegion() && - !RegionHistorian.getInstance().isOnline()) { - RegionHistorian.getInstance().online(this.conf); - } - Integer mapKey = Bytes.mapKey(regionInfo.getRegionName()); - HRegion region = this.onlineRegions.get(mapKey); - if (region == null) { - try { - region = instantiateRegion(regionInfo); - // Startup a compaction early if one is needed. - this.compactSplitThread.compactionRequested(region); - } catch (IOException e) { - LOG.error("error opening region " + regionInfo.getRegionNameAsString(), e); - - // TODO: add an extra field in HRegionInfo to indicate that there is - // an error. We can't do that now because that would be an incompatible - // change that would require a migration - reportClose(regionInfo, new Text(StringUtils.stringifyException(e))); - return; - } - this.lock.writeLock().lock(); - try { - this.log.setSequenceNumber(region.getMinSequenceId()); - this.onlineRegions.put(mapKey, region); - } finally { - this.lock.writeLock().unlock(); - } - } - reportOpen(regionInfo); - } - - protected HRegion instantiateRegion(final HRegionInfo regionInfo) - throws IOException { - return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo - .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null, - this.cacheFlusher, new Progressable() { - public void progress() { - addProcessingMessage(regionInfo); - } - }); - } - - /* - * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue. - * This method is called while region is in the queue of regions to process - * and then while the region is being opened, it is called from the Worker - * thread that is running the region open. - * @param hri Region to add the message for - */ - protected void addProcessingMessage(final HRegionInfo hri) { - getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri)); - } - - void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) - throws IOException { - HRegion region = this.removeFromOnlineRegions(hri); - if (region != null) { - region.close(); - if(reportWhenCompleted) { - reportClose(hri); - } - } - } - - /** Called either when the master tells us to restart or from stop() */ - ArrayList closeAllRegions() { - ArrayList regionsToClose = new ArrayList(); - this.lock.writeLock().lock(); - try { - regionsToClose.addAll(onlineRegions.values()); - onlineRegions.clear(); - } finally { - this.lock.writeLock().unlock(); - } - for(HRegion region: regionsToClose) { - if (LOG.isDebugEnabled()) { - LOG.debug("closing region " + Bytes.toString(region.getRegionName())); - } - try { - region.close(abortRequested); - } catch (IOException e) { - LOG.error("error closing region " + - Bytes.toString(region.getRegionName()), - RemoteExceptionHandler.checkIOException(e)); - } - } - return regionsToClose; - } - - /** Called as the first stage of cluster shutdown. */ - void closeUserRegions() { - ArrayList regionsToClose = new ArrayList(); - this.lock.writeLock().lock(); - try { - synchronized (onlineRegions) { - for (Iterator> i = - onlineRegions.entrySet().iterator(); i.hasNext();) { - Map.Entry e = i.next(); - HRegion r = e.getValue(); - if (!r.getRegionInfo().isMetaRegion()) { - regionsToClose.add(r); - i.remove(); - } - } - } - } finally { - this.lock.writeLock().unlock(); - } - for(HRegion region: regionsToClose) { - if (LOG.isDebugEnabled()) { - LOG.debug("closing region " + Bytes.toString(region.getRegionName())); - } - try { - region.close(); - } catch (IOException e) { - LOG.error("error closing region " + region.getRegionName(), - RemoteExceptionHandler.checkIOException(e)); - } - } - this.quiesced.set(true); - if (onlineRegions.size() == 0) { - outboundMsgs.add(HMsg.REPORT_EXITING); - } else { - outboundMsgs.add(HMsg.REPORT_QUIESCED); - } - } - - // - // HRegionInterface - // - - /** {@inheritDoc} */ - public HRegionInfo getRegionInfo(final byte [] regionName) - throws NotServingRegionException { - requestCount.incrementAndGet(); - return getRegion(regionName).getRegionInfo(); - } - - /** {@inheritDoc} */ - public Cell get(final byte [] regionName, final byte [] row, - final byte [] column) - throws IOException { - checkOpen(); - requestCount.incrementAndGet(); - try { - return getRegion(regionName).get(row, column); - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** {@inheritDoc} */ - public Cell[] get(final byte [] regionName, final byte [] row, - final byte [] column, final int numVersions) - throws IOException { - checkOpen(); - requestCount.incrementAndGet(); - try { - return getRegion(regionName).get(row, column, numVersions); - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** {@inheritDoc} */ - public Cell[] get(final byte [] regionName, final byte [] row, - final byte [] column, final long timestamp, final int numVersions) - throws IOException { - checkOpen(); - requestCount.incrementAndGet(); - try { - return getRegion(regionName).get(row, column, timestamp, numVersions); - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** {@inheritDoc} */ - public RowResult getRow(final byte [] regionName, final byte [] row, - final long ts) - throws IOException { - return getRow(regionName, row, null, ts); - } - - /** {@inheritDoc} */ - public RowResult getRow(final byte [] regionName, final byte [] row, - final byte [][] columns) - throws IOException { - return getRow(regionName, row, columns, HConstants.LATEST_TIMESTAMP); - } - - /** {@inheritDoc} */ - public RowResult getRow(final byte [] regionName, final byte [] row, - final byte [][] columns, final long ts) - throws IOException { - checkOpen(); - requestCount.incrementAndGet(); - try { - // convert the columns array into a set so it's easy to check later. - Set columnSet = null; - if (columns != null) { - columnSet = new TreeSet(Bytes.BYTES_COMPARATOR); - columnSet.addAll(Arrays.asList(columns)); - } - - HRegion region = getRegion(regionName); - Map map = region.getFull(row, columnSet, ts); - HbaseMapWritable result = - new HbaseMapWritable(); - result.putAll(map); - return new RowResult(row, result); - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** {@inheritDoc} */ - public RowResult getClosestRowBefore(final byte [] regionName, - final byte [] row) - throws IOException { - checkOpen(); - requestCount.incrementAndGet(); - try { - // locate the region we're operating on - HRegion region = getRegion(regionName); - // ask the region for all the data - RowResult rr = region.getClosestRowBefore(row); - return rr; - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** {@inheritDoc} */ - public RowResult next(final long scannerId) throws IOException { - checkOpen(); - requestCount.incrementAndGet(); - try { - String scannerName = String.valueOf(scannerId); - InternalScanner s = scanners.get(scannerName); - if (s == null) { - throw new UnknownScannerException("Name: " + scannerName); - } - this.leases.renewLease(scannerName); - - // Collect values to be returned here - HbaseMapWritable values - = new HbaseMapWritable(); - HStoreKey key = new HStoreKey(); - TreeMap results = - new TreeMap(Bytes.BYTES_COMPARATOR); - while (s.next(key, results)) { - values.putAll(results); - if (values.size() > 0) { - // Row has something in it. Return the value. - break; - } - - // No data for this row, go get another. - results.clear(); - } - return values.size() == 0 ? null : new RowResult(key.getRow(), values); - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** {@inheritDoc} */ - public void batchUpdate(final byte [] regionName, BatchUpdate b) - throws IOException { - checkOpen(); - this.requestCount.incrementAndGet(); - HRegion region = getRegion(regionName); - validateValuesLength(b, region); - try { - cacheFlusher.reclaimMemcacheMemory(); - region.batchUpdate(b); - } catch (OutOfMemoryError error) { - abort(); - LOG.fatal("Ran out of memory", error); - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** - * Utility method to verify values length - * @param batchUpdate The update to verify - * @throws IOException Thrown if a value is too long - */ - private void validateValuesLength(BatchUpdate batchUpdate, - HRegion region) throws IOException { - HTableDescriptor desc = region.getTableDesc(); - for (Iterator iter = - batchUpdate.iterator(); iter.hasNext();) { - - BatchOperation operation = iter.next(); - int maxLength = - desc.getFamily(HStoreKey.getFamily(operation.getColumn())). - getMaxValueLength(); - if(operation.getValue() != null) - if(operation.getValue().length > maxLength) { - throw new IOException("Value in column " + - Bytes.toString(operation.getColumn()) + " is too long. " + - operation.getValue().length + " instead of " + maxLength); - } - } - } - - // - // remote scanner interface - // - - /** {@inheritDoc} */ - public long openScanner(byte [] regionName, byte [][] cols, byte [] firstRow, - final long timestamp, final RowFilterInterface filter) - throws IOException { - checkOpen(); - NullPointerException npe = null; - if (regionName == null) { - npe = new NullPointerException("regionName is null"); - } else if (cols == null) { - npe = new NullPointerException("columns to scan is null"); - } else if (firstRow == null) { - npe = new NullPointerException("firstRow for scanner is null"); - } - if (npe != null) { - IOException io = new IOException("Invalid arguments to openScanner"); - io.initCause(npe); - throw io; - } - requestCount.incrementAndGet(); - try { - HRegion r = getRegion(regionName); - InternalScanner s = - r.getScanner(cols, firstRow, timestamp, filter); - long scannerId = addScanner(s); - return scannerId; - } catch (IOException e) { - LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", - RemoteExceptionHandler.checkIOException(e)); - checkFileSystem(); - throw e; - } - } - - protected long addScanner(InternalScanner s) throws LeaseStillHeldException { - long scannerId = -1L; - scannerId = rand.nextLong(); - String scannerName = String.valueOf(scannerId); - synchronized(scanners) { - scanners.put(scannerName, s); - } - this.leases. - createLease(scannerName, new ScannerListener(scannerName)); - return scannerId; - } - - /** {@inheritDoc} */ - public void close(final long scannerId) throws IOException { - checkOpen(); - requestCount.incrementAndGet(); - try { - String scannerName = String.valueOf(scannerId); - InternalScanner s = null; - synchronized(scanners) { - s = scanners.remove(scannerName); - } - if(s == null) { - throw new UnknownScannerException(scannerName); - } - s.close(); - this.leases.cancelLease(scannerName); - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - Map scanners = - Collections.synchronizedMap(new HashMap()); - - /** - * Instantiated as a scanner lease. - * If the lease times out, the scanner is closed - */ - private class ScannerListener implements LeaseListener { - private final String scannerName; - - ScannerListener(final String n) { - this.scannerName = n; - } - - /** {@inheritDoc} */ - public void leaseExpired() { - LOG.info("Scanner " + this.scannerName + " lease expired"); - InternalScanner s = null; - synchronized(scanners) { - s = scanners.remove(this.scannerName); - } - if (s != null) { - try { - s.close(); - } catch (IOException e) { - LOG.error("Closing scanner", e); - } - } - } - } - - // - // Methods that do the actual work for the remote API - // - - /** {@inheritDoc} */ - public void deleteAll(final byte [] regionName, final byte [] row, - final byte [] column, final long timestamp) - throws IOException { - HRegion region = getRegion(regionName); - region.deleteAll(row, column, timestamp); - } - - /** {@inheritDoc} */ - public void deleteAll(final byte [] regionName, final byte [] row, - final long timestamp) - throws IOException { - HRegion region = getRegion(regionName); - region.deleteAll(row, timestamp); - } - - /** {@inheritDoc} */ - public void deleteFamily(byte [] regionName, byte [] row, byte [] family, - long timestamp) throws IOException{ - getRegion(regionName).deleteFamily(row, family, timestamp); - } - - - /** - * @return Info on this server. - */ - public HServerInfo getServerInfo() { - return this.serverInfo; - } - - /** @return the info server */ - public InfoServer getInfoServer() { - return infoServer; - } - - /** - * @return true if a stop has been requested. - */ - public boolean isStopRequested() { - return stopRequested.get(); - } - - /** - * - * @return the configuration - */ - public HBaseConfiguration getConfiguration() { - return conf; - } - - /** @return the write lock for the server */ - ReentrantReadWriteLock.WriteLock getWriteLock() { - return lock.writeLock(); - } - - /** - * @return Immutable list of this servers regions. - */ - public Collection getOnlineRegions() { - return Collections.unmodifiableCollection(onlineRegions.values()); - } - - /** - * @return The HRegionInfos from online regions sorted - */ - public SortedSet getSortedOnlineRegionInfos() { - SortedSet result = new TreeSet(); - synchronized(this.onlineRegions) { - for (HRegion r: this.onlineRegions.values()) { - result.add(r.getRegionInfo()); - } - } - return result; - } - - /** - * This method removes HRegion corresponding to hri from the Map of onlineRegions. - * - * @param hri the HRegionInfo corresponding to the HRegion to-be-removed. - * @return the removed HRegion, or null if the HRegion was not in onlineRegions. - */ - HRegion removeFromOnlineRegions(HRegionInfo hri) { - this.lock.writeLock().lock(); - HRegion toReturn = null; - try { - toReturn = onlineRegions.remove(Bytes.mapKey(hri.getRegionName())); - } finally { - this.lock.writeLock().unlock(); - } - return toReturn; - } - - /** - * @return A new Map of online regions sorted by region size with the first - * entry being the biggest. - */ - public SortedMap getCopyOfOnlineRegionsSortedBySize() { - // we'll sort the regions in reverse - SortedMap sortedRegions = new TreeMap( - new Comparator() { - public int compare(Long a, Long b) { - return -1 * a.compareTo(b); - } - }); - // Copy over all regions. Regions are sorted by size with biggest first. - synchronized (this.onlineRegions) { - for (HRegion region : this.onlineRegions.values()) { - sortedRegions.put(region.memcacheSize.get(), region); - } - } - return sortedRegions; - } - - /** - * @param regionName - * @return HRegion for the passed regionName or null if named - * region is not member of the online regions. - */ - public HRegion getOnlineRegion(final byte [] regionName) { - return onlineRegions.get(Bytes.mapKey(regionName)); - } - - /** @return the request count */ - public AtomicInteger getRequestCount() { - return this.requestCount; - } - - /** @return reference to FlushRequester */ - public FlushRequester getFlushRequester() { - return this.cacheFlusher; - } - - /** - * Protected utility method for safely obtaining an HRegion handle. - * @param regionName Name of online {@link HRegion} to return - * @return {@link HRegion} for regionName - * @throws NotServingRegionException - */ - protected HRegion getRegion(final byte [] regionName) - throws NotServingRegionException { - HRegion region = null; - this.lock.readLock().lock(); - try { - Integer key = Integer.valueOf(Bytes.hashCode(regionName)); - region = onlineRegions.get(key); - if (region == null) { - throw new NotServingRegionException(regionName); - } - return region; - } finally { - this.lock.readLock().unlock(); - } - } - - /** - * Get the top N most loaded regions this server is serving so we can - * tell the master which regions it can reallocate if we're overloaded. - * TODO: actually calculate which regions are most loaded. (Right now, we're - * just grabbing the first N regions being served regardless of load.) - */ - protected HRegionInfo[] getMostLoadedRegions() { - ArrayList regions = new ArrayList(); - synchronized (onlineRegions) { - for (HRegion r : onlineRegions.values()) { - if (regions.size() < numRegionsToReport) { - regions.add(r.getRegionInfo()); - } else { - break; - } - } - } - return regions.toArray(new HRegionInfo[regions.size()]); - } - - /** - * Called to verify that this server is up and running. - * - * @throws IOException - */ - protected void checkOpen() throws IOException { - if (this.stopRequested.get() || this.abortRequested) { - throw new IOException("Server not running"); - } - if (!fsOk) { - throw new IOException("File system not available"); - } - } - - /** - * Checks to see if the file system is still accessible. - * If not, sets abortRequested and stopRequested - * - * @return false if file system is not available - */ - protected boolean checkFileSystem() { - if (this.fsOk && fs != null) { - try { - FSUtils.checkFileSystemAvailable(fs); - } catch (IOException e) { - LOG.fatal("Shutting down HRegionServer: file system not available", e); - abort(); - fsOk = false; - } - } - return this.fsOk; - } - - /** - * @return Returns list of non-closed regions hosted on this server. If no - * regions to check, returns an empty list. - */ - protected Set getRegionsToCheck() { - HashSet regionsToCheck = new HashSet(); - //TODO: is this locking necessary? - lock.readLock().lock(); - try { - regionsToCheck.addAll(this.onlineRegions.values()); - } finally { - lock.readLock().unlock(); - } - // Purge closed regions. - for (final Iterator i = regionsToCheck.iterator(); i.hasNext();) { - HRegion r = i.next(); - if (r.isClosed()) { - i.remove(); - } - } - return regionsToCheck; - } - - /** {@inheritDoc} */ - public long getProtocolVersion(final String protocol, - @SuppressWarnings("unused") final long clientVersion) - throws IOException { - if (protocol.equals(HRegionInterface.class.getName())) { - return HRegionInterface.versionID; - } - throw new IOException("Unknown protocol to name node: " + protocol); - } - - /** - * @return Queue to which you can add outbound messages. - */ - protected List getOutboundMsgs() { - return this.outboundMsgs; - } - - /** - * Return the total size of all memcaches in every region. - * @return memcache size in bytes - */ - public long getGlobalMemcacheSize() { - long total = 0; - synchronized (onlineRegions) { - for (HRegion region : onlineRegions.values()) { - total += region.memcacheSize.get(); - } - } - return total; - } - - /** - * @return Return the leases. - */ - protected Leases getLeases() { - return leases; - } - - /** - * @return Return the rootDir. - */ - protected Path getRootDir() { - return rootDir; - } - - /** - * @return Return the fs. - */ - protected FileSystem getFileSystem() { - return fs; - } - - // - // Main program and support routines - // - - private static void printUsageAndExit() { - printUsageAndExit(null); - } - - private static void printUsageAndExit(final String message) { - if (message != null) { - System.err.println(message); - } - System.err.println("Usage: java " + - "org.apache.hbase.HRegionServer [--bind=hostname:port] start"); - System.exit(0); - } - - /** - * Do class main. - * @param args - * @param regionServerClass HRegionServer to instantiate. - */ - protected static void doMain(final String [] args, - final Class regionServerClass) { - if (args.length < 1) { - printUsageAndExit(); - } - Configuration conf = new HBaseConfiguration(); - - // Process command-line args. TODO: Better cmd-line processing - // (but hopefully something not as painful as cli options). - final String addressArgKey = "--bind="; - for (String cmd: args) { - if (cmd.startsWith(addressArgKey)) { - conf.set(REGIONSERVER_ADDRESS, cmd.substring(addressArgKey.length())); - continue; - } - - if (cmd.equals("start")) { - try { - // If 'local', don't start a region server here. Defer to - // LocalHBaseCluster. It manages 'local' clusters. - if (LocalHBaseCluster.isLocal(conf)) { - LOG.warn("Not starting a distinct region server because " + - "hbase.master is set to 'local' mode"); - } else { - Constructor c = - regionServerClass.getConstructor(HBaseConfiguration.class); - HRegionServer hrs = c.newInstance(conf); - Thread t = new Thread(hrs); - t.setName("regionserver" + hrs.server.getListenerAddress()); - t.start(); - } - } catch (Throwable t) { - LOG.error( "Can not start region server because "+ - StringUtils.stringifyException(t) ); - System.exit(-1); - } - break; - } - - if (cmd.equals("stop")) { - printUsageAndExit("To shutdown the regionserver run " + - "bin/hbase-daemon.sh stop regionserver or send a kill signal to" + - "the regionserver pid"); - } - - // Print out usage if we get to here. - printUsageAndExit(); - } - } - - /** - * @param args - */ - public static void main(String [] args) { - Configuration conf = new HBaseConfiguration(); - @SuppressWarnings("unchecked") - Class regionServerClass = (Class) conf - .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); - doMain(args, regionServerClass); - } -} +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.lang.reflect.Constructor; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LeaseListener; +import org.apache.hadoop.hbase.Leases; +import org.apache.hadoop.hbase.LocalHBaseCluster; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionHistorian; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchOperation; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.HbaseRPC; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.InfoServer; +import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; + +/** + * HRegionServer makes a set of HRegions available to clients. It checks in with + * the HMaster. There are many HRegionServers in a single HBase deployment. + */ +public class HRegionServer implements HConstants, HRegionInterface, Runnable { + static final Log LOG = LogFactory.getLog(HRegionServer.class); + + // Set when a report to the master comes back with a message asking us to + // shutdown. Also set by call to stop when debugging or running unit tests + // of HRegionServer in isolation. We use AtomicBoolean rather than + // plain boolean so we can pass a reference to Chore threads. Otherwise, + // Chore threads need to know about the hosting class. + protected final AtomicBoolean stopRequested = new AtomicBoolean(false); + + protected final AtomicBoolean quiesced = new AtomicBoolean(false); + + // Go down hard. Used if file system becomes unavailable and also in + // debugging and unit tests. + protected volatile boolean abortRequested; + + // If false, the file system has become unavailable + protected volatile boolean fsOk; + + protected final HServerInfo serverInfo; + protected final HBaseConfiguration conf; + private FileSystem fs; + private Path rootDir; + private final Random rand = new Random(); + + // Key is Bytes.hashCode of region name byte array and the value is HRegion + // in both of the maps below. Use Bytes.mapKey(byte []) generating key for + // below maps. + protected final Map onlineRegions = + new ConcurrentHashMap(); + + protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final List outboundMsgs = + Collections.synchronizedList(new ArrayList()); + + final int numRetries; + protected final int threadWakeFrequency; + private final int msgInterval; + private final int serverLeaseTimeout; + + protected final int numRegionsToReport; + + // Remote HMaster + private HMasterRegionInterface hbaseMaster; + + // Server to handle client requests. Default access so can be accessed by + // unit tests. + final Server server; + + // Leases + private final Leases leases; + + // Request counter + private volatile AtomicInteger requestCount = new AtomicInteger(); + + // Info server. Default access so can be used by unit tests. REGIONSERVER + // is name of the webapp and the attribute name used stuffing this instance + // into web context. + InfoServer infoServer; + + /** region server process name */ + public static final String REGIONSERVER = "regionserver"; + + /** + * Space is reserved in HRS constructor and then released when aborting + * to recover from an OOME. See HBASE-706. + */ + private final LinkedList reservedSpace = new LinkedList(); + + /** + * Thread to shutdown the region server in an orderly manner. This thread + * is registered as a shutdown hook in the HRegionServer constructor and is + * only called when the HRegionServer receives a kill signal. + */ + class ShutdownThread extends Thread { + private final HRegionServer instance; + + /** + * @param instance + */ + public ShutdownThread(HRegionServer instance) { + this.instance = instance; + } + + /** {@inheritDoc} */ + @Override + public void run() { + LOG.info("Starting shutdown thread."); + + // tell the region server to stop and wait for it to complete + instance.stop(); + instance.join(); + LOG.info("Shutdown thread complete"); + } + } + + // Compactions + final CompactSplitThread compactSplitThread; + + // Cache flushing + final Flusher cacheFlusher; + + // HLog and HLog roller. log is protected rather than private to avoid + // eclipse warning when accessed by inner classes + protected HLog log; + final LogRoller logRoller; + + // flag set after we're done setting up server threads (used for testing) + protected volatile boolean isOnline; + + /** + * Starts a HRegionServer at the default location + * @param conf + * @throws IOException + */ + public HRegionServer(HBaseConfiguration conf) throws IOException { + this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS, + DEFAULT_REGIONSERVER_ADDRESS)), conf); + } + + /** + * Starts a HRegionServer at the specified location + * @param address + * @param conf + * @throws IOException + */ + public HRegionServer(HServerAddress address, HBaseConfiguration conf) + throws IOException { + this.abortRequested = false; + this.fsOk = true; + this.conf = conf; + + this.isOnline = false; + + // Config'ed params + this.numRetries = conf.getInt("hbase.client.retries.number", 2); + this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000); + this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + this.serverLeaseTimeout = + conf.getInt("hbase.master.lease.period", 120 * 1000); + + // Cache flushing thread. + this.cacheFlusher = new Flusher(conf, this); + + // Compaction thread + this.compactSplitThread = new CompactSplitThread(this); + + // Log rolling thread + this.logRoller = new LogRoller(this); + + // Task thread to process requests from Master + this.worker = new Worker(); + this.workerThread = new Thread(worker); + + // Server to handle client requests + this.server = HbaseRPC.getServer(this, address.getBindAddress(), + address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), + false, conf); + // Address is givin a default IP for the moment. Will be changed after + // calling the master. + this.serverInfo = new HServerInfo(new HServerAddress( + new InetSocketAddress(DEFAULT_HOST, + this.server.getListenerAddress().getPort())), System.currentTimeMillis(), + this.conf.getInt("hbase.regionserver.info.port", 60030)); + this.numRegionsToReport = + conf.getInt("hbase.regionserver.numregionstoreport", 10); + + this.leases = new Leases( + conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000), + this.threadWakeFrequency); + + int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4); + for(int i = 0; i < nbBlocks; i++) { + reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]); + } + + // Register shutdown hook for HRegionServer, runs an orderly shutdown + // when a kill signal is recieved + Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); + } + + /** + * The HRegionServer sticks in this loop until closed. It repeatedly checks + * in with the HMaster, sending heartbeats & reports, and receiving HRegion + * load/unload instructions. + */ + public void run() { + boolean quiesceRequested = false; + // A sleeper that sleeps for msgInterval. + Sleeper sleeper = + new Sleeper(this.msgInterval, this.stopRequested); + try { + init(reportForDuty(sleeper)); + long lastMsg = 0; + // Now ask master what it wants us to do and tell it what we have done + for (int tries = 0; !stopRequested.get() && isHealthy();) { + long now = System.currentTimeMillis(); + if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) { + // It has been way too long since we last reported to the master. + // Commit suicide. + LOG.fatal("unable to report to master for " + (now - lastMsg) + + " milliseconds - aborting server"); + abort(); + break; + } + if ((now - lastMsg) >= msgInterval) { + HMsg outboundArray[] = null; + synchronized(this.outboundMsgs) { + outboundArray = + this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]); + this.outboundMsgs.clear(); + } + try { + this.serverInfo.setLoad(new HServerLoad(requestCount.get(), + onlineRegions.size())); + this.requestCount.set(0); + HMsg msgs[] = hbaseMaster.regionServerReport( + serverInfo, outboundArray, getMostLoadedRegions()); + lastMsg = System.currentTimeMillis(); + if (this.quiesced.get() && onlineRegions.size() == 0) { + // We've just told the master we're exiting because we aren't + // serving any regions. So set the stop bit and exit. + LOG.info("Server quiesced and not serving any regions. " + + "Starting shutdown"); + stopRequested.set(true); + this.outboundMsgs.clear(); + continue; + } + + // Queue up the HMaster's instruction stream for processing + boolean restart = false; + for(int i = 0; + !restart && !stopRequested.get() && i < msgs.length; + i++) { + LOG.info(msgs[i].toString()); + switch(msgs[i].getType()) { + case MSG_CALL_SERVER_STARTUP: + // We the MSG_CALL_SERVER_STARTUP on startup but we can also + // get it when the master is panicing because for instance + // the HDFS has been yanked out from under it. Be wary of + // this message. + if (checkFileSystem()) { + closeAllRegions(); + try { + log.closeAndDelete(); + } catch (Exception e) { + LOG.error("error closing and deleting HLog", e); + } + try { + serverInfo.setStartCode(System.currentTimeMillis()); + log = setupHLog(); + } catch (IOException e) { + this.abortRequested = true; + this.stopRequested.set(true); + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal("error restarting server", e); + break; + } + reportForDuty(sleeper); + restart = true; + } else { + LOG.fatal("file system available check failed. " + + "Shutting down server."); + } + break; + + case MSG_REGIONSERVER_STOP: + stopRequested.set(true); + break; + + case MSG_REGIONSERVER_QUIESCE: + if (!quiesceRequested) { + try { + toDo.put(new ToDoEntry(msgs[i])); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was " + + "interrupted.", e); + } + quiesceRequested = true; + } + break; + + default: + if (fsOk) { + try { + toDo.put(new ToDoEntry(msgs[i])); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was " + + "interrupted.", e); + } + } + } + } + // Reset tries count if we had a successful transaction. + tries = 0; + + if (restart || this.stopRequested.get()) { + toDo.clear(); + continue; + } + } catch (Exception e) { + if (e instanceof IOException) { + e = RemoteExceptionHandler.checkIOException((IOException) e); + } + if (tries < this.numRetries) { + LOG.warn("Processing message (Retry: " + tries + ")", e); + tries++; + } else { + LOG.fatal("Exceeded max retries: " + this.numRetries, e); + if (!checkFileSystem()) { + continue; + } + // Something seriously wrong. Shutdown. + stop(); + } + } + } + // Do some housekeeping before going to sleep + housekeeping(); + sleeper.sleep(lastMsg); + } // for + } catch (OutOfMemoryError error) { + abort(); + LOG.fatal("Ran out of memory", error); + } catch (Throwable t) { + LOG.fatal("Unhandled exception. Aborting...", t); + abort(); + } + RegionHistorian.getInstance().offline(); + this.leases.closeAfterLeasesExpire(); + this.worker.stop(); + this.server.stop(); + if (this.infoServer != null) { + LOG.info("Stopping infoServer"); + try { + this.infoServer.stop(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + + // Send interrupts to wake up threads if sleeping so they notice shutdown. + // TODO: Should we check they are alive? If OOME could have exited already + cacheFlusher.interruptIfNecessary(); + compactSplitThread.interruptIfNecessary(); + this.logRoller.interruptIfNecessary(); + + if (abortRequested) { + if (this.fsOk) { + // Only try to clean up if the file system is available + try { + this.log.close(); + LOG.info("On abort, closed hlog"); + } catch (IOException e) { + LOG.error("Unable to close log in abort", + RemoteExceptionHandler.checkIOException(e)); + } + closeAllRegions(); // Don't leave any open file handles + } + LOG.info("aborting server at: " + + serverInfo.getServerAddress().toString()); + } else { + ArrayList closedRegions = closeAllRegions(); + try { + log.closeAndDelete(); + } catch (IOException e) { + LOG.error("Close and delete failed", + RemoteExceptionHandler.checkIOException(e)); + } + try { + HMsg[] exitMsg = new HMsg[closedRegions.size() + 1]; + exitMsg[0] = HMsg.REPORT_EXITING; + // Tell the master what regions we are/were serving + int i = 1; + for (HRegion region: closedRegions) { + exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, + region.getRegionInfo()); + } + + LOG.info("telling master that region server is shutting down at: " + + serverInfo.getServerAddress().toString()); + hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null); + } catch (IOException e) { + LOG.warn("Failed to send exiting message to master: ", + RemoteExceptionHandler.checkIOException(e)); + } + LOG.info("stopping server at: " + + serverInfo.getServerAddress().toString()); + } + join(); + LOG.info(Thread.currentThread().getName() + " exiting"); + } + + /* + * Run init. Sets up hlog and starts up all server threads. + * @param c Extra configuration. + */ + protected void init(final MapWritable c) throws IOException { + try { + for (Map.Entry e: c.entrySet()) { + String key = e.getKey().toString(); + String value = e.getValue().toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("Config from master: " + key + "=" + value); + } + this.conf.set(key, value); + } + // Master may have sent us a new address with the other configs. + // Update our address in this case. See HBASE-719 + if(conf.get("hbase.regionserver.address") != null) + serverInfo.setServerAddress(new HServerAddress + (conf.get("hbase.regionserver.address"), + serverInfo.getServerAddress().getPort())); + // Master sent us hbase.rootdir to use. Should be fully qualified + // path with file system specification included. Set 'fs.default.name' + // to match the filesystem on hbase.rootdir else underlying hadoop hdfs + // accessors will be going against wrong filesystem (unless all is set + // to defaults). + this.conf.set("fs.default.name", this.conf.get("hbase.rootdir")); + this.fs = FileSystem.get(this.conf); + this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR)); + this.log = setupHLog(); + startServiceThreads(); + isOnline = true; + } catch (IOException e) { + this.stopRequested.set(true); + isOnline = false; + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal("Failed init", e); + IOException ex = new IOException("region server startup failed"); + ex.initCause(e); + throw ex; + } + } + + /** + * Report the status of the server. A server is online once all the startup + * is completed (setting up filesystem, starting service threads, etc.). This + * method is designed mostly to be useful in tests. + * @return true if online, false if not. + */ + public boolean isOnline() { + return isOnline; + } + + private HLog setupHLog() throws RegionServerRunningException, + IOException { + + Path logdir = new Path(rootDir, "log" + "_" + + serverInfo.getServerAddress().getBindAddress() + "_" + + this.serverInfo.getStartCode() + "_" + + this.serverInfo.getServerAddress().getPort()); + if (LOG.isDebugEnabled()) { + LOG.debug("Log dir " + logdir); + } + if (fs.exists(logdir)) { + throw new RegionServerRunningException("region server already " + + "running at " + this.serverInfo.getServerAddress().toString() + + " because logdir " + logdir.toString() + " exists"); + } + return new HLog(fs, logdir, conf, logRoller); + } + + /* + * Start Chore Threads, Server, Worker and lease checker threads. Install an + * UncaughtExceptionHandler that calls abort of RegionServer if we get + * an unhandled exception. We cannot set the handler on all threads. + * Server's internal Listener thread is off limits. For Server, if an OOME, + * it waits a while then retries. Meantime, a flush or a compaction that + * tries to run should trigger same critical condition and the shutdown will + * run. On its way out, this server will shut down Server. Leases are sort + * of inbetween. It has an internal thread that while it inherits from + * Chore, it keeps its own internal stop mechanism so needs to be stopped + * by this hosting server. Worker logs the exception and exits. + */ + private void startServiceThreads() throws IOException { + String n = Thread.currentThread().getName(); + UncaughtExceptionHandler handler = new UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + abort(); + LOG.fatal("Set stop flag in " + t.getName(), e); + } + }; + Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller", + handler); + Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", + handler); + Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", + handler); + Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); + // Leases is not a Thread. Internally it runs a daemon thread. If it gets + // an unhandled exception, it will just exit. + this.leases.setName(n + ".leaseChecker"); + this.leases.start(); + // Put up info server. + int port = this.conf.getInt("hbase.regionserver.info.port", 60030); + if (port >= 0) { + String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); + this.infoServer = new InfoServer("regionserver", a, port, false); + this.infoServer.setAttribute("regionserver", this); + this.infoServer.start(); + } + // Start Server. This service is like leases in that it internally runs + // a thread. + this.server.start(); + LOG.info("HRegionServer started at: " + + serverInfo.getServerAddress().toString()); + } + + /* + * Verify that server is healthy + */ + private boolean isHealthy() { + if (!fsOk) { + // File system problem + return false; + } + // Verify that all threads are alive + if (!(leases.isAlive() && compactSplitThread.isAlive() && + cacheFlusher.isAlive() && logRoller.isAlive() && + workerThread.isAlive())) { + // One or more threads are no longer alive - shut down + stop(); + return false; + } + return true; + } + /* + * Run some housekeeping tasks before we go into 'hibernation' sleeping at + * the end of the main HRegionServer run loop. + */ + private void housekeeping() { + // If the todo list has > 0 messages, iterate looking for open region + // messages. Send the master a message that we're working on its + // processing so it doesn't assign the region elsewhere. + if (this.toDo.size() <= 0) { + return; + } + // This iterator is 'safe'. We are guaranteed a view on state of the + // queue at time iterator was taken out. Apparently goes from oldest. + for (ToDoEntry e: this.toDo) { + if (e.msg.isType(HMsg.Type.MSG_REGION_OPEN)) { + addProcessingMessage(e.msg.getRegionInfo()); + } + } + } + + /** @return the HLog */ + HLog getLog() { + return this.log; + } + + /** + * Sets a flag that will cause all the HRegionServer threads to shut down + * in an orderly fashion. Used by unit tests. + */ + public void stop() { + this.stopRequested.set(true); + synchronized(this) { + notifyAll(); // Wakes run() if it is sleeping + } + } + + /** + * Cause the server to exit without closing the regions it is serving, the + * log it is using and without notifying the master. + * Used unit testing and on catastrophic events such as HDFS is yanked out + * from under hbase or we OOME. + */ + public void abort() { + reservedSpace.clear(); + this.abortRequested = true; + stop(); + } + + /** + * Wait on all threads to finish. + * Presumption is that all closes and stops have already been called. + */ + void join() { + join(this.workerThread); + join(this.cacheFlusher); + join(this.compactSplitThread); + join(this.logRoller); + } + + private void join(final Thread t) { + while (t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + // continue + } + } + } + + /* + * Let the master know we're here + * Run initialization using parameters passed us by the master. + */ + private MapWritable reportForDuty(final Sleeper sleeper) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Telling master at " + + conf.get(MASTER_ADDRESS) + " that we are up"); + } + // Do initial RPC setup. The final argument indicates that the RPC should retry indefinitely. + this.hbaseMaster = (HMasterRegionInterface)HbaseRPC.waitForProxy( + HMasterRegionInterface.class, HMasterRegionInterface.versionID, + new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(), + this.conf, -1); + MapWritable result = null; + long lastMsg = 0; + while(!stopRequested.get()) { + try { + this.requestCount.set(0); + this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size())); + lastMsg = System.currentTimeMillis(); + result = this.hbaseMaster.regionServerStartup(serverInfo); + break; + } catch (Leases.LeaseStillHeldException e) { + LOG.info("Lease " + e.getName() + " already held on master. Check " + + "DNS configuration so that all region servers are" + + "reporting their true IPs and not 127.0.0.1. Otherwise, this" + + "problem should resolve itself after the lease period of " + + this.conf.get("hbase.master.lease.period") + + " seconds expires over on the master"); + } catch (IOException e) { + LOG.warn("error telling master we are up", e); + } + sleeper.sleep(lastMsg); + } + return result; + } + + /* Add to the outbound message buffer */ + private void reportOpen(HRegionInfo region) { + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); + } + + /* Add to the outbound message buffer */ + private void reportClose(HRegionInfo region) { + reportClose(region, null); + } + + /* Add to the outbound message buffer */ + private void reportClose(final HRegionInfo region, final Text message) { + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); + } + + + /** + * Add to the outbound message buffer + * + * When a region splits, we need to tell the master that there are two new + * regions that need to be assigned. + * + * We do not need to inform the master about the old region, because we've + * updated the meta or root regions, and the master will pick that up on its + * next rescan of the root or meta tables. + */ + void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, + HRegionInfo newRegionB) { + + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, + new Text(oldRegion.getRegionNameAsString() + " split; daughters: " + + newRegionA.getRegionNameAsString() + ", " + + newRegionB.getRegionNameAsString()))); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); + } + + ////////////////////////////////////////////////////////////////////////////// + // HMaster-given operations + ////////////////////////////////////////////////////////////////////////////// + + /* + * Data structure to hold a HMsg and retries count. + */ + private static class ToDoEntry { + private int tries; + private final HMsg msg; + ToDoEntry(HMsg msg) { + this.tries = 0; + this.msg = msg; + } + } + + final BlockingQueue toDo = new LinkedBlockingQueue(); + private Worker worker; + private Thread workerThread; + + /** Thread that performs long running requests from the master */ + class Worker implements Runnable { + void stop() { + synchronized(toDo) { + toDo.notifyAll(); + } + } + + /** {@inheritDoc} */ + public void run() { + try { + while(!stopRequested.get()) { + ToDoEntry e = null; + try { + e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if(e == null || stopRequested.get()) { + continue; + } + LOG.info(e.msg); + switch(e.msg.getType()) { + + case MSG_REGIONSERVER_QUIESCE: + closeUserRegions(); + break; + + case MSG_REGION_OPEN: + // Open a region + openRegion(e.msg.getRegionInfo()); + break; + + case MSG_REGION_CLOSE: + // Close a region + closeRegion(e.msg.getRegionInfo(), true); + break; + + case MSG_REGION_CLOSE_WITHOUT_REPORT: + // Close a region, don't reply + closeRegion(e.msg.getRegionInfo(), false); + break; + + default: + throw new AssertionError( + "Impossible state during msg processing. Instruction: " + + e.msg.toString()); + } + } catch (InterruptedException ex) { + // continue + } catch (Exception ex) { + if (ex instanceof IOException) { + ex = RemoteExceptionHandler.checkIOException((IOException) ex); + } + if(e != null && e.tries < numRetries) { + LOG.warn(ex); + e.tries++; + try { + toDo.put(e); + } catch (InterruptedException ie) { + throw new RuntimeException("Putting into msgQueue was " + + "interrupted.", ex); + } + } else { + LOG.error("unable to process message" + + (e != null ? (": " + e.msg.toString()) : ""), ex); + if (!checkFileSystem()) { + break; + } + } + } + } + } catch(Throwable t) { + LOG.fatal("Unhandled exception", t); + } finally { + LOG.info("worker thread exiting"); + } + } + } + + void openRegion(final HRegionInfo regionInfo) { + // If historian is not online and this is not a meta region, online it. + if (!regionInfo.isMetaRegion() && + !RegionHistorian.getInstance().isOnline()) { + RegionHistorian.getInstance().online(this.conf); + } + Integer mapKey = Bytes.mapKey(regionInfo.getRegionName()); + HRegion region = this.onlineRegions.get(mapKey); + if (region == null) { + try { + region = instantiateRegion(regionInfo); + // Startup a compaction early if one is needed. + this.compactSplitThread.compactionRequested(region); + } catch (IOException e) { + LOG.error("error opening region " + regionInfo.getRegionNameAsString(), e); + + // TODO: add an extra field in HRegionInfo to indicate that there is + // an error. We can't do that now because that would be an incompatible + // change that would require a migration + reportClose(regionInfo, new Text(StringUtils.stringifyException(e))); + return; + } + this.lock.writeLock().lock(); + try { + this.log.setSequenceNumber(region.getMinSequenceId()); + this.onlineRegions.put(mapKey, region); + } finally { + this.lock.writeLock().unlock(); + } + } + reportOpen(regionInfo); + } + + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo + .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null, + this.cacheFlusher, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + + /* + * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue. + * This method is called while region is in the queue of regions to process + * and then while the region is being opened, it is called from the Worker + * thread that is running the region open. + * @param hri Region to add the message for + */ + protected void addProcessingMessage(final HRegionInfo hri) { + getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri)); + } + + void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) + throws IOException { + HRegion region = this.removeFromOnlineRegions(hri); + if (region != null) { + region.close(); + if(reportWhenCompleted) { + reportClose(hri); + } + } + } + + /** Called either when the master tells us to restart or from stop() */ + ArrayList closeAllRegions() { + ArrayList regionsToClose = new ArrayList(); + this.lock.writeLock().lock(); + try { + regionsToClose.addAll(onlineRegions.values()); + onlineRegions.clear(); + } finally { + this.lock.writeLock().unlock(); + } + for(HRegion region: regionsToClose) { + if (LOG.isDebugEnabled()) { + LOG.debug("closing region " + Bytes.toString(region.getRegionName())); + } + try { + region.close(abortRequested); + } catch (IOException e) { + LOG.error("error closing region " + + Bytes.toString(region.getRegionName()), + RemoteExceptionHandler.checkIOException(e)); + } + } + return regionsToClose; + } + + /** Called as the first stage of cluster shutdown. */ + void closeUserRegions() { + ArrayList regionsToClose = new ArrayList(); + this.lock.writeLock().lock(); + try { + synchronized (onlineRegions) { + for (Iterator> i = + onlineRegions.entrySet().iterator(); i.hasNext();) { + Map.Entry e = i.next(); + HRegion r = e.getValue(); + if (!r.getRegionInfo().isMetaRegion()) { + regionsToClose.add(r); + i.remove(); + } + } + } + } finally { + this.lock.writeLock().unlock(); + } + for(HRegion region: regionsToClose) { + if (LOG.isDebugEnabled()) { + LOG.debug("closing region " + Bytes.toString(region.getRegionName())); + } + try { + region.close(); + } catch (IOException e) { + LOG.error("error closing region " + region.getRegionName(), + RemoteExceptionHandler.checkIOException(e)); + } + } + this.quiesced.set(true); + if (onlineRegions.size() == 0) { + outboundMsgs.add(HMsg.REPORT_EXITING); + } else { + outboundMsgs.add(HMsg.REPORT_QUIESCED); + } + } + + // + // HRegionInterface + // + + /** {@inheritDoc} */ + public HRegionInfo getRegionInfo(final byte [] regionName) + throws NotServingRegionException { + requestCount.incrementAndGet(); + return getRegion(regionName).getRegionInfo(); + } + + /** {@inheritDoc} */ + public Cell get(final byte [] regionName, final byte [] row, + final byte [] column) + throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + try { + return getRegion(regionName).get(row, column); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public Cell[] get(final byte [] regionName, final byte [] row, + final byte [] column, final int numVersions) + throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + try { + return getRegion(regionName).get(row, column, numVersions); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public Cell[] get(final byte [] regionName, final byte [] row, + final byte [] column, final long timestamp, final int numVersions) + throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + try { + return getRegion(regionName).get(row, column, timestamp, numVersions); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public RowResult getRow(final byte [] regionName, final byte [] row, + final byte [][] columns, final long ts) + throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + try { + // convert the columns array into a set so it's easy to check later. + Set columnSet = null; + if (columns != null) { + columnSet = new TreeSet(Bytes.BYTES_COMPARATOR); + columnSet.addAll(Arrays.asList(columns)); + } + + HRegion region = getRegion(regionName); + Map map = region.getFull(row, columnSet, ts); + HbaseMapWritable result = + new HbaseMapWritable(); + result.putAll(map); + return new RowResult(row, result); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public RowResult getClosestRowBefore(final byte [] regionName, + final byte [] row) + throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + try { + // locate the region we're operating on + HRegion region = getRegion(regionName); + // ask the region for all the data + RowResult rr = region.getClosestRowBefore(row); + return rr; + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public RowResult next(final long scannerId) throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + try { + String scannerName = String.valueOf(scannerId); + InternalScanner s = scanners.get(scannerName); + if (s == null) { + throw new UnknownScannerException("Name: " + scannerName); + } + this.leases.renewLease(scannerName); + + // Collect values to be returned here + HbaseMapWritable values + = new HbaseMapWritable(); + HStoreKey key = new HStoreKey(); + TreeMap results = + new TreeMap(Bytes.BYTES_COMPARATOR); + while (s.next(key, results)) { + values.putAll(results); + if (values.size() > 0) { + // Row has something in it. Return the value. + break; + } + + // No data for this row, go get another. + results.clear(); + } + return values.size() == 0 ? null : new RowResult(key.getRow(), values); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public void batchUpdate(final byte [] regionName, BatchUpdate b) + throws IOException { + checkOpen(); + this.requestCount.incrementAndGet(); + HRegion region = getRegion(regionName); + validateValuesLength(b, region); + try { + cacheFlusher.reclaimMemcacheMemory(); + region.batchUpdate(b); + } catch (OutOfMemoryError error) { + abort(); + LOG.fatal("Ran out of memory", error); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** + * Utility method to verify values length + * @param batchUpdate The update to verify + * @throws IOException Thrown if a value is too long + */ + private void validateValuesLength(BatchUpdate batchUpdate, + HRegion region) throws IOException { + HTableDescriptor desc = region.getTableDesc(); + for (Iterator iter = + batchUpdate.iterator(); iter.hasNext();) { + + BatchOperation operation = iter.next(); + int maxLength = + desc.getFamily(HStoreKey.getFamily(operation.getColumn())). + getMaxValueLength(); + if(operation.getValue() != null) + if(operation.getValue().length > maxLength) { + throw new IOException("Value in column " + + Bytes.toString(operation.getColumn()) + " is too long. " + + operation.getValue().length + " instead of " + maxLength); + } + } + } + + // + // remote scanner interface + // + + /** {@inheritDoc} */ + public long openScanner(byte [] regionName, byte [][] cols, byte [] firstRow, + final long timestamp, final RowFilterInterface filter) + throws IOException { + checkOpen(); + NullPointerException npe = null; + if (regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if (cols == null) { + npe = new NullPointerException("columns to scan is null"); + } else if (firstRow == null) { + npe = new NullPointerException("firstRow for scanner is null"); + } + if (npe != null) { + IOException io = new IOException("Invalid arguments to openScanner"); + io.initCause(npe); + throw io; + } + requestCount.incrementAndGet(); + try { + HRegion r = getRegion(regionName); + InternalScanner s = + r.getScanner(cols, firstRow, timestamp, filter); + long scannerId = addScanner(s); + return scannerId; + } catch (IOException e) { + LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", + RemoteExceptionHandler.checkIOException(e)); + checkFileSystem(); + throw e; + } + } + + protected long addScanner(InternalScanner s) throws LeaseStillHeldException { + long scannerId = -1L; + scannerId = rand.nextLong(); + String scannerName = String.valueOf(scannerId); + synchronized(scanners) { + scanners.put(scannerName, s); + } + this.leases. + createLease(scannerName, new ScannerListener(scannerName)); + return scannerId; + } + + /** {@inheritDoc} */ + public void close(final long scannerId) throws IOException { + checkOpen(); + requestCount.incrementAndGet(); + try { + String scannerName = String.valueOf(scannerId); + InternalScanner s = null; + synchronized(scanners) { + s = scanners.remove(scannerName); + } + if(s == null) { + throw new UnknownScannerException(scannerName); + } + s.close(); + this.leases.cancelLease(scannerName); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + Map scanners = + Collections.synchronizedMap(new HashMap()); + + /** + * Instantiated as a scanner lease. + * If the lease times out, the scanner is closed + */ + private class ScannerListener implements LeaseListener { + private final String scannerName; + + ScannerListener(final String n) { + this.scannerName = n; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Scanner " + this.scannerName + " lease expired"); + InternalScanner s = null; + synchronized(scanners) { + s = scanners.remove(this.scannerName); + } + if (s != null) { + try { + s.close(); + } catch (IOException e) { + LOG.error("Closing scanner", e); + } + } + } + } + + // + // Methods that do the actual work for the remote API + // + + /** {@inheritDoc} */ + public void deleteAll(final byte [] regionName, final byte [] row, + final byte [] column, final long timestamp) + throws IOException { + HRegion region = getRegion(regionName); + region.deleteAll(row, column, timestamp); + } + + /** {@inheritDoc} */ + public void deleteAll(final byte [] regionName, final byte [] row, + final long timestamp) + throws IOException { + HRegion region = getRegion(regionName); + region.deleteAll(row, timestamp); + } + + /** {@inheritDoc} */ + public void deleteFamily(byte [] regionName, byte [] row, byte [] family, + long timestamp) throws IOException{ + getRegion(regionName).deleteFamily(row, family, timestamp); + } + + + /** + * @return Info on this server. + */ + public HServerInfo getServerInfo() { + return this.serverInfo; + } + + /** @return the info server */ + public InfoServer getInfoServer() { + return infoServer; + } + + /** + * @return true if a stop has been requested. + */ + public boolean isStopRequested() { + return stopRequested.get(); + } + + /** + * + * @return the configuration + */ + public HBaseConfiguration getConfiguration() { + return conf; + } + + /** @return the write lock for the server */ + ReentrantReadWriteLock.WriteLock getWriteLock() { + return lock.writeLock(); + } + + /** + * @return Immutable list of this servers regions. + */ + public Collection getOnlineRegions() { + return Collections.unmodifiableCollection(onlineRegions.values()); + } + + /** + * @return The HRegionInfos from online regions sorted + */ + public SortedSet getSortedOnlineRegionInfos() { + SortedSet result = new TreeSet(); + synchronized(this.onlineRegions) { + for (HRegion r: this.onlineRegions.values()) { + result.add(r.getRegionInfo()); + } + } + return result; + } + + /** + * This method removes HRegion corresponding to hri from the Map of onlineRegions. + * + * @param hri the HRegionInfo corresponding to the HRegion to-be-removed. + * @return the removed HRegion, or null if the HRegion was not in onlineRegions. + */ + HRegion removeFromOnlineRegions(HRegionInfo hri) { + this.lock.writeLock().lock(); + HRegion toReturn = null; + try { + toReturn = onlineRegions.remove(Bytes.mapKey(hri.getRegionName())); + } finally { + this.lock.writeLock().unlock(); + } + return toReturn; + } + + /** + * @return A new Map of online regions sorted by region size with the first + * entry being the biggest. + */ + public SortedMap getCopyOfOnlineRegionsSortedBySize() { + // we'll sort the regions in reverse + SortedMap sortedRegions = new TreeMap( + new Comparator() { + public int compare(Long a, Long b) { + return -1 * a.compareTo(b); + } + }); + // Copy over all regions. Regions are sorted by size with biggest first. + synchronized (this.onlineRegions) { + for (HRegion region : this.onlineRegions.values()) { + sortedRegions.put(region.memcacheSize.get(), region); + } + } + return sortedRegions; + } + + /** + * @param regionName + * @return HRegion for the passed regionName or null if named + * region is not member of the online regions. + */ + public HRegion getOnlineRegion(final byte [] regionName) { + return onlineRegions.get(Bytes.mapKey(regionName)); + } + + /** @return the request count */ + public AtomicInteger getRequestCount() { + return this.requestCount; + } + + /** @return reference to FlushRequester */ + public FlushRequester getFlushRequester() { + return this.cacheFlusher; + } + + /** + * Protected utility method for safely obtaining an HRegion handle. + * @param regionName Name of online {@link HRegion} to return + * @return {@link HRegion} for regionName + * @throws NotServingRegionException + */ + protected HRegion getRegion(final byte [] regionName) + throws NotServingRegionException { + HRegion region = null; + this.lock.readLock().lock(); + try { + Integer key = Integer.valueOf(Bytes.hashCode(regionName)); + region = onlineRegions.get(key); + if (region == null) { + throw new NotServingRegionException(regionName); + } + return region; + } finally { + this.lock.readLock().unlock(); + } + } + + /** + * Get the top N most loaded regions this server is serving so we can + * tell the master which regions it can reallocate if we're overloaded. + * TODO: actually calculate which regions are most loaded. (Right now, we're + * just grabbing the first N regions being served regardless of load.) + */ + protected HRegionInfo[] getMostLoadedRegions() { + ArrayList regions = new ArrayList(); + synchronized (onlineRegions) { + for (HRegion r : onlineRegions.values()) { + if (regions.size() < numRegionsToReport) { + regions.add(r.getRegionInfo()); + } else { + break; + } + } + } + return regions.toArray(new HRegionInfo[regions.size()]); + } + + /** + * Called to verify that this server is up and running. + * + * @throws IOException + */ + protected void checkOpen() throws IOException { + if (this.stopRequested.get() || this.abortRequested) { + throw new IOException("Server not running"); + } + if (!fsOk) { + throw new IOException("File system not available"); + } + } + + /** + * Checks to see if the file system is still accessible. + * If not, sets abortRequested and stopRequested + * + * @return false if file system is not available + */ + protected boolean checkFileSystem() { + if (this.fsOk && fs != null) { + try { + FSUtils.checkFileSystemAvailable(fs); + } catch (IOException e) { + LOG.fatal("Shutting down HRegionServer: file system not available", e); + abort(); + fsOk = false; + } + } + return this.fsOk; + } + + /** + * @return Returns list of non-closed regions hosted on this server. If no + * regions to check, returns an empty list. + */ + protected Set getRegionsToCheck() { + HashSet regionsToCheck = new HashSet(); + //TODO: is this locking necessary? + lock.readLock().lock(); + try { + regionsToCheck.addAll(this.onlineRegions.values()); + } finally { + lock.readLock().unlock(); + } + // Purge closed regions. + for (final Iterator i = regionsToCheck.iterator(); i.hasNext();) { + HRegion r = i.next(); + if (r.isClosed()) { + i.remove(); + } + } + return regionsToCheck; + } + + /** {@inheritDoc} */ + public long getProtocolVersion(final String protocol, + @SuppressWarnings("unused") final long clientVersion) + throws IOException { + if (protocol.equals(HRegionInterface.class.getName())) { + return HRegionInterface.versionID; + } + throw new IOException("Unknown protocol to name node: " + protocol); + } + + /** + * @return Queue to which you can add outbound messages. + */ + protected List getOutboundMsgs() { + return this.outboundMsgs; + } + + /** + * Return the total size of all memcaches in every region. + * @return memcache size in bytes + */ + public long getGlobalMemcacheSize() { + long total = 0; + synchronized (onlineRegions) { + for (HRegion region : onlineRegions.values()) { + total += region.memcacheSize.get(); + } + } + return total; + } + + /** + * @return Return the leases. + */ + protected Leases getLeases() { + return leases; + } + + /** + * @return Return the rootDir. + */ + protected Path getRootDir() { + return rootDir; + } + + /** + * @return Return the fs. + */ + protected FileSystem getFileSystem() { + return fs; + } + + // + // Main program and support routines + // + + private static void printUsageAndExit() { + printUsageAndExit(null); + } + + private static void printUsageAndExit(final String message) { + if (message != null) { + System.err.println(message); + } + System.err.println("Usage: java " + + "org.apache.hbase.HRegionServer [--bind=hostname:port] start"); + System.exit(0); + } + + /** + * Do class main. + * @param args + * @param regionServerClass HRegionServer to instantiate. + */ + protected static void doMain(final String [] args, + final Class regionServerClass) { + if (args.length < 1) { + printUsageAndExit(); + } + Configuration conf = new HBaseConfiguration(); + + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + final String addressArgKey = "--bind="; + for (String cmd: args) { + if (cmd.startsWith(addressArgKey)) { + conf.set(REGIONSERVER_ADDRESS, cmd.substring(addressArgKey.length())); + continue; + } + + if (cmd.equals("start")) { + try { + // If 'local', don't start a region server here. Defer to + // LocalHBaseCluster. It manages 'local' clusters. + if (LocalHBaseCluster.isLocal(conf)) { + LOG.warn("Not starting a distinct region server because " + + "hbase.master is set to 'local' mode"); + } else { + Constructor c = + regionServerClass.getConstructor(HBaseConfiguration.class); + HRegionServer hrs = c.newInstance(conf); + Thread t = new Thread(hrs); + t.setName("regionserver" + hrs.server.getListenerAddress()); + t.start(); + } + } catch (Throwable t) { + LOG.error( "Can not start region server because "+ + StringUtils.stringifyException(t) ); + System.exit(-1); + } + break; + } + + if (cmd.equals("stop")) { + printUsageAndExit("To shutdown the regionserver run " + + "bin/hbase-daemon.sh stop regionserver or send a kill signal to" + + "the regionserver pid"); + } + + // Print out usage if we get to here. + printUsageAndExit(); + } + } + + /** + * @param args + */ + public static void main(String [] args) { + Configuration conf = new HBaseConfiguration(); + @SuppressWarnings("unchecked") + Class regionServerClass = (Class) conf + .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); + doMain(args, regionServerClass); + } +}