HBASE-4495 CatalogTracker has an identity crisis; needs to be cut-back in scope (Mikhail Antonov)
This commit is contained in:
parent
24a0a2a2bf
commit
3b2de6233b
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,404 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.NoRouteToHostException;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.rmi.UnknownHostException;
|
||||
|
||||
/**
|
||||
* Utility class to perform operation (get/wait for/verify/set/delete) on znode in ZooKeeper
|
||||
* which keeps hbase:meta region server location.
|
||||
*
|
||||
* Stateless class with a bunch of static methods. Doesn't manage resources passed in
|
||||
* (e.g. HConnection, ZooKeeperWatcher etc).
|
||||
*
|
||||
* Meta region location is set by <code>RegionServerServices</code>.
|
||||
* This class doesn't use ZK watchers, rather accesses ZK directly.
|
||||
*
|
||||
* This class it stateless. The only reason it's not made a non-instantiable util class
|
||||
* with a collection of static methods is that it'd be rather hard to mock properly in tests.
|
||||
*
|
||||
* TODO: rewrite using RPC calls to master to find out about hbase:meta.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetaTableLocator {
|
||||
private static final Log LOG = LogFactory.getLog(MetaTableLocator.class);
|
||||
|
||||
static final byte [] META_REGION_NAME =
|
||||
HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
|
||||
|
||||
// only needed to allow non-timeout infinite waits to stop when cluster shuts down
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
/**
|
||||
* Checks if the meta region location is available.
|
||||
* @return true if meta region location is available, false if not
|
||||
*/
|
||||
public boolean isLocationAvailable(ZooKeeperWatcher zkw) {
|
||||
try {
|
||||
return ZKUtil.getData(zkw, zkw.metaServerZNode) != null;
|
||||
} catch(KeeperException e) {
|
||||
LOG.error("ZK error trying to get hbase:meta from ZooKeeper");
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("ZK error trying to get hbase:meta from ZooKeeper");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the meta region location, if available. Does not block.
|
||||
* @param zkw zookeeper connection to use
|
||||
* @return server name or null if we failed to get the data.
|
||||
*/
|
||||
public ServerName getMetaRegionLocation(final ZooKeeperWatcher zkw) {
|
||||
try {
|
||||
try {
|
||||
return ServerName.parseFrom(ZKUtil.getData(zkw, zkw.metaServerZNode));
|
||||
} catch (DeserializationException e) {
|
||||
throw ZKUtil.convert(e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the meta region location, if available, and waits for up to the
|
||||
* specified timeout if not immediately available.
|
||||
* Given the zookeeper notification could be delayed, we will try to
|
||||
* get the latest data.
|
||||
* @param timeout maximum time to wait, in millis
|
||||
* @return server name for server hosting meta region formatted as per
|
||||
* {@link ServerName}, or null if none available
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
*/
|
||||
public ServerName waitMetaRegionLocation(ZooKeeperWatcher zkw, long timeout)
|
||||
throws InterruptedException, NotAllMetaRegionsOnlineException {
|
||||
try {
|
||||
if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
|
||||
String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. "
|
||||
+ "There could be a mismatch with the one configured in the master.";
|
||||
LOG.error(errorMsg);
|
||||
throw new IllegalArgumentException(errorMsg);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IllegalStateException("KeeperException while trying to check baseZNode:", e);
|
||||
}
|
||||
ServerName sn = blockUntilAvailable(zkw, timeout);
|
||||
|
||||
if (sn == null) {
|
||||
throw new NotAllMetaRegionsOnlineException("Timed out; " + timeout + "ms");
|
||||
}
|
||||
|
||||
return sn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits indefinitely for availability of <code>hbase:meta</code>. Used during
|
||||
* cluster startup. Does not verify meta, just that something has been
|
||||
* set up in zk.
|
||||
* @see #waitMetaRegionLocation(org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher, long)
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
*/
|
||||
public void waitMetaRegionLocation(ZooKeeperWatcher zkw) throws InterruptedException {
|
||||
Stopwatch stopwatch = new Stopwatch().start();
|
||||
while (!stopped) {
|
||||
try {
|
||||
if (waitMetaRegionLocation(zkw, 100) != null) break;
|
||||
long sleepTime = stopwatch.elapsedMillis();
|
||||
// +1 in case sleepTime=0
|
||||
if ((sleepTime + 1) % 10000 == 0) {
|
||||
LOG.warn("Have been waiting for meta to be assigned for " + sleepTime + "ms");
|
||||
}
|
||||
} catch (NotAllMetaRegionsOnlineException e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("hbase:meta still not available, sleeping and retrying." +
|
||||
" Reason: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify <code>hbase:meta</code> is deployed and accessible.
|
||||
* @param timeout How long to wait on zk for meta address (passed through to
|
||||
* the internal call to {@link #getMetaServerConnection}.
|
||||
* @return True if the <code>hbase:meta</code> location is healthy.
|
||||
* @throws java.io.IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public boolean verifyMetaRegionLocation(HConnection hConnection,
|
||||
ZooKeeperWatcher zkw, final long timeout)
|
||||
throws InterruptedException, IOException {
|
||||
AdminProtos.AdminService.BlockingInterface service = null;
|
||||
try {
|
||||
service = getMetaServerConnection(hConnection, zkw, timeout);
|
||||
} catch (NotAllMetaRegionsOnlineException e) {
|
||||
// Pass
|
||||
} catch (ServerNotRunningYetException e) {
|
||||
// Pass -- remote server is not up so can't be carrying root
|
||||
} catch (UnknownHostException e) {
|
||||
// Pass -- server name doesn't resolve so it can't be assigned anything.
|
||||
} catch (RegionServerStoppedException e) {
|
||||
// Pass -- server name sends us to a server that is dying or already dead.
|
||||
}
|
||||
return (service == null)? false:
|
||||
verifyRegionLocation(service,
|
||||
getMetaRegionLocation(zkw), META_REGION_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify we can connect to <code>hostingServer</code> and that its carrying
|
||||
* <code>regionName</code>.
|
||||
* @param hostingServer Interface to the server hosting <code>regionName</code>
|
||||
* @param address The servername that goes with the <code>metaServer</code>
|
||||
* Interface. Used logging.
|
||||
* @param regionName The regionname we are interested in.
|
||||
* @return True if we were able to verify the region located at other side of
|
||||
* the Interface.
|
||||
* @throws IOException
|
||||
*/
|
||||
// TODO: We should be able to get the ServerName from the AdminProtocol
|
||||
// rather than have to pass it in. Its made awkward by the fact that the
|
||||
// HRI is likely a proxy against remote server so the getServerName needs
|
||||
// to be fixed to go to a local method or to a cache before we can do this.
|
||||
private boolean verifyRegionLocation(AdminService.BlockingInterface hostingServer,
|
||||
final ServerName address, final byte [] regionName)
|
||||
throws IOException {
|
||||
if (hostingServer == null) {
|
||||
LOG.info("Passed hostingServer is null");
|
||||
return false;
|
||||
}
|
||||
Throwable t;
|
||||
try {
|
||||
// Try and get regioninfo from the hosting server.
|
||||
return ProtobufUtil.getRegionInfo(hostingServer, regionName) != null;
|
||||
} catch (ConnectException e) {
|
||||
t = e;
|
||||
} catch (RetriesExhaustedException e) {
|
||||
t = e;
|
||||
} catch (RemoteException e) {
|
||||
IOException ioe = e.unwrapRemoteException();
|
||||
t = ioe;
|
||||
} catch (IOException e) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause != null && cause instanceof EOFException) {
|
||||
t = cause;
|
||||
} else if (cause != null && cause.getMessage() != null
|
||||
&& cause.getMessage().contains("Connection reset")) {
|
||||
t = cause;
|
||||
} else {
|
||||
t = e;
|
||||
}
|
||||
}
|
||||
LOG.info("Failed verification of " + Bytes.toStringBinary(regionName) +
|
||||
" at address=" + address + ", exception=" + t);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a connection to the server hosting meta, as reported by ZooKeeper,
|
||||
* waiting up to the specified timeout for availability.
|
||||
* <p>WARNING: Does not retry. Use an {@link org.apache.hadoop.hbase.client.HTable} instead.
|
||||
* @param timeout How long to wait on meta location
|
||||
* @return connection to server hosting meta
|
||||
* @throws InterruptedException
|
||||
* @throws NotAllMetaRegionsOnlineException if timed out waiting
|
||||
* @throws IOException
|
||||
*/
|
||||
private AdminService.BlockingInterface getMetaServerConnection(HConnection hConnection,
|
||||
ZooKeeperWatcher zkw, long timeout)
|
||||
throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
|
||||
return getCachedConnection(hConnection, waitMetaRegionLocation(zkw, timeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sn ServerName to get a connection against.
|
||||
* @return The AdminProtocol we got when we connected to <code>sn</code>
|
||||
* May have come from cache, may not be good, may have been setup by this
|
||||
* invocation, or may be null.
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
private static AdminService.BlockingInterface getCachedConnection(HConnection hConnection,
|
||||
ServerName sn)
|
||||
throws IOException {
|
||||
if (sn == null) {
|
||||
return null;
|
||||
}
|
||||
AdminService.BlockingInterface service = null;
|
||||
try {
|
||||
service = hConnection.getAdmin(sn);
|
||||
} catch (RetriesExhaustedException e) {
|
||||
if (e.getCause() != null && e.getCause() instanceof ConnectException) {
|
||||
// Catch this; presume it means the cached connection has gone bad.
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.debug("Timed out connecting to " + sn);
|
||||
} catch (NoRouteToHostException e) {
|
||||
LOG.debug("Connecting to " + sn, e);
|
||||
} catch (SocketException e) {
|
||||
LOG.debug("Exception connecting to " + sn);
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.debug("Unknown host exception connecting to " + sn);
|
||||
} catch (RpcClient.FailedServerException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Server " + sn + " is in failed server list.");
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
Throwable cause = ioe.getCause();
|
||||
if (ioe instanceof ConnectException) {
|
||||
// Catch. Connect refused.
|
||||
} else if (cause != null && cause instanceof EOFException) {
|
||||
// Catch. Other end disconnected us.
|
||||
} else if (cause != null && cause.getMessage() != null &&
|
||||
cause.getMessage().toLowerCase().contains("connection reset")) {
|
||||
// Catch. Connection reset.
|
||||
} else {
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
}
|
||||
return service;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the location of <code>hbase:meta</code> in ZooKeeper to the
|
||||
* specified server address.
|
||||
* @param zookeeper zookeeper reference
|
||||
* @param location The server hosting <code>hbase:meta</code>
|
||||
* @throws KeeperException unexpected zookeeper exception
|
||||
*/
|
||||
public static void setMetaLocation(ZooKeeperWatcher zookeeper,
|
||||
final ServerName location)
|
||||
throws KeeperException {
|
||||
LOG.info("Setting hbase:meta region location in ZooKeeper as " + location);
|
||||
// Make the MetaRegionServer pb and then get its bytes and save this as
|
||||
// the znode content.
|
||||
byte [] data = toByteArray(location);
|
||||
try {
|
||||
ZKUtil.createAndWatch(zookeeper, zookeeper.metaServerZNode, data);
|
||||
} catch(KeeperException.NodeExistsException nee) {
|
||||
LOG.debug("META region location already existed, updated location");
|
||||
ZKUtil.setData(zookeeper, zookeeper.metaServerZNode, data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build up the znode content.
|
||||
* @param sn What to put into the znode.
|
||||
* @return The content of the meta-region-server znode
|
||||
*/
|
||||
private static byte [] toByteArray(final ServerName sn) {
|
||||
// ZNode content is a pb message preceded by some pb magic.
|
||||
HBaseProtos.ServerName pbsn =
|
||||
HBaseProtos.ServerName.newBuilder()
|
||||
.setHostName(sn.getHostname())
|
||||
.setPort(sn.getPort())
|
||||
.setStartCode(sn.getStartcode())
|
||||
.build();
|
||||
|
||||
ZooKeeperProtos.MetaRegionServer pbrsr =
|
||||
ZooKeeperProtos.MetaRegionServer.newBuilder()
|
||||
.setServer(pbsn)
|
||||
.setRpcVersion(HConstants.RPC_CURRENT_VERSION)
|
||||
.build();
|
||||
return ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the location of <code>hbase:meta</code> in ZooKeeper.
|
||||
* @param zookeeper zookeeper reference
|
||||
* @throws KeeperException unexpected zookeeper exception
|
||||
*/
|
||||
public void deleteMetaLocation(ZooKeeperWatcher zookeeper)
|
||||
throws KeeperException {
|
||||
LOG.info("Unsetting hbase:meta region location in ZooKeeper");
|
||||
try {
|
||||
// Just delete the node. Don't need any watches.
|
||||
ZKUtil.deleteNode(zookeeper, zookeeper.metaServerZNode);
|
||||
} catch(KeeperException.NoNodeException nne) {
|
||||
// Has already been deleted
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until the meta region is available.
|
||||
* @param zkw zookeeper connection to use
|
||||
* @param timeout maximum time to wait, in millis
|
||||
* @return ServerName or null if we timed out.
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public ServerName blockUntilAvailable(final ZooKeeperWatcher zkw,
|
||||
final long timeout)
|
||||
throws InterruptedException {
|
||||
byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.metaServerZNode, timeout);
|
||||
if (data == null) return null;
|
||||
try {
|
||||
return ServerName.parseFrom(data);
|
||||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed parse", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop working.
|
||||
* Interrupts any ongoing waits.
|
||||
*/
|
||||
public void stop() {
|
||||
if (!stopped) {
|
||||
LOG.debug("Stopping MetaTableLocator");
|
||||
stopped = true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor.Visitor;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* A tool to migrate the data stored in hbase:meta table to pbuf serialization.
|
||||
* Supports migrating from 0.92.x and 0.94.x to 0.96.x for the catalog table.
|
||||
* @deprecated will be removed for the major release after 0.96.
|
||||
*/
|
||||
@Deprecated
|
||||
public class MetaMigrationConvertingToPB {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MetaMigrationConvertingToPB.class);
|
||||
|
||||
private static class ConvertToPBMetaVisitor implements Visitor {
|
||||
private final MasterServices services;
|
||||
private long numMigratedRows;
|
||||
|
||||
public ConvertToPBMetaVisitor(MasterServices services) {
|
||||
this.services = services;
|
||||
numMigratedRows = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean visit(Result r) throws IOException {
|
||||
if (r == null || r.isEmpty()) return true;
|
||||
// Check info:regioninfo, info:splitA, and info:splitB. Make sure all
|
||||
// have migrated HRegionInfos.
|
||||
byte [] hriBytes = getBytes(r, HConstants.REGIONINFO_QUALIFIER);
|
||||
// Presumes that an edit updating all three cells either succeeds or
|
||||
// doesn't -- that we don't have case of info:regioninfo migrated but not
|
||||
// info:splitA.
|
||||
if (isMigrated(hriBytes)) return true;
|
||||
// OK. Need to migrate this row in meta.
|
||||
|
||||
//This will 'migrate' the HRI from 092.x and 0.94.x to 0.96+ by reading the
|
||||
//writable serialization
|
||||
HRegionInfo hri = parseFrom(hriBytes);
|
||||
|
||||
// Now make a put to write back to meta.
|
||||
Put p = MetaTableAccessor.makePutFromRegionInfo(hri);
|
||||
|
||||
// Now migrate info:splitA and info:splitB if they are not null
|
||||
migrateSplitIfNecessary(r, p, HConstants.SPLITA_QUALIFIER);
|
||||
migrateSplitIfNecessary(r, p, HConstants.SPLITB_QUALIFIER);
|
||||
|
||||
MetaTableAccessor.putToMetaTable(this.services.getShortCircuitConnection(), p);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Migrated " + Bytes.toString(p.getRow()));
|
||||
}
|
||||
numMigratedRows++;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
static void migrateSplitIfNecessary(final Result r, final Put p, final byte [] which)
|
||||
throws IOException {
|
||||
byte [] hriSplitBytes = getBytes(r, which);
|
||||
if (!isMigrated(hriSplitBytes)) {
|
||||
//This will 'migrate' the HRI from 092.x and 0.94.x to 0.96+ by reading the
|
||||
//writable serialization
|
||||
HRegionInfo hri = parseFrom(hriSplitBytes);
|
||||
p.addImmutable(HConstants.CATALOG_FAMILY, which, hri.toByteArray());
|
||||
}
|
||||
}
|
||||
|
||||
static HRegionInfo parseFrom(byte[] hriBytes) throws IOException {
|
||||
try {
|
||||
return HRegionInfo.parseFrom(hriBytes);
|
||||
} catch (DeserializationException ex) {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param r Result to dig in.
|
||||
* @param qualifier Qualifier to look at in the passed <code>r</code>.
|
||||
* @return Bytes for an HRegionInfo or null if no bytes or empty bytes found.
|
||||
*/
|
||||
static byte [] getBytes(final Result r, final byte [] qualifier) {
|
||||
byte [] hriBytes = r.getValue(HConstants.CATALOG_FAMILY, qualifier);
|
||||
if (hriBytes == null || hriBytes.length <= 0) return null;
|
||||
return hriBytes;
|
||||
}
|
||||
|
||||
static boolean isMigrated(final byte [] hriBytes) {
|
||||
if (hriBytes == null || hriBytes.length <= 0) return true;
|
||||
|
||||
return ProtobufUtil.isPBMagicPrefix(hriBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converting writable serialization to PB, if it is needed.
|
||||
* @param services MasterServices to get a handle on master
|
||||
* @return num migrated rows
|
||||
* @throws IOException or RuntimeException if something goes wrong
|
||||
*/
|
||||
public static long updateMetaIfNecessary(final MasterServices services)
|
||||
throws IOException {
|
||||
if (isMetaTableUpdated(services.getShortCircuitConnection())) {
|
||||
LOG.info("META already up-to date with PB serialization");
|
||||
return 0;
|
||||
}
|
||||
LOG.info("META has Writable serializations, migrating hbase:meta to PB serialization");
|
||||
try {
|
||||
long rows = updateMeta(services);
|
||||
LOG.info("META updated with PB serialization. Total rows updated: " + rows);
|
||||
return rows;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Update hbase:meta with PB serialization failed." + "Master startup aborted.");
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update hbase:meta rows, converting writable serialization to PB
|
||||
* @return num migrated rows
|
||||
*/
|
||||
static long updateMeta(final MasterServices masterServices) throws IOException {
|
||||
LOG.info("Starting update of META");
|
||||
ConvertToPBMetaVisitor v = new ConvertToPBMetaVisitor(masterServices);
|
||||
MetaTableAccessor.fullScan(masterServices.getShortCircuitConnection(), v);
|
||||
LOG.info("Finished update of META. Total rows updated:" + v.numMigratedRows);
|
||||
return v.numMigratedRows;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param hConnection connection to be used
|
||||
* @return True if the meta table has been migrated.
|
||||
* @throws IOException
|
||||
*/
|
||||
static boolean isMetaTableUpdated(final HConnection hConnection) throws IOException {
|
||||
List<Result> results = MetaTableAccessor.fullScanOfMeta(hConnection);
|
||||
if (results == null || results.isEmpty()) {
|
||||
LOG.info("hbase:meta doesn't have any entries to update.");
|
||||
return true;
|
||||
}
|
||||
for (Result r : results) {
|
||||
byte[] value = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
|
||||
if (!isMigrated(value)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Mocking utility for common hbase:meta functionality
|
||||
*/
|
||||
public class MetaMockingUtil {
|
||||
|
||||
/**
|
||||
* Returns a Result object constructed from the given region information simulating
|
||||
* a catalog table result.
|
||||
* @param region the HRegionInfo object or null
|
||||
* @return A mocked up Result that fakes a Get on a row in the <code>hbase:meta</code> table.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Result getMetaTableRowResult(final HRegionInfo region)
|
||||
throws IOException {
|
||||
return getMetaTableRowResult(region, null, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Result object constructed from the given region information simulating
|
||||
* a catalog table result.
|
||||
* @param region the HRegionInfo object or null
|
||||
* @param ServerName to use making startcode and server hostname:port in meta or null
|
||||
* @return A mocked up Result that fakes a Get on a row in the <code>hbase:meta</code> table.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Result getMetaTableRowResult(final HRegionInfo region, final ServerName sn)
|
||||
throws IOException {
|
||||
return getMetaTableRowResult(region, sn, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Result object constructed from the given region information simulating
|
||||
* a catalog table result.
|
||||
* @param region the HRegionInfo object or null
|
||||
* @param ServerName to use making startcode and server hostname:port in meta or null
|
||||
* @param splita daughter region or null
|
||||
* @param splitb daughter region or null
|
||||
* @return A mocked up Result that fakes a Get on a row in the <code>hbase:meta</code> table.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Result getMetaTableRowResult(HRegionInfo region, final ServerName sn,
|
||||
HRegionInfo splita, HRegionInfo splitb) throws IOException {
|
||||
List<Cell> kvs = new ArrayList<Cell>();
|
||||
if (region != null) {
|
||||
kvs.add(new KeyValue(
|
||||
region.getRegionName(),
|
||||
HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
region.toByteArray()));
|
||||
}
|
||||
|
||||
if (sn != null) {
|
||||
kvs.add(new KeyValue(region.getRegionName(),
|
||||
HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
|
||||
Bytes.toBytes(sn.getHostAndPort())));
|
||||
kvs.add(new KeyValue(region.getRegionName(),
|
||||
HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
|
||||
Bytes.toBytes(sn.getStartcode())));
|
||||
}
|
||||
|
||||
if (splita != null) {
|
||||
kvs.add(new KeyValue(
|
||||
region.getRegionName(),
|
||||
HConstants.CATALOG_FAMILY, HConstants.SPLITA_QUALIFIER,
|
||||
splita.toByteArray()));
|
||||
}
|
||||
|
||||
if (splitb != null) {
|
||||
kvs.add(new KeyValue(
|
||||
region.getRegionName(),
|
||||
HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER,
|
||||
splitb.toByteArray()));
|
||||
}
|
||||
|
||||
//important: sort the kvs so that binary search work
|
||||
Collections.sort(kvs, KeyValue.META_COMPARATOR);
|
||||
|
||||
return Result.create(kvs);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sn ServerName to use making startcode and server in meta
|
||||
* @param hri Region to serialize into HRegionInfo
|
||||
* @return A mocked up Result that fakes a Get on a row in the <code>hbase:meta</code> table.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Result getMetaTableRowResultAsSplitRegion(final HRegionInfo hri,
|
||||
final ServerName sn) throws IOException {
|
||||
hri.setOffline(true);
|
||||
hri.setSplit(true);
|
||||
return getMetaTableRowResult(hri, sn);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,432 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.FsShell;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.migration.NamespaceUpgrade;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test migration that changes HRI serialization into PB. Tests by bringing up a cluster from actual
|
||||
* data from a 0.92 cluster, as well as manually downgrading and then upgrading the hbase:meta info.
|
||||
* @deprecated Remove after 0.96
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
@Deprecated
|
||||
public class TestMetaMigrationConvertingToPB {
|
||||
static final Log LOG = LogFactory.getLog(TestMetaMigrationConvertingToPB.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private final static String TESTTABLE = "TestTable";
|
||||
|
||||
private final static int ROW_COUNT = 100;
|
||||
private final static int REGION_COUNT = 9; //initial number of regions of the TestTable
|
||||
|
||||
private static final int META_VERSION_092 = 0;
|
||||
|
||||
/*
|
||||
* This test uses a tgz file named "TestMetaMigrationConvertingToPB.tgz" under
|
||||
* hbase-server/src/test/data which contains file data from a 0.92 cluster.
|
||||
* The cluster has a table named "TestTable", which has 100 rows. 0.94 has same
|
||||
* hbase:meta structure, so it should be the same.
|
||||
*
|
||||
* hbase(main):001:0> create 'TestTable', 'f1'
|
||||
* hbase(main):002:0> for i in 1..100
|
||||
* hbase(main):003:1> put 'TestTable', "row#{i}", "f1:c1", i
|
||||
* hbase(main):004:1> end
|
||||
*
|
||||
* There are 9 regions in the table
|
||||
*/
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// Start up our mini cluster on top of an 0.92 root.dir that has data from
|
||||
// a 0.92 hbase run -- it has a table with 100 rows in it -- and see if
|
||||
// we can migrate from 0.92
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
TEST_UTIL.startMiniDFSCluster(1);
|
||||
Path testdir = TEST_UTIL.getDataTestDir("TestMetaMigrationConvertToPB");
|
||||
// Untar our test dir.
|
||||
File untar = untar(new File(testdir.toString()));
|
||||
// Now copy the untar up into hdfs so when we start hbase, we'll run from it.
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
FsShell shell = new FsShell(conf);
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
// find where hbase will root itself, so we can copy filesystem there
|
||||
Path hbaseRootDir = TEST_UTIL.getDefaultRootDirPath();
|
||||
if (!fs.isDirectory(hbaseRootDir.getParent())) {
|
||||
// mkdir at first
|
||||
fs.mkdirs(hbaseRootDir.getParent());
|
||||
}
|
||||
doFsCommand(shell,
|
||||
new String [] {"-put", untar.toURI().toString(), hbaseRootDir.toString()});
|
||||
|
||||
// windows fix: tgz file has hbase:meta directory renamed as -META- since the original
|
||||
// is an illegal name under windows. So we rename it back.
|
||||
// See src/test/data//TestMetaMigrationConvertingToPB.README and
|
||||
// https://issues.apache.org/jira/browse/HBASE-6821
|
||||
doFsCommand(shell, new String [] {"-mv", new Path(hbaseRootDir, "-META-").toString(),
|
||||
new Path(hbaseRootDir, ".META.").toString()});
|
||||
// See whats in minihdfs.
|
||||
doFsCommand(shell, new String [] {"-lsr", "/"});
|
||||
|
||||
//upgrade to namespace as well
|
||||
Configuration toolConf = TEST_UTIL.getConfiguration();
|
||||
conf.set(HConstants.HBASE_DIR, TEST_UTIL.getDefaultRootDirPath().toString());
|
||||
ToolRunner.run(toolConf, new NamespaceUpgrade(), new String[]{"--upgrade"});
|
||||
|
||||
TEST_UTIL.startMiniHBaseCluster(1, 1);
|
||||
// Assert we are running against the copied-up filesystem. The copied-up
|
||||
// rootdir should have had a table named 'TestTable' in it. Assert it
|
||||
// present.
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TESTTABLE);
|
||||
ResultScanner scanner = t.getScanner(new Scan());
|
||||
int count = 0;
|
||||
while (scanner.next() != null) {
|
||||
count++;
|
||||
}
|
||||
// Assert that we find all 100 rows that are in the data we loaded. If
|
||||
// so then we must have migrated it from 0.90 to 0.92.
|
||||
Assert.assertEquals(ROW_COUNT, count);
|
||||
scanner.close();
|
||||
t.close();
|
||||
}
|
||||
|
||||
private static File untar(final File testdir) throws IOException {
|
||||
// Find the src data under src/test/data
|
||||
final String datafile = "TestMetaMigrationConvertToPB";
|
||||
String srcTarFile =
|
||||
System.getProperty("project.build.testSourceDirectory", "src/test") +
|
||||
File.separator + "data" + File.separator + datafile + ".tgz";
|
||||
File homedir = new File(testdir.toString());
|
||||
File tgtUntarDir = new File(homedir, datafile);
|
||||
if (tgtUntarDir.exists()) {
|
||||
if (!FileUtil.fullyDelete(tgtUntarDir)) {
|
||||
throw new IOException("Failed delete of " + tgtUntarDir.toString());
|
||||
}
|
||||
}
|
||||
LOG.info("Untarring " + srcTarFile + " into " + homedir.toString());
|
||||
FileUtil.unTar(new File(srcTarFile), homedir);
|
||||
Assert.assertTrue(tgtUntarDir.exists());
|
||||
return tgtUntarDir;
|
||||
}
|
||||
|
||||
private static void doFsCommand(final FsShell shell, final String [] args)
|
||||
throws Exception {
|
||||
// Run the 'put' command.
|
||||
int errcode = shell.run(args);
|
||||
if (errcode != 0) throw new IOException("Failed put; errcode=" + errcode);
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaUpdatedFlagInROOT() throws Exception {
|
||||
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
|
||||
boolean metaUpdated = MetaMigrationConvertingToPB.
|
||||
isMetaTableUpdated(master.getShortCircuitConnection());
|
||||
assertEquals(true, metaUpdated);
|
||||
verifyMetaRowsAreUpdated(master.getShortCircuitConnection());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaMigration() throws Exception {
|
||||
LOG.info("Starting testMetaMigration");
|
||||
final byte [] FAMILY = Bytes.toBytes("family");
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testMetaMigration"));
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
|
||||
htd.addFamily(hcd);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
byte[][] regionNames = new byte[][]{
|
||||
HConstants.EMPTY_START_ROW,
|
||||
Bytes.toBytes("region_a"),
|
||||
Bytes.toBytes("region_b")};
|
||||
createMultiRegionsWithWritableSerialization(conf,
|
||||
htd.getTableName().getName(),
|
||||
regionNames);
|
||||
HConnection masterHConnection =
|
||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getShortCircuitConnection();
|
||||
// Erase the current version of root meta for this test.
|
||||
undoVersionInRoot();
|
||||
MetaTableAccessor.fullScanMetaAndPrint(masterHConnection);
|
||||
LOG.info("Meta Print completed.testMetaMigration");
|
||||
|
||||
long numMigratedRows = MetaMigrationConvertingToPB.updateMeta(
|
||||
TEST_UTIL.getHBaseCluster().getMaster());
|
||||
MetaTableAccessor.fullScanMetaAndPrint(masterHConnection);
|
||||
|
||||
// Should be one entry only and it should be for the table we just added.
|
||||
assertEquals(regionNames.length, numMigratedRows);
|
||||
|
||||
// Assert that the flag in ROOT is updated to reflect the correct status
|
||||
boolean metaUpdated = MetaMigrationConvertingToPB.isMetaTableUpdated(masterHConnection);
|
||||
assertEquals(true, metaUpdated);
|
||||
verifyMetaRowsAreUpdated(masterHConnection);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test assumes a master crash/failure during the meta migration process
|
||||
* and attempts to continue the meta migration process when a new master takes over.
|
||||
* When a master dies during the meta migration we will have some rows of
|
||||
* META.CatalogFamily updated with PB serialization and some
|
||||
* still hanging with writable serialization. When the backup master/ or
|
||||
* fresh start of master attempts the migration it will encounter some rows of META
|
||||
* already updated with new HRI and some still legacy. This test will simulate this
|
||||
* scenario and validates that the migration process can safely skip the updated
|
||||
* rows and migrate any pending rows at startup.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testMasterCrashDuringMetaMigration() throws Exception {
|
||||
final byte[] FAMILY = Bytes.toBytes("family");
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf
|
||||
("testMasterCrashDuringMetaMigration"));
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
|
||||
htd.addFamily(hcd);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
// Create 10 New regions.
|
||||
createMultiRegionsWithPBSerialization(conf, htd.getTableName().getName(), 10);
|
||||
// Create 10 Legacy regions.
|
||||
createMultiRegionsWithWritableSerialization(conf,
|
||||
htd.getTableName().getName(), 10);
|
||||
HConnection masterHConnection =
|
||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getShortCircuitConnection();
|
||||
// Erase the current version of root meta for this test.
|
||||
undoVersionInRoot();
|
||||
|
||||
MetaTableAccessor.fullScanMetaAndPrint(masterHConnection);
|
||||
LOG.info("Meta Print completed.testUpdatesOnMetaWithLegacyHRI");
|
||||
|
||||
long numMigratedRows =
|
||||
MetaMigrationConvertingToPB.updateMetaIfNecessary(
|
||||
TEST_UTIL.getHBaseCluster().getMaster());
|
||||
assertEquals(numMigratedRows, 10);
|
||||
|
||||
// Assert that the flag in ROOT is updated to reflect the correct status
|
||||
boolean metaUpdated = MetaMigrationConvertingToPB.isMetaTableUpdated(masterHConnection);
|
||||
assertEquals(true, metaUpdated);
|
||||
|
||||
verifyMetaRowsAreUpdated(masterHConnection);
|
||||
|
||||
LOG.info("END testMasterCrashDuringMetaMigration");
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that every hbase:meta row is updated
|
||||
*/
|
||||
void verifyMetaRowsAreUpdated(HConnection hConnection)
|
||||
throws IOException {
|
||||
List<Result> results = MetaTableAccessor.fullScan(hConnection);
|
||||
assertTrue(results.size() >= REGION_COUNT);
|
||||
|
||||
for (Result result : results) {
|
||||
byte[] hriBytes = result.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.REGIONINFO_QUALIFIER);
|
||||
assertTrue(hriBytes != null && hriBytes.length > 0);
|
||||
assertTrue(MetaMigrationConvertingToPB.isMigrated(hriBytes));
|
||||
|
||||
byte[] splitA = result.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.SPLITA_QUALIFIER);
|
||||
if (splitA != null && splitA.length > 0) {
|
||||
assertTrue(MetaMigrationConvertingToPB.isMigrated(splitA));
|
||||
}
|
||||
|
||||
byte[] splitB = result.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.SPLITB_QUALIFIER);
|
||||
if (splitB != null && splitB.length > 0) {
|
||||
assertTrue(MetaMigrationConvertingToPB.isMigrated(splitB));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Changes the version of hbase:meta to 0 to simulate 0.92 and 0.94 clusters*/
|
||||
private void undoVersionInRoot() throws IOException {
|
||||
Put p = new Put(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
|
||||
|
||||
p.add(HConstants.CATALOG_FAMILY, HConstants.META_VERSION_QUALIFIER,
|
||||
Bytes.toBytes(META_VERSION_092));
|
||||
|
||||
// TODO wire this MetaEditor.putToRootTable(ct, p);
|
||||
LOG.info("Downgraded -ROOT- meta version=" + META_VERSION_092);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts multiple regions into hbase:meta using Writable serialization instead of PB
|
||||
*/
|
||||
public int createMultiRegionsWithWritableSerialization(final Configuration c,
|
||||
final byte[] tableName, int numRegions) throws IOException {
|
||||
if (numRegions < 3) throw new IOException("Must create at least 3 regions");
|
||||
byte [] startKey = Bytes.toBytes("aaaaa");
|
||||
byte [] endKey = Bytes.toBytes("zzzzz");
|
||||
byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
|
||||
byte [][] regionStartKeys = new byte[splitKeys.length+1][];
|
||||
for (int i=0;i<splitKeys.length;i++) {
|
||||
regionStartKeys[i+1] = splitKeys[i];
|
||||
}
|
||||
regionStartKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
|
||||
return createMultiRegionsWithWritableSerialization(c, tableName, regionStartKeys);
|
||||
}
|
||||
|
||||
public int createMultiRegionsWithWritableSerialization(final Configuration c,
|
||||
final byte[] tableName, byte [][] startKeys)
|
||||
throws IOException {
|
||||
return createMultiRegionsWithWritableSerialization(c,
|
||||
TableName.valueOf(tableName), startKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts multiple regions into hbase:meta using Writable serialization instead of PB
|
||||
*/
|
||||
public int createMultiRegionsWithWritableSerialization(final Configuration c,
|
||||
final TableName tableName, byte [][] startKeys)
|
||||
throws IOException {
|
||||
Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
|
||||
HTable meta = new HTable(c, TableName.META_TABLE_NAME);
|
||||
|
||||
List<HRegionInfo> newRegions
|
||||
= new ArrayList<HRegionInfo>(startKeys.length);
|
||||
int count = 0;
|
||||
for (int i = 0; i < startKeys.length; i++) {
|
||||
int j = (i + 1) % startKeys.length;
|
||||
HRegionInfo hri = new HRegionInfo(tableName, startKeys[i], startKeys[j]);
|
||||
Put put = new Put(hri.getRegionName());
|
||||
put.setDurability(Durability.SKIP_WAL);
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
getBytes(hri)); //this is the old Writable serialization
|
||||
|
||||
//also add the region as it's daughters
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITA_QUALIFIER,
|
||||
getBytes(hri)); //this is the old Writable serialization
|
||||
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER,
|
||||
getBytes(hri)); //this is the old Writable serialization
|
||||
|
||||
meta.put(put);
|
||||
LOG.info("createMultiRegionsWithWritableSerialization: PUT inserted " + hri.toString());
|
||||
|
||||
newRegions.add(hri);
|
||||
count++;
|
||||
}
|
||||
meta.close();
|
||||
return count;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
private byte[] getBytes(HRegionInfo hri) throws IOException {
|
||||
DataOutputBuffer out = new DataOutputBuffer();
|
||||
try {
|
||||
hri.write(out);
|
||||
return out.getData();
|
||||
} finally {
|
||||
if (out != null) {
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts multiple regions into hbase:meta using PB serialization
|
||||
*/
|
||||
int createMultiRegionsWithPBSerialization(final Configuration c,
|
||||
final byte[] tableName, int numRegions)
|
||||
throws IOException {
|
||||
if (numRegions < 3) throw new IOException("Must create at least 3 regions");
|
||||
byte [] startKey = Bytes.toBytes("aaaaa");
|
||||
byte [] endKey = Bytes.toBytes("zzzzz");
|
||||
byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
|
||||
byte [][] regionStartKeys = new byte[splitKeys.length+1][];
|
||||
for (int i=0;i<splitKeys.length;i++) {
|
||||
regionStartKeys[i+1] = splitKeys[i];
|
||||
}
|
||||
regionStartKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
|
||||
return createMultiRegionsWithPBSerialization(c, tableName, regionStartKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts multiple regions into hbase:meta using PB serialization
|
||||
*/
|
||||
int createMultiRegionsWithPBSerialization(final Configuration c, final byte[] tableName,
|
||||
byte [][] startKeys) throws IOException {
|
||||
return createMultiRegionsWithPBSerialization(c,
|
||||
TableName.valueOf(tableName), startKeys);
|
||||
}
|
||||
|
||||
int createMultiRegionsWithPBSerialization(final Configuration c,
|
||||
final TableName tableName,
|
||||
byte [][] startKeys) throws IOException {
|
||||
Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
|
||||
HTable meta = new HTable(c, TableName.META_TABLE_NAME);
|
||||
|
||||
List<HRegionInfo> newRegions
|
||||
= new ArrayList<HRegionInfo>(startKeys.length);
|
||||
int count = 0;
|
||||
for (int i = 0; i < startKeys.length; i++) {
|
||||
int j = (i + 1) % startKeys.length;
|
||||
HRegionInfo hri = new HRegionInfo(tableName, startKeys[i], startKeys[j]);
|
||||
Put put = MetaTableAccessor.makePutFromRegionInfo(hri);
|
||||
put.setDurability(Durability.SKIP_WAL);
|
||||
meta.put(put);
|
||||
LOG.info("createMultiRegionsWithPBSerialization: PUT inserted " + hri.toString());
|
||||
|
||||
newRegions.add(hri);
|
||||
count++;
|
||||
}
|
||||
meta.close();
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,378 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test {@link org.apache.hadoop.hbase.MetaTableAccessor}.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestMetaTableAccessor {
|
||||
private static final Log LOG = LogFactory.getLog(TestMetaTableAccessor.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static HConnection hConnection;
|
||||
|
||||
@BeforeClass public static void beforeClass() throws Exception {
|
||||
UTIL.startMiniCluster(3);
|
||||
|
||||
Configuration c = new Configuration(UTIL.getConfiguration());
|
||||
// Tests to 4 retries every 5 seconds. Make it try every 1 second so more
|
||||
// responsive. 1 second is default as is ten retries.
|
||||
c.setLong("hbase.client.pause", 1000);
|
||||
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
|
||||
hConnection = HConnectionManager.getConnection(c);
|
||||
}
|
||||
|
||||
@AfterClass public static void afterClass() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Does {@link MetaTableAccessor#getRegion(HConnection, byte[])} and a write
|
||||
* against hbase:meta while its hosted server is restarted to prove our retrying
|
||||
* works.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test public void testRetrying()
|
||||
throws IOException, InterruptedException {
|
||||
final TableName name =
|
||||
TableName.valueOf("testRetrying");
|
||||
LOG.info("Started " + name);
|
||||
HTable t = UTIL.createTable(name, HConstants.CATALOG_FAMILY);
|
||||
int regionCount = UTIL.createMultiRegions(t, HConstants.CATALOG_FAMILY);
|
||||
// Test it works getting a region from just made user table.
|
||||
final List<HRegionInfo> regions =
|
||||
testGettingTableRegions(hConnection, name, regionCount);
|
||||
MetaTask reader = new MetaTask(hConnection, "reader") {
|
||||
@Override
|
||||
void metaTask() throws Throwable {
|
||||
testGetRegion(hConnection, regions.get(0));
|
||||
LOG.info("Read " + regions.get(0).getEncodedName());
|
||||
}
|
||||
};
|
||||
MetaTask writer = new MetaTask(hConnection, "writer") {
|
||||
@Override
|
||||
void metaTask() throws Throwable {
|
||||
MetaTableAccessor.addRegionToMeta(hConnection, regions.get(0));
|
||||
LOG.info("Wrote " + regions.get(0).getEncodedName());
|
||||
}
|
||||
};
|
||||
reader.start();
|
||||
writer.start();
|
||||
|
||||
// We're gonna check how it takes. If it takes too long, we will consider
|
||||
// it as a fail. We can't put that in the @Test tag as we want to close
|
||||
// the threads nicely
|
||||
final long timeOut = 180000;
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
// Make sure reader and writer are working.
|
||||
assertTrue(reader.isProgressing());
|
||||
assertTrue(writer.isProgressing());
|
||||
|
||||
// Kill server hosting meta -- twice . See if our reader/writer ride over the
|
||||
// meta moves. They'll need to retry.
|
||||
for (int i = 0; i < 2; i++) {
|
||||
LOG.info("Restart=" + i);
|
||||
UTIL.ensureSomeRegionServersAvailable(2);
|
||||
int index = -1;
|
||||
do {
|
||||
index = UTIL.getMiniHBaseCluster().getServerWithMeta();
|
||||
} while (index == -1 &&
|
||||
startTime + timeOut < System.currentTimeMillis());
|
||||
|
||||
if (index != -1){
|
||||
UTIL.getMiniHBaseCluster().abortRegionServer(index);
|
||||
UTIL.getMiniHBaseCluster().waitOnRegionServer(index);
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue("reader: " + reader.toString(), reader.isProgressing());
|
||||
assertTrue("writer: " + writer.toString(), writer.isProgressing());
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
} finally {
|
||||
reader.stop = true;
|
||||
writer.stop = true;
|
||||
reader.join();
|
||||
writer.join();
|
||||
t.close();
|
||||
}
|
||||
long exeTime = System.currentTimeMillis() - startTime;
|
||||
assertTrue("Timeout: test took " + exeTime / 1000 + " sec", exeTime < timeOut);
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread that runs a MetaTableAccessor task until asked stop.
|
||||
*/
|
||||
abstract static class MetaTask extends Thread {
|
||||
boolean stop = false;
|
||||
int count = 0;
|
||||
Throwable t = null;
|
||||
final HConnection hConnection;
|
||||
|
||||
MetaTask(final HConnection hConnection, final String name) {
|
||||
super(name);
|
||||
this.hConnection = hConnection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while(!this.stop) {
|
||||
LOG.info("Before " + this.getName()+ ", count=" + this.count);
|
||||
metaTask();
|
||||
this.count += 1;
|
||||
LOG.info("After " + this.getName() + ", count=" + this.count);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.info(this.getName() + " failed", t);
|
||||
this.t = t;
|
||||
}
|
||||
}
|
||||
|
||||
boolean isProgressing() throws InterruptedException {
|
||||
int currentCount = this.count;
|
||||
while(currentCount == this.count) {
|
||||
if (!isAlive()) return false;
|
||||
if (this.t != null) return false;
|
||||
Thread.sleep(10);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "count=" + this.count + ", t=" +
|
||||
(this.t == null? "null": this.t.toString());
|
||||
}
|
||||
|
||||
abstract void metaTask() throws Throwable;
|
||||
}
|
||||
|
||||
@Test public void testGetRegionsCatalogTables()
|
||||
throws IOException, InterruptedException {
|
||||
List<HRegionInfo> regions =
|
||||
MetaTableAccessor.getTableRegions(UTIL.getZooKeeperWatcher(),
|
||||
hConnection, TableName.META_TABLE_NAME);
|
||||
assertTrue(regions.size() >= 1);
|
||||
assertTrue(MetaTableAccessor.getTableRegionsAndLocations(UTIL.getZooKeeperWatcher(),
|
||||
hConnection,TableName.META_TABLE_NAME).size() >= 1);
|
||||
}
|
||||
|
||||
@Test public void testTableExists() throws IOException {
|
||||
final TableName name =
|
||||
TableName.valueOf("testTableExists");
|
||||
assertFalse(MetaTableAccessor.tableExists(hConnection, name));
|
||||
UTIL.createTable(name, HConstants.CATALOG_FAMILY);
|
||||
assertTrue(MetaTableAccessor.tableExists(hConnection, name));
|
||||
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||
admin.disableTable(name);
|
||||
admin.deleteTable(name);
|
||||
assertFalse(MetaTableAccessor.tableExists(hConnection, name));
|
||||
assertTrue(MetaTableAccessor.tableExists(hConnection,
|
||||
TableName.META_TABLE_NAME));
|
||||
}
|
||||
|
||||
@Test public void testGetRegion() throws IOException, InterruptedException {
|
||||
final String name = "testGetRegion";
|
||||
LOG.info("Started " + name);
|
||||
// Test get on non-existent region.
|
||||
Pair<HRegionInfo, ServerName> pair =
|
||||
MetaTableAccessor.getRegion(hConnection, Bytes.toBytes("nonexistent-region"));
|
||||
assertNull(pair);
|
||||
LOG.info("Finished " + name);
|
||||
}
|
||||
|
||||
// Test for the optimization made in HBASE-3650
|
||||
@Test public void testScanMetaForTable()
|
||||
throws IOException, InterruptedException {
|
||||
final TableName name =
|
||||
TableName.valueOf("testScanMetaForTable");
|
||||
LOG.info("Started " + name);
|
||||
|
||||
/** Create 2 tables
|
||||
- testScanMetaForTable
|
||||
- testScanMetaForTablf
|
||||
**/
|
||||
|
||||
UTIL.createTable(name, HConstants.CATALOG_FAMILY);
|
||||
// name that is +1 greater than the first one (e+1=f)
|
||||
TableName greaterName =
|
||||
TableName.valueOf("testScanMetaForTablf");
|
||||
UTIL.createTable(greaterName, HConstants.CATALOG_FAMILY);
|
||||
|
||||
// Now make sure we only get the regions from 1 of the tables at a time
|
||||
|
||||
assertEquals(1, MetaTableAccessor.getTableRegions(UTIL.getZooKeeperWatcher(),
|
||||
hConnection, name).size());
|
||||
assertEquals(1, MetaTableAccessor.getTableRegions(UTIL.getZooKeeperWatcher(),
|
||||
hConnection, greaterName).size());
|
||||
}
|
||||
|
||||
private static List<HRegionInfo> testGettingTableRegions(final HConnection hConnection,
|
||||
final TableName name, final int regionCount)
|
||||
throws IOException, InterruptedException {
|
||||
List<HRegionInfo> regions = MetaTableAccessor.getTableRegions(UTIL.getZooKeeperWatcher(),
|
||||
hConnection, name);
|
||||
assertEquals(regionCount, regions.size());
|
||||
Pair<HRegionInfo, ServerName> pair =
|
||||
MetaTableAccessor.getRegion(hConnection, regions.get(0).getRegionName());
|
||||
assertEquals(regions.get(0).getEncodedName(),
|
||||
pair.getFirst().getEncodedName());
|
||||
return regions;
|
||||
}
|
||||
|
||||
private static void testGetRegion(final HConnection hConnection,
|
||||
final HRegionInfo region)
|
||||
throws IOException, InterruptedException {
|
||||
Pair<HRegionInfo, ServerName> pair =
|
||||
MetaTableAccessor.getRegion(hConnection, region.getRegionName());
|
||||
assertEquals(region.getEncodedName(),
|
||||
pair.getFirst().getEncodedName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseReplicaIdFromServerColumn() {
|
||||
String column1 = HConstants.SERVER_QUALIFIER_STR;
|
||||
assertEquals(0, MetaTableAccessor.parseReplicaIdFromServerColumn(Bytes.toBytes(column1)));
|
||||
String column2 = column1 + MetaTableAccessor.META_REPLICA_ID_DELIMITER;
|
||||
assertEquals(-1, MetaTableAccessor.parseReplicaIdFromServerColumn(Bytes.toBytes(column2)));
|
||||
String column3 = column2 + "00";
|
||||
assertEquals(-1, MetaTableAccessor.parseReplicaIdFromServerColumn(Bytes.toBytes(column3)));
|
||||
String column4 = column3 + "2A";
|
||||
assertEquals(42, MetaTableAccessor.parseReplicaIdFromServerColumn(Bytes.toBytes(column4)));
|
||||
String column5 = column4 + "2A";
|
||||
assertEquals(-1, MetaTableAccessor.parseReplicaIdFromServerColumn(Bytes.toBytes(column5)));
|
||||
String column6 = HConstants.STARTCODE_QUALIFIER_STR;
|
||||
assertEquals(-1, MetaTableAccessor.parseReplicaIdFromServerColumn(Bytes.toBytes(column6)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaReaderGetColumnMethods() {
|
||||
Assert.assertArrayEquals(HConstants.SERVER_QUALIFIER, MetaTableAccessor.getServerColumn(0));
|
||||
Assert.assertArrayEquals(Bytes.toBytes(HConstants.SERVER_QUALIFIER_STR
|
||||
+ MetaTableAccessor.META_REPLICA_ID_DELIMITER + "002A"),
|
||||
MetaTableAccessor.getServerColumn(42));
|
||||
|
||||
Assert.assertArrayEquals(HConstants.STARTCODE_QUALIFIER,
|
||||
MetaTableAccessor.getStartCodeColumn(0));
|
||||
Assert.assertArrayEquals(Bytes.toBytes(HConstants.STARTCODE_QUALIFIER_STR
|
||||
+ MetaTableAccessor.META_REPLICA_ID_DELIMITER + "002A"),
|
||||
MetaTableAccessor.getStartCodeColumn(42));
|
||||
|
||||
Assert.assertArrayEquals(HConstants.SEQNUM_QUALIFIER,
|
||||
MetaTableAccessor.getSeqNumColumn(0));
|
||||
Assert.assertArrayEquals(Bytes.toBytes(HConstants.SEQNUM_QUALIFIER_STR
|
||||
+ MetaTableAccessor.META_REPLICA_ID_DELIMITER + "002A"),
|
||||
MetaTableAccessor.getSeqNumColumn(42));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaLocationsForRegionReplicas() throws IOException {
|
||||
Random random = new Random();
|
||||
ServerName serverName0 = ServerName.valueOf("foo", 60010, random.nextLong());
|
||||
ServerName serverName1 = ServerName.valueOf("bar", 60010, random.nextLong());
|
||||
ServerName serverName100 = ServerName.valueOf("baz", 60010, random.nextLong());
|
||||
|
||||
long regionId = System.currentTimeMillis();
|
||||
HRegionInfo primary = new HRegionInfo(TableName.valueOf("table_foo"),
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 0);
|
||||
HRegionInfo replica1 = new HRegionInfo(TableName.valueOf("table_foo"),
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 1);
|
||||
HRegionInfo replica100 = new HRegionInfo(TableName.valueOf("table_foo"),
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false, regionId, 100);
|
||||
|
||||
long seqNum0 = random.nextLong();
|
||||
long seqNum1 = random.nextLong();
|
||||
long seqNum100 = random.nextLong();
|
||||
|
||||
|
||||
HTable meta = MetaTableAccessor.getMetaHTable(hConnection);
|
||||
try {
|
||||
MetaTableAccessor.updateRegionLocation(hConnection, primary, serverName0, seqNum0);
|
||||
|
||||
// assert that the server, startcode and seqNum columns are there for the primary region
|
||||
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
|
||||
|
||||
// add replica = 1
|
||||
MetaTableAccessor.updateRegionLocation(hConnection, replica1, serverName1, seqNum1);
|
||||
// check whether the primary is still there
|
||||
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
|
||||
// now check for replica 1
|
||||
assertMetaLocation(meta, primary.getRegionName(), serverName1, seqNum1, 1, true);
|
||||
|
||||
// add replica = 1
|
||||
MetaTableAccessor.updateRegionLocation(hConnection, replica100, serverName100, seqNum100);
|
||||
// check whether the primary is still there
|
||||
assertMetaLocation(meta, primary.getRegionName(), serverName0, seqNum0, 0, true);
|
||||
// check whether the replica 1 is still there
|
||||
assertMetaLocation(meta, primary.getRegionName(), serverName1, seqNum1, 1, true);
|
||||
// now check for replica 1
|
||||
assertMetaLocation(meta, primary.getRegionName(), serverName100, seqNum100, 100, true);
|
||||
} finally {
|
||||
meta.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertMetaLocation(HTable meta, byte[] row, ServerName serverName,
|
||||
long seqNum, int replicaId, boolean checkSeqNum) throws IOException {
|
||||
Get get = new Get(row);
|
||||
Result result = meta.get(get);
|
||||
assertTrue(Bytes.equals(
|
||||
result.getValue(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId)),
|
||||
Bytes.toBytes(serverName.getHostAndPort())));
|
||||
assertTrue(Bytes.equals(
|
||||
result.getValue(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId)),
|
||||
Bytes.toBytes(serverName.getStartcode())));
|
||||
if (checkSeqNum) {
|
||||
assertTrue(Bytes.equals(
|
||||
result.getValue(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId)),
|
||||
Bytes.toBytes(seqNum)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,211 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Test MetaTableAccessor but without spinning up a cluster.
|
||||
* We mock regionserver back and forth (we do spin up a zk cluster).
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestMetaTableAccessorNoCluster {
|
||||
private static final Log LOG = LogFactory.getLog(TestMetaTableAccessorNoCluster.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static final Abortable ABORTABLE = new Abortable() {
|
||||
boolean aborted = false;
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
LOG.info(why, e);
|
||||
this.aborted = true;
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return this.aborted;
|
||||
}
|
||||
};
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
UTIL.startMiniZKCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws IOException {
|
||||
UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetHRegionInfo() throws IOException {
|
||||
assertNull(HRegionInfo.getHRegionInfo(new Result()));
|
||||
|
||||
List<Cell> kvs = new ArrayList<Cell>();
|
||||
Result r = Result.create(kvs);
|
||||
assertNull(HRegionInfo.getHRegionInfo(r));
|
||||
|
||||
byte [] f = HConstants.CATALOG_FAMILY;
|
||||
// Make a key value that doesn't have the expected qualifier.
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f,
|
||||
HConstants.SERVER_QUALIFIER, f));
|
||||
r = Result.create(kvs);
|
||||
assertNull(HRegionInfo.getHRegionInfo(r));
|
||||
// Make a key that does not have a regioninfo value.
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f,
|
||||
HConstants.REGIONINFO_QUALIFIER, f));
|
||||
HRegionInfo hri = HRegionInfo.getHRegionInfo(Result.create(kvs));
|
||||
assertTrue(hri == null);
|
||||
// OK, give it what it expects
|
||||
kvs.clear();
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f,
|
||||
HConstants.REGIONINFO_QUALIFIER,
|
||||
HRegionInfo.FIRST_META_REGIONINFO.toByteArray()));
|
||||
hri = HRegionInfo.getHRegionInfo(Result.create(kvs));
|
||||
assertNotNull(hri);
|
||||
assertTrue(hri.equals(HRegionInfo.FIRST_META_REGIONINFO));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that MetaTableAccessor will ride over server throwing
|
||||
* "Server not running" IOEs.
|
||||
* @see @link {https://issues.apache.org/jira/browse/HBASE-3446}
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testRideOverServerNotRunning()
|
||||
throws IOException, InterruptedException, ServiceException {
|
||||
// Need a zk watcher.
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(UTIL.getConfiguration(),
|
||||
this.getClass().getSimpleName(), ABORTABLE, true);
|
||||
// This is a servername we use in a few places below.
|
||||
ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis());
|
||||
|
||||
HConnection connection;
|
||||
try {
|
||||
// Mock an ClientProtocol. Our mock implementation will fail a few
|
||||
// times when we go to open a scanner.
|
||||
final ClientProtos.ClientService.BlockingInterface implementation =
|
||||
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
|
||||
// When scan called throw IOE 'Server not running' a few times
|
||||
// before we return a scanner id. Whats WEIRD is that these
|
||||
// exceptions do not show in the log because they are caught and only
|
||||
// printed if we FAIL. We eventually succeed after retry so these don't
|
||||
// show. We will know if they happened or not because we will ask
|
||||
// mockito at the end of this test to verify that scan was indeed
|
||||
// called the wanted number of times.
|
||||
List<Cell> kvs = new ArrayList<Cell>();
|
||||
final byte [] rowToVerify = Bytes.toBytes("rowToVerify");
|
||||
kvs.add(new KeyValue(rowToVerify,
|
||||
HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
HRegionInfo.FIRST_META_REGIONINFO.toByteArray()));
|
||||
kvs.add(new KeyValue(rowToVerify,
|
||||
HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
|
||||
Bytes.toBytes(sn.getHostAndPort())));
|
||||
kvs.add(new KeyValue(rowToVerify,
|
||||
HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
|
||||
Bytes.toBytes(sn.getStartcode())));
|
||||
final List<CellScannable> cellScannables = new ArrayList<CellScannable>(1);
|
||||
cellScannables.add(Result.create(kvs));
|
||||
final ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
for (CellScannable result : cellScannables) {
|
||||
builder.addCellsPerResult(((Result)result).size());
|
||||
}
|
||||
Mockito.when(implementation.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any()))
|
||||
.thenThrow(new ServiceException("Server not running (1 of 3)"))
|
||||
.thenThrow(new ServiceException("Server not running (2 of 3)"))
|
||||
.thenThrow(new ServiceException("Server not running (3 of 3)"))
|
||||
.thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build())
|
||||
.thenAnswer(new Answer<ScanResponse>() {
|
||||
public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
((PayloadCarryingRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil
|
||||
.createCellScanner(cellScannables));
|
||||
return builder.build();
|
||||
}
|
||||
}).thenReturn(ScanResponse.newBuilder().setMoreResults(false).build());
|
||||
// Associate a spied-upon HConnection with UTIL.getConfiguration. Need
|
||||
// to shove this in here first so it gets picked up all over; e.g. by
|
||||
// HTable.
|
||||
connection = HConnectionTestingUtility.getSpiedConnection(UTIL.getConfiguration());
|
||||
|
||||
// Fix the location lookup so it 'works' though no network. First
|
||||
// make an 'any location' object.
|
||||
final HRegionLocation anyLocation =
|
||||
new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, sn);
|
||||
final RegionLocations rl = new RegionLocations(anyLocation);
|
||||
// Return the RegionLocations object when locateRegion
|
||||
// The ugly format below comes of 'Important gotcha on spying real objects!' from
|
||||
// http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html
|
||||
ClusterConnection cConnection =
|
||||
HConnectionTestingUtility.getSpiedClusterConnection(UTIL.getConfiguration());
|
||||
Mockito.doReturn(rl).when
|
||||
(cConnection).locateRegion((TableName)Mockito.any(), (byte[])Mockito.any(),
|
||||
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt());
|
||||
|
||||
// Now shove our HRI implementation into the spied-upon connection.
|
||||
Mockito.doReturn(implementation).
|
||||
when(connection).getClient(Mockito.any(ServerName.class));
|
||||
|
||||
// Scan meta for user tables and verify we got back expected answer.
|
||||
NavigableMap<HRegionInfo, Result> hris =
|
||||
MetaTableAccessor.getServerUserRegions(connection, sn);
|
||||
assertEquals(1, hris.size());
|
||||
assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO));
|
||||
assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow()));
|
||||
// Finally verify that scan was called four times -- three times
|
||||
// with exception and then on 4th, 5th and 6th attempt we succeed
|
||||
Mockito.verify(implementation, Mockito.times(6)).
|
||||
scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any());
|
||||
} finally {
|
||||
HConnectionManager.deleteConnection(UTIL.getConfiguration());
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,335 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Test {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestMetaTableLocator {
|
||||
private static final Log LOG = LogFactory.getLog(TestMetaTableLocator.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static final ServerName SN =
|
||||
ServerName.valueOf("example.org", 1234, System.currentTimeMillis());
|
||||
private ZooKeeperWatcher watcher;
|
||||
private Abortable abortable;
|
||||
|
||||
@BeforeClass public static void beforeClass() throws Exception {
|
||||
// Set this down so tests run quicker
|
||||
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
||||
UTIL.startMiniZKCluster();
|
||||
}
|
||||
|
||||
@AfterClass public static void afterClass() throws IOException {
|
||||
UTIL.getZkCluster().shutdown();
|
||||
}
|
||||
|
||||
@Before public void before() throws IOException {
|
||||
this.abortable = new Abortable() {
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
LOG.info(why, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
this.watcher = new ZooKeeperWatcher(UTIL.getConfiguration(),
|
||||
this.getClass().getSimpleName(), this.abortable, true);
|
||||
}
|
||||
|
||||
@After public void after() {
|
||||
try {
|
||||
// Clean out meta location or later tests will be confused... they presume
|
||||
// start fresh in zk.
|
||||
new MetaTableLocator().deleteMetaLocation(this.watcher);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Unable to delete hbase:meta location", e);
|
||||
}
|
||||
|
||||
// Clear out our doctored connection or could mess up subsequent tests.
|
||||
HConnectionManager.deleteConnection(UTIL.getConfiguration());
|
||||
|
||||
this.watcher.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test interruptable while blocking wait on meta.
|
||||
* @throws IOException
|
||||
* @throws ServiceException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test public void testInterruptWaitOnMeta()
|
||||
throws IOException, InterruptedException, ServiceException {
|
||||
final ClientProtos.ClientService.BlockingInterface client =
|
||||
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
|
||||
|
||||
Mockito.when(client.get((RpcController)Mockito.any(), (GetRequest)Mockito.any())).
|
||||
thenReturn(GetResponse.newBuilder().build());
|
||||
|
||||
final MetaTableLocator mtl = new MetaTableLocator();
|
||||
ServerName meta = new MetaTableLocator().getMetaRegionLocation(this.watcher);
|
||||
Assert.assertNull(meta);
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
mtl.waitMetaRegionLocation(watcher);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Interrupted", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
while (!t.isAlive())
|
||||
Threads.sleep(1);
|
||||
Threads.sleep(1);
|
||||
assertTrue(t.isAlive());
|
||||
mtl.stop();
|
||||
// Join the thread... should exit shortly.
|
||||
t.join();
|
||||
}
|
||||
|
||||
private void testVerifyMetaRegionLocationWithException(Exception ex)
|
||||
throws IOException, InterruptedException, KeeperException, ServiceException {
|
||||
// Mock an ClientProtocol.
|
||||
final ClientProtos.ClientService.BlockingInterface implementation =
|
||||
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
|
||||
HConnection connection = mockConnection(null, implementation);
|
||||
|
||||
// If a 'get' is called on mocked interface, throw connection refused.
|
||||
Mockito.when(implementation.get((RpcController) Mockito.any(), (GetRequest) Mockito.any())).
|
||||
thenThrow(new ServiceException(ex));
|
||||
|
||||
MetaTableLocator.setMetaLocation(this.watcher, SN);
|
||||
long timeout = UTIL.getConfiguration().
|
||||
getLong("hbase.catalog.verification.timeout", 1000);
|
||||
Assert.assertFalse(new MetaTableLocator().verifyMetaRegionLocation(
|
||||
connection, watcher, timeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test we survive a connection refused {@link ConnectException}
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws KeeperException
|
||||
* @throws ServiceException
|
||||
*/
|
||||
@Test
|
||||
public void testGetMetaServerConnectionFails()
|
||||
throws IOException, InterruptedException, KeeperException, ServiceException {
|
||||
testVerifyMetaRegionLocationWithException(new ConnectException("Connection refused"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that verifyMetaRegionLocation properly handles getting a
|
||||
* ServerNotRunningException. See HBASE-4470.
|
||||
* Note this doesn't check the exact exception thrown in the
|
||||
* HBASE-4470 as there it is thrown from getHConnection() and
|
||||
* here it is thrown from get() -- but those are both called
|
||||
* from the same function anyway, and this way is less invasive than
|
||||
* throwing from getHConnection would be.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws KeeperException
|
||||
* @throws ServiceException
|
||||
*/
|
||||
@Test
|
||||
public void testVerifyMetaRegionServerNotRunning()
|
||||
throws IOException, InterruptedException, KeeperException, ServiceException {
|
||||
testVerifyMetaRegionLocationWithException(new ServerNotRunningYetException("mock"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test get of meta region fails properly if nothing to connect to.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws KeeperException
|
||||
* @throws ServiceException
|
||||
*/
|
||||
@Test
|
||||
public void testVerifyMetaRegionLocationFails()
|
||||
throws IOException, InterruptedException, KeeperException, ServiceException {
|
||||
HConnection connection = Mockito.mock(HConnection.class);
|
||||
ServiceException connectException =
|
||||
new ServiceException(new ConnectException("Connection refused"));
|
||||
final AdminProtos.AdminService.BlockingInterface implementation =
|
||||
Mockito.mock(AdminProtos.AdminService.BlockingInterface.class);
|
||||
Mockito.when(implementation.getRegionInfo((RpcController)Mockito.any(),
|
||||
(GetRegionInfoRequest)Mockito.any())).thenThrow(connectException);
|
||||
Mockito.when(connection.getAdmin(Mockito.any(ServerName.class), Mockito.anyBoolean())).
|
||||
thenReturn(implementation);
|
||||
|
||||
MetaTableLocator.setMetaLocation(this.watcher,
|
||||
ServerName.valueOf("example.com", 1234, System.currentTimeMillis()));
|
||||
Assert.assertFalse(new MetaTableLocator().verifyMetaRegionLocation(connection, watcher, 100));
|
||||
}
|
||||
|
||||
@Test (expected = NotAllMetaRegionsOnlineException.class)
|
||||
public void testTimeoutWaitForMeta()
|
||||
throws IOException, InterruptedException {
|
||||
new MetaTableLocator().waitMetaRegionLocation(watcher, 100);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test waiting on meat w/ no timeout specified.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
@Test public void testNoTimeoutWaitForMeta()
|
||||
throws IOException, InterruptedException, KeeperException {
|
||||
final MetaTableLocator mtl = new MetaTableLocator();
|
||||
ServerName hsa = mtl.getMetaRegionLocation(watcher);
|
||||
Assert.assertNull(hsa);
|
||||
|
||||
// Now test waiting on meta location getting set.
|
||||
Thread t = new WaitOnMetaThread();
|
||||
startWaitAliveThenWaitItLives(t, 1);
|
||||
// Set a meta location.
|
||||
hsa = setMetaLocation();
|
||||
// Join the thread... should exit shortly.
|
||||
t.join();
|
||||
// Now meta is available.
|
||||
Assert.assertTrue(mtl.getMetaRegionLocation(watcher).equals(hsa));
|
||||
}
|
||||
|
||||
private ServerName setMetaLocation() throws KeeperException {
|
||||
MetaTableLocator.setMetaLocation(this.watcher, SN);
|
||||
return SN;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param admin An {@link AdminProtos.AdminService.BlockingInterface} instance; you'll likely
|
||||
* want to pass a mocked HRS; can be null.
|
||||
* @param client A mocked ClientProtocol instance, can be null
|
||||
* @return Mock up a connection that returns a {@link Configuration} when
|
||||
* {@link HConnection#getConfiguration()} is called, a 'location' when
|
||||
* {@link HConnection#getRegionLocation(byte[], byte[], boolean)} is called,
|
||||
* and that returns the passed {@link AdminProtos.AdminService.BlockingInterface} instance when
|
||||
* {@link HConnection#getAdmin(ServerName)} is called, returns the passed
|
||||
* {@link ClientProtos.ClientService.BlockingInterface} instance when
|
||||
* {@link HConnection#getClient(ServerName)} is called (Be sure to call
|
||||
* {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration)}
|
||||
* when done with this mocked Connection.
|
||||
* @throws IOException
|
||||
*/
|
||||
private HConnection mockConnection(final AdminProtos.AdminService.BlockingInterface admin,
|
||||
final ClientProtos.ClientService.BlockingInterface client)
|
||||
throws IOException {
|
||||
HConnection connection =
|
||||
HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration());
|
||||
Mockito.doNothing().when(connection).close();
|
||||
// Make it so we return any old location when asked.
|
||||
final HRegionLocation anyLocation =
|
||||
new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, SN);
|
||||
Mockito.when(connection.getRegionLocation((TableName) Mockito.any(),
|
||||
(byte[]) Mockito.any(), Mockito.anyBoolean())).
|
||||
thenReturn(anyLocation);
|
||||
Mockito.when(connection.locateRegion((TableName) Mockito.any(),
|
||||
(byte[]) Mockito.any())).
|
||||
thenReturn(anyLocation);
|
||||
if (admin != null) {
|
||||
// If a call to getHRegionConnection, return this implementation.
|
||||
Mockito.when(connection.getAdmin(Mockito.any(ServerName.class))).
|
||||
thenReturn(admin);
|
||||
}
|
||||
if (client != null) {
|
||||
// If a call to getClient, return this implementation.
|
||||
Mockito.when(connection.getClient(Mockito.any(ServerName.class))).
|
||||
thenReturn(client);
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
private void startWaitAliveThenWaitItLives(final Thread t, final int ms) {
|
||||
t.start();
|
||||
while(!t.isAlive()) {
|
||||
// Wait
|
||||
}
|
||||
// Wait one second.
|
||||
Threads.sleep(ms);
|
||||
Assert.assertTrue("Assert " + t.getName() + " still waiting", t.isAlive());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait on META.
|
||||
*/
|
||||
class WaitOnMetaThread extends Thread {
|
||||
|
||||
WaitOnMetaThread() {
|
||||
super("WaitOnMeta");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
doWaiting();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Failed wait", e);
|
||||
}
|
||||
LOG.info("Exiting " + getName());
|
||||
}
|
||||
|
||||
void doWaiting() throws InterruptedException {
|
||||
try {
|
||||
while (new MetaTableLocator().waitMetaRegionLocation(watcher, 100) == null);
|
||||
} catch (NotAllMetaRegionsOnlineException e) {
|
||||
//Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue