HBASE-3194 HBase should run on both secure and vanilla versions of Hadoop 0.20
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1032848 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4dce2b26bf
commit
43f81f1943
|
@ -1122,6 +1122,8 @@ Release 0.90.0 - Unreleased
|
|||
HBASE-3083 Major compaction check should use new timestamp meta
|
||||
information in HFiles (rather than dfs timestamp) along with
|
||||
TTL to allow major even if single file
|
||||
HBASE-3194 HBase should run on both secure and vanilla versions of Hadoop 0.20
|
||||
(Gary Helmling via Stack)
|
||||
|
||||
|
||||
NEW FEATURES
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
|
||||
|
@ -148,6 +150,7 @@ public class LocalHBaseCluster {
|
|||
this.regionServerClass =
|
||||
(Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
|
||||
regionServerClass);
|
||||
|
||||
for (int i = 0; i < noRegionServers; i++) {
|
||||
addRegionServer(i);
|
||||
}
|
||||
|
@ -169,6 +172,17 @@ public class LocalHBaseCluster {
|
|||
return rst;
|
||||
}
|
||||
|
||||
public JVMClusterUtil.RegionServerThread addRegionServer(
|
||||
final int index, User user)
|
||||
throws IOException, InterruptedException {
|
||||
return user.runAs(
|
||||
new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
|
||||
public JVMClusterUtil.RegionServerThread run() throws Exception {
|
||||
return addRegionServer(index);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public JVMClusterUtil.MasterThread addMaster() throws IOException {
|
||||
return addMaster(this.masterThreads.size());
|
||||
}
|
||||
|
@ -185,6 +199,17 @@ public class LocalHBaseCluster {
|
|||
return mt;
|
||||
}
|
||||
|
||||
public JVMClusterUtil.MasterThread addMaster(
|
||||
final int index, User user)
|
||||
throws IOException, InterruptedException {
|
||||
return user.runAs(
|
||||
new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
|
||||
public JVMClusterUtil.MasterThread run() throws Exception {
|
||||
return addMaster(index);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serverNumber
|
||||
* @return region server
|
||||
|
|
|
@ -1020,8 +1020,6 @@ public abstract class HBaseServer {
|
|||
Writable value = null;
|
||||
|
||||
CurCall.set(call);
|
||||
UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
|
||||
UserGroupInformation.setCurrentUser(call.connection.ticket);
|
||||
try {
|
||||
if (!started)
|
||||
throw new ServerNotRunningException("Server is not running yet");
|
||||
|
@ -1031,7 +1029,6 @@ public abstract class HBaseServer {
|
|||
errorClass = e.getClass().getName();
|
||||
error = StringUtils.stringifyException(e);
|
||||
}
|
||||
UserGroupInformation.setCurrentUser(previous);
|
||||
CurCall.set(null);
|
||||
|
||||
if (buf.size() > buffersize) {
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -67,10 +68,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.hadoop.security.UnixUserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Facility for testing HBase. Replacement for
|
||||
|
@ -357,11 +355,12 @@ public class HBaseTestingUtility {
|
|||
* @param numSlaves
|
||||
* @return Reference to the hbase mini hbase cluster.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @see {@link #startMiniCluster()}
|
||||
*/
|
||||
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
|
||||
final int numSlaves)
|
||||
throws IOException {
|
||||
throws IOException, InterruptedException {
|
||||
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
|
||||
createRootDir();
|
||||
Configuration c = new Configuration(this.conf);
|
||||
|
@ -382,7 +381,7 @@ public class HBaseTestingUtility {
|
|||
* @param servers number of region servers
|
||||
* @throws IOException
|
||||
*/
|
||||
public void restartHBaseCluster(int servers) throws IOException {
|
||||
public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
|
||||
this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
|
||||
// Don't leave here till we've done a successful scan of the .META.
|
||||
HTable t = new HTable(new Configuration(this.conf), HConstants.META_TABLE_NAME);
|
||||
|
@ -576,6 +575,16 @@ public class HBaseTestingUtility {
|
|||
return new HTable(new Configuration(getConfiguration()), tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop an existing table
|
||||
* @param tableName existing table
|
||||
*/
|
||||
public void deleteTable(byte[] tableName) throws IOException {
|
||||
HBaseAdmin admin = new HBaseAdmin(getConfiguration());
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide an existing table name to truncate
|
||||
* @param tableName existing table
|
||||
|
@ -1127,20 +1136,20 @@ public class HBaseTestingUtility {
|
|||
* @return A new configuration instance with a different user set into it.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Configuration setDifferentUser(final Configuration c,
|
||||
public static User getDifferentUser(final Configuration c,
|
||||
final String differentiatingSuffix)
|
||||
throws IOException {
|
||||
FileSystem currentfs = FileSystem.get(c);
|
||||
Preconditions.checkArgument(currentfs instanceof DistributedFileSystem);
|
||||
if (!(currentfs instanceof DistributedFileSystem)) {
|
||||
return User.getCurrent();
|
||||
}
|
||||
// Else distributed filesystem. Make a new instance per daemon. Below
|
||||
// code is taken from the AppendTestUtil over in hdfs.
|
||||
Configuration c2 = new Configuration(c);
|
||||
String username = UserGroupInformation.getCurrentUGI().getUserName() +
|
||||
String username = User.getCurrent().getName() +
|
||||
differentiatingSuffix;
|
||||
UnixUserGroupInformation.saveToConf(c2,
|
||||
UnixUserGroupInformation.UGI_PROPERTY_NAME,
|
||||
new UnixUserGroupInformation(username, new String[]{"supergroup"}));
|
||||
return c2;
|
||||
User user = User.createUserForTesting(c, username,
|
||||
new String[]{"supergroup"});
|
||||
return user;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -33,13 +35,12 @@ import org.apache.hadoop.hbase.client.HConnectionManager;
|
|||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.io.MapWritable;
|
||||
import org.apache.hadoop.security.UnixUserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
|
@ -52,12 +53,7 @@ public class MiniHBaseCluster {
|
|||
static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
|
||||
private Configuration conf;
|
||||
public LocalHBaseCluster hbaseCluster;
|
||||
// Cache this. For some reason only works first time I get it. TODO: Figure
|
||||
// out why.
|
||||
private final static UserGroupInformation UGI;
|
||||
static {
|
||||
UGI = UserGroupInformation.getCurrentUGI();
|
||||
}
|
||||
private static int index;
|
||||
|
||||
/**
|
||||
* Start a MiniHBaseCluster.
|
||||
|
@ -66,7 +62,7 @@ public class MiniHBaseCluster {
|
|||
* @throws IOException
|
||||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int numRegionServers)
|
||||
throws IOException {
|
||||
throws IOException, InterruptedException {
|
||||
this(conf, 1, numRegionServers);
|
||||
}
|
||||
|
||||
|
@ -79,7 +75,7 @@ public class MiniHBaseCluster {
|
|||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int numMasters,
|
||||
int numRegionServers)
|
||||
throws IOException {
|
||||
throws IOException, InterruptedException {
|
||||
this.conf = conf;
|
||||
conf.set(HConstants.MASTER_PORT, "0");
|
||||
init(numMasters, numRegionServers);
|
||||
|
@ -165,12 +161,13 @@ public class MiniHBaseCluster {
|
|||
* the FileSystem system exit hook does.
|
||||
*/
|
||||
public static class MiniHBaseClusterRegionServer extends HRegionServer {
|
||||
private static int index = 0;
|
||||
private Thread shutdownThread = null;
|
||||
private User user = null;
|
||||
|
||||
public MiniHBaseClusterRegionServer(Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
super(setDifferentUser(conf));
|
||||
super(conf);
|
||||
this.user = User.getCurrent();
|
||||
}
|
||||
|
||||
public void setHServerInfo(final HServerInfo hsi) {
|
||||
|
@ -184,19 +181,6 @@ public class MiniHBaseCluster {
|
|||
* @return A new fs instance if we are up on DistributeFileSystem.
|
||||
* @throws IOException
|
||||
*/
|
||||
private static Configuration setDifferentUser(final Configuration c)
|
||||
throws IOException {
|
||||
FileSystem currentfs = FileSystem.get(c);
|
||||
if (!(currentfs instanceof DistributedFileSystem)) return c;
|
||||
// Else distributed filesystem. Make a new instance per daemon. Below
|
||||
// code is taken from the AppendTestUtil over in hdfs.
|
||||
Configuration c2 = new Configuration(c);
|
||||
String username = UGI.getUserName() + ".hrs." + index++;
|
||||
UnixUserGroupInformation.saveToConf(c2,
|
||||
UnixUserGroupInformation.UGI_PROPERTY_NAME,
|
||||
new UnixUserGroupInformation(username, new String[]{"supergroup"}));
|
||||
return c2;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleReportForDutyResponse(MapWritable c) throws IOException {
|
||||
|
@ -208,7 +192,12 @@ public class MiniHBaseCluster {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
super.run();
|
||||
this.user.runAs(new PrivilegedAction<Object>(){
|
||||
public Object run() {
|
||||
runRegionServer();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Exception in run", t);
|
||||
} finally {
|
||||
|
@ -220,10 +209,27 @@ public class MiniHBaseCluster {
|
|||
}
|
||||
}
|
||||
|
||||
private void runRegionServer() {
|
||||
super.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void kill() {
|
||||
super.kill();
|
||||
}
|
||||
|
||||
public void abort(final String reason, final Throwable cause) {
|
||||
this.user.runAs(new PrivilegedAction<Object>() {
|
||||
public Object run() {
|
||||
abortRegionServer(reason, cause);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void abortRegionServer(String reason, Throwable cause) {
|
||||
super.abort(reason, cause);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -250,17 +256,26 @@ public class MiniHBaseCluster {
|
|||
}
|
||||
|
||||
private void init(final int nMasterNodes, final int nRegionNodes)
|
||||
throws IOException {
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
// start up a LocalHBaseCluster
|
||||
hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, nRegionNodes,
|
||||
hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
|
||||
MiniHBaseCluster.MiniHBaseClusterMaster.class,
|
||||
MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
|
||||
|
||||
// manually add the regionservers as other users
|
||||
for (int i=0; i<nRegionNodes; i++) {
|
||||
User user = HBaseTestingUtility.getDifferentUser(conf,
|
||||
".hfs."+index++);
|
||||
hbaseCluster.addRegionServer(i, user);
|
||||
}
|
||||
|
||||
hbaseCluster.startup();
|
||||
} catch (IOException e) {
|
||||
shutdown();
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Error starting cluster", t);
|
||||
shutdown();
|
||||
throw new IOException("Shutting down", t);
|
||||
}
|
||||
|
@ -272,10 +287,23 @@ public class MiniHBaseCluster {
|
|||
* @throws IOException
|
||||
* @return New RegionServerThread
|
||||
*/
|
||||
public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
|
||||
JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer();
|
||||
t.start();
|
||||
t.waitForServerOnline();
|
||||
public JVMClusterUtil.RegionServerThread startRegionServer()
|
||||
throws IOException {
|
||||
User rsUser =
|
||||
HBaseTestingUtility.getDifferentUser(conf, ".hfs."+index++);
|
||||
JVMClusterUtil.RegionServerThread t = null;
|
||||
try {
|
||||
t = rsUser.runAs(
|
||||
new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
|
||||
public JVMClusterUtil.RegionServerThread run() throws Exception {
|
||||
return hbaseCluster.addRegionServer();
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
t.waitForServerOnline();
|
||||
} catch (InterruptedException ie) {
|
||||
throw new IOException("Interrupted executing UserGroupInformation.doAs()", ie);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
@ -50,12 +51,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.security.UnixUserGroupInformation;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
|
@ -460,45 +461,49 @@ public class TestStore extends TestCase {
|
|||
public void testHandleErrorsInFlush() throws Exception {
|
||||
LOG.info("Setting up a faulty file system that cannot write");
|
||||
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// Set a different UGI so we don't get the same cached LocalFS instance
|
||||
conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME,
|
||||
"testhandleerrorsinflush,foo");
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
User user = User.createUserForTesting(conf,
|
||||
"testhandleerrorsinflush", new String[]{"foo"});
|
||||
// Inject our faulty LocalFileSystem
|
||||
conf.setClass("fs.file.impl", FaultyFileSystem.class,
|
||||
FileSystem.class);
|
||||
// Make sure it worked (above is sensitive to caching details in hadoop core)
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
assertEquals(FaultyFileSystem.class, fs.getClass());
|
||||
user.runAs(new PrivilegedExceptionAction<Object>() {
|
||||
public Object run() throws Exception {
|
||||
// Make sure it worked (above is sensitive to caching details in hadoop core)
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
assertEquals(FaultyFileSystem.class, fs.getClass());
|
||||
|
||||
// Initialize region
|
||||
init(getName(), conf);
|
||||
// Initialize region
|
||||
init(getName(), conf);
|
||||
|
||||
LOG.info("Adding some data");
|
||||
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
|
||||
this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
|
||||
this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
|
||||
LOG.info("Adding some data");
|
||||
store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
|
||||
store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
|
||||
store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
|
||||
|
||||
LOG.info("Before flush, we should have no files");
|
||||
FileStatus[] files = fs.listStatus(store.getHomedir());
|
||||
Path[] paths = FileUtil.stat2Paths(files);
|
||||
System.err.println("Got paths: " + Joiner.on(",").join(paths));
|
||||
assertEquals(0, paths.length);
|
||||
LOG.info("Before flush, we should have no files");
|
||||
FileStatus[] files = fs.listStatus(store.getHomedir());
|
||||
Path[] paths = FileUtil.stat2Paths(files);
|
||||
System.err.println("Got paths: " + Joiner.on(",").join(paths));
|
||||
assertEquals(0, paths.length);
|
||||
|
||||
//flush
|
||||
try {
|
||||
LOG.info("Flushing");
|
||||
flush(1);
|
||||
fail("Didn't bubble up IOE!");
|
||||
} catch (IOException ioe) {
|
||||
assertTrue(ioe.getMessage().contains("Fault injected"));
|
||||
}
|
||||
//flush
|
||||
try {
|
||||
LOG.info("Flushing");
|
||||
flush(1);
|
||||
fail("Didn't bubble up IOE!");
|
||||
} catch (IOException ioe) {
|
||||
assertTrue(ioe.getMessage().contains("Fault injected"));
|
||||
}
|
||||
|
||||
LOG.info("After failed flush, we should still have no files!");
|
||||
files = fs.listStatus(store.getHomedir());
|
||||
paths = FileUtil.stat2Paths(files);
|
||||
System.err.println("Got paths: " + Joiner.on(",").join(paths));
|
||||
assertEquals(0, paths.length);
|
||||
LOG.info("After failed flush, we should still have no files!");
|
||||
files = fs.listStatus(store.getHomedir());
|
||||
paths = FileUtil.stat2Paths(files);
|
||||
System.err.println("Got paths: " + Joiner.on(",").join(paths));
|
||||
assertEquals(0, paths.length);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
|||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -179,10 +181,10 @@ public class TestWALReplay {
|
|||
@Test
|
||||
public void testRegionMadeOfBulkLoadedFilesOnly()
|
||||
throws IOException, SecurityException, IllegalArgumentException,
|
||||
NoSuchFieldException, IllegalAccessException {
|
||||
NoSuchFieldException, IllegalAccessException, InterruptedException {
|
||||
final String tableNameStr = "testReplayEditsWrittenViaHRegion";
|
||||
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
|
||||
Path basedir = new Path(this.hbaseRootDir, tableNameStr);
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
|
||||
final Path basedir = new Path(this.hbaseRootDir, tableNameStr);
|
||||
deleteDir(basedir);
|
||||
HLog wal = createWAL(this.conf);
|
||||
HRegion region = HRegion.openHRegion(hri, wal, this.conf);
|
||||
|
@ -198,18 +200,24 @@ public class TestWALReplay {
|
|||
wal.sync();
|
||||
|
||||
// Now 'crash' the region by stealing its wal
|
||||
Configuration newConf = HBaseTestingUtility.setDifferentUser(this.conf,
|
||||
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||
User user = HBaseTestingUtility.getDifferentUser(newConf,
|
||||
tableNameStr);
|
||||
runWALSplit(newConf);
|
||||
HLog wal2 = createWAL(newConf);
|
||||
HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf),
|
||||
newConf, hri, null);
|
||||
long seqid2 = region2.initialize();
|
||||
assertTrue(seqid2 > -1);
|
||||
user.runAs(new PrivilegedExceptionAction() {
|
||||
public Object run() throws Exception {
|
||||
runWALSplit(newConf);
|
||||
HLog wal2 = createWAL(newConf);
|
||||
HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf),
|
||||
newConf, hri, null);
|
||||
long seqid2 = region2.initialize();
|
||||
assertTrue(seqid2 > -1);
|
||||
|
||||
// I can't close wal1. Its been appropriated when we split.
|
||||
region2.close();
|
||||
wal2.closeAndDelete();
|
||||
// I can't close wal1. Its been appropriated when we split.
|
||||
region2.close();
|
||||
wal2.closeAndDelete();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -224,10 +232,10 @@ public class TestWALReplay {
|
|||
@Test
|
||||
public void testReplayEditsWrittenViaHRegion()
|
||||
throws IOException, SecurityException, IllegalArgumentException,
|
||||
NoSuchFieldException, IllegalAccessException {
|
||||
NoSuchFieldException, IllegalAccessException, InterruptedException {
|
||||
final String tableNameStr = "testReplayEditsWrittenViaHRegion";
|
||||
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
|
||||
Path basedir = new Path(this.hbaseRootDir, tableNameStr);
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
|
||||
final Path basedir = new Path(this.hbaseRootDir, tableNameStr);
|
||||
deleteDir(basedir);
|
||||
final byte[] rowName = Bytes.toBytes(tableNameStr);
|
||||
final int countPerFamily = 10;
|
||||
|
@ -250,7 +258,7 @@ public class TestWALReplay {
|
|||
}
|
||||
}
|
||||
// Now assert edits made it in.
|
||||
Get g = new Get(rowName);
|
||||
final Get g = new Get(rowName);
|
||||
Result result = region.get(g, null);
|
||||
assertEquals(countPerFamily * hri.getTableDesc().getFamilies().size(),
|
||||
result.size());
|
||||
|
@ -280,39 +288,45 @@ public class TestWALReplay {
|
|||
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
|
||||
}
|
||||
// Get count of edits.
|
||||
Result result2 = region2.get(g, null);
|
||||
final Result result2 = region2.get(g, null);
|
||||
assertEquals(2 * result.size(), result2.size());
|
||||
wal2.sync();
|
||||
// Set down maximum recovery so we dfsclient doesn't linger retrying something
|
||||
// long gone.
|
||||
HBaseTestingUtility.setMaxRecoveryErrorCount(wal2.getOutputStream(), 1);
|
||||
Configuration newConf = HBaseTestingUtility.setDifferentUser(this.conf,
|
||||
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||
User user = HBaseTestingUtility.getDifferentUser(newConf,
|
||||
tableNameStr);
|
||||
runWALSplit(newConf);
|
||||
FileSystem newFS = FileSystem.get(newConf);
|
||||
// Make a new wal for new region open.
|
||||
HLog wal3 = createWAL(newConf);
|
||||
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
|
||||
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, null) {
|
||||
@Override
|
||||
protected boolean restoreEdit(Store s, KeyValue kv) {
|
||||
boolean b = super.restoreEdit(s, kv);
|
||||
countOfRestoredEdits.incrementAndGet();
|
||||
return b;
|
||||
}
|
||||
};
|
||||
long seqid3 = region3.initialize();
|
||||
// HRegionServer usually does this. It knows the largest seqid across all regions.
|
||||
wal3.setSequenceNumber(seqid3);
|
||||
Result result3 = region3.get(g, null);
|
||||
// Assert that count of cells is same as before crash.
|
||||
assertEquals(result2.size(), result3.size());
|
||||
assertEquals(hri.getTableDesc().getFamilies().size() * countPerFamily,
|
||||
countOfRestoredEdits.get());
|
||||
user.runAs(new PrivilegedExceptionAction() {
|
||||
public Object run() throws Exception {
|
||||
runWALSplit(newConf);
|
||||
FileSystem newFS = FileSystem.get(newConf);
|
||||
// Make a new wal for new region open.
|
||||
HLog wal3 = createWAL(newConf);
|
||||
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
|
||||
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, null) {
|
||||
@Override
|
||||
protected boolean restoreEdit(Store s, KeyValue kv) {
|
||||
boolean b = super.restoreEdit(s, kv);
|
||||
countOfRestoredEdits.incrementAndGet();
|
||||
return b;
|
||||
}
|
||||
};
|
||||
long seqid3 = region3.initialize();
|
||||
// HRegionServer usually does this. It knows the largest seqid across all regions.
|
||||
wal3.setSequenceNumber(seqid3);
|
||||
Result result3 = region3.get(g, null);
|
||||
// Assert that count of cells is same as before crash.
|
||||
assertEquals(result2.size(), result3.size());
|
||||
assertEquals(hri.getTableDesc().getFamilies().size() * countPerFamily,
|
||||
countOfRestoredEdits.get());
|
||||
|
||||
// I can't close wal1. Its been appropriated when we split.
|
||||
region3.close();
|
||||
wal3.closeAndDelete();
|
||||
// I can't close wal1. Its been appropriated when we split.
|
||||
region3.close();
|
||||
wal3.closeAndDelete();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -323,11 +337,11 @@ public class TestWALReplay {
|
|||
@Test
|
||||
public void testReplayEditsWrittenIntoWAL() throws Exception {
|
||||
final String tableNameStr = "testReplayEditsWrittenIntoWAL";
|
||||
HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
|
||||
Path basedir = new Path(hbaseRootDir, tableNameStr);
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
|
||||
final Path basedir = new Path(hbaseRootDir, tableNameStr);
|
||||
deleteDir(basedir);
|
||||
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
|
||||
HLog wal = createWAL(this.conf);
|
||||
final HLog wal = createWAL(this.conf);
|
||||
final byte[] tableName = Bytes.toBytes(tableNameStr);
|
||||
final byte[] rowName = tableName;
|
||||
final byte[] regionName = hri.getEncodedNameAsBytes();
|
||||
|
@ -363,39 +377,45 @@ public class TestWALReplay {
|
|||
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
|
||||
// Make a new conf and a new fs for the splitter to run on so we can take
|
||||
// over old wal.
|
||||
Configuration newConf = HBaseTestingUtility.setDifferentUser(this.conf,
|
||||
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||
User user = HBaseTestingUtility.getDifferentUser(newConf,
|
||||
".replay.wal.secondtime");
|
||||
runWALSplit(newConf);
|
||||
FileSystem newFS = FileSystem.get(newConf);
|
||||
// 100k seems to make for about 4 flushes during HRegion#initialize.
|
||||
newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 100);
|
||||
// Make a new wal for new region.
|
||||
HLog newWal = createWAL(newConf);
|
||||
final AtomicInteger flushcount = new AtomicInteger(0);
|
||||
try {
|
||||
final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
|
||||
null) {
|
||||
protected boolean internalFlushcache(HLog wal, long myseqid)
|
||||
throws IOException {
|
||||
boolean b = super.internalFlushcache(wal, myseqid);
|
||||
flushcount.incrementAndGet();
|
||||
return b;
|
||||
};
|
||||
};
|
||||
long seqid = region.initialize();
|
||||
// We flushed during init.
|
||||
assertTrue(flushcount.get() > 0);
|
||||
assertTrue(seqid > wal.getSequenceNumber());
|
||||
user.runAs(new PrivilegedExceptionAction(){
|
||||
public Object run() throws Exception {
|
||||
runWALSplit(newConf);
|
||||
FileSystem newFS = FileSystem.get(newConf);
|
||||
// 100k seems to make for about 4 flushes during HRegion#initialize.
|
||||
newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 100);
|
||||
// Make a new wal for new region.
|
||||
HLog newWal = createWAL(newConf);
|
||||
final AtomicInteger flushcount = new AtomicInteger(0);
|
||||
try {
|
||||
final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
|
||||
null) {
|
||||
protected boolean internalFlushcache(HLog wal, long myseqid)
|
||||
throws IOException {
|
||||
boolean b = super.internalFlushcache(wal, myseqid);
|
||||
flushcount.incrementAndGet();
|
||||
return b;
|
||||
};
|
||||
};
|
||||
long seqid = region.initialize();
|
||||
// We flushed during init.
|
||||
assertTrue(flushcount.get() > 0);
|
||||
assertTrue(seqid > wal.getSequenceNumber());
|
||||
|
||||
Get get = new Get(rowName);
|
||||
Result result = region.get(get, -1);
|
||||
// Make sure we only see the good edits
|
||||
assertEquals(countPerFamily * (hri.getTableDesc().getFamilies().size() - 1),
|
||||
result.size());
|
||||
region.close();
|
||||
} finally {
|
||||
newWal.closeAndDelete();
|
||||
}
|
||||
Get get = new Get(rowName);
|
||||
Result result = region.get(get, -1);
|
||||
// Make sure we only see the good edits
|
||||
assertEquals(countPerFamily * (hri.getTableDesc().getFamilies().size() - 1),
|
||||
result.size());
|
||||
region.close();
|
||||
} finally {
|
||||
newWal.closeAndDelete();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Flusher used in this test. Keep count of how often we are called and
|
||||
|
|
Loading…
Reference in New Issue