HBASE-14154 DFS Replication should be configurable at column family level

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Ashish Singhi 2015-07-31 17:03:29 -07:00 committed by Andrew Purtell
parent 377bf1937f
commit f504e4b4ed
12 changed files with 140 additions and 19 deletions

View File

@ -129,6 +129,9 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
public static final byte[] MOB_THRESHOLD_BYTES = Bytes.toBytes(MOB_THRESHOLD); public static final byte[] MOB_THRESHOLD_BYTES = Bytes.toBytes(MOB_THRESHOLD);
public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k
public static final String DFS_REPLICATION = "DFS_REPLICATION";
public static final short DEFAULT_DFS_REPLICATION = 0;
/** /**
* Default compression type. * Default compression type.
*/ */
@ -1226,4 +1229,32 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
setValue(IS_MOB_BYTES, Bytes.toBytes(isMobEnabled)); setValue(IS_MOB_BYTES, Bytes.toBytes(isMobEnabled));
return this; return this;
} }
/**
* @return replication factor set for this CF or {@link #DEFAULT_DFS_REPLICATION} if not set.
* <p>
* {@link #DEFAULT_DFS_REPLICATION} value indicates that user has explicitly not set any
* block replication factor for this CF, hence use the default replication factor set in
* the file system.
*/
public short getDFSReplication() {
String rf = getValue(DFS_REPLICATION);
return rf == null ? DEFAULT_DFS_REPLICATION : Short.valueOf(rf);
}
/**
* Set the replication factor to hfile(s) belonging to this family
* @param replication number of replicas the blocks(s) belonging to this CF should have, or
* {@link #DEFAULT_DFS_REPLICATION} for the default replication factor set in the
* filesystem
* @return this (for chained invocation)
*/
public HColumnDescriptor setDFSReplication(short replication) {
if (replication < 1 && replication != DEFAULT_DFS_REPLICATION) {
throw new IllegalArgumentException(
"DFS replication factor cannot be less than 1 if explictly set.");
}
setValue(DFS_REPLICATION, Short.toString(replication));
return this;
}
} }

View File

@ -62,7 +62,7 @@ public class TestHColumnDescriptor {
hcd.setCompressionType(Algorithm.SNAPPY); hcd.setCompressionType(Algorithm.SNAPPY);
hcd.setMobEnabled(true); hcd.setMobEnabled(true);
hcd.setMobThreshold(1000L); hcd.setMobThreshold(1000L);
hcd.setDFSReplication((short) v);
byte [] bytes = hcd.toByteArray(); byte [] bytes = hcd.toByteArray();
HColumnDescriptor deserializedHcd = HColumnDescriptor.parseFrom(bytes); HColumnDescriptor deserializedHcd = HColumnDescriptor.parseFrom(bytes);
@ -80,6 +80,7 @@ public class TestHColumnDescriptor {
assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW)); assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW));
assertEquals(hcd.isMobEnabled(), deserializedHcd.isMobEnabled()); assertEquals(hcd.isMobEnabled(), deserializedHcd.isMobEnabled());
assertEquals(hcd.getMobThreshold(), deserializedHcd.getMobThreshold()); assertEquals(hcd.getMobThreshold(), deserializedHcd.getMobThreshold());
assertEquals(v, deserializedHcd.getDFSReplication());
} }
@Test @Test

View File

@ -232,12 +232,16 @@ public class TestHTableDescriptor {
byte[] familyName = Bytes.toBytes("cf"); byte[] familyName = Bytes.toBytes("cf");
HColumnDescriptor hcd = new HColumnDescriptor(familyName); HColumnDescriptor hcd = new HColumnDescriptor(familyName);
hcd.setBlocksize(1000); hcd.setBlocksize(1000);
hcd.setDFSReplication((short) 3);
htd.addFamily(hcd); htd.addFamily(hcd);
assertEquals(1000, htd.getFamily(familyName).getBlocksize()); assertEquals(1000, htd.getFamily(familyName).getBlocksize());
assertEquals(3, htd.getFamily(familyName).getDFSReplication());
hcd = new HColumnDescriptor(familyName); hcd = new HColumnDescriptor(familyName);
hcd.setBlocksize(2000); hcd.setBlocksize(2000);
hcd.setDFSReplication((short) 1);
htd.modifyFamily(hcd); htd.modifyFamily(hcd);
assertEquals(2000, htd.getFamily(familyName).getBlocksize()); assertEquals(2000, htd.getFamily(familyName).getBlocksize());
assertEquals(1, htd.getFamily(familyName).getDFSReplication());
} }
@Test(expected=IllegalArgumentException.class) @Test(expected=IllegalArgumentException.class)

View File

@ -268,7 +268,7 @@ public class HFileWriterImpl implements HFile.Writer {
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
FsPermission perms = FSUtils.getFilePermissions(fs, conf, FsPermission perms = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY); HConstants.DATA_FILE_UMASK_KEY);
return FSUtils.create(fs, path, perms, favoredNodes); return FSUtils.create(conf, fs, path, perms, favoredNodes);
} }
/** Additional initialization steps */ /** Additional initialization steps */

View File

@ -1583,6 +1583,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null); warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
} }
// check data replication factor, it can be 0(default value) when user has not explicitly
// set the value, in this case we use default replication factor set in the file system.
if (hcd.getDFSReplication() < 0) {
String message = "HFile Replication for column family " + hcd.getNameAsString()
+ " must be greater than zero.";
warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
}
// TODO: should we check coprocessors and encryption ? // TODO: should we check coprocessors and encryption ?
} }
} }

View File

@ -770,7 +770,7 @@ public class HRegionFileSystem {
// First check to get the permissions // First check to get the permissions
FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// Write the RegionInfo file content // Write the RegionInfo file content
FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null); FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
try { try {
out.write(content); out.write(content);
} finally { } finally {

View File

@ -363,11 +363,12 @@ public abstract class FSUtils {
* <li>overwrite the file if it exists</li> * <li>overwrite the file if it exists</li>
* <li>apply the umask in the configuration (if it is enabled)</li> * <li>apply the umask in the configuration (if it is enabled)</li>
* <li>use the fs configured buffer size (or 4096 if not set)</li> * <li>use the fs configured buffer size (or 4096 if not set)</li>
* <li>use the default replication</li> * <li>use the configured column family replication or default replication if
* {@link HColumnDescriptor#DEFAULT_DFS_REPLICATION}</li>
* <li>use the default block size</li> * <li>use the default block size</li>
* <li>not track progress</li> * <li>not track progress</li>
* </ol> * </ol>
* * @param conf configurations
* @param fs {@link FileSystem} on which to write the file * @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write * @param path {@link Path} to the file to write
* @param perm permissions * @param perm permissions
@ -375,23 +376,22 @@ public abstract class FSUtils {
* @return output stream to the created file * @return output stream to the created file
* @throws IOException if the file cannot be created * @throws IOException if the file cannot be created
*/ */
public static FSDataOutputStream create(FileSystem fs, Path path, public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
if (fs instanceof HFileSystem) { if (fs instanceof HFileSystem) {
FileSystem backingFs = ((HFileSystem)fs).getBackingFs(); FileSystem backingFs = ((HFileSystem)fs).getBackingFs();
if (backingFs instanceof DistributedFileSystem) { if (backingFs instanceof DistributedFileSystem) {
// Try to use the favoredNodes version via reflection to allow backwards- // Try to use the favoredNodes version via reflection to allow backwards-
// compatibility. // compatibility.
short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION,
String.valueOf(HColumnDescriptor.DEFAULT_DFS_REPLICATION)));
try { try {
return (FSDataOutputStream) (DistributedFileSystem.class return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create",
.getDeclaredMethod("create", Path.class, FsPermission.class, Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class,
boolean.class, int.class, short.class, long.class, Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true,
Progressable.class, InetSocketAddress[].class) getDefaultBufferSize(backingFs),
.invoke(backingFs, path, perm, true, replication > 0 ? replication : getDefaultReplication(backingFs, path),
getDefaultBufferSize(backingFs), getDefaultBlockSize(backingFs, path), null, favoredNodes));
getDefaultReplication(backingFs, path),
getDefaultBlockSize(backingFs, path),
null, favoredNodes));
} catch (InvocationTargetException ite) { } catch (InvocationTargetException ite) {
// Function was properly called, but threw it's own exception. // Function was properly called, but threw it's own exception.
throw new IOException(ite.getCause()); throw new IOException(ite.getCause());

View File

@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -57,9 +56,12 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -1336,4 +1338,59 @@ public class TestAdmin1 {
this.admin.deleteTable(tableName); this.admin.deleteTable(tableName);
} }
/*
* Test DFS replication for column families, where one CF has default replication(3) and the other
* is set to 1.
*/
@Test(timeout = 300000)
public void testHFileReplication() throws Exception {
TableName name = TableName.valueOf("testHFileReplication");
String fn1 = "rep1";
HColumnDescriptor hcd1 = new HColumnDescriptor(fn1);
hcd1.setDFSReplication((short) 1);
String fn = "defaultRep";
HColumnDescriptor hcd = new HColumnDescriptor(fn);
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(hcd);
htd.addFamily(hcd1);
Table table = TEST_UTIL.createTable(htd, null);
TEST_UTIL.waitTableAvailable(name);
Put p = new Put(Bytes.toBytes("defaultRep_rk"));
byte[] q1 = Bytes.toBytes("q1");
byte[] v1 = Bytes.toBytes("v1");
p.addColumn(Bytes.toBytes(fn), q1, v1);
List<Put> puts = new ArrayList<Put>(2);
puts.add(p);
p = new Put(Bytes.toBytes("rep1_rk"));
p.addColumn(Bytes.toBytes(fn1), q1, v1);
puts.add(p);
try {
table.put(puts);
admin.flush(name);
List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegions(name);
for (HRegion r : regions) {
Store store = r.getStore(Bytes.toBytes(fn));
for (StoreFile sf : store.getStorefiles()) {
assertTrue(sf.toString().contains(fn));
assertTrue("Column family " + fn + " should have 3 copies",
FSUtils.getDefaultReplication(TEST_UTIL.getTestFileSystem(), sf.getPath()) == (sf
.getFileInfo().getFileStatus().getReplication()));
}
store = r.getStore(Bytes.toBytes(fn1));
for (StoreFile sf : store.getStorefiles()) {
assertTrue(sf.toString().contains(fn1));
assertTrue("Column family " + fn1 + " should have only 1 copy", 1 == sf.getFileInfo()
.getFileStatus().getReplication());
}
}
} finally {
if (admin.isTableEnabled(name)) {
this.admin.disableTable(name);
this.admin.deleteTable(name);
}
}
}
} }

View File

@ -5521,6 +5521,22 @@ public class TestFromClientSide {
hcd.setScope(0); hcd.setScope(0);
checkTableIsLegal(htd); checkTableIsLegal(htd);
try {
hcd.setDFSReplication((short) -1);
fail("Illegal value for setDFSReplication did not throw");
} catch (IllegalArgumentException e) {
// pass
}
// set an illegal DFS replication value by hand
hcd.setValue(HColumnDescriptor.DFS_REPLICATION, "-1");
checkTableIsIllegal(htd);
try {
hcd.setDFSReplication((short) -1);
fail("Should throw exception if an illegal value is explicitly being set");
} catch (IllegalArgumentException e) {
// pass
}
// check the conf settings to disable sanity checks // check the conf settings to disable sanity checks
htd.setMemStoreFlushSize(0); htd.setMemStoreFlushSize(0);

View File

@ -266,7 +266,7 @@ public class TestFSUtils {
// then that the correct file is created // then that the correct file is created
Path p = new Path("target" + File.separator + UUID.randomUUID().toString()); Path p = new Path("target" + File.separator + UUID.randomUUID().toString());
try { try {
FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null); FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null);
out.close(); out.close();
FileStatus stat = fs.getFileStatus(p); FileStatus stat = fs.getFileStatus(p);
assertEquals(new FsPermission("700"), stat.getPermission()); assertEquals(new FsPermission("700"), stat.getPermission());
@ -288,13 +288,13 @@ public class TestFSUtils {
Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file);
Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file);
try { try {
FSDataOutputStream out = FSUtils.create(fs, p, perms, null); FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null);
out.close(); out.close();
assertTrue("The created file should be present", FSUtils.isExists(fs, p)); assertTrue("The created file should be present", FSUtils.isExists(fs, p));
// delete the file with recursion as false. Only the file will be deleted. // delete the file with recursion as false. Only the file will be deleted.
FSUtils.delete(fs, p, false); FSUtils.delete(fs, p, false);
// Create another file // Create another file
FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null); FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null);
out1.close(); out1.close();
// delete the file with recursion as false. Still the file only will be deleted // delete the file with recursion as false. Still the file only will be deleted
FSUtils.delete(fs, p1, true); FSUtils.delete(fs, p1, true);

View File

@ -790,6 +790,9 @@ module Hbase
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA] set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.
HColumnDescriptor::DFS_REPLICATION))) if arg.include?(org.apache.hadoop.hbase.
HColumnDescriptor::DFS_REPLICATION)
arg.each_key do |unknown_key| arg.each_key do |unknown_key|
puts("Unknown argument ignored for column family %s: %s" % [name, unknown_key]) puts("Unknown argument ignored for column family %s: %s" % [name, unknown_key])

View File

@ -50,6 +50,7 @@ Examples:
hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname) hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'} hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}} hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}
hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1}
You can also keep around a reference to the created table: You can also keep around a reference to the created table: