HBASE-2669 HCM.shutdownHook causes data loss with hbase.client.write.buffer != 0
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1024067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2fe608efd3
commit
113d533ad7
|
@ -594,6 +594,8 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-3110 TestReplicationSink failing in TRUNK up on Hudson
|
HBASE-3110 TestReplicationSink failing in TRUNK up on Hudson
|
||||||
HBASE-3101 bin assembly doesn't include -tests or -source jars
|
HBASE-3101 bin assembly doesn't include -tests or -source jars
|
||||||
HBASE-3121 [rest] Do not perform cache control when returning results
|
HBASE-3121 [rest] Do not perform cache control when returning results
|
||||||
|
HBASE-2669 HCM.shutdownHook causes data loss with
|
||||||
|
hbase.client.write.buffer != 0
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1760 Cleanup TODOs in HTable
|
HBASE-1760 Cleanup TODOs in HTable
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class HBaseAdmin implements Abortable {
|
||||||
|
|
||||||
private void cleanupCatalogTracker(final CatalogTracker ct) {
|
private void cleanupCatalogTracker(final CatalogTracker ct) {
|
||||||
ct.stop();
|
ct.stop();
|
||||||
HConnectionManager.deleteConnection(ct.getConnection());
|
HConnectionManager.deleteConnection(ct.getConnection().getConfiguration(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -33,13 +33,27 @@ import org.apache.hadoop.hbase.HServerAddress;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
import org.apache.hadoop.hbase.ipc.HMasterInterface;
|
import org.apache.hadoop.hbase.ipc.HMasterInterface;
|
||||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cluster connection.
|
* Cluster connection. Hosts a connection to the ZooKeeper ensemble and
|
||||||
|
* thereafter into the HBase cluster. Knows how to locate regions out on the cluster,
|
||||||
|
* keeps a cache of locations and then knows how to recalibrate after they move.
|
||||||
* {@link HConnectionManager} manages instances of this class.
|
* {@link HConnectionManager} manages instances of this class.
|
||||||
|
*
|
||||||
|
* <p>HConnections are used by {@link HTable} mostly but also by
|
||||||
|
* {@link HBaseAdmin}, {@link CatalogTracker},
|
||||||
|
* and {@link ZooKeeperWatcher}. HConnection instances can be shared. Sharing
|
||||||
|
* is usually what you want because rather than each HConnection instance
|
||||||
|
* having to do its own discovery of regions out on the cluster, instead, all
|
||||||
|
* clients get to share the one cache of locations. Sharing makes cleanup of
|
||||||
|
* HConnections awkward. See {@link HConnectionManager} for cleanup
|
||||||
|
* discussion.
|
||||||
|
*
|
||||||
|
* @see HConnectionManager
|
||||||
*/
|
*/
|
||||||
public interface HConnection extends Abortable {
|
public interface HConnection extends Abortable {
|
||||||
/**
|
/**
|
||||||
|
@ -48,7 +62,7 @@ public interface HConnection extends Abortable {
|
||||||
public Configuration getConfiguration();
|
public Configuration getConfiguration();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve ZooKeeperWatcher used by the connection.
|
* Retrieve ZooKeeperWatcher used by this connection.
|
||||||
* @return ZooKeeperWatcher handle being used by the connection.
|
* @return ZooKeeperWatcher handle being used by the connection.
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -68,21 +68,54 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A non-instantiable class that manages connections to tables.
|
* A non-instantiable class that manages {@link HConnection}s.
|
||||||
* Used by {@link HTable} and {@link HBaseAdmin}
|
* This class has a static Map of {@link HConnection} instances keyed by
|
||||||
|
* {@link Configuration}; all invocations of {@link #getConnection(Configuration)}
|
||||||
|
* that pass the same {@link Configuration} instance will be returned the same
|
||||||
|
* {@link HConnection} instance (Adding properties to a Configuration
|
||||||
|
* instance does not change its object identity). Sharing {@link HConnection}
|
||||||
|
* instances is usually what you want; all clients of the {@link HConnection}
|
||||||
|
* instances share the HConnections' cache of Region locations rather than each
|
||||||
|
* having to discover for itself the location of meta, root, etc. It makes
|
||||||
|
* sense for the likes of the pool of HTables class {@link HTablePool}, for
|
||||||
|
* instance (If concerned that a single {@link HConnection} is insufficient
|
||||||
|
* for sharing amongst clients in say an heavily-multithreaded environment,
|
||||||
|
* in practise its not proven to be an issue. Besides, {@link HConnection} is
|
||||||
|
* implemented atop Hadoop RPC and as of this writing, Hadoop RPC does a
|
||||||
|
* connection per cluster-member, exclusively).
|
||||||
|
*
|
||||||
|
* <p>But sharing connections
|
||||||
|
* makes clean up of {@link HConnection} instances a little awkward. Currently,
|
||||||
|
* clients cleanup by calling
|
||||||
|
* {@link #deleteConnection(Configuration, boolean)}. This will shutdown the
|
||||||
|
* zookeeper connection the HConnection was using and clean up all
|
||||||
|
* HConnection resources as well as stopping proxies to servers out on the
|
||||||
|
* cluster. Not running the cleanup will not end the world; it'll
|
||||||
|
* just stall the closeup some and spew some zookeeper connection failed
|
||||||
|
* messages into the log. Running the cleanup on a {@link HConnection} that is
|
||||||
|
* subsequently used by another will cause breakage so be careful running
|
||||||
|
* cleanup.
|
||||||
|
* <p>To create a {@link HConnection} that is not shared by others, you can
|
||||||
|
* create a new {@link Configuration} instance, pass this new instance to
|
||||||
|
* {@link #getConnection(Configuration)}, and then when done, close it up by
|
||||||
|
* doing something like the following:
|
||||||
|
* <pre>
|
||||||
|
* {@code
|
||||||
|
* Configuration newConfig = new Configuration(originalConf);
|
||||||
|
* HConnection connection = HConnectionManager.getConnection(newConfig);
|
||||||
|
* // Use the connection to your hearts' delight and then when done...
|
||||||
|
* HConnectionManager.deleteConnection(newConfig, true);
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
* <p>Cleanup used to be done inside in a shutdown hook. On startup we'd
|
||||||
|
* register a shutdown hook that called {@link #deleteAllConnections(boolean)}
|
||||||
|
* on its way out but the order in which shutdown hooks run is not defined so
|
||||||
|
* were problematic for clients of HConnection that wanted to register their
|
||||||
|
* own shutdown hooks so we removed ours though this shifts the onus for
|
||||||
|
* cleanup to the client.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
public class HConnectionManager {
|
public class HConnectionManager {
|
||||||
// Register a shutdown hook, one that cleans up RPC and closes zk sessions.
|
|
||||||
static {
|
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread("HCM.shutdownHook") {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
HConnectionManager.deleteAllConnections(true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
static final int MAX_CACHED_HBASE_INSTANCES = 31;
|
static final int MAX_CACHED_HBASE_INSTANCES = 31;
|
||||||
|
|
||||||
// A LRU Map of Configuration hashcode -> TableServers. We set instances to 31.
|
// A LRU Map of Configuration hashcode -> TableServers. We set instances to 31.
|
||||||
|
@ -105,7 +138,8 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the connection that goes with the passed <code>conf</code> configuration.
|
* Get the connection that goes with the passed <code>conf</code>
|
||||||
|
* configuration instance.
|
||||||
* If no current connection exists, method creates a new connection for the
|
* If no current connection exists, method creates a new connection for the
|
||||||
* passed <code>conf</code> instance.
|
* passed <code>conf</code> instance.
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
|
@ -126,9 +160,13 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete connection information for the instance specified by configuration
|
* Delete connection information for the instance specified by configuration.
|
||||||
* @param conf configuration
|
* This will close connection to the zookeeper ensemble and let go of all
|
||||||
* @param stopProxy stop the proxy as well
|
* resources.
|
||||||
|
* @param conf configuration whose identity is used to find {@link HConnection}
|
||||||
|
* instance.
|
||||||
|
* @param stopProxy Shuts down all the proxy's put up to cluster members
|
||||||
|
* including to cluster HMaster. Calls {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}.
|
||||||
*/
|
*/
|
||||||
public static void deleteConnection(Configuration conf, boolean stopProxy) {
|
public static void deleteConnection(Configuration conf, boolean stopProxy) {
|
||||||
synchronized (HBASE_INSTANCES) {
|
synchronized (HBASE_INSTANCES) {
|
||||||
|
@ -139,14 +177,6 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Delete connection information for the instance
|
|
||||||
* @param connection configuration
|
|
||||||
*/
|
|
||||||
public static void deleteConnection(HConnection connection) {
|
|
||||||
deleteConnection(connection.getConfiguration(), false);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete information for all connections.
|
* Delete information for all connections.
|
||||||
* @param stopProxy stop the proxy as well
|
* @param stopProxy stop the proxy as well
|
||||||
|
|
|
@ -62,11 +62,13 @@ import org.apache.hadoop.hbase.util.Writables;
|
||||||
* be corrupted if multiple threads contend over a single HTable instance.
|
* be corrupted if multiple threads contend over a single HTable instance.
|
||||||
*
|
*
|
||||||
* <p>Instances of HTable passed the same {@link Configuration} instance will
|
* <p>Instances of HTable passed the same {@link Configuration} instance will
|
||||||
* share connections to master and the zookeeper ensemble as well as caches of
|
* share connections to servers out on the cluster and to the zookeeper ensemble
|
||||||
* region locations. This happens because they will all share the same
|
* as well as caches of region locations. This is usually a *good* thing.
|
||||||
* {@link HConnection} instance (internally we keep a Map of {@link HConnection}
|
* This happens because they will all share the same underlying
|
||||||
* instances keyed by {@link Configuration}).
|
* {@link HConnection} instance. See {@link HConnectionManager} for more on
|
||||||
* {@link HConnection} will read most of the
|
* how this mechanism works.
|
||||||
|
*
|
||||||
|
* <p>{@link HConnection} will read most of the
|
||||||
* configuration it needs from the passed {@link Configuration} on initial
|
* configuration it needs from the passed {@link Configuration} on initial
|
||||||
* construction. Thereafter, for settings such as
|
* construction. Thereafter, for settings such as
|
||||||
* <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
|
* <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
|
||||||
|
@ -77,6 +79,8 @@ import org.apache.hadoop.hbase.util.Writables;
|
||||||
* new configuration.
|
* new configuration.
|
||||||
*
|
*
|
||||||
* @see HBaseAdmin for create, drop, list, enable and disable of tables.
|
* @see HBaseAdmin for create, drop, list, enable and disable of tables.
|
||||||
|
* @see HConnection
|
||||||
|
* @see HConnectionManager
|
||||||
*/
|
*/
|
||||||
public class HTable implements HTableInterface {
|
public class HTable implements HTableInterface {
|
||||||
private static final Log LOG = LogFactory.getLog(HTable.class);
|
private static final Log LOG = LogFactory.getLog(HTable.class);
|
||||||
|
|
|
@ -19,35 +19,34 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Queue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A simple pool of HTable instances.<p>
|
* A simple pool of HTable instances.<p>
|
||||||
*
|
*
|
||||||
* Each HTablePool acts as a pool for all tables. To use, instantiate an
|
* Each HTablePool acts as a pool for all tables. To use, instantiate an
|
||||||
* HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
|
* HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
|
||||||
* Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}.<p>
|
* Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}.
|
||||||
*
|
*
|
||||||
* A pool can be created with a <i>maxSize</i> which defines the most HTable
|
* <p>A pool can be created with a <i>maxSize</i> which defines the most HTable
|
||||||
* references that will ever be retained for each table. Otherwise the default
|
* references that will ever be retained for each table. Otherwise the default
|
||||||
* is {@link Integer#MAX_VALUE}.<p>
|
* is {@link Integer#MAX_VALUE}.
|
||||||
|
*
|
||||||
|
* <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}.
|
||||||
*/
|
*/
|
||||||
public class HTablePool {
|
public class HTablePool {
|
||||||
private final ConcurrentMap<String, LinkedList<HTableInterface>> tables =
|
private final ConcurrentMap<String, LinkedList<HTableInterface>> tables =
|
||||||
new ConcurrentHashMap<String, LinkedList<HTableInterface>>();
|
new ConcurrentHashMap<String, LinkedList<HTableInterface>>();
|
||||||
private final Configuration config;
|
private final Configuration config;
|
||||||
private final int maxSize;
|
private final int maxSize;
|
||||||
private HTableInterfaceFactory tableFactory = new HTableFactory();
|
private final HTableInterfaceFactory tableFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default Constructor. Default HBaseConfiguration and no limit on pool size.
|
* Default Constructor. Default HBaseConfiguration and no limit on pool size.
|
||||||
|
@ -61,15 +60,17 @@ public class HTablePool {
|
||||||
* @param config configuration
|
* @param config configuration
|
||||||
* @param maxSize maximum number of references to keep for each table
|
* @param maxSize maximum number of references to keep for each table
|
||||||
*/
|
*/
|
||||||
public HTablePool(Configuration config, int maxSize) {
|
public HTablePool(final Configuration config, final int maxSize) {
|
||||||
this.config = config;
|
this(config, maxSize, null);
|
||||||
this.maxSize = maxSize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public HTablePool(Configuration config, int maxSize, HTableInterfaceFactory tableFactory) {
|
public HTablePool(final Configuration config, final int maxSize,
|
||||||
this.config = config;
|
final HTableInterfaceFactory tableFactory) {
|
||||||
|
// Make a new configuration instance so I can safely cleanup when
|
||||||
|
// done with the pool.
|
||||||
|
this.config = new Configuration(config);
|
||||||
this.maxSize = maxSize;
|
this.maxSize = maxSize;
|
||||||
this.tableFactory = tableFactory;
|
this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -146,7 +147,7 @@ public class HTablePool {
|
||||||
table = queue.poll();
|
table = queue.poll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
HConnectionManager.deleteConnection(this.config, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
@ -102,6 +103,12 @@ implements Configurable {
|
||||||
public void close(TaskAttemptContext context)
|
public void close(TaskAttemptContext context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
table.flushCommits();
|
table.flushCommits();
|
||||||
|
// The following call will shutdown all connections to the cluster from
|
||||||
|
// this JVM. It will close out our zk session otherwise zk wil log
|
||||||
|
// expired sessions rather than closed ones. If any other HTable instance
|
||||||
|
// running in this JVM, this next call will cause it damage. Presumption
|
||||||
|
// is that the above this.table is only instance.
|
||||||
|
HConnectionManager.deleteAllConnections(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1322,7 +1322,6 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
// If bulkAssign in progress, suspend checks
|
// If bulkAssign in progress, suspend checks
|
||||||
if (this.bulkAssign) return;
|
if (this.bulkAssign) return;
|
||||||
synchronized (regionsInTransition) {
|
synchronized (regionsInTransition) {
|
||||||
LOG.debug("Checking for timed out RIT");
|
|
||||||
// Iterate all regions in transition checking for time outs
|
// Iterate all regions in transition checking for time outs
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
for (RegionState regionState : regionsInTransition.values()) {
|
for (RegionState regionState : regionsInTransition.values()) {
|
||||||
|
|
|
@ -178,7 +178,7 @@ public class HMasterCommandLine extends ServerCommandLine {
|
||||||
/*
|
/*
|
||||||
* Version of master that will shutdown the passed zk cluster on its way out.
|
* Version of master that will shutdown the passed zk cluster on its way out.
|
||||||
*/
|
*/
|
||||||
static class LocalHMaster extends HMaster {
|
public static class LocalHMaster extends HMaster {
|
||||||
private MiniZooKeeperCluster zkcluster = null;
|
private MiniZooKeeperCluster zkcluster = null;
|
||||||
|
|
||||||
public LocalHMaster(Configuration conf)
|
public LocalHMaster(Configuration conf)
|
||||||
|
|
|
@ -153,4 +153,19 @@ public class LogCleaner extends Chore {
|
||||||
LOG.warn("Error while cleaning the logs", e);
|
LOG.warn("Error while cleaning the logs", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
super.run();
|
||||||
|
} finally {
|
||||||
|
for (LogCleanerDelegate lc: this.logCleanersChain) {
|
||||||
|
try {
|
||||||
|
lc.stop("Exiting");
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.warn("Stopping", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for the log cleaning function inside the master. By default, three
|
* Interface for the log cleaning function inside the master. By default, three
|
||||||
|
@ -30,15 +31,14 @@ import org.apache.hadoop.fs.Path;
|
||||||
* "hbase.master.logcleaner.plugins", which is a comma-separated list of fully
|
* "hbase.master.logcleaner.plugins", which is a comma-separated list of fully
|
||||||
* qualified class names. LogsCleaner will add it to the chain.
|
* qualified class names. LogsCleaner will add it to the chain.
|
||||||
*
|
*
|
||||||
* HBase ships with LogsCleaner as the default implementation.
|
* <p>HBase ships with LogsCleaner as the default implementation.
|
||||||
*
|
*
|
||||||
* This interface extends Configurable, so setConf needs to be called once
|
* <p>This interface extends Configurable, so setConf needs to be called once
|
||||||
* before using the cleaner.
|
* before using the cleaner.
|
||||||
* Since LogCleanerDelegates are created in LogsCleaner by reflection. Classes
|
* Since LogCleanerDelegates are created in LogsCleaner by reflection. Classes
|
||||||
* that implements this interface should provide a default constructor.
|
* that implements this interface should provide a default constructor.
|
||||||
*/
|
*/
|
||||||
public interface LogCleanerDelegate extends Configurable {
|
public interface LogCleanerDelegate extends Configurable, Stoppable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should the master delete the log or keep it?
|
* Should the master delete the log or keep it?
|
||||||
* @param filePath full path to log.
|
* @param filePath full path to log.
|
||||||
|
|
|
@ -29,12 +29,11 @@ import org.apache.commons.logging.LogFactory;
|
||||||
* be deleted. By default they are allowed to live for 10 minutes.
|
* be deleted. By default they are allowed to live for 10 minutes.
|
||||||
*/
|
*/
|
||||||
public class TimeToLiveLogCleaner implements LogCleanerDelegate {
|
public class TimeToLiveLogCleaner implements LogCleanerDelegate {
|
||||||
|
static final Log LOG = LogFactory.getLog(TimeToLiveLogCleaner.class.getName());
|
||||||
static final Log LOG =
|
|
||||||
LogFactory.getLog(TimeToLiveLogCleaner.class.getName());
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
// Configured time a log can be kept after it was closed
|
// Configured time a log can be kept after it was closed
|
||||||
private long ttl;
|
private long ttl;
|
||||||
|
private boolean stopped = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isLogDeletable(Path filePath) {
|
public boolean isLogDeletable(Path filePath) {
|
||||||
|
@ -67,4 +66,14 @@ public class TimeToLiveLogCleaner implements LogCleanerDelegate {
|
||||||
public Configuration getConf() {
|
public Configuration getConf() {
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String why) {
|
||||||
|
this.stopped = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStopped() {
|
||||||
|
return this.stopped;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -1243,6 +1243,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
r.hasReferences()? "Region has references on open" :
|
r.hasReferences()? "Region has references on open" :
|
||||||
"Region has too many store files");
|
"Region has too many store files");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add to online regions if all above was successful.
|
||||||
|
addToOnlineRegions(r);
|
||||||
|
|
||||||
// Update ZK, ROOT or META
|
// Update ZK, ROOT or META
|
||||||
if (r.getRegionInfo().isRootRegion()) {
|
if (r.getRegionInfo().isRootRegion()) {
|
||||||
RootLocationEditor.setRootLocation(getZooKeeper(),
|
RootLocationEditor.setRootLocation(getZooKeeper(),
|
||||||
|
@ -1257,8 +1261,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
|
MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Add to online regions if all above was successful.
|
|
||||||
addToOnlineRegions(r);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
|
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
|
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -38,12 +39,11 @@ import java.util.Set;
|
||||||
* replication before deleting it when its TTL is over.
|
* replication before deleting it when its TTL is over.
|
||||||
*/
|
*/
|
||||||
public class ReplicationLogCleaner implements LogCleanerDelegate {
|
public class ReplicationLogCleaner implements LogCleanerDelegate {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
|
||||||
private static final Log LOG =
|
|
||||||
LogFactory.getLog(ReplicationLogCleaner.class);
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private ReplicationZookeeper zkHelper;
|
private ReplicationZookeeper zkHelper;
|
||||||
private Set<String> hlogs = new HashSet<String>();
|
private Set<String> hlogs = new HashSet<String>();
|
||||||
|
private boolean stopped = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiates the cleaner, does nothing more.
|
* Instantiates the cleaner, does nothing more.
|
||||||
|
@ -105,11 +105,13 @@ public class ReplicationLogCleaner implements LogCleanerDelegate {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setConf(Configuration conf) {
|
public void setConf(Configuration conf) {
|
||||||
this.conf = conf;
|
// Make my own Configuration. Then I'll have my own connection to zk that
|
||||||
|
// I can close myself when comes time.
|
||||||
|
this.conf = new Configuration(conf);
|
||||||
try {
|
try {
|
||||||
ZooKeeperWatcher zkw =
|
ZooKeeperWatcher zkw =
|
||||||
new ZooKeeperWatcher(conf, this.getClass().getName(), null);
|
new ZooKeeperWatcher(this.conf, "replicationLogCleaner", null);
|
||||||
this.zkHelper = new ReplicationZookeeper(conf, zkw);
|
this.zkHelper = new ReplicationZookeeper(this.conf, zkw);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -122,4 +124,20 @@ public class ReplicationLogCleaner implements LogCleanerDelegate {
|
||||||
public Configuration getConf() {
|
public Configuration getConf() {
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String why) {
|
||||||
|
if (this.stopped) return;
|
||||||
|
this.stopped = true;
|
||||||
|
if (this.zkHelper != null) {
|
||||||
|
LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher());
|
||||||
|
this.zkHelper.getZookeeperWatcher().close();
|
||||||
|
}
|
||||||
|
HConnectionManager.deleteConnection(this.conf, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStopped() {
|
||||||
|
return this.stopped;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -106,7 +106,7 @@ class HMerge {
|
||||||
HConnection connection = HConnectionManager.getConnection(conf);
|
HConnection connection = HConnectionManager.getConnection(conf);
|
||||||
masterIsRunning = connection.isMasterRunning();
|
masterIsRunning = connection.isMasterRunning();
|
||||||
}
|
}
|
||||||
HConnectionManager.deleteConnection(conf, false);
|
HConnectionManager.deleteConnection(conf, true);
|
||||||
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
|
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
|
||||||
if (masterIsRunning) {
|
if (masterIsRunning) {
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class ZKUtil {
|
||||||
throw new IOException("Unable to determine ZooKeeper ensemble");
|
throw new IOException("Unable to determine ZooKeeper ensemble");
|
||||||
}
|
}
|
||||||
int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
|
int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
|
||||||
LOG.info(descriptor + " opening connection to ZooKeeper with ensemble (" +
|
LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" +
|
||||||
ensemble + ")");
|
ensemble + ")");
|
||||||
return new ZooKeeper(ensemble, timeout, watcher);
|
return new ZooKeeper(ensemble, timeout, watcher);
|
||||||
}
|
}
|
||||||
|
@ -194,7 +194,7 @@ public class ZKUtil {
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
try {
|
try {
|
||||||
Stat s = zkw.getZooKeeper().exists(znode, zkw);
|
Stat s = zkw.getZooKeeper().exists(znode, zkw);
|
||||||
LOG.info(zkw.prefix("Set watcher on existing znode " + znode));
|
LOG.debug(zkw.prefix("Set watcher on existing znode " + znode));
|
||||||
return s != null ? true : false;
|
return s != null ? true : false;
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
|
LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
@ -283,7 +282,7 @@ public class ZooKeeperWatcher implements Watcher {
|
||||||
switch(event.getState()) {
|
switch(event.getState()) {
|
||||||
case SyncConnected:
|
case SyncConnected:
|
||||||
// Update our identifier. Otherwise ignore.
|
// Update our identifier. Otherwise ignore.
|
||||||
LOG.info(this.identifier + " connected");
|
LOG.debug(this.identifier + " connected");
|
||||||
// Now, this callback can be invoked before the this.zookeeper is set.
|
// Now, this callback can be invoked before the this.zookeeper is set.
|
||||||
// Wait a little while.
|
// Wait a little while.
|
||||||
long finished = System.currentTimeMillis() +
|
long finished = System.currentTimeMillis() +
|
||||||
|
|
Loading…
Reference in New Issue