HBASE-2961 Close zookeeper when done with it (HCM, Master, and RS)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@995612 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-09-10 00:03:57 +00:00
parent 5b31d2cd72
commit 9453a813d0
20 changed files with 162 additions and 360 deletions

View File

@ -29,6 +29,7 @@ Release 0.21.0 - Unreleased
HBASE-2553 Revisit IncrementColumnValue implementation in 0.22
HBASE-2692 Master rewrite and cleanup for 0.90
(Karthik Ranganathan, Jon Gray & Stack)
HBASE-2961 Close zookeeper when done with it (HCM, Master, and RS)
BUG FIXES

View File

@ -126,7 +126,11 @@ public class LocalHBaseCluster {
public JVMClusterUtil.RegionServerThread addRegionServer(final int index)
throws IOException {
JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.createRegionServerThread(this.conf,
// Create each regionserver with its own Configuration instance so each has
// its HConnection instance rather than share (see HBASE_INSTANCES down in
// the guts of HConnectionManager.
JVMClusterUtil.RegionServerThread rst =
JVMClusterUtil.createRegionServerThread(new Configuration(this.conf),
this.regionServerClass, index);
this.regionThreads.add(rst);
return rst;
@ -254,4 +258,4 @@ public class LocalHBaseCluster {
admin.createTable(htd);
cluster.shutdown();
}
}
}

View File

@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker;
@ -53,7 +53,7 @@ import org.apache.zookeeper.KeeperException;
public class CatalogTracker {
private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
private final ServerConnection connection;
private final HConnection connection;
private final ZooKeeperWatcher zookeeper;
@ -80,8 +80,8 @@ public class CatalogTracker {
* @param abortable if fatal exception
* @throws IOException
*/
public CatalogTracker(final ZooKeeperWatcher zk,
final ServerConnection connection, final Abortable abortable)
public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection,
final Abortable abortable)
throws IOException {
this(zk, connection, abortable, 0);
}
@ -95,9 +95,8 @@ public class CatalogTracker {
* @param defaultTimeout Timeout to use.
* @throws IOException
*/
public CatalogTracker(final ZooKeeperWatcher zk,
final ServerConnection connection, final Abortable abortable,
final int defaultTimeout)
public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection,
final Abortable abortable, final int defaultTimeout)
throws IOException {
this.zookeeper = zk;
this.connection = connection;

View File

@ -92,7 +92,7 @@ public class HBaseAdmin implements Abortable {
throws ZooKeeperConnectionException, IOException {
if (this.catalogTracker == null) {
this.catalogTracker = new CatalogTracker(this.connection.getZooKeeperWatcher(),
ServerConnectionManager.getConnection(conf), this,
HConnectionManager.getConnection(conf), this,
this.conf.getInt("hbase.admin.catalog.timeout", 10 * 1000));
try {
this.catalogTracker.start();
@ -408,7 +408,7 @@ public class HBaseAdmin implements Abortable {
}
}
// Delete cached information to prevent clients from using old locations
HConnectionManager.deleteConnection(conf, false);
this.connection.clearRegionCache(tableName);
LOG.info("Deleted " + Bytes.toString(tableName));
}

View File

@ -121,6 +121,14 @@ public interface HConnection {
*/
public void clearRegionCache();
/**
* Allows flushing the region cache of all locations that pertain to
* <code>tableName</code>
* @param tableName Name of the table whose regions we are to remove from
* cache.
*/
public void clearRegionCache(final byte [] tableName);
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in, ignoring any value that might be in the cache.

View File

@ -22,22 +22,19 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@ -53,7 +50,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterAddressTracker;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@ -63,11 +59,12 @@ import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@ -93,11 +90,11 @@ public class HConnectionManager {
// A LRU Map of Configuration hashcode -> TableServers. We set instances to 31.
// The zk default max connections to the ensemble from the one client is 30 so
// should run into zk issues before hit this value of 31.
private static final Map<Configuration, TableServers> HBASE_INSTANCES =
new LinkedHashMap<Configuration, TableServers>
private static final Map<Configuration, HConnectionImplementation> HBASE_INSTANCES =
new LinkedHashMap<Configuration, HConnectionImplementation>
((int) (MAX_CACHED_HBASE_INSTANCES/0.75F)+1, 0.75F, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<Configuration, TableServers> eldest) {
protected boolean removeEldestEntry(Map.Entry<Configuration, HConnectionImplementation> eldest) {
return size() > MAX_CACHED_HBASE_INSTANCES;
}
};
@ -119,11 +116,11 @@ public class HConnectionManager {
*/
public static HConnection getConnection(Configuration conf)
throws ZooKeeperConnectionException {
TableServers connection;
HConnectionImplementation connection;
synchronized (HBASE_INSTANCES) {
connection = HBASE_INSTANCES.get(conf);
if (connection == null) {
connection = new TableServers(conf);
connection = new HConnectionImplementation(conf);
HBASE_INSTANCES.put(conf, connection);
}
}
@ -137,7 +134,7 @@ public class HConnectionManager {
*/
public static void deleteConnection(Configuration conf, boolean stopProxy) {
synchronized (HBASE_INSTANCES) {
TableServers t = HBASE_INSTANCES.remove(conf);
HConnectionImplementation t = HBASE_INSTANCES.remove(conf);
if (t != null) {
t.close(stopProxy);
}
@ -151,7 +148,7 @@ public class HConnectionManager {
*/
public static void deleteAllConnections(boolean stopProxy) {
synchronized (HBASE_INSTANCES) {
for (TableServers t : HBASE_INSTANCES.values()) {
for (HConnectionImplementation t : HBASE_INSTANCES.values()) {
if (t != null) {
t.close(stopProxy);
}
@ -168,7 +165,7 @@ public class HConnectionManager {
static int getCachedRegionCount(Configuration conf,
byte[] tableName)
throws ZooKeeperConnectionException {
TableServers connection = (TableServers)getConnection(conf);
HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf);
return connection.getNumberOfCachedRegionLocations(tableName);
}
@ -180,13 +177,13 @@ public class HConnectionManager {
*/
static boolean isRegionCached(Configuration conf,
byte[] tableName, byte[] row) throws ZooKeeperConnectionException {
TableServers connection = (TableServers)getConnection(conf);
HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf);
return connection.isRegionCached(tableName, row);
}
/* Encapsulates connection to zookeeper and regionservers.*/
static class TableServers implements ServerConnection, Abortable {
static final Log LOG = LogFactory.getLog(TableServers.class);
static class HConnectionImplementation implements HConnection, Abortable {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
private final Class<? extends HRegionInterface> serverInterfaceClass;
private final long pause;
private final int numRetries;
@ -203,7 +200,6 @@ public class HConnectionManager {
// ZooKeeper-based master address tracker
private MasterAddressTracker masterAddressTracker;
private final Object rootRegionLock = new Object();
private final Object metaRegionLock = new Object();
private final Object userRegionLock = new Object();
@ -213,9 +209,12 @@ public class HConnectionManager {
private final Map<String, HRegionInterface> servers =
new ConcurrentHashMap<String, HRegionInterface>();
// Used by master and region servers during safe mode only
private volatile HRegionLocation rootRegionLocation;
private final RootRegionTracker rootRegionTracker;
/**
* Map of table to table {@link HRegionLocation}s. The table key is made
* by doing a {@link Bytes#mapKey(byte[])} of the table's name.
*/
private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>>
cachedRegionLocations =
new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
@ -230,7 +229,7 @@ public class HConnectionManager {
* @param conf Configuration object
*/
@SuppressWarnings("unchecked")
public TableServers(Configuration conf)
public HConnectionImplementation(Configuration conf)
throws ZooKeeperConnectionException {
this.conf = conf;
@ -260,11 +259,14 @@ public class HConnectionManager {
10);
// initialize zookeeper and master address manager
getZooKeeperWatcher();
masterAddressTracker = new MasterAddressTracker(zooKeeper, this);
this.zooKeeper = getZooKeeperWatcher();
masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
zooKeeper.registerListener(masterAddressTracker);
masterAddressTracker.start();
this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this);
this.rootRegionTracker.start();
this.master = null;
this.masterChecked = false;
}
@ -277,20 +279,6 @@ public class HConnectionManager {
return this.pause * HConstants.RETRY_BACKOFF[ntries];
}
// Used by master and region servers during safe mode only
public void unsetRootRegionLocation() {
this.rootRegionLocation = null;
}
// Used by master and region servers during safe mode only
public void setRootRegionLocation(HRegionLocation rootRegion) {
if (rootRegion == null) {
throw new IllegalArgumentException(
"Cannot set root region location to null.");
}
this.rootRegionLocation = rootRegion;
}
public HMasterInterface getMaster()
throws MasterNotRunningException, ZooKeeperConnectionException {
@ -528,15 +516,14 @@ public class HConnectionManager {
}
if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
synchronized (rootRegionLock) {
// This block guards against two threads trying to find the root
// region at the same time. One will go do the find while the
// second waits. The second thread will not do find.
if (!useCache || rootRegionLocation == null) {
this.rootRegionLocation = locateRootRegion();
}
return this.rootRegionLocation;
try {
HServerAddress hsa =
this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);
if (hsa == null) return null;
return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
} else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,
@ -811,15 +798,15 @@ public class HConnectionManager {
return null;
}
/*
* Delete a cached location, if it satisfies the table name and row
* requirements.
/**
* Delete a cached location
* @param tableName tableName
* @param row
*/
void deleteCachedLocation(final byte [] tableName, final byte [] row) {
synchronized (this.cachedRegionLocations) {
SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
getTableLocations(tableName);
// start to examine the cache. we can only do cache actions
// if there's something in the cache for this table.
if (!tableLocations.isEmpty()) {
@ -828,9 +815,9 @@ public class HConnectionManager {
tableLocations.remove(rl.getRegionInfo().getStartKey());
if (LOG.isDebugEnabled()) {
LOG.debug("Removed " +
rl.getRegionInfo().getRegionNameAsString() +
" for tableName=" + Bytes.toString(tableName) +
" from cache " + "because of " + Bytes.toStringBinary(row));
rl.getRegionInfo().getRegionNameAsString() +
" for tableName=" + Bytes.toString(tableName) +
" from cache " + "because of " + Bytes.toStringBinary(row));
}
}
}
@ -858,11 +845,18 @@ public class HConnectionManager {
return result;
}
/**
* Allows flushing the region cache.
*/
@Override
public void clearRegionCache() {
cachedRegionLocations.clear();
synchronized(this.cachedRegionLocations) {
this.cachedRegionLocations.clear();
}
}
@Override
public void clearRegionCache(final byte [] tableName) {
synchronized (this.cachedRegionLocations) {
this.cachedRegionLocations.remove(Bytes.mapKey(tableName));
}
}
/*
@ -923,8 +917,10 @@ public class HConnectionManager {
throws ZooKeeperConnectionException {
if(zooKeeper == null) {
try {
zooKeeper = new ZooKeeperWatcher(conf,
ZKUtil.getZooKeeperClusterKey(conf), this);
this.zooKeeper = new ZooKeeperWatcher(conf,
ZKUtil.getZooKeeperClusterKey(conf), this);
LOG.debug("zkw created, sessionid=0x" +
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
} catch (IOException e) {
throw new ZooKeeperConnectionException(e);
}
@ -932,105 +928,6 @@ public class HConnectionManager {
return zooKeeper;
}
/**
* Repeatedly try to find the root region in ZK
* @return HRegionLocation for root region if found
* @throws NoServerForRegionException - if the root region can not be
* located after retrying
* @throws IOException
*/
private HRegionLocation locateRootRegion()
throws IOException {
// We lazily instantiate the ZooKeeper object because we don't want to
// make the constructor have to throw IOException or handle it itself.
ZooKeeperWatcher zk;
try {
zk = getZooKeeperWatcher();
} catch (IOException e) {
throw new ZooKeeperConnectionException(e);
}
HServerAddress rootRegionAddress = null;
for (int tries = 0; tries < numRetries; tries++) {
int localTimeouts = 0;
// ask the master which server has the root region
while (rootRegionAddress == null && localTimeouts < numRetries) {
// Don't read root region until we're out of safe mode so we know
// that the meta regions have been assigned.
try {
rootRegionAddress = ZKUtil.getDataAsAddress(zk, zk.rootServerZNode);
} catch (KeeperException e) {
LOG.error("Unexpected ZooKeeper error attempting to read the root " +
"region server address");
throw new IOException(e);
}
if (rootRegionAddress == null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping " + getPauseTime(tries) +
"ms, waiting for root region.");
}
Thread.sleep(getPauseTime(tries));
} catch (InterruptedException iex) {
// continue
}
localTimeouts++;
}
}
if (rootRegionAddress == null) {
throw new NoServerForRegionException(
"Timed out trying to locate root region");
}
try {
// Get a connection to the region server
HRegionInterface server = getHRegionConnection(rootRegionAddress);
// if this works, then we're good, and we have an acceptable address,
// so we can stop doing retries and return the result.
server.getRegionInfo(HRegionInfo.ROOT_REGIONINFO.getRegionName());
if (LOG.isDebugEnabled()) {
LOG.debug("Found ROOT at " + rootRegionAddress);
}
break;
} catch (Throwable t) {
t = translateException(t);
if (tries == numRetries - 1) {
throw new NoServerForRegionException("Timed out trying to locate "+
"root region because: " + t.getMessage());
}
// Sleep and retry finding root region.
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Root region location changed. Sleeping.");
}
Thread.sleep(getPauseTime(tries));
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding root region.");
}
} catch (InterruptedException iex) {
// continue
}
}
rootRegionAddress = null;
}
// if the address is null by this point, then the retries have failed,
// and we're sort of sunk
if (rootRegionAddress == null) {
throw new NoServerForRegionException(
"unable to locate root region server");
}
// return the region location
return new HRegionLocation(
HRegionInfo.ROOT_REGIONINFO, rootRegionAddress);
}
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
List<Throwable> exceptions = new ArrayList<Throwable>();
@ -1070,40 +967,6 @@ public class HConnectionManager {
}
}
private HRegionLocation
getRegionLocationForRowWithRetries(byte[] tableName, byte[] rowKey,
boolean reload)
throws IOException {
boolean reloadFlag = reload;
List<Throwable> exceptions = new ArrayList<Throwable>();
HRegionLocation location = null;
int tries = 0;
for (; tries < numRetries;) {
try {
location = getRegionLocation(tableName, rowKey, reloadFlag);
} catch (Throwable t) {
exceptions.add(t);
}
if (location != null) {
break;
}
reloadFlag = true;
tries++;
try {
Thread.sleep(getPauseTime(tries));
} catch (InterruptedException e) {
// continue
}
}
if (location == null) {
throw new RetriesExhaustedException(" -- nothing found, no 'location' returned," +
" tableName=" + Bytes.toString(tableName) +
", reload=" + reload + " --",
HConstants.EMPTY_BYTE_ARRAY, rowKey, tries, exceptions);
}
return location;
}
/**
* @deprecated Use HConnectionManager::processBatch instead.
*/
@ -1150,6 +1013,12 @@ public class HConnectionManager {
HBaseRPC.stopProxy(i);
}
}
if (this.zooKeeper != null) {
LOG.debug("Closed zookeeper sessionid=0x" +
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
this.zooKeeper.close();
this.zooKeeper = null;
}
}
private Callable<MultiResponse> createCallable(
@ -1249,7 +1118,7 @@ public class HConnectionManager {
// Not really sure what a reasonable timeout value is. Here's a first try.
MultiResponse resp = future.get(1000, TimeUnit.MILLISECONDS);
MultiResponse resp = future.get();
if (resp == null) {
// Entire server failed
@ -1271,8 +1140,6 @@ public class HConnectionManager {
}
}
}
} catch (TimeoutException e) {
LOG.debug("Timeout for region server: " + address + ", removing from cache");
} catch (InterruptedException e) {
LOG.debug("Failed all from " + address, e);
Thread.currentThread().interrupt();
@ -1412,10 +1279,6 @@ public class HConnectionManager {
public void abort(final String msg, Throwable t) {
if (t != null) LOG.fatal(msg, t);
else LOG.fatal(msg);
if(zooKeeper != null) {
zooKeeper.close();
zooKeeper = null;
}
}
}
}

View File

@ -1,41 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HRegionLocation;
/**
* Used by master and region server, so that they do not need to wait for the
* cluster to be up to get a connection.
*/
public interface ServerConnection extends HConnection {
/**
* Set root region location in connection
* @param rootRegion region location for root region
*/
public void setRootRegionLocation(HRegionLocation rootRegion);
/**
* Unset the root region location in the connection. Called by
* ServerManager.processRegionClose.
*/
public void unsetRootRegionLocation();
}

View File

@ -1,47 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
/**
* Used by server processes to expose HServerConnection method
* so can call HConnectionManager#setRootRegionLocation
*/
public class ServerConnectionManager extends HConnectionManager {
/*
* Not instantiable
*/
private ServerConnectionManager() {}
/**
* Get the connection object for the instance specified by the configuration
* If no current connection exists, create a new connection for that instance
* @param conf configuration
* @return HConnection object for the instance specified by the configuration
* @throws ZooKeeperConnectionException
*/
public static ServerConnection getConnection(Configuration conf) throws ZooKeeperConnectionException {
return (ServerConnection) HConnectionManager.getConnection(conf);
}
}

View File

@ -62,10 +62,10 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
@ -146,7 +146,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
// file system manager for the master FS operations
private final MasterFileSystem fileSystemManager;
private final ServerConnection connection;
private final HConnection connection;
// server manager to deal with region server info
private final ServerManager serverManager;
@ -243,7 +244,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
*/
// TODO: Do this using Dependency Injection, using PicoContainer or Spring.
this.fileSystemManager = new MasterFileSystem(this);
this.connection = ServerConnectionManager.getConnection(conf);
this.connection = HConnectionManager.getConnection(conf);
this.executorService = new ExecutorService(getServerName());
this.serverManager = new ServerManager(this, this);
@ -266,7 +267,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.clusterStatusTracker.start();
LOG.info("Server active/primary master; " + this.address +
"; clusterStarter=" + this.clusterStarter);
"; clusterStarter=" + this.clusterStarter + ", sessionid=0x" +
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
}
/**
@ -320,8 +322,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.rpcServer.stop();
if (this.balancerChore != null) this.balancerChore.interrupt();
this.activeMasterManager.stop();
this.zooKeeper.close();
this.executorService.shutdown();
HConnectionManager.deleteConnection(this.conf, true);
this.zooKeeper.close();
LOG.info("HMaster main thread exiting");
}

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
@ -530,8 +530,8 @@ public class ServerManager {
private HRegionInterface getServerConnection(HServerInfo info) {
try {
ServerConnection connection =
ServerConnectionManager.getConnection(this.master.getConfiguration());
HConnection connection =
HConnectionManager.getConnection(this.master.getConfiguration());
HRegionInterface hri = serverConnections.get(info.getServerName());
if(hri == null) {
LOG.info("new connection");

View File

@ -61,9 +61,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
@ -77,23 +77,22 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
@ -154,7 +153,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
protected HServerInfo serverInfo;
protected final Configuration conf;
private final ServerConnection connection;
private final HConnection connection;
protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false);
private FileSystem fs;
private Path rootDir;
@ -286,7 +285,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.fsOk = true;
this.conf = conf;
this.connection = ServerConnectionManager.getConnection(conf);
this.connection = HConnectionManager.getConnection(conf);
this.isOnline = false;
// Config'ed params
@ -418,7 +417,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
}
// Try to get the root region location from zookeeper.
checkRootRegionLocation();
this.catalogTracker.waitForRoot();
long now = System.currentTimeMillis();
// Drop into the send loop if msgInterval has elapsed or if something
// to send. If we fail talking to the master, then we'll sleep below
@ -508,9 +507,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
HBaseRPC.stopProxy(this.hbaseMaster);
this.hbaseMaster = null;
}
this.leases.close();
HConnectionManager.deleteConnection(conf, true);
this.zooKeeper.close();
if (!killed) {
join();
}
@ -557,7 +556,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
stop("Received " + msgs[i]);
continue;
}
this.connection.unsetRootRegionLocation();
LOG.warn("NOT PROCESSING " + msgs[i] + " -- WHY IS MASTER SENDING IT TO US?");
}
return outboundMessages;
@ -574,19 +572,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return hsl;
}
private void checkRootRegionLocation() throws InterruptedException {
if (this.haveRootRegion.get()) return;
HServerAddress rootServer = catalogTracker.getRootLocation();
if (rootServer != null) {
// By setting the root region location, we bypass the wait imposed on
// HTable for all regions being assigned.
HRegionLocation hrl =
new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootServer);
this.connection.setRootRegionLocation(hrl);
this.haveRootRegion.set(true);
}
}
private void closeWAL(final boolean delete) {
try {
if (this.hlog != null) {
@ -708,6 +693,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
startServiceThreads();
LOG.info("Serving as " + this.serverInfo.getServerName() +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
isOnline = true;
} catch (Throwable e) {
this.isOnline = false;

View File

@ -1862,12 +1862,12 @@ public class HLog implements Syncable {
Configuration conf = HBaseConfiguration.create();
for (int i = 1; i < args.length; i++) {
try {
Path logPath = new Path(args[i]);
if (dump) {
dump(conf, logPath);
} else {
split(conf, logPath);
}
Path logPath = new Path(args[i]);
if (dump) {
dump(conf, logPath);
} else {
split(conf, logPath);
}
} catch (Throwable t) {
t.printStackTrace(System.err);
System.exit(-1);

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
@ -162,7 +163,15 @@ public class SequenceFileLogReader implements HLog.Reader {
} catch (IOException e) {
Log.warn("Failed getting position to add to throw", e);
}
return new IOException((this.path == null? "": this.path.toString()) +
", pos=" + pos + ", edit=" + this.edit, ioe);
// Preserve EOFE because these are treated differently if it comes up during
// a split of logs
String msg = (this.path == null? "": this.path.toString()) +
", pos=" + pos + ", edit=" + this.edit;
if (ioe instanceof EOFException) {
EOFException eof = new EOFException(msg);
eof.initCause(ioe);
return eof;
}
return new IOException(msg, ioe);
}
}

View File

@ -351,7 +351,7 @@ public class ZooKeeperWatcher implements Watcher {
*/
public void close() {
try {
if(zooKeeper != null) {
if (zooKeeper != null) {
zooKeeper.close();
// super.close();
}

View File

@ -19,6 +19,9 @@
*/
package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@ -33,7 +36,8 @@ import java.util.List;
import java.util.ArrayList;
public class TestMultiParallel extends MultiRegionTable {
// This test needs to be rewritten to use HBaseTestingUtility -- St.Ack 20100910
private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
private static final byte[] VALUE = Bytes.toBytes("value");
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
private static final String FAMILY = "family";
@ -82,6 +86,7 @@ public class TestMultiParallel extends MultiRegionTable {
}
public void testBatchWithGet() throws Exception {
LOG.info("test=testBatchWithGet");
HTable table = new HTable(conf, TEST_TABLE);
// load test data
@ -125,6 +130,7 @@ public class TestMultiParallel extends MultiRegionTable {
* @throws Exception
*/
public void testFlushCommitsWithAbort() throws Exception {
LOG.info("test=testFlushCommitsWithAbort");
doTestFlushCommits(true);
}
@ -133,8 +139,11 @@ public class TestMultiParallel extends MultiRegionTable {
}
public void doTestFlushCommits(boolean doAbort) throws Exception {
LOG.info("test=doTestFlushCommits");
// Load the data
HTable table = new HTable(conf, TEST_TABLE);
Configuration newconf = new Configuration(conf);
newconf.setInt("hbase.client.retries.number", 10);
HTable table = new HTable(newconf, TEST_TABLE);
table.setAutoFlush(false);
table.setWriteBufferSize(10 * 1024 * 1024);
@ -170,8 +179,10 @@ public class TestMultiParallel extends MultiRegionTable {
}
public void testBatchWithPut() throws Exception {
HTable table = new HTable(conf, TEST_TABLE);
LOG.info("test=testBatchWithPut");
Configuration newconf = new Configuration(conf);
newconf.setInt("hbase.client.retries.number", 10);
HTable table = new HTable(newconf, TEST_TABLE);
// put multiple rows using a batch
List<Row> puts = constructPutRequests();
@ -191,7 +202,7 @@ public class TestMultiParallel extends MultiRegionTable {
}
public void testBatchWithDelete() throws Exception {
LOG.info("test=testBatchWithDelete");
HTable table = new HTable(conf, TEST_TABLE);
// Load some data
@ -219,7 +230,7 @@ public class TestMultiParallel extends MultiRegionTable {
}
public void testHTableDeleteWithList() throws Exception {
LOG.info("test=testHTableDeleteWithList");
HTable table = new HTable(conf, TEST_TABLE);
// Load some data
@ -247,6 +258,7 @@ public class TestMultiParallel extends MultiRegionTable {
}
public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
HTable table = new HTable(conf, TEST_TABLE);
List<Row> puts = new ArrayList<Row>();
@ -282,6 +294,7 @@ public class TestMultiParallel extends MultiRegionTable {
}
public void testBatchWithMixedActions() throws Exception {
LOG.info("test=testBatchWithMixedActions");
HTable table = new HTable(conf, TEST_TABLE);
// Load some data to start

View File

@ -32,9 +32,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -64,8 +64,8 @@ public class TestMetaReaderEditor {
UTIL.startMiniCluster();
ZKW = new ZooKeeperWatcher(UTIL.getConfiguration(),
"TestMetaReaderEditor", ABORTABLE);
ServerConnection connection =
ServerConnectionManager.getConnection(UTIL.getConfiguration());
HConnection connection =
HConnectionManager.getConnection(UTIL.getConfiguration());
CT = new CatalogTracker(ZKW, connection, ABORTABLE);
CT.start();
}

View File

@ -151,8 +151,7 @@ public class TestFromClientSide {
putRows(ht, 3, value2, keyPrefix1);
putRows(ht, 3, value2, keyPrefix2);
putRows(ht, 3, value2, keyPrefix3);
HTable table = new HTable(TEST_UTIL.getConfiguration(),
Bytes.toBytes("testWeirdCacheBehaviour"));
HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE);
System.out.println("Checking values for key: " + keyPrefix1);
assertEquals("Got back incorrect number of rows from scan", 3,
getNumberOfRows(keyPrefix1, value2, table));

View File

@ -147,8 +147,8 @@ public class TestHCM {
Put put = new Put(ROW);
put.add(FAM_NAM, ROW, ROW);
table.put(put);
HConnectionManager.TableServers conn =
(HConnectionManager.TableServers) table.getConnection();
HConnectionManager.HConnectionImplementation conn =
(HConnectionManager.HConnectionImplementation)table.getConnection();
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
conn.deleteCachedLocation(TABLE_NAME, ROW);
HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);

View File

@ -32,6 +32,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
@ -162,7 +163,9 @@ public class TestFSErrorsExposed {
admin.createTable(desc);
// Make it fail faster.
util.getConfiguration().setInt("hbase.client.retries.number", 1);
HTable table = new HTable(util.getConfiguration(), tableName);
// Make a new Configuration so it makes a new connection that has the
// above configuration on it; else we use the old one w/ 10 as default.
HTable table = new HTable(new Configuration(util.getConfiguration()), tableName);
// Load some data
util.loadTable(table, fam);

View File

@ -324,7 +324,7 @@ public class TestHLogSplit {
HLog.Entry entry;
while ((entry = in.next()) != null) ++actualCount;
assertEquals(entryCount-1, actualCount);
// should not have stored the EOF files as corrupt
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
assertEquals(archivedLogs.length, 0);